From 182db94fad6abec9c038107a98879568921dd8cb Mon Sep 17 00:00:00 2001 From: Andrea Rosa Date: Fri, 9 Nov 2018 18:25:43 +0000 Subject: [PATCH] Feature/acksync response writer (#1267) This implements a "detached" mechanism to get an ack from the runner once it actually starts to run a function. In this scenario the response returned back is just a 202 if we placed the function in a specific time-frame. If we hit some errors or we fail to place the fn in time we return back different errors. --- api/agent/call.go | 10 +- api/agent/config.go | 5 + api/agent/lb_agent.go | 70 +++++++++- api/agent/lb_agent_test.go | 43 ++++-- api/agent/pure_runner.go | 129 +++++++++++++++--- api/models/call.go | 2 + api/models/error.go | 10 ++ api/runnerpool/ch_placer.go | 9 +- api/runnerpool/fake_placer.go | 34 +++++ api/runnerpool/naive_placer.go | 7 +- api/runnerpool/placer_config.go | 8 +- api/runnerpool/placer_tracker.go | 10 +- api/runnerpool/runner_pool.go | 3 +- api/server/runner_fninvoke.go | 54 ++++++-- test/fn-system-tests/exec_fn_test.go | 110 ++++++++++++++- .../exec_runner_status_test.go | 6 +- test/fn-system-tests/system_test.go | 1 + 17 files changed, 453 insertions(+), 58 deletions(-) create mode 100644 api/runnerpool/fake_placer.go diff --git a/api/agent/call.go b/api/agent/call.go index a23bf2815..1b8bc5463 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -59,7 +59,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp ID: id, Image: fn.Image, // Delay: 0, - Type: "sync", + Type: models.TypeSync, // Payload: TODO, Priority: new(int32), // TODO this is crucial, apparently Timeout: fn.Timeout, @@ -172,6 +172,14 @@ func WithWriter(w io.Writer) CallOpt { } } +// InvokeDetached mark a call to be a detached call +func InvokeDetached() CallOpt { + return func(c *call) error { + c.Model().Type = models.TypeDetached + return nil + } +} + // WithContext overrides the context on the call func WithContext(ctx context.Context) CallOpt { return func(c *call) error { diff --git a/api/agent/config.go b/api/agent/config.go index 2c6a0df89..f2578c93a 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -18,6 +18,7 @@ type Config struct { HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` HotStartTimeout time.Duration `json:"hot_start_timeout_msecs"` AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` + DetachedHeadRoom time.Duration `json:"detached_head_room_msecs"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` @@ -94,6 +95,9 @@ const ( // EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files EnvIOFSOpts = "FN_IOFS_OPTS" + // EnvDetachedHeadroom is the extra room we want to give to a detached function to run. + EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM" + // MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever MaxMsDisabled = time.Duration(math.MaxInt64) @@ -128,6 +132,7 @@ func NewConfig() (*Config, error) { err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(10)*time.Minute) err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) + err = setEnvMsecs(err, EnvDetachedHeadroom, &cfg.DetachedHeadRoom, time.Duration(360)*time.Second) err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize) err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize) err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index d34f1e283..4d9657607 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -6,6 +6,8 @@ import ( "errors" "io" "io/ioutil" + "net/http" + "time" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -26,6 +28,39 @@ type lbAgent struct { shutWg *common.WaitGroup } +type DetachedResponseWriter struct { + Headers http.Header + status int + acked chan struct{} +} + +func (w *DetachedResponseWriter) Header() http.Header { + return w.Headers +} + +func (w *DetachedResponseWriter) Write(data []byte) (int, error) { + return len(data), nil +} + +func (w *DetachedResponseWriter) WriteHeader(statusCode int) { + w.status = statusCode + w.acked <- struct{}{} +} + +func (w *DetachedResponseWriter) Status() int { + return w.status +} + +func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter { + return &DetachedResponseWriter{ + Headers: h, + status: statusCode, + acked: make(chan struct{}, 1), + } +} + +var _ http.ResponseWriter = new(DetachedResponseWriter) // keep the compiler happy + type LBAgentOption func(*lbAgent) error func WithLBAgentConfig(cfg *Config) LBAgentOption { @@ -177,10 +212,43 @@ func (a *lbAgent) Submit(callI Call) error { statsDequeue(ctx) statsStartRun(ctx) - err = a.placer.PlaceCall(a.rp, ctx, call) + if call.Type == models.TypeDetached { + return a.placeDetachCall(ctx, call) + } + return a.placeCall(ctx, call) +} + +func (a *lbAgent) placeDetachCall(ctx context.Context, call *call) error { + errPlace := make(chan error, 1) + rw := call.w.(*DetachedResponseWriter) + go a.spawnPlaceCall(ctx, call, errPlace) + select { + case err := <-errPlace: + return err + case <-rw.acked: + return nil + } +} + +func (a *lbAgent) placeCall(ctx context.Context, call *call) error { + err := a.placer.PlaceCall(ctx, a.rp, call) return a.handleCallEnd(ctx, call, err, true) } +func (a *lbAgent) spawnPlaceCall(ctx context.Context, call *call, errCh chan error) { + var cancel func() + ctx = common.BackgroundContext(ctx) + cfg := a.placer.GetPlacerConfig() + + // PlacerTimeout for Detached + call.Timeout (inside container) + headroom for docker-pull, gRPC network retrasmit etc.) + newCtxTimeout := cfg.DetachedPlacerTimeout + time.Duration(call.Timeout)*time.Second + a.cfg.DetachedHeadRoom + ctx, cancel = context.WithTimeout(ctx, newCtxTimeout) + defer cancel() + + err := a.placer.PlaceCall(ctx, a.rp, call) + errCh <- a.handleCallEnd(ctx, call, err, true) +} + // setRequestGetBody sets GetBody function on the given http.Request if it is missing. GetBody allows // reading from the request body without mutating the state of the request. func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer, error) { diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index fa5dd51cd..dc76677f3 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -160,10 +160,11 @@ func TestOneRunner(t *testing.T) { cfg := pool.NewPlacerConfig() placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{} + modelCall := &models.Call{Type: models.TypeSync} + call := &mockRunnerCall{model: modelCall} ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) defer cancel() - err := placer.PlaceCall(rp, ctx, call) + err := placer.PlaceCall(ctx, rp, call) if err != nil { t.Fatalf("Failed to place call on runner %v", err) } @@ -173,15 +174,37 @@ func TestEnforceTimeoutFromContext(t *testing.T) { cfg := pool.NewPlacerConfig() placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{} + + modelCall := &models.Call{Type: models.TypeSync} + call := &mockRunnerCall{model: modelCall} + ctx, cancel := context.WithDeadline(context.Background(), time.Now()) defer cancel() - err := placer.PlaceCall(rp, ctx, call) + err := placer.PlaceCall(ctx, rp, call) if err == nil { t.Fatal("Call should have timed out") } } +func TestDetachedPlacerTimeout(t *testing.T) { + // In this test we set the detached placer timeout to a value lower than the request timeout (call.Timeout) + // the fake placer will just sleep for a time greater of the detached placement timeout and it will return + // the right error only if the detached timeout exceeds but the request timeout is still valid + cfg := pool.NewPlacerConfig() + cfg.DetachedPlacerTimeout = 300 * time.Millisecond + placer := pool.NewFakeDetachedPlacer(&cfg, 400*time.Millisecond) + + modelCall := &models.Call{Type: models.TypeDetached} + call := &mockRunnerCall{model: modelCall} + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + defer cancel() + err := placer.PlaceCall(ctx, nil, call) + if err == nil { + t.Fatal("Detached call should have time out because of the expiration of the placement timeout") + } + +} + func TestRRRunner(t *testing.T) { cfg := pool.NewPlacerConfig() placer := pool.NewNaivePlacer(&cfg) @@ -196,8 +219,10 @@ func TestRRRunner(t *testing.T) { defer wg.Done() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) defer cancel() - call := &mockRunnerCall{} - err := placer.PlaceCall(rp, ctx, call) + modelCall := &models.Call{Type: models.TypeSync} + call := &mockRunnerCall{model: modelCall} + + err := placer.PlaceCall(ctx, rp, call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) } @@ -231,8 +256,10 @@ func TestEnforceLbTimeout(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) defer cancel() - call := &mockRunnerCall{} - err := placer.PlaceCall(rp, ctx, call) + modelCall := &models.Call{Type: models.TypeSync} + call := &mockRunnerCall{model: modelCall} + + err := placer.PlaceCall(ctx, rp, call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) } diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 90b4492e3..720628346 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -44,16 +44,25 @@ import ( The flow of events is as follows: - 1) LB sends ClientMsg_Try to runner - 2) Runner allocates its resources and sends an ACK: RunnerMsg_Acknowledged - 3) LB sends ClientMsg_Data messages with an EOF for last message set. - 4) Runner upon receiving with ClientMsg_Data calls agent.Submit() - 5) agent.Submit starts reading data from callHandle io.PipeReader, this reads - data from LB via gRPC receiver (inQueue). - 6) agent.Submit starts sending data via callHandle http.ResponseWriter interface, - which is pushed to gRPC sender (outQueue) to the LB. - 7) agent.Submit() completes, this means, the Function I/O is now completed. - 8) Runner finalizes gRPC session with RunnerMsg_Finished to LB. + LB: + + 1) LB sends ClientMsg_TryCall to runner + 2) LB sends ClientMsg_DataFrame messages with an EOF for last message set. + 3) LB receives RunnerMsg_CallResultStart for http status and headers + 4) LB receives RunnerMsg_DataFrame messages for http body with an EOF for last message set. + 8) LB receives RunnerMsg_CallFinished as the final message. + + LB can be interrupted with RunnerMsg_CallFinished anytime. If this is a NACK, presence of 503 + means LB can retry the call. + + Runner: + + 1) Runner upon receiving ClientMsg_TryCall calls agent.Submit() + 2) Runner allocates its resources but can send a NACK: RunnerMsg_Finished if it cannot service the call in time. + 3) agent.Submit starts reading data from callHandle io.PipeReader, this reads + data from LB via gRPC receiver (inQueue). The http reader detects headers/data + and sends RunnerMsg_CallResultStart and/or RunnerMsg_DataFrame messages to LB. + 4) agent.Submit() completes, this means, the Function I/O is now completed. Runner sends RunnerMsg_Finished */ @@ -199,6 +208,25 @@ func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error { return err } +func (ch *callHandle) enqueueDetached(err error) { + statusCode := http.StatusAccepted + if err != nil { + if models.IsAPIError(err) { + statusCode = models.GetAPIErrorCode(err) + } else { + statusCode = http.StatusInternalServerError + } + } + + err = ch.enqueueMsg(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_ResultStart{ + ResultStart: &runner.CallResultStart{ + Meta: &runner.CallResultStart_Http{ + Http: &runner.HttpRespMeta{ + Headers: ch.prepHeaders(), + StatusCode: int32(statusCode)}}}}}) +} + // enqueueCallResponse enqueues a Submit() response to the LB // and initiates a graceful shutdown of the session. func (ch *callHandle) enqueueCallResponse(err error) { @@ -385,6 +413,10 @@ func (ch *callHandle) prepHeaders() []*runner.HttpHeader { // received data is pushed to LB via gRPC sender queue. // Write also sends http headers/state to the LB. func (ch *callHandle) Write(data []byte) (int, error) { + if ch.c.Model().Type == models.TypeDetached { + //If it is an detached call we just /dev/null the data coming back from the container + return len(data), nil + } var err error ch.headerOnce.Do(func() { // WARNING: we do fetch Status and Headers without @@ -410,7 +442,6 @@ func (ch *callHandle) Write(data []byte) (int, error) { if err != nil { return 0, err } - total := 0 // split up data into gRPC chunks for { @@ -526,10 +557,13 @@ type statusTracker struct { // pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it // and provides the gRPC server that implements the LB <-> Runner protocol. type pureRunner struct { - gRPCServer *grpc.Server - creds credentials.TransportCredentials - a Agent - status statusTracker + gRPCServer *grpc.Server + creds credentials.TransportCredentials + a Agent + status statusTracker + callHandleMap map[string]*callHandle + callHandleLock sync.Mutex + enableDetach bool } // implements Agent @@ -559,6 +593,19 @@ func (pr *pureRunner) AddCallListener(cl fnext.CallListener) { pr.a.AddCallListener(cl) } +func (pr *pureRunner) saveCallHandle(ch *callHandle) { + pr.callHandleLock.Lock() + pr.callHandleMap[ch.c.Model().ID] = ch + pr.callHandleLock.Unlock() +} + +func (pr *pureRunner) removeCallHandle(cID string) { + pr.callHandleLock.Lock() + delete(pr.callHandleMap, cID) + pr.callHandleLock.Unlock() + +} + func (pr *pureRunner) spawnSubmit(state *callHandle) { go func() { err := pr.a.Submit(state.c) @@ -566,6 +613,15 @@ func (pr *pureRunner) spawnSubmit(state *callHandle) { }() } +func (pr *pureRunner) spawnDetachSubmit(state *callHandle) { + go func() { + pr.saveCallHandle(state) + err := pr.a.Submit(state.c) + pr.removeCallHandle(state.c.Model().ID) + state.enqueueCallResponse(err) + }() +} + // handleTryCall based on the TryCall message, tries to place the call on NBIO Agent func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error { @@ -612,8 +668,17 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error } state.c.slotHashId = string(hashId[:]) } - pr.spawnSubmit(state) + if state.c.Type == models.TypeDetached { + if !pr.enableDetach { + err = models.ErrDetachUnsupported + state.enqueueCallResponse(err) + return err + } + pr.spawnDetachSubmit(state) + return nil + } + pr.spawnSubmit(state) return nil } @@ -862,6 +927,29 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne return pr.handleStatusCall(ctx) } +// BeforeCall called before a function is executed +func (pr *pureRunner) BeforeCall(ctx context.Context, call *models.Call) error { + if call.Type != models.TypeDetached { + return nil + } + var err error + // it is an ack sync we send ResultStart message back + pr.callHandleLock.Lock() + ch := pr.callHandleMap[call.ID] + pr.callHandleLock.Unlock() + if ch == nil { + err = models.ErrCallHandlerNotFound + return err + } + ch.enqueueDetached(err) + return nil +} + +// AfterCall called after a funcion is executed +func (pr *pureRunner) AfterCall(ctx context.Context, call *models.Call) error { + return nil +} + func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error) { agent := New(da) @@ -907,6 +995,14 @@ func PureRunnerWithStatusImage(imgName string) PureRunnerOption { } } +func PureRunnerWithDetached() PureRunnerOption { + return func(pr *pureRunner) error { + pr.AddCallListener(pr) + pr.enableDetach = true + return nil + } +} + func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) { pr := &pureRunner{} @@ -933,6 +1029,7 @@ func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunner logrus.Warn("Running pure runner in insecure mode!") } + pr.callHandleMap = make(map[string]*callHandle) pr.gRPCServer = grpc.NewServer(opts...) runner.RegisterRunnerProtocolServer(pr.gRPCServer, pr) diff --git a/api/models/call.go b/api/models/call.go index 2dd273eba..c8b374af3 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -14,6 +14,8 @@ const ( TypeSync = "sync" // TypeAsync ... TypeAsync = "async" + // TypeDetached is used for calls which return an ack to the caller as soon as the call starts + TypeDetached = "detached" ) var possibleStatuses = [...]string{"delayed", "queued", "running", "success", "error", "cancelled"} diff --git a/api/models/error.go b/api/models/error.go index 9326858fb..fa9b18d9e 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -156,6 +156,16 @@ var ( error: errors.New("Async functions are not supported on this server"), } + ErrDetachUnsupported = err{ + code: http.StatusNotImplemented, + error: errors.New("Detach call functions are not supported on this server"), + } + + ErrCallHandlerNotFound = err{ + code: http.StatusInternalServerError, + error: errors.New("Unable to find the call handle"), + } + // TODO consider removal. see rationale at uses, or remove if none. ErrContainerExitedEarly = err{ code: http.StatusBadGateway, diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index 84c6c2dd2..708518029 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -23,12 +23,15 @@ func NewCHPlacer(cfg *PlacerConfig) Placer { } } +func (p *chPlacer) GetPlacerConfig() PlacerConfig { + return p.cfg +} + // This borrows the CH placement algorithm from the original FNLB. // Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use // the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it. -func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - - state := NewPlacerTracker(ctx, &p.cfg) +func (p *chPlacer) PlaceCall(ctx context.Context, rp RunnerPool, call RunnerCall) error { + state := NewPlacerTracker(ctx, &p.cfg, call) defer state.HandleDone() key := call.Model().FnID diff --git a/api/runnerpool/fake_placer.go b/api/runnerpool/fake_placer.go new file mode 100644 index 000000000..24ba24c11 --- /dev/null +++ b/api/runnerpool/fake_placer.go @@ -0,0 +1,34 @@ +package runnerpool + +import ( + "context" + "time" +) + +type fakeDetachedPlacer struct { + cfg PlacerConfig + sleeptime time.Duration +} + +func NewFakeDetachedPlacer(cfg *PlacerConfig, st time.Duration) Placer { + return &fakeDetachedPlacer{ + cfg: *cfg, + sleeptime: st, + } +} + +func (p *fakeDetachedPlacer) GetPlacerConfig() PlacerConfig { + return p.cfg +} + +// PlaceCall for the fakeDetachedPlacer just sleeps for a period of time to let the placer context to time out. +// It returns the context exceeded error only if the placer context times out and the request context is still valid +func (p *fakeDetachedPlacer) PlaceCall(ctx context.Context, rp RunnerPool, call RunnerCall) error { + state := NewPlacerTracker(ctx, &p.cfg, call) + defer state.HandleDone() + time.Sleep(p.sleeptime) + if state.placerCtx.Err() != nil && state.requestCtx.Err() == nil { + return state.placerCtx.Err() + } + return nil +} diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index d8a65caf2..5e04690c7 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -23,9 +23,12 @@ func NewNaivePlacer(cfg *PlacerConfig) Placer { } } -func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { +func (sp *naivePlacer) GetPlacerConfig() PlacerConfig { + return sp.cfg +} - state := NewPlacerTracker(ctx, &sp.cfg) +func (sp *naivePlacer) PlaceCall(ctx context.Context, rp RunnerPool, call RunnerCall) error { + state := NewPlacerTracker(ctx, &sp.cfg, call) defer state.HandleDone() var runnerPoolErr error diff --git a/api/runnerpool/placer_config.go b/api/runnerpool/placer_config.go index 77346f3b5..0f586227f 100644 --- a/api/runnerpool/placer_config.go +++ b/api/runnerpool/placer_config.go @@ -11,11 +11,15 @@ type PlacerConfig struct { // Maximum amount of time a placer can hold a request during runner attempts PlacerTimeout time.Duration `json:"placer_timeout"` + + // Maximum amount of time a placer can hold an ack sync request during runner attempts + DetachedPlacerTimeout time.Duration `json:"detached_placer_timeout"` } func NewPlacerConfig() PlacerConfig { return PlacerConfig{ - RetryAllDelay: 10 * time.Millisecond, - PlacerTimeout: 360 * time.Second, + RetryAllDelay: 10 * time.Millisecond, + PlacerTimeout: 360 * time.Second, + DetachedPlacerTimeout: 30 * time.Second, } } diff --git a/api/runnerpool/placer_tracker.go b/api/runnerpool/placer_tracker.go index 55a292db6..208a347cb 100644 --- a/api/runnerpool/placer_tracker.go +++ b/api/runnerpool/placer_tracker.go @@ -19,8 +19,14 @@ type placerTracker struct { isPlaced bool } -func NewPlacerTracker(requestCtx context.Context, cfg *PlacerConfig) *placerTracker { - ctx, cancel := context.WithTimeout(context.Background(), cfg.PlacerTimeout) +func NewPlacerTracker(requestCtx context.Context, cfg *PlacerConfig, call RunnerCall) *placerTracker { + + timeout := cfg.PlacerTimeout + if call.Model().Type == models.TypeDetached { + timeout = cfg.DetachedPlacerTimeout + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) return &placerTracker{ cfg: cfg, requestCtx: requestCtx, diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 9b5ab9cb4..70abbb33a 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -13,7 +13,8 @@ import ( // Placer implements a placement strategy for calls that are load-balanced // across runners in a pool type Placer interface { - PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error + PlaceCall(ctx context.Context, rp RunnerPool, call RunnerCall) error + GetPlacerConfig() PlacerConfig } // RunnerPool is the abstraction for getting an ordered list of runners to try for a call diff --git a/api/server/runner_fninvoke.go b/api/server/runner_fninvoke.go index 5c45d0865..ce3e6e079 100644 --- a/api/server/runner_fninvoke.go +++ b/api/server/runner_fninvoke.go @@ -19,6 +19,12 @@ var ( bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} ) +// ResponseBuffer implements http.ResponseWriter +type ResponseBuffer interface { + http.ResponseWriter + Status() int +} + // implements http.ResponseWriter // this little guy buffers responses from user containers and lets them still // set headers and such without us risking writing partial output [as much, the @@ -34,6 +40,7 @@ var _ http.ResponseWriter = new(syncResponseWriter) // nice compiler errors func (s *syncResponseWriter) Header() http.Header { return s.headers } func (s *syncResponseWriter) WriteHeader(code int) { s.status = code } +func (s *syncResponseWriter) Status() int { return s.status } // handleFnInvokeCall executes the function, for router handlers func (s *Server) handleFnInvokeCall(c *gin.Context) { @@ -49,8 +56,6 @@ func (s *Server) handleFnInvokeCall(c *gin.Context) { // handleTriggerHTTPFunctionCall2 executes the function and returns an error // Requires the following in the context: func (s *Server) handleFnInvokeCall2(c *gin.Context) error { - // log := common.Logger(c.Request.Context()) - fn, err := s.lbReadAccess.GetFnByID(c, c.Param(api.ParamFnID)) if err != nil { return err @@ -73,19 +78,19 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode // buffer the response before writing it out to client to prevent partials from trying to stream buf := bufPool.Get().(*bytes.Buffer) buf.Reset() - writer := syncResponseWriter{ - headers: resp.Header(), - status: 200, - Buffer: buf, - } + var writer ResponseBuffer - opts := []agent.CallOpt{ - agent.WithWriter(&writer), // XXX (reed): order matters [for now] - agent.FromHTTPFnRequest(app, fn, req), - } - if trig != nil { - opts = append(opts, agent.WithTrigger(trig)) + isDetached := req.Header.Get("Fn-Invoke-Type") == models.TypeDetached + if isDetached { + writer = agent.NewDetachedResponseWriter(resp.Header(), 202) + } else { + writer = &syncResponseWriter{ + headers: resp.Header(), + status: 200, + Buffer: buf, + } } + opts := getCallOptions(req, app, fn, trig, writer) call, err := s.agent.GetCall(opts...) if err != nil { @@ -102,11 +107,30 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode writer.Header().Add("Fn-Call-Id", call.Model().ID) // XXX(reed): move to before Submit when adding streaming // buffered response writer traps status (so we can add headers), we need to write it still - if writer.status > 0 { - resp.WriteHeader(writer.status) + if writer.Status() > 0 { + resp.WriteHeader(writer.Status()) + } + + if isDetached { + return nil } io.Copy(resp, buf) bufPool.Put(buf) // at this point, submit returned without timing out, so we can re-use this one return nil } + +func getCallOptions(req *http.Request, app *models.App, fn *models.Fn, trig *models.Trigger, rw http.ResponseWriter) []agent.CallOpt { + var opts []agent.CallOpt + opts = append(opts, agent.WithWriter(rw)) // XXX (reed): order matters [for now] + opts = append(opts, agent.FromHTTPFnRequest(app, fn, req)) + + if req.Header.Get("Fn-Invoke-Type") == models.TypeDetached { + opts = append(opts, agent.InvokeDetached()) + } + + if trig != nil { + opts = append(opts, agent.WithTrigger(trig)) + } + return opts +} diff --git a/test/fn-system-tests/exec_fn_test.go b/test/fn-system-tests/exec_fn_test.go index e0da2803b..a19f68164 100644 --- a/test/fn-system-tests/exec_fn_test.go +++ b/test/fn-system-tests/exec_fn_test.go @@ -52,7 +52,7 @@ func TestCanExecuteFunction(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { t.Fatalf("Got unexpected error: %v", err) } @@ -80,6 +80,47 @@ func TestCanExecuteFunction(t *testing.T) { } } +func TestCanExecuteDetachedFunction(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + app := &models.App{Name: id.New().String()} + app = ensureApp(t, app) + + fn := &models.Fn{ + AppID: app.ID, + Name: id.New().String(), + Image: image, + ResourceConfig: models.ResourceConfig{ + Memory: memory, + }, + } + fn = ensureFn(t, fn) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "invoke", fn.ID) + + body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` + content := bytes.NewBuffer([]byte(body)) + output := &bytes.Buffer{} + + resp, err := callFN(ctx, u.String(), content, output, models.TypeDetached) + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } +} + func TestCanExecuteBigOutput(t *testing.T) { buf := setLogBuffer() defer func() { @@ -118,7 +159,7 @@ func TestCanExecuteBigOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { t.Fatalf("Got unexpected error: %v", err) } @@ -173,7 +214,7 @@ func TestCanExecuteTooBigOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { t.Fatalf("Got unexpected error: %v", err) } @@ -228,7 +269,7 @@ func TestCanExecuteEmptyOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { t.Fatalf("Got unexpected error: %v", err) } @@ -286,7 +327,7 @@ func TestBasicConcurrentExecution(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} <-latch - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { results <- fmt.Errorf("Got unexpected error: %v", err) return @@ -313,3 +354,62 @@ func TestBasicConcurrentExecution(t *testing.T) { } } } + +func TestBasicConcurrentDetachedExecution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + app := &models.App{Name: id.New().String()} + app = ensureApp(t, app) + + fn := &models.Fn{ + AppID: app.ID, + Name: id.New().String(), + Image: image, + ResourceConfig: models.ResourceConfig{ + Memory: memory, + }, + } + fn = ensureFn(t, fn) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "invoke", fn.ID) + + results := make(chan error) + latch := make(chan struct{}) + concurrentFuncs := 10 + for i := 0; i < concurrentFuncs; i++ { + go func() { + body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` + content := bytes.NewBuffer([]byte(body)) + output := &bytes.Buffer{} + <-latch + resp, err := callFN(ctx, u.String(), content, output, models.TypeDetached) + if err != nil { + results <- fmt.Errorf("Got unexpected error: %v", err) + return + } + + if resp.StatusCode != http.StatusAccepted { + results <- fmt.Errorf("StatusCode check failed on %v", resp.StatusCode) + return + } + + results <- nil + }() + } + close(latch) + for i := 0; i < concurrentFuncs; i++ { + err := <-results + if err != nil { + t.Fatalf("Error in basic concurrency execution test: %v", err) + } + } +} diff --git a/test/fn-system-tests/exec_runner_status_test.go b/test/fn-system-tests/exec_runner_status_test.go index 6250f4e8c..4247963d1 100644 --- a/test/fn-system-tests/exec_runner_status_test.go +++ b/test/fn-system-tests/exec_runner_status_test.go @@ -16,14 +16,16 @@ import ( "github.com/fnproject/fn/api/runnerpool" ) -func callFN(ctx context.Context, u string, content io.Reader, output io.Writer) (*http.Response, error) { +func callFN(ctx context.Context, u string, content io.Reader, output io.Writer, invokeType string) (*http.Response, error) { method := "POST" req, err := http.NewRequest(method, u, content) + req.Header.Set("Fn-Invoke-Type", invokeType) if err != nil { return nil, fmt.Errorf("error running fn: %s", err) } req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) @@ -78,7 +80,7 @@ func TestCannotExecuteStatusImage(t *testing.T) { content := bytes.NewBuffer([]byte(`status`)) output := &bytes.Buffer{} - resp, err := callFN(ctx, u.String(), content, output) + resp, err := callFN(ctx, u.String(), content, output, models.TypeSync) if err != nil { t.Fatalf("Got unexpected error: %v", err) } diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 19844af88..5a66e24e9 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -288,6 +288,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, agent.PureRunnerWithAgent(innerAgent), agent.PureRunnerWithStatusImage(StatusImage), + agent.PureRunnerWithDetached(), ) if err != nil { return nil, err