diff --git a/api/agent/agent.go b/api/agent/agent.go index 09afc991d..d2a0463ce 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -10,7 +10,6 @@ import ( "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/agent/drivers/docker" - "github.com/fnproject/fn/api/agent/drivers/mock" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" @@ -98,6 +97,9 @@ type Agent interface { // GetAppByID is to get the app by ID GetAppByID(ctx context.Context, appID string) (*models.App, error) + + // GetRoute is to get the route by appId and path + GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) } type agent struct { @@ -119,7 +121,7 @@ type agent struct { // New creates an Agent that executes functions locally as Docker containers. func New(da DataAccess) Agent { - a := createAgent(da, true, nil).(*agent) + a := createAgent(da).(*agent) if !a.shutWg.AddSession(1) { logrus.Fatalf("cannot start agent, unable to add session") } @@ -127,7 +129,7 @@ func New(da DataAccess) Agent { return a } -func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) Agent { +func createAgent(da DataAccess) Agent { cfg, err := NewAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) @@ -135,22 +137,14 @@ func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) A logrus.Infof("agent starting cfg=%+v", cfg) // TODO: Create drivers.New(runnerConfig) - var driver drivers.Driver - if withDocker { - driver = docker.NewDocker(drivers.Config{ - ServerVersion: cfg.MinDockerVersion, - PreForkPoolSize: cfg.PreForkPoolSize, - PreForkImage: cfg.PreForkImage, - PreForkCmd: cfg.PreForkCmd, - PreForkUseOnce: cfg.PreForkUseOnce, - PreForkNetworks: cfg.PreForkNetworks, - }) - } else { - driver = mock.New() - } - if withShutWg == nil { - withShutWg = common.NewWaitGroup() - } + driver := docker.NewDocker(drivers.Config{ + ServerVersion: cfg.MinDockerVersion, + PreForkPoolSize: cfg.PreForkPoolSize, + PreForkImage: cfg.PreForkImage, + PreForkCmd: cfg.PreForkCmd, + PreForkUseOnce: cfg.PreForkUseOnce, + PreForkNetworks: cfg.PreForkNetworks, + }) a := &agent{ cfg: *cfg, @@ -158,7 +152,7 @@ func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) A driver: driver, slotMgr: NewSlotQueueMgr(), resources: NewResourceTracker(cfg), - shutWg: withShutWg, + shutWg: common.NewWaitGroup(), } // TODO assert that agent doesn't get started for API nodes up above ? @@ -173,6 +167,10 @@ func (a *agent) GetAppID(ctx context.Context, appName string) (string, error) { return a.da.GetAppID(ctx, appName) } +func (a *agent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return a.da.GetRoute(ctx, appID, path) +} + // TODO shuffle this around somewhere else (maybe) func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { return a.da.Enqueue(ctx, call) diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 11b8b7b9c..8d1a30be5 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -114,7 +114,7 @@ func TestCallConfigurationRequest(t *testing.T) { call, err := a.GetCall( WithWriter(w), // XXX (reed): order matters [for now] - FromRequest(app, path, req), + FromRequest(a, app, path, req), ) if err != nil { t.Fatal(err) @@ -530,7 +530,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { // grab a buffer so we can read what gets written to this guy var out bytes.Buffer - callI, err := a.GetCall(FromRequest(app, path, req), WithWriter(&out)) + callI, err := a.GetCall(FromRequest(a, app, path, req), WithWriter(&out)) if err != nil { t.Fatal(err) } @@ -703,7 +703,7 @@ func TestPipesAreClear(t *testing.T) { req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodOne))) var outOne bytes.Buffer - callI, err := a.GetCall(FromRequest(app, ca.Path, req), WithWriter(&outOne)) + callI, err := a.GetCall(FromRequest(a, app, ca.Path, req), WithWriter(&outOne)) if err != nil { t.Fatal(err) } @@ -737,7 +737,7 @@ func TestPipesAreClear(t *testing.T) { req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodTwo))) var outTwo bytes.Buffer - callI, err = a.GetCall(FromRequest(app, ca.Path, req), WithWriter(&outTwo)) + callI, err = a.GetCall(FromRequest(a, app, ca.Path, req), WithWriter(&outTwo)) if err != nil { t.Fatal(err) } @@ -844,7 +844,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { } var outOne bytes.Buffer - callI, err := a.GetCall(FromRequest(app, call.Path, req), WithWriter(&outOne)) + callI, err := a.GetCall(FromRequest(a, app, call.Path, req), WithWriter(&outOne)) if err != nil { t.Fatal(err) } @@ -869,7 +869,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { } var outTwo bytes.Buffer - callI, err = a.GetCall(FromRequest(app, call.Path, req), WithWriter(&outTwo)) + callI, err = a.GetCall(FromRequest(a, app, call.Path, req), WithWriter(&outTwo)) if err != nil { t.Fatal(err) } diff --git a/api/agent/call.go b/api/agent/call.go index 5d3b78760..6b7448b57 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -41,7 +41,7 @@ type Call interface { } // TODO build w/o closures... lazy -type CallOpt func(a *agent, c *call) error +type CallOpt func(c *call) error type Param struct { Key string @@ -49,9 +49,9 @@ type Param struct { } type Params []Param -func FromRequest(app *models.App, path string, req *http.Request) CallOpt { - return func(a *agent, c *call) error { - route, err := a.da.GetRoute(req.Context(), app.ID, path) +func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallOpt { + return func(c *call) error { + route, err := a.GetRoute(req.Context(), app.ID, path) if err != nil { return err } @@ -160,7 +160,7 @@ func reqURL(req *http.Request) string { // here, to be a fully qualified model. We probably should double check but having a way // to bypass will likely be what's used anyway unless forced. func FromModel(mCall *models.Call) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.Call = mCall req, err := http.NewRequest(c.Method, c.URL, strings.NewReader(c.Payload)) @@ -176,7 +176,7 @@ func FromModel(mCall *models.Call) CallOpt { } func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.Call = mCall req, err := http.NewRequest(c.Method, c.URL, in) @@ -193,26 +193,19 @@ func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt { // TODO this should be required func WithWriter(w io.Writer) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.w = w return nil } } func WithContext(ctx context.Context) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.req = c.req.WithContext(ctx) return nil } } -func WithoutPreemptiveCapacityCheck() CallOpt { - return func(a *agent, c *call) error { - c.disablePreemptiveCapacityCheck = true - return nil - } -} - // GetCall builds a Call that can be used to submit jobs to the agent. // // TODO where to put this? async and sync both call this @@ -220,7 +213,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { var c call for _, o := range opts { - err := o(a, &c) + err := o(&c) if err != nil { return nil, err } @@ -231,11 +224,9 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } - if !c.disablePreemptiveCapacityCheck { - if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { - // if we're not going to be able to run this call on this machine, bail here. - return nil, models.ErrCallTimeoutServerBusy - } + if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { + // if we're not going to be able to run this call on this machine, bail here. + return nil, models.ErrCallTimeoutServerBusy } c.da = a.da @@ -274,15 +265,14 @@ type call struct { ct callTrigger slots *slotQueue slotDeadline time.Time + lbDeadline time.Time execDeadline time.Time requestState RequestState containerState ContainerState - // This can be used to disable the preemptive capacity check in GetCall - disablePreemptiveCapacityCheck bool } -func (c *call) SlotDeadline() time.Time { - return c.slotDeadline +func (c *call) LbDeadline() time.Time { + return c.lbDeadline } func (c *call) RequestBody() io.ReadCloser { @@ -311,8 +301,11 @@ func (c *call) Start(ctx context.Context) error { c.StartedAt = strfmt.DateTime(time.Now()) c.Status = "running" - if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in - rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) + // Do not write this header if lb-agent + if c.lbDeadline.IsZero() { + if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in + rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) + } } if c.Type == models.TypeAsync { diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index 4c2e11dff..6dc2f4d46 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -74,6 +74,20 @@ type nopCloser struct { func (n *nopCloser) Close() error { return nil } +type nullReadWriter struct { + io.ReadCloser +} + +func (n *nullReadWriter) Close() error { + return nil +} +func (n *nullReadWriter) Read(b []byte) (int, error) { + return 0, io.EOF +} +func (n *nullReadWriter) Write(b []byte) (int, error) { + return 0, io.EOF +} + // multiWriteCloser returns the first write or close that returns a non-nil // err, if no non-nil err is returned, then the returned bytes written will be // from the last call to write. diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 6e833af76..babdcbd37 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -2,6 +2,8 @@ package agent import ( "context" + "errors" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -25,41 +27,89 @@ const ( ) type lbAgent struct { - delegatedAgent Agent - rp pool.RunnerPool - placer pool.Placer - shutWg *common.WaitGroup + cfg AgentConfig + da DataAccess + callListeners []fnext.CallListener + rp pool.RunnerPool + placer pool.Placer + + shutWg *common.WaitGroup + callEndCount int64 } // NewLBAgent creates an Agent that knows how to load-balance function calls // across a group of runner nodes. func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) { - wg := common.NewWaitGroup() - agent := createAgent(da, false, wg) + + // TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig + cfg, err := NewAgentConfig() + if err != nil { + logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) + } + logrus.Infof("lb-agent starting cfg=%+v", cfg) + a := &lbAgent{ - delegatedAgent: agent, - rp: rp, - placer: p, - shutWg: wg, + cfg: *cfg, + da: da, + rp: rp, + placer: p, + shutWg: common.NewWaitGroup(), } return a, nil } +func (a *lbAgent) AddCallListener(listener fnext.CallListener) { + a.callListeners = append(a.callListeners, listener) +} + +func (a *lbAgent) fireBeforeCall(ctx context.Context, call *models.Call) error { + return fireBeforeCallFun(a.callListeners, ctx, call) +} + +func (a *lbAgent) fireAfterCall(ctx context.Context, call *models.Call) error { + return fireAfterCallFun(a.callListeners, ctx, call) +} + // GetAppID is to get the match of an app name to its ID func (a *lbAgent) GetAppID(ctx context.Context, appName string) (string, error) { - return a.delegatedAgent.GetAppID(ctx, appName) + return a.da.GetAppID(ctx, appName) } // GetAppByID is to get the app by ID func (a *lbAgent) GetAppByID(ctx context.Context, appID string) (*models.App, error) { - return a.delegatedAgent.GetAppByID(ctx, appID) + return a.da.GetAppByID(ctx, appID) +} + +func (a *lbAgent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return a.da.GetRoute(ctx, appID, path) } -// GetCall delegates to the wrapped agent but disables the capacity check as -// this agent isn't actually running the call. func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { - opts = append(opts, WithoutPreemptiveCapacityCheck()) - return a.delegatedAgent.GetCall(opts...) + var c call + + for _, o := range opts { + err := o(&c) + if err != nil { + return nil, err + } + } + + // TODO typed errors to test + if c.req == nil || c.Call == nil { + return nil, errors.New("no model or request provided for call") + } + + c.da = a.da + c.ct = a + c.stderr = &nullReadWriter{} + + ctx, _ := common.LoggerWithFields(c.req.Context(), + logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path}) + c.req = c.req.WithContext(ctx) + + c.lbDeadline = time.Now().Add(time.Duration(c.Call.Timeout) * time.Second) + + return &c, nil } func (a *lbAgent) Close() error { @@ -67,27 +117,17 @@ func (a *lbAgent) Close() error { // start closing the front gate first ch := a.shutWg.CloseGroupNB() - // delegated agent shutdown next, blocks here... - err1 := a.delegatedAgent.Close() - if err1 != nil { - logrus.WithError(err1).Warn("Delegated agent shutdown error") - } - // finally shutdown the runner pool ctx, cancel := context.WithTimeout(context.Background(), runnerPoolShutdownTimeout) defer cancel() - err2 := a.rp.Shutdown(ctx) - if err2 != nil { - logrus.WithError(err2).Warn("Runner pool shutdown error") + err := a.rp.Shutdown(ctx) + if err != nil { + logrus.WithError(err).Warn("Runner pool shutdown error") } // gate-on front-gate, should be completed if delegated agent & runner pool is gone. <-ch - - if err1 != nil { - return err1 - } - return err2 + return err } func GetGroupID(call *models.Call) string { @@ -109,20 +149,20 @@ func (a *lbAgent) Submit(callI Call) error { call := callI.(*call) - ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline) + ctx, cancel := context.WithDeadline(call.req.Context(), call.lbDeadline) call.req = call.req.WithContext(ctx) defer cancel() ctx, span := trace.StartSpan(ctx, "agent_submit") defer span.End() - err := a.submit(ctx, call) - return err -} - -func (a *lbAgent) submit(ctx context.Context, call *call) error { statsEnqueue(ctx) + // first check any excess case of call.End() stacking. + if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) { + a.handleCallEnd(ctx, call, context.DeadlineExceeded, false) + } + err := call.Start(ctx) if err != nil { return a.handleCallEnd(ctx, call, err, false) @@ -130,6 +170,10 @@ func (a *lbAgent) submit(ctx context.Context, call *call) error { statsDequeueAndStart(ctx) + // WARNING: isStarted (handleCallEnd) semantics + // need some consideration here. Similar to runner/agent + // we consider isCommitted true if call.Start() succeeds. + // isStarted=true means we will call Call.End(). err = a.placer.PlaceCall(a.rp, ctx, call) if err != nil { logrus.WithError(err).Error("Failed to place call") @@ -138,16 +182,34 @@ func (a *lbAgent) submit(ctx context.Context, call *call) error { return a.handleCallEnd(ctx, call, err, true) } -func (a *lbAgent) AddCallListener(cl fnext.CallListener) { - a.delegatedAgent.AddCallListener(cl) -} - func (a *lbAgent) Enqueue(context.Context, *models.Call) error { logrus.Fatal("Enqueue not implemented. Panicking.") return nil } -func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isCommitted bool) error { - delegatedAgent := a.delegatedAgent.(*agent) - return delegatedAgent.handleCallEnd(ctx, call, nil, err, isCommitted) +func (a *lbAgent) scheduleCallEnd(fn func()) { + atomic.AddInt64(&a.callEndCount, 1) + go func() { + fn() + atomic.AddInt64(&a.callEndCount, -1) + a.shutWg.AddSession(-1) + }() +} + +func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error { + if isStarted { + a.scheduleCallEnd(func() { + ctx = common.BackgroundContext(ctx) + ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) + call.End(ctx, err) + cancel() + }) + + handleStatsEnd(ctx, err) + return transformTimeout(err, false) + } + + a.shutWg.AddSession(-1) + handleStatsDequeue(ctx, err) + return transformTimeout(err, true) } diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 0cfc0d81f..1702a54a3 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -117,15 +117,15 @@ func (r *mockRunner) Address() string { } type mockRunnerCall struct { - slotDeadline time.Time - r *http.Request - rw http.ResponseWriter - stdErr io.ReadWriteCloser - model *models.Call + lbDeadline time.Time + r *http.Request + rw http.ResponseWriter + stdErr io.ReadWriteCloser + model *models.Call } -func (c *mockRunnerCall) SlotDeadline() time.Time { - return c.slotDeadline +func (c *mockRunnerCall) LbDeadline() time.Time { + return c.lbDeadline } func (c *mockRunnerCall) RequestBody() io.ReadCloser { @@ -152,7 +152,7 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC func TestOneRunner(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { t.Fatalf("Failed to place call on runner %v", err) @@ -162,7 +162,7 @@ func TestOneRunner(t *testing.T) { func TestEnforceTimeoutFromContext(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} ctx, cancel := context.WithDeadline(context.Background(), time.Now()) defer cancel() err := placer.PlaceCall(rp, ctx, call) @@ -182,7 +182,7 @@ func TestSpilloverToSecondRunner(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) @@ -199,7 +199,7 @@ func TestSpilloverToSecondRunner(t *testing.T) { } } -func TestEnforceSlotTimeout(t *testing.T) { +func TestEnforceLbTimeout(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1) @@ -210,7 +210,7 @@ func TestEnforceSlotTimeout(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) diff --git a/api/agent/listeners.go b/api/agent/listeners.go index 7855c5eba..934c4b474 100644 --- a/api/agent/listeners.go +++ b/api/agent/listeners.go @@ -17,7 +17,15 @@ func (a *agent) AddCallListener(listener fnext.CallListener) { } func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { - for _, l := range a.callListeners { + return fireBeforeCallFun(a.callListeners, ctx, call) +} + +func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { + return fireAfterCallFun(a.callListeners, ctx, call) +} + +func fireBeforeCallFun(callListeners []fnext.CallListener, ctx context.Context, call *models.Call) error { + for _, l := range callListeners { err := l.BeforeCall(ctx, call) if err != nil { return err @@ -26,8 +34,8 @@ func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { return nil } -func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { - for _, l := range a.callListeners { +func fireAfterCallFun(callListeners []fnext.CallListener, ctx context.Context, call *models.Call) error { + for _, l := range callListeners { err := l.AfterCall(ctx, call) if err != nil { return err diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 956cf536c..65816fd29 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -521,6 +521,10 @@ func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) { return pr.a.GetCall(opts...) } +func (pr *pureRunner) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return pr.a.GetRoute(ctx, appID, path) +} + func (pr *pureRunner) Submit(Call) error { return errors.New("Submit cannot be called directly in a Pure Runner.") } @@ -671,7 +675,7 @@ func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, ce } func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error) { - a := createAgent(da, true, nil) + a := createAgent(da) var pr *pureRunner var err error if cert != "" && key != "" && ca != "" { diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index a47d4afda..0ef031eae 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -28,7 +28,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) - timeout := time.After(call.SlotDeadline().Sub(time.Now())) + timeout := time.After(call.LbDeadline().Sub(time.Now())) for { select { @@ -57,7 +57,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall } } - remaining := call.SlotDeadline().Sub(time.Now()) + remaining := call.LbDeadline().Sub(time.Now()) if remaining <= 0 { return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index 00b146b7b..65ca543e0 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -23,7 +23,7 @@ func NewNaivePlacer() Placer { } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - timeout := time.After(call.SlotDeadline().Sub(time.Now())) + timeout := time.After(call.LbDeadline().Sub(time.Now())) for { select { @@ -47,7 +47,7 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner } } - remaining := call.SlotDeadline().Sub(time.Now()) + remaining := call.LbDeadline().Sub(time.Now()) if remaining <= 0 { return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 9f49a528d..cc740f5de 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -41,7 +41,7 @@ type Runner interface { // RunnerCall provides access to the necessary details of request in order for it to be // processed by a RunnerPool type RunnerCall interface { - SlotDeadline() time.Time + LbDeadline() time.Time RequestBody() io.ReadCloser ResponseWriter() http.ResponseWriter StdErr() io.ReadWriteCloser diff --git a/api/server/runner.go b/api/server/runner.go index 795878005..dc7a9e200 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -75,7 +75,7 @@ func (s *Server) serve(c *gin.Context, app *models.App, path string) error { call, err := s.agent.GetCall( agent.WithWriter(&writer), // XXX (reed): order matters [for now] - agent.FromRequest(app, path, c.Request), + agent.FromRequest(s.agent, app, path, c.Request), ) if err != nil { return err