diff --git a/api/agent/agent.go b/api/agent/agent.go index 6d12c3eae..1d6b5a28a 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -228,10 +228,7 @@ func (a *agent) Submit(callI Call) error { call := callI.(*call) - ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline) - call.req = call.req.WithContext(ctx) - defer cancel() - + ctx := call.req.Context() ctx, span := trace.StartSpan(ctx, "agent_submit") defer span.End() @@ -277,7 +274,11 @@ func (a *agent) submit(ctx context.Context, call *call) error { statsDequeueAndStart(ctx) - // pass this error (nil or otherwise) to end directly, to store status, etc + // We are about to execute the function, set container Exec Deadline (call.Timeout) + ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) + defer cancel() + + // Pass this error (nil or otherwise) to end directly, to store status, etc. err = slot.exec(ctx, call) return a.handleCallEnd(ctx, call, slot, err, true) } @@ -383,9 +384,17 @@ func handleStatsEnd(ctx context.Context, err error) { // request type, this may launch a new container or wait for other containers to become idle // or it may wait for resources to become available to launch a new container. func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { - // start the deadline context for waiting for slots - ctx, cancel := context.WithDeadline(ctx, call.slotDeadline) - defer cancel() + if call.Type == models.TypeAsync { + // *) for async, slot deadline is also call.Timeout. This is because we would like to + // allocate enough time for docker-pull, slot-wait, docker-start, etc. + // and also make sure we have call.Timeout inside the container. Total time + // to run an async becomes 2 * call.Timeout. + // *) for sync, there's no slot deadline, the timeout is controlled by http-client + // context (or runner gRPC context) + tmp, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) + ctx = tmp + defer cancel() + } ctx, span := trace.StartSpan(ctx, "agent_get_slot") defer span.End() @@ -721,7 +730,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { errApp := make(chan error, 1) go func() { - ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req) + ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req.WithContext(ctx)) errApp <- proto.Dispatch(ctx, ci, call.w) }() @@ -752,8 +761,10 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) + deadline := time.Now().Add(time.Duration(call.Timeout) * time.Second) + // add Fn-specific information to the config to shove everything into env vars for cold - call.Config["FN_DEADLINE"] = strfmt.DateTime(call.execDeadline).String() + call.Config["FN_DEADLINE"] = strfmt.DateTime(deadline).String() call.Config["FN_METHOD"] = call.Model().Method call.Config["FN_REQUEST_URL"] = call.Model().URL call.Config["FN_CALL_ID"] = call.Model().ID diff --git a/api/agent/call.go b/api/agent/call.go index 4a39dd264..6febd115e 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -254,40 +254,36 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { } mem := c.Memory + uint64(c.TmpFsSize) - if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { // if we're not going to be able to run this call on this machine, bail here. return nil, models.ErrCallTimeoutServerBusy } + err := setMaxBodyLimit(&a.cfg, &c) if err != nil { return nil, err } + setupCtx(&c) + c.da = a.da c.ct = a - - ctx, _ := common.LoggerWithFields(c.req.Context(), - logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path}) - c.req = c.req.WithContext(ctx) - - c.stderr = setupLogger(ctx, a.cfg.MaxLogSize, c.Call) + c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, c.Call) if c.w == nil { // send STDOUT to logs if no writer given (async...) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) c.w = c.stderr } - now := time.Now() - slotDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second / 2) - execDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second) - - c.slotDeadline = slotDeadline - c.execDeadline = execDeadline - return &c, nil } +func setupCtx(c *call) { + ctx, _ := common.LoggerWithFields(c.req.Context(), + logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path}) + c.req = c.req.WithContext(ctx) +} + func setMaxBodyLimit(cfg *AgentConfig, c *call) error { if cfg.MaxRequestSize > 0 && c.req.ContentLength > 0 && uint64(c.req.ContentLength) > cfg.MaxRequestSize { return models.ErrRequestContentTooBig @@ -310,16 +306,10 @@ type call struct { stderr io.ReadWriteCloser ct callTrigger slots *slotQueue - slotDeadline time.Time - lbDeadline time.Time - execDeadline time.Time requestState RequestState containerState ContainerState slotHashId string -} - -func (c *call) LbDeadline() time.Time { - return c.lbDeadline + isLB bool } func (c *call) SlotHashId() string { @@ -358,8 +348,7 @@ func (c *call) Start(ctx context.Context) error { c.StartedAt = strfmt.DateTime(time.Now()) c.Status = "running" - // Do not write this header if lb-agent - if c.lbDeadline.IsZero() { + if !c.isLB { if rw, ok := c.w.(http.ResponseWriter); ok { // TODO need to figure out better way to wire response headers in rw.Header().Set("XXX-FXLB-WAIT", time.Time(c.StartedAt).Sub(time.Time(c.CreatedAt)).String()) } diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 29efa5a6e..6b889f5e7 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "sync/atomic" - "time" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -104,20 +103,18 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { if c.req == nil || c.Call == nil { return nil, errors.New("no model or request provided for call") } + err := setMaxBodyLimit(&a.cfg, &c) if err != nil { return nil, err } + setupCtx(&c) + + c.isLB = true c.da = a.da c.ct = a c.stderr = &nullReadWriter{} - - ctx, _ := common.LoggerWithFields(c.req.Context(), - logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path}) - c.req = c.req.WithContext(ctx) - - c.lbDeadline = time.Now().Add(time.Duration(c.Call.Timeout) * time.Second) c.slotHashId = getSlotQueueKey(&c) return &c, nil @@ -157,12 +154,7 @@ func (a *lbAgent) Submit(callI Call) error { } call := callI.(*call) - - ctx, cancel := context.WithDeadline(call.req.Context(), call.lbDeadline) - call.req = call.req.WithContext(ctx) - defer cancel() - - ctx, span := trace.StartSpan(ctx, "agent_submit") + ctx, span := trace.StartSpan(call.req.Context(), "agent_submit") defer span.End() statsEnqueue(ctx) diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 72084a823..62e45ac1f 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -117,7 +117,6 @@ func (r *mockRunner) Address() string { } type mockRunnerCall struct { - lbDeadline time.Time r *http.Request rw http.ResponseWriter stdErr io.ReadWriteCloser @@ -125,10 +124,6 @@ type mockRunnerCall struct { slotHashId string } -func (c *mockRunnerCall) LbDeadline() time.Time { - return c.lbDeadline -} - func (c *mockRunnerCall) SlotHashId() string { return c.slotHashId } @@ -157,8 +152,10 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC func TestOneRunner(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} - err := placer.PlaceCall(rp, context.Background(), call) + call := &mockRunnerCall{} + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) + defer cancel() + err := placer.PlaceCall(rp, ctx, call) if err != nil { t.Fatalf("Failed to place call on runner %v", err) } @@ -167,7 +164,7 @@ func TestOneRunner(t *testing.T) { func TestEnforceTimeoutFromContext(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) - call := &mockRunnerCall{lbDeadline: time.Now().Add(1 * time.Second)} + call := &mockRunnerCall{} ctx, cancel := context.WithDeadline(context.Background(), time.Now()) defer cancel() err := placer.PlaceCall(rp, ctx, call) @@ -187,8 +184,10 @@ func TestRRRunner(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} - err := placer.PlaceCall(rp, context.Background(), call) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) + defer cancel() + call := &mockRunnerCall{} + err := placer.PlaceCall(rp, ctx, call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) } @@ -218,8 +217,11 @@ func TestEnforceLbTimeout(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - call := &mockRunnerCall{lbDeadline: time.Now().Add(10 * time.Millisecond)} - err := placer.PlaceCall(rp, context.Background(), call) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) + defer cancel() + + call := &mockRunnerCall{} + err := placer.PlaceCall(rp, ctx, call) if err != nil { failures <- fmt.Errorf("Timed out call %d", i) } diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index be3abdea9..5284881ae 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -5,7 +5,6 @@ import ( "errors" "io" "net/http" - "time" "github.com/fnproject/fn/api/models" "github.com/go-openapi/strfmt" @@ -79,16 +78,8 @@ func (ci callInfoImpl) Input() io.Reader { func (ci callInfoImpl) Deadline() strfmt.DateTime { deadline, ok := ci.req.Context().Deadline() if !ok { - // In theory deadline must have been set here, but if it wasn't then - // at this point it is already too late to raise an error. Set it to - // something meaningful. - // This assumes StartedAt was set to something other than the default. - // If that isn't set either, then how many things have gone wrong? - if ci.call.StartedAt == strfmt.NewDateTime() { - // We just panic if StartedAt is the default (i.e. not set) - panic("No context deadline and zero-value StartedAt - this should never happen") - } - deadline = ((time.Time)(ci.call.StartedAt)).Add(time.Duration(ci.call.Timeout) * time.Second) + // In theory deadline must have been set here + panic("No context deadline is set in protocol, should never happen") } return strfmt.DateTime(deadline) } diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index 1fb490222..07838ff07 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -2,12 +2,14 @@ package protocol import ( "bytes" + "context" "encoding/json" "io" "io/ioutil" "net/http" "net/url" "testing" + "time" "github.com/fnproject/fn/api/models" ) @@ -16,7 +18,7 @@ type RequestData struct { A string `json:"a"` } -func setupRequest(data interface{}) *callInfoImpl { +func setupRequest(data interface{}) (*callInfoImpl, context.CancelFunc) { req := &http.Request{ Method: http.MethodPost, URL: &url.URL{ @@ -46,12 +48,14 @@ func setupRequest(data interface{}) *callInfoImpl { // fixup URL in models.Call call.URL = req.URL.String() - ci := &callInfoImpl{call: call, req: req} - return ci + ctx, cancel := context.WithTimeout(req.Context(), 1*time.Second) + ci := &callInfoImpl{call: call, req: req.WithContext(ctx)} + return ci, cancel } func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) { - ci := setupRequest(nil) + ci, cancel := setupRequest(nil) + defer cancel() r, w := io.Pipe() proto := JSONProtocol{w, r} go func() { @@ -88,7 +92,8 @@ func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) { func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { rDataBefore := RequestData{A: "a"} - ci := setupRequest(rDataBefore) + ci, cancel := setupRequest(rDataBefore) + defer cancel() r, w := io.Pipe() proto := JSONProtocol{w, r} go func() { @@ -133,7 +138,8 @@ func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { } func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) { - ci := setupRequest(nil) + ci, cancel := setupRequest(nil) + defer cancel() r, w := io.Pipe() proto := JSONProtocol{w, r} go func() { @@ -177,7 +183,8 @@ func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) { } func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) { - ci := setupRequest(nil) + ci, cancel := setupRequest(nil) + defer cancel() r, w := io.Pipe() proto := JSONProtocol{w, r} go func() { diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 00973eeee..cd13e1ce8 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -67,8 +67,7 @@ type callHandle struct { c *call // the agent's version of call // Timings, for metrics: - receivedTime strfmt.DateTime // When was the call received? - allocatedTime strfmt.DateTime // When did we finish allocating capacity? + receivedTime strfmt.DateTime // When was the call received? // For implementing http.ResponseWriter: headers http.Header @@ -530,7 +529,7 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error return err } - agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state)) + agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state), WithContext(state.ctx)) if err != nil { state.enqueueCallResponse(err) return err @@ -545,7 +544,6 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error } state.c.slotHashId = string(hashId[:]) } - state.allocatedTime = strfmt.DateTime(time.Now()) pr.spawnSubmit(state) return nil diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index a5442e109..7ed197754 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -31,7 +31,7 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) - timeout := time.After(call.LbDeadline().Sub(time.Now())) + for { runners, err := rp.Runners(call) if err != nil { @@ -43,8 +43,6 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy default: } @@ -69,8 +67,6 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy case <-time.After(p.rrInterval): } } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index d0059340a..42bca9d35 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -25,7 +25,6 @@ func NewNaivePlacer() Placer { } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - timeout := time.After(call.LbDeadline().Sub(time.Now())) for { runners, err := rp.Runners(call) @@ -37,8 +36,6 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy default: } @@ -62,8 +59,6 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy case <-time.After(sp.rrInterval): } } diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 0b2b862af..c5ecdb21d 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -4,7 +4,6 @@ import ( "context" "io" "net/http" - "time" "github.com/fnproject/fn/api/models" ) @@ -42,7 +41,6 @@ type Runner interface { // processed by a RunnerPool type RunnerCall interface { SlotHashId() string - LbDeadline() time.Time RequestBody() io.ReadCloser ResponseWriter() http.ResponseWriter StdErr() io.ReadWriteCloser diff --git a/test/fn-api-tests/calls_test.go b/test/fn-api-tests/calls_test.go index 2bf8155e6..1de2b57c3 100644 --- a/test/fn-api-tests/calls_test.go +++ b/test/fn-api-tests/calls_test.go @@ -60,7 +60,7 @@ func TestGetExactCall(t *testing.T) { } u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) - callID := CallAsync(t, u, &bytes.Buffer{}) + callID := CallAsync(t, s.Context, u, &bytes.Buffer{}) cfg := &call.GetAppsAppCallsCallParams{ Call: callID, diff --git a/test/fn-api-tests/exec_test.go b/test/fn-api-tests/exec_test.go index 9198b077e..40884d13a 100644 --- a/test/fn-api-tests/exec_test.go +++ b/test/fn-api-tests/exec_test.go @@ -2,6 +2,7 @@ package tests import ( "bytes" + "context" "encoding/json" "io" "net/url" @@ -15,9 +16,9 @@ import ( "github.com/fnproject/fn_go/models" ) -func CallAsync(t *testing.T, u url.URL, content io.Reader) string { +func CallAsync(t *testing.T, ctx context.Context, u url.URL, content io.Reader) string { output := &bytes.Buffer{} - _, err := CallFN(u.String(), content, output, "POST", []string{}) + _, err := CallFN(ctx, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -41,9 +42,9 @@ func CallAsync(t *testing.T, u url.URL, content io.Reader) string { return callID.CallID } -func CallSync(t *testing.T, u url.URL, content io.Reader) string { +func CallSync(t *testing.T, ctx context.Context, u url.URL, content io.Reader) string { output := &bytes.Buffer{} - resp, err := CallFN(u.String(), content, output, "POST", []string{}) + resp, err := CallFN(ctx, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -75,7 +76,7 @@ func TestCanCallfunction(t *testing.T) { content := &bytes.Buffer{} output := &bytes.Buffer{} - _, err := CallFN(u.String(), content, output, "POST", []string{}) + _, err := CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -104,7 +105,7 @@ func TestCallOutputMatch(t *testing.T) { Name string }{Name: "John"}) output := &bytes.Buffer{} - _, err := CallFN(u.String(), content, output, "POST", []string{}) + _, err := CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -134,7 +135,7 @@ func TestCanCallAsync(t *testing.T) { Type: newRouteType, }) - CallAsync(t, u, &bytes.Buffer{}) + CallAsync(t, s.Context, u, &bytes.Buffer{}) } func TestCanGetAsyncState(t *testing.T) { @@ -157,7 +158,7 @@ func TestCanGetAsyncState(t *testing.T) { Type: newRouteType, }) - callID := CallAsync(t, u, &bytes.Buffer{}) + callID := CallAsync(t, s.Context, u, &bytes.Buffer{}) cfg := &call.GetAppsAppCallsCallParams{ Call: callID, App: s.AppName, @@ -221,7 +222,7 @@ func TestCanCauseTimeout(t *testing.T) { }{Seconds: 11}) output := &bytes.Buffer{} - resp, _ := CallFN(u.String(), content, output, "POST", []string{}) + resp, _ := CallFN(s.Context, u.String(), content, output, "POST", []string{}) if !strings.Contains(output.String(), "Timed out") { t.Errorf("Must fail because of timeout, but got error message: %v", output.String()) @@ -270,7 +271,7 @@ func TestCallResponseHeadersMatch(t *testing.T) { u.Path = path.Join(u.Path, "r", s.AppName, rt.Path) content := &bytes.Buffer{} output := &bytes.Buffer{} - CallFN(u.String(), content, output, "POST", + CallFN(s.Context, u.String(), content, output, "POST", []string{ "ACCEPT: application/xml", "ACCEPT: application/json; q=0.2", @@ -305,7 +306,7 @@ func TestCanWriteLogs(t *testing.T) { Size int }{Size: 20}) - callID := CallSync(t, u, content) + callID := CallSync(t, s.Context, u, content) cfg := &operations.GetAppsAppCallsCallLogParams{ Call: callID, @@ -353,7 +354,7 @@ func TestOversizedLog(t *testing.T) { Size int }{Size: size}) //exceeding log by 1 symbol - callID := CallSync(t, u, content) + callID := CallSync(t, s.Context, u, content) cfg := &operations.GetAppsAppCallsCallLogParams{ Call: callID, diff --git a/test/fn-api-tests/formats_test.go b/test/fn-api-tests/formats_test.go index 6f461b3f5..21a4ec688 100644 --- a/test/fn-api-tests/formats_test.go +++ b/test/fn-api-tests/formats_test.go @@ -41,7 +41,7 @@ func TestFnJSONFormats(t *testing.T) { }) content := bytes.NewBuffer(b) output := &bytes.Buffer{} - resp, err := CallFN(u.String(), content, output, "POST", []string{}) + resp, err := CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } diff --git a/test/fn-api-tests/utils.go b/test/fn-api-tests/utils.go index b64bf1bc0..48b9da12d 100644 --- a/test/fn-api-tests/utils.go +++ b/test/fn-api-tests/utils.go @@ -136,6 +136,8 @@ func (s *TestHarness) Cleanup() { for app, _ := range s.createdApps { safeDeleteApp(ctx, s.Client, app) } + + s.Cancel() } func EnvAsHeader(req *http.Request, selectedEnv []string) { @@ -151,7 +153,7 @@ func EnvAsHeader(req *http.Request, selectedEnv []string) { } } -func CallFN(u string, content io.Reader, output io.Writer, method string, env []string) (*http.Response, error) { +func CallFN(ctx context.Context, u string, content io.Reader, output io.Writer, method string, env []string) (*http.Response, error) { if method == "" { if content == nil { method = "GET" @@ -164,8 +166,8 @@ func CallFN(u string, content io.Reader, output io.Writer, method string, env [] if err != nil { return nil, fmt.Errorf("error running route: %s", err) } - req.Header.Set("Content-Type", "application/json") + req = req.WithContext(ctx) if len(env) > 0 { EnvAsHeader(req, env) diff --git a/test/fn-system-tests/exec_test.go b/test/fn-system-tests/exec_test.go index ef7358d81..7ce27c7b8 100644 --- a/test/fn-system-tests/exec_test.go +++ b/test/fn-system-tests/exec_test.go @@ -2,6 +2,7 @@ package tests import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -10,8 +11,8 @@ import ( "path" "strings" "testing" + "time" - apimodels "github.com/fnproject/fn/api/models" apiutils "github.com/fnproject/fn/test/fn-api-tests" sdkmodels "github.com/fnproject/fn_go/models" ) @@ -75,7 +76,7 @@ func TestCanExecuteFunction(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -118,7 +119,7 @@ func TestCanExecuteBigOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -163,7 +164,7 @@ func TestCanExecuteTooBigOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -208,7 +209,7 @@ func TestCanExecuteEmptyOutput(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -256,7 +257,7 @@ func TestBasicConcurrentExecution(t *testing.T) { body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) if err != nil { results <- fmt.Errorf("Got unexpected error: %v", err) return @@ -288,10 +289,14 @@ func TestSaturatedSystem(t *testing.T) { s := apiutils.SetupHarness() + // override default 60 secs with shorter. + s.Cancel() + s.Context, s.Cancel = context.WithTimeout(context.Background(), 4*time.Second) + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) defer s.Cleanup() - timeout := int32(5) + timeout := int32(1) rt := s.BasicRoute() rt.Image = "fnproject/fn-test-utils" @@ -316,28 +321,8 @@ func TestSaturatedSystem(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) - if err != nil { - if err != apimodels.ErrCallTimeoutServerBusy { - t.Errorf("Got unexpected error: %v", err) - } - } - - // LB may respond either with: - // timeout: a timeout during a call to a runner - // too busy: a timeout during LB retry loop - exp1 := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n" - exp2 := "{\"error\":{\"message\":\"Timed out\"}}\n" - - actual := output.String() - - if strings.Contains(exp1, actual) && len(exp1) == len(actual) { - } else if strings.Contains(exp2, actual) && len(exp2) == len(actual) { - } else { - t.Errorf("Assertion error.\n\tExpected: %v or %v\n\tActual: %v", exp1, exp2, output.String()) - } - - if resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusGatewayTimeout { - t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + resp, err := apiutils.CallFN(s.Context, u.String(), content, output, "POST", []string{}) + if resp != nil || err == nil || s.Context.Err() == nil { + t.Fatalf("Expected response: %v err:%v", resp, err) } }