Files
fn-serverless/api/agent/lb_agent.go
Reed Allman af94f3f8ac move max_request_size from agent to server (#1145)
moves the config option for max request size up to the front end, adds the env
var for it there, adds a server test for it and removes it from agent. a
request is either gonna come through the lb (before grpc) or to the server, we
can handle limiting the request there at least now, which may be easier than
having multiple layers of request body checking. this aligns with not making
the agent as responsible for http behaviors (eventually, not at all once route
is fully deprecated).
2018-07-31 08:58:47 -07:00

274 lines
6.4 KiB
Go

package agent
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"sync/atomic"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/fnext"
)
type lbAgent struct {
cfg Config
cda CallHandler
callListeners []fnext.CallListener
rp pool.RunnerPool
placer pool.Placer
callOverrider CallOverrider
shutWg *common.WaitGroup
callEndCount int64
}
type LBAgentOption func(*lbAgent) error
func WithLBAgentConfig(cfg *Config) LBAgentOption {
return func(a *lbAgent) error {
a.cfg = *cfg
return nil
}
}
// LB agents can use this to register a CallOverrider to modify a Call and extensions
func WithLBCallOverrider(fn CallOverrider) LBAgentOption {
return func(a *lbAgent) error {
if a.callOverrider != nil {
return errors.New("lb-agent call overriders already exists")
}
a.callOverrider = fn
return nil
}
}
// NewLBAgent creates an Agent that knows how to load-balance function calls
// across a group of runner nodes.
func NewLBAgent(da CallHandler, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error) {
// Yes, LBAgent and Agent both use an Config.
cfg, err := NewConfig()
if err != nil {
logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg)
}
a := &lbAgent{
cfg: *cfg,
cda: da,
rp: rp,
placer: p,
shutWg: common.NewWaitGroup(),
}
// Allow overriding config
for _, option := range options {
err = option(a)
if err != nil {
logrus.WithError(err).Fatalf("error in lb-agent options")
}
}
logrus.Infof("lb-agent starting cfg=%+v", a.cfg)
return a, nil
}
// implements Agent
func (a *lbAgent) AddCallListener(listener fnext.CallListener) {
a.callListeners = append(a.callListeners, listener)
}
// implements callTrigger
func (a *lbAgent) fireBeforeCall(ctx context.Context, call *models.Call) error {
return fireBeforeCallFun(a.callListeners, ctx, call)
}
// implements callTrigger
func (a *lbAgent) fireAfterCall(ctx context.Context, call *models.Call) error {
return fireAfterCallFun(a.callListeners, ctx, call)
}
// implements Agent
func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
var c call
for _, o := range opts {
err := o(&c)
if err != nil {
return nil, err
}
}
// TODO typed errors to test
if c.req == nil || c.Call == nil {
return nil, errors.New("no model or request provided for call")
}
// If overrider is present, let's allow it to modify models.Call
// and call extensions
if a.callOverrider != nil {
ext, err := a.callOverrider(c.Call, c.extensions)
if err != nil {
return nil, err
}
c.extensions = ext
}
setupCtx(&c)
c.isLB = true
c.handler = a.cda
c.ct = a
c.stderr = &nullReadWriter{}
c.slotHashId = getSlotQueueKey(&c)
return &c, nil
}
// implements Agent
func (a *lbAgent) Close() error {
// start closing the front gate first
ch := a.shutWg.CloseGroupNB()
// finally shutdown the runner pool
err := a.rp.Shutdown(context.Background())
if err != nil {
logrus.WithError(err).Warn("Runner pool shutdown error")
}
// gate-on front-gate, should be completed if delegated agent & runner pool is gone.
<-ch
return err
}
// implements Agent
func (a *lbAgent) Submit(callI Call) error {
if !a.shutWg.AddSession(1) {
return models.ErrCallTimeoutServerBusy
}
call := callI.(*call)
ctx, span := trace.StartSpan(call.req.Context(), "agent_submit")
defer span.End()
statsEnqueue(ctx)
// first check any excess case of call.End() stacking.
if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) {
a.handleCallEnd(ctx, call, context.DeadlineExceeded, false)
}
err := call.Start(ctx)
if err != nil {
return a.handleCallEnd(ctx, call, err, false)
}
statsDequeueAndStart(ctx)
// pre-read and buffer request body if already not done based
// on GetBody presence.
buf, err := a.setRequestBody(ctx, call)
if buf != nil {
defer bufPool.Put(buf)
}
if err != nil {
common.Logger(call.req.Context()).WithError(err).Error("Failed to process call body")
return a.handleCallEnd(ctx, call, err, true)
}
// WARNING: isStarted (handleCallEnd) semantics
// need some consideration here. Similar to runner/agent
// we consider isCommitted true if call.Start() succeeds.
// isStarted=true means we will call Call.End().
err = a.placer.PlaceCall(a.rp, ctx, call)
if err != nil {
common.Logger(call.req.Context()).WithError(err).Error("Failed to place call")
}
return a.handleCallEnd(ctx, call, err, true)
}
// setRequestGetBody sets GetBody function on the given http.Request if it is missing. GetBody allows
// reading from the request body without mutating the state of the request.
func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer, error) {
r := call.req
if r.Body == nil || r.GetBody != nil {
return nil, nil
}
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
// WARNING: we need to handle IO in a separate go-routine below
// to be able to detect a ctx timeout. When we timeout, we
// let gin/http-server to unblock the go-routine below.
errApp := make(chan error, 1)
go func() {
_, err := buf.ReadFrom(r.Body)
if err != nil && err != io.EOF {
errApp <- err
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(buf.Bytes()))
// GetBody does not mutate the state of the request body
r.GetBody = func() (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(buf.Bytes())), nil
}
close(errApp)
}()
select {
case err := <-errApp:
return buf, err
case <-ctx.Done():
return buf, ctx.Err()
}
}
// implements Agent
func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
logrus.Error("Enqueue not implemented")
return errors.New("Enqueue not implemented")
}
func (a *lbAgent) scheduleCallEnd(fn func()) {
atomic.AddInt64(&a.callEndCount, 1)
go func() {
fn()
atomic.AddInt64(&a.callEndCount, -1)
a.shutWg.DoneSession()
}()
}
func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error {
if isStarted {
a.scheduleCallEnd(func() {
ctx = common.BackgroundContext(ctx)
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
call.End(ctx, err)
cancel()
})
handleStatsEnd(ctx, err)
return transformTimeout(err, false)
}
a.shutWg.DoneSession()
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
var _ Agent = &lbAgent{}
var _ callTrigger = &lbAgent{}