From e47d55056ad12fa23f1d38e70855e0ba4db5e6f9 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 12 Apr 2018 15:51:58 -0700 Subject: [PATCH] fn: reduce lbagent and agent dependency (#938) * fn: reduce lbagent and agent dependency lbagent and agent code is too dependent. This causes any changed in agent to break lbagent. In reality, for LB there should be no delegated agent. Splitting these two will cause some code duplication, but it reduces dependency and complexity (eg. agent without docker) * fn: post rebase fixup * fn: runner/runnercall should use lbDeadline * fn: fixup ln agent test * fn: remove agent create option for common.WaitGroup --- api/agent/agent.go | 38 ++++----- api/agent/agent_test.go | 12 +-- api/agent/call.go | 47 +++++------ api/agent/func_logger.go | 14 ++++ api/agent/lb_agent.go | 148 +++++++++++++++++++++++---------- api/agent/lb_agent_test.go | 24 +++--- api/agent/listeners.go | 14 +++- api/agent/pure_runner.go | 6 +- api/runnerpool/ch_placer.go | 4 +- api/runnerpool/naive_placer.go | 4 +- api/runnerpool/runner_pool.go | 2 +- api/server/runner.go | 2 +- 12 files changed, 197 insertions(+), 118 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 09afc991d..d2a0463ce 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -10,7 +10,6 @@ import ( "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/agent/drivers/docker" - "github.com/fnproject/fn/api/agent/drivers/mock" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" @@ -98,6 +97,9 @@ type Agent interface { // GetAppByID is to get the app by ID GetAppByID(ctx context.Context, appID string) (*models.App, error) + + // GetRoute is to get the route by appId and path + GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) } type agent struct { @@ -119,7 +121,7 @@ type agent struct { // New creates an Agent that executes functions locally as Docker containers. func New(da DataAccess) Agent { - a := createAgent(da, true, nil).(*agent) + a := createAgent(da).(*agent) if !a.shutWg.AddSession(1) { logrus.Fatalf("cannot start agent, unable to add session") } @@ -127,7 +129,7 @@ func New(da DataAccess) Agent { return a } -func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) Agent { +func createAgent(da DataAccess) Agent { cfg, err := NewAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) @@ -135,22 +137,14 @@ func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) A logrus.Infof("agent starting cfg=%+v", cfg) // TODO: Create drivers.New(runnerConfig) - var driver drivers.Driver - if withDocker { - driver = docker.NewDocker(drivers.Config{ - ServerVersion: cfg.MinDockerVersion, - PreForkPoolSize: cfg.PreForkPoolSize, - PreForkImage: cfg.PreForkImage, - PreForkCmd: cfg.PreForkCmd, - PreForkUseOnce: cfg.PreForkUseOnce, - PreForkNetworks: cfg.PreForkNetworks, - }) - } else { - driver = mock.New() - } - if withShutWg == nil { - withShutWg = common.NewWaitGroup() - } + driver := docker.NewDocker(drivers.Config{ + ServerVersion: cfg.MinDockerVersion, + PreForkPoolSize: cfg.PreForkPoolSize, + PreForkImage: cfg.PreForkImage, + PreForkCmd: cfg.PreForkCmd, + PreForkUseOnce: cfg.PreForkUseOnce, + PreForkNetworks: cfg.PreForkNetworks, + }) a := &agent{ cfg: *cfg, @@ -158,7 +152,7 @@ func createAgent(da DataAccess, withDocker bool, withShutWg *common.WaitGroup) A driver: driver, slotMgr: NewSlotQueueMgr(), resources: NewResourceTracker(cfg), - shutWg: withShutWg, + shutWg: common.NewWaitGroup(), } // TODO assert that agent doesn't get started for API nodes up above ? @@ -173,6 +167,10 @@ func (a *agent) GetAppID(ctx context.Context, appName string) (string, error) { return a.da.GetAppID(ctx, appName) } +func (a *agent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return a.da.GetRoute(ctx, appID, path) +} + // TODO shuffle this around somewhere else (maybe) func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { return a.da.Enqueue(ctx, call) diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 11b8b7b9c..8d1a30be5 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -114,7 +114,7 @@ func TestCallConfigurationRequest(t *testing.T) { call, err := a.GetCall( WithWriter(w), // XXX (reed): order matters [for now] - FromRequest(app, path, req), + FromRequest(a, app, path, req), ) if err != nil { t.Fatal(err) @@ -530,7 +530,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { // grab a buffer so we can read what gets written to this guy var out bytes.Buffer - callI, err := a.GetCall(FromRequest(app, path, req), WithWriter(&out)) + callI, err := a.GetCall(FromRequest(a, app, path, req), WithWriter(&out)) if err != nil { t.Fatal(err) } @@ -703,7 +703,7 @@ func TestPipesAreClear(t *testing.T) { req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodOne))) var outOne bytes.Buffer - callI, err := a.GetCall(FromRequest(app, ca.Path, req), WithWriter(&outOne)) + callI, err := a.GetCall(FromRequest(a, app, ca.Path, req), WithWriter(&outOne)) if err != nil { t.Fatal(err) } @@ -737,7 +737,7 @@ func TestPipesAreClear(t *testing.T) { req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodTwo))) var outTwo bytes.Buffer - callI, err = a.GetCall(FromRequest(app, ca.Path, req), WithWriter(&outTwo)) + callI, err = a.GetCall(FromRequest(a, app, ca.Path, req), WithWriter(&outTwo)) if err != nil { t.Fatal(err) } @@ -844,7 +844,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { } var outOne bytes.Buffer - callI, err := a.GetCall(FromRequest(app, call.Path, req), WithWriter(&outOne)) + callI, err := a.GetCall(FromRequest(a, app, call.Path, req), WithWriter(&outOne)) if err != nil { t.Fatal(err) } @@ -869,7 +869,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { } var outTwo bytes.Buffer - callI, err = a.GetCall(FromRequest(app, call.Path, req), WithWriter(&outTwo)) + callI, err = a.GetCall(FromRequest(a, app, call.Path, req), WithWriter(&outTwo)) if err != nil { t.Fatal(err) } diff --git a/api/agent/call.go b/api/agent/call.go index 5d3b78760..6b7448b57 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -41,7 +41,7 @@ type Call interface { } // TODO build w/o closures... lazy -type CallOpt func(a *agent, c *call) error +type CallOpt func(c *call) error type Param struct { Key string @@ -49,9 +49,9 @@ type Param struct { } type Params []Param -func FromRequest(app *models.App, path string, req *http.Request) CallOpt { - return func(a *agent, c *call) error { - route, err := a.da.GetRoute(req.Context(), app.ID, path) +func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallOpt { + return func(c *call) error { + route, err := a.GetRoute(req.Context(), app.ID, path) if err != nil { return err } @@ -160,7 +160,7 @@ func reqURL(req *http.Request) string { // here, to be a fully qualified model. We probably should double check but having a way // to bypass will likely be what's used anyway unless forced. func FromModel(mCall *models.Call) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.Call = mCall req, err := http.NewRequest(c.Method, c.URL, strings.NewReader(c.Payload)) @@ -176,7 +176,7 @@ func FromModel(mCall *models.Call) CallOpt { } func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.Call = mCall req, err := http.NewRequest(c.Method, c.URL, in) @@ -193,26 +193,19 @@ func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt { // TODO this should be required func WithWriter(w io.Writer) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.w = w return nil } } func WithContext(ctx context.Context) CallOpt { - return func(a *agent, c *call) error { + return func(c *call) error { c.req = c.req.WithContext(ctx) return nil } } -func WithoutPreemptiveCapacityCheck() CallOpt { - return func(a *agent, c *call) error { - c.disablePreemptiveCapacityCheck = true - return nil - } -} - // GetCall builds a Call that can be used to submit jobs to the agent. // // TODO where to put this? async and sync both call this @@ -220,7 +213,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { var c call for _, o := range opts { - err := o(a, &c) + err := o(&c) if err != nil { return nil, err } @@ -231,11 +224,9 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } - if !c.disablePreemptiveCapacityCheck { - if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { - // if we're not going to be able to run this call on this machine, bail here. - return nil, models.ErrCallTimeoutServerBusy - } + if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { + // if we're not going to be able to run this call on this machine, bail here. + return nil, models.ErrCallTimeoutServerBusy } c.da = a.da @@ -274,15 +265,14 @@ type call struct { ct callTrigger slots *slotQueue slotDeadline time.Time + lbDeadline time.Time execDeadline time.Time requestState RequestState containerState ContainerState - // This can be used to disable the preemptive capacity check in GetCall - disablePreemptiveCapacityCheck bool } -func (c *call) SlotDeadline() time.Time { - return c.slotDeadline +func (c *call) LbDeadline() time.Time { + return c.lbDeadline } func (c *call) RequestBody() io.ReadCloser { @@ -311,8 +301,11 @@ func (c *call) Start(ctx context.Context) error { c.StartedAt = strfmt.DateTime(time.Now()) c.Status = "running" - if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in - rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) + // Do not write this header if lb-agent + if c.lbDeadline.IsZero() { + if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in + rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) + } } if c.Type == models.TypeAsync { diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index 4c2e11dff..6dc2f4d46 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -74,6 +74,20 @@ type nopCloser struct { func (n *nopCloser) Close() error { return nil } +type nullReadWriter struct { + io.ReadCloser +} + +func (n *nullReadWriter) Close() error { + return nil +} +func (n *nullReadWriter) Read(b []byte) (int, error) { + return 0, io.EOF +} +func (n *nullReadWriter) Write(b []byte) (int, error) { + return 0, io.EOF +} + // multiWriteCloser returns the first write or close that returns a non-nil // err, if no non-nil err is returned, then the returned bytes written will be // from the last call to write. diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 6e833af76..babdcbd37 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -2,6 +2,8 @@ package agent import ( "context" + "errors" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -25,41 +27,89 @@ const ( ) type lbAgent struct { - delegatedAgent Agent - rp pool.RunnerPool - placer pool.Placer - shutWg *common.WaitGroup + cfg AgentConfig + da DataAccess + callListeners []fnext.CallListener + rp pool.RunnerPool + placer pool.Placer + + shutWg *common.WaitGroup + callEndCount int64 } // NewLBAgent creates an Agent that knows how to load-balance function calls // across a group of runner nodes. func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) { - wg := common.NewWaitGroup() - agent := createAgent(da, false, wg) + + // TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig + cfg, err := NewAgentConfig() + if err != nil { + logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) + } + logrus.Infof("lb-agent starting cfg=%+v", cfg) + a := &lbAgent{ - delegatedAgent: agent, - rp: rp, - placer: p, - shutWg: wg, + cfg: *cfg, + da: da, + rp: rp, + placer: p, + shutWg: common.NewWaitGroup(), } return a, nil } +func (a *lbAgent) AddCallListener(listener fnext.CallListener) { + a.callListeners = append(a.callListeners, listener) +} + +func (a *lbAgent) fireBeforeCall(ctx context.Context, call *models.Call) error { + return fireBeforeCallFun(a.callListeners, ctx, call) +} + +func (a *lbAgent) fireAfterCall(ctx context.Context, call *models.Call) error { + return fireAfterCallFun(a.callListeners, ctx, call) +} + // GetAppID is to get the match of an app name to its ID func (a *lbAgent) GetAppID(ctx context.Context, appName string) (string, error) { - return a.delegatedAgent.GetAppID(ctx, appName) + return a.da.GetAppID(ctx, appName) } // GetAppByID is to get the app by ID func (a *lbAgent) GetAppByID(ctx context.Context, appID string) (*models.App, error) { - return a.delegatedAgent.GetAppByID(ctx, appID) + return a.da.GetAppByID(ctx, appID) +} + +func (a *lbAgent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return a.da.GetRoute(ctx, appID, path) } -// GetCall delegates to the wrapped agent but disables the capacity check as -// this agent isn't actually running the call. func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { - opts = append(opts, WithoutPreemptiveCapacityCheck()) - return a.delegatedAgent.GetCall(opts...) + var c call + + for _, o := range opts { + err := o(&c) + if err != nil { + return nil, err + } + } + + // TODO typed errors to test + if c.req == nil || c.Call == nil { + return nil, errors.New("no model or request provided for call") + } + + c.da = a.da + c.ct = a + c.stderr = &nullReadWriter{} + + ctx, _ := common.LoggerWithFields(c.req.Context(), + logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path}) + c.req = c.req.WithContext(ctx) + + c.lbDeadline = time.Now().Add(time.Duration(c.Call.Timeout) * time.Second) + + return &c, nil } func (a *lbAgent) Close() error { @@ -67,27 +117,17 @@ func (a *lbAgent) Close() error { // start closing the front gate first ch := a.shutWg.CloseGroupNB() - // delegated agent shutdown next, blocks here... - err1 := a.delegatedAgent.Close() - if err1 != nil { - logrus.WithError(err1).Warn("Delegated agent shutdown error") - } - // finally shutdown the runner pool ctx, cancel := context.WithTimeout(context.Background(), runnerPoolShutdownTimeout) defer cancel() - err2 := a.rp.Shutdown(ctx) - if err2 != nil { - logrus.WithError(err2).Warn("Runner pool shutdown error") + err := a.rp.Shutdown(ctx) + if err != nil { + logrus.WithError(err).Warn("Runner pool shutdown error") } // gate-on front-gate, should be completed if delegated agent & runner pool is gone. <-ch - - if err1 != nil { - return err1 - } - return err2 + return err } func GetGroupID(call *models.Call) string { @@ -109,20 +149,20 @@ func (a *lbAgent) Submit(callI Call) error { call := callI.(*call) - ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline) + ctx, cancel := context.WithDeadline(call.req.Context(), call.lbDeadline) call.req = call.req.WithContext(ctx) defer cancel() ctx, span := trace.StartSpan(ctx, "agent_submit") defer span.End() - err := a.submit(ctx, call) - return err -} - -func (a *lbAgent) submit(ctx context.Context, call *call) error { statsEnqueue(ctx) + // first check any excess case of call.End() stacking. + if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) { + a.handleCallEnd(ctx, call, context.DeadlineExceeded, false) + } + err := call.Start(ctx) if err != nil { return a.handleCallEnd(ctx, call, err, false) @@ -130,6 +170,10 @@ func (a *lbAgent) submit(ctx context.Context, call *call) error { statsDequeueAndStart(ctx) + // WARNING: isStarted (handleCallEnd) semantics + // need some consideration here. Similar to runner/agent + // we consider isCommitted true if call.Start() succeeds. + // isStarted=true means we will call Call.End(). err = a.placer.PlaceCall(a.rp, ctx, call) if err != nil { logrus.WithError(err).Error("Failed to place call") @@ -138,16 +182,34 @@ func (a *lbAgent) submit(ctx context.Context, call *call) error { return a.handleCallEnd(ctx, call, err, true) } -func (a *lbAgent) AddCallListener(cl fnext.CallListener) { - a.delegatedAgent.AddCallListener(cl) -} - func (a *lbAgent) Enqueue(context.Context, *models.Call) error { logrus.Fatal("Enqueue not implemented. Panicking.") return nil } -func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isCommitted bool) error { - delegatedAgent := a.delegatedAgent.(*agent) - return delegatedAgent.handleCallEnd(ctx, call, nil, err, isCommitted) +func (a *lbAgent) scheduleCallEnd(fn func()) { + atomic.AddInt64(&a.callEndCount, 1) + go func() { + fn() + atomic.AddInt64(&a.callEndCount, -1) + a.shutWg.AddSession(-1) + }() +} + +func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error { + if isStarted { + a.scheduleCallEnd(func() { + ctx = common.BackgroundContext(ctx) + ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) + call.End(ctx, err) + cancel() + }) + + handleStatsEnd(ctx, err) + return transformTimeout(err, false) + } + + a.shutWg.AddSession(-1) + handleStatsDequeue(ctx, err) + return transformTimeout(err, true) } diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 0cfc0d81f..1702a54a3 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -117,15 +117,15 @@ func (r *mockRunner) Address() string { } type mockRunnerCall struct { - slotDeadline time.Time - r *http.Request - rw http.ResponseWriter - stdErr io.ReadWriteCloser - model *models.Call + lbDeadline time.Time + r *http.Request + rw http.ResponseWriter + stdErr io.ReadWriteCloser + model *models.Call } -func (c *mockRunnerCall) SlotDeadline() time.Time { - return c.slotDeadline +func (c *mockRunnerCall) LbDeadline() time.Time { + return c.lbDeadline } func (c *mockRunnerCall) RequestBody() io.ReadCloser { @@ -152,7 +152,7 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC func TestOneRunner(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { t.Fatalf("Failed to place call on runner %v", err) @@ -162,7 +162,7 @@ func TestOneRunner(t *testing.T) { func TestEnforceTimeoutFromContext(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} ctx, cancel := context.WithDeadline(context.Background(), time.Now()) defer cancel() err := placer.PlaceCall(rp, ctx, call) @@ -182,7 +182,7 @@ func TestSpilloverToSecondRunner(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) @@ -199,7 +199,7 @@ func TestSpilloverToSecondRunner(t *testing.T) { } } -func TestEnforceSlotTimeout(t *testing.T) { +func TestEnforceLbTimeout(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1) @@ -210,7 +210,7 @@ func TestEnforceSlotTimeout(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)} + call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} err := placer.PlaceCall(rp, context.Background(), call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) diff --git a/api/agent/listeners.go b/api/agent/listeners.go index 7855c5eba..934c4b474 100644 --- a/api/agent/listeners.go +++ b/api/agent/listeners.go @@ -17,7 +17,15 @@ func (a *agent) AddCallListener(listener fnext.CallListener) { } func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { - for _, l := range a.callListeners { + return fireBeforeCallFun(a.callListeners, ctx, call) +} + +func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { + return fireAfterCallFun(a.callListeners, ctx, call) +} + +func fireBeforeCallFun(callListeners []fnext.CallListener, ctx context.Context, call *models.Call) error { + for _, l := range callListeners { err := l.BeforeCall(ctx, call) if err != nil { return err @@ -26,8 +34,8 @@ func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { return nil } -func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { - for _, l := range a.callListeners { +func fireAfterCallFun(callListeners []fnext.CallListener, ctx context.Context, call *models.Call) error { + for _, l := range callListeners { err := l.AfterCall(ctx, call) if err != nil { return err diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 956cf536c..65816fd29 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -521,6 +521,10 @@ func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) { return pr.a.GetCall(opts...) } +func (pr *pureRunner) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { + return pr.a.GetRoute(ctx, appID, path) +} + func (pr *pureRunner) Submit(Call) error { return errors.New("Submit cannot be called directly in a Pure Runner.") } @@ -671,7 +675,7 @@ func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, ce } func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error) { - a := createAgent(da, true, nil) + a := createAgent(da) var pr *pureRunner var err error if cert != "" && key != "" && ca != "" { diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index a47d4afda..0ef031eae 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -28,7 +28,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) - timeout := time.After(call.SlotDeadline().Sub(time.Now())) + timeout := time.After(call.LbDeadline().Sub(time.Now())) for { select { @@ -57,7 +57,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall } } - remaining := call.SlotDeadline().Sub(time.Now()) + remaining := call.LbDeadline().Sub(time.Now()) if remaining <= 0 { return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index 00b146b7b..65ca543e0 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -23,7 +23,7 @@ func NewNaivePlacer() Placer { } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - timeout := time.After(call.SlotDeadline().Sub(time.Now())) + timeout := time.After(call.LbDeadline().Sub(time.Now())) for { select { @@ -47,7 +47,7 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner } } - remaining := call.SlotDeadline().Sub(time.Now()) + remaining := call.LbDeadline().Sub(time.Now()) if remaining <= 0 { return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 9f49a528d..cc740f5de 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -41,7 +41,7 @@ type Runner interface { // RunnerCall provides access to the necessary details of request in order for it to be // processed by a RunnerPool type RunnerCall interface { - SlotDeadline() time.Time + LbDeadline() time.Time RequestBody() io.ReadCloser ResponseWriter() http.ResponseWriter StdErr() io.ReadWriteCloser diff --git a/api/server/runner.go b/api/server/runner.go index 795878005..dc7a9e200 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -75,7 +75,7 @@ func (s *Server) serve(c *gin.Context, app *models.App, path string) error { call, err := s.agent.GetCall( agent.WithWriter(&writer), // XXX (reed): order matters [for now] - agent.FromRequest(app, path, c.Request), + agent.FromRequest(s.agent, app, path, c.Request), ) if err != nil { return err