diff --git a/api/agent/agent.go b/api/agent/agent.go index e8eed5a55..0e9c4b112 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -5,6 +5,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" "github.com/fnproject/fn/api/agent/drivers" @@ -111,9 +112,10 @@ type agent struct { resources ResourceTracker // used to track running calls / safe shutdown - wg sync.WaitGroup // TODO rename - shutonce sync.Once - shutdown chan struct{} + wg sync.WaitGroup // TODO rename + shutonce sync.Once + shutdown chan struct{} + callEndCount int64 } // New creates an Agent that executes functions locally as Docker containers. @@ -250,14 +252,23 @@ func (a *agent) submit(ctx context.Context, call *call) error { // pass this error (nil or otherwise) to end directly, to store status, etc err = slot.exec(ctx, call) handleStatsEnd(ctx, err) - - // TODO: we need to allocate more time to store the call + logs in case the call timed out, - // but this could put us over the timeout if the call did not reply yet (need better policy). - ctx = common.BackgroundContext(ctx) - err = call.End(ctx, err) + a.handleCallEnd(ctx, call, err) return transformTimeout(err, false) } +func (a *agent) handleCallEnd(ctx context.Context, call *call, err error) { + a.wg.Add(1) + atomic.AddInt64(&a.callEndCount, 1) + go func() { + ctx = common.BackgroundContext(ctx) + ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) + call.End(ctx, err) + cancel() + atomic.AddInt64(&a.callEndCount, -1) + a.wg.Done() + }() +} + func transformTimeout(e error, isRetriable bool) error { if e == context.DeadlineExceeded { if isRetriable { @@ -308,6 +319,11 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { ctx, span := trace.StartSpan(ctx, "agent_get_slot") defer span.End() + // first check any excess case of call.End() stacking. + if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) { + return nil, context.DeadlineExceeded + } + if protocol.IsStreamable(protocol.Protocol(call.Format)) { // For hot requests, we use a long lived slot queue, which we use to manage hot containers var isNew bool diff --git a/api/agent/config.go b/api/agent/config.go index 63021aaab..32ad1138b 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -15,6 +15,8 @@ type AgentConfig struct { HotPoll time.Duration `json:"hot_poll_msecs"` HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` + CallEndTimeout time.Duration `json:"call_end_timeout"` + MaxCallEndStacking uint64 `json:"max_call_end_stacking"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` @@ -31,6 +33,8 @@ const ( EnvHotPoll = "FN_HOT_POLL_MSECS" EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" + EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" + EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" @@ -46,8 +50,9 @@ const ( func NewAgentConfig() (*AgentConfig, error) { cfg := &AgentConfig{ - MinDockerVersion: "17.10.0-ce", - MaxLogSize: 1 * 1024 * 1024, + MinDockerVersion: "17.10.0-ce", + MaxLogSize: 1 * 1024 * 1024, + MaxCallEndStacking: 8192, } var err error @@ -57,12 +62,14 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond) err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) + err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute) err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize) err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize) err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU) err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory) err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize) err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize) + err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking) if err != nil { return cfg, err diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 8ee500ade..5a6ae5553 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -193,7 +193,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { containerIds := make([]string, 0) - for i, test := range []struct { + testCases := []struct { path string body string method string @@ -235,7 +235,11 @@ func TestRouteRunnerIOPipes(t *testing.T) { // CASE IV: should land on CASE III container {"/r/zoo/http/", ok, "GET", http.StatusOK, "", nil, 0}, - } { + } + + callIds := make([]string, len(testCases)) + + for i, test := range testCases { body := strings.NewReader(test.body) _, rec := routerRequest(t, srv.Router, test.method, test.path, body) respBytes, _ := ioutil.ReadAll(rec.Body) @@ -246,6 +250,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { } containerIds = append(containerIds, "N/A") + callIds[i] = rec.Header().Get("Fn_call_id") if rec.Code != test.expectedCode { isFailure = true @@ -260,13 +265,6 @@ func TestRouteRunnerIOPipes(t *testing.T) { } - if test.expectedLogsSubStr != nil { - callID := rec.Header().Get("Fn_call_id") - if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) { - isFailure = true - } - } - if rec.Code == http.StatusOK { dockerId, err := getDockerId(respBytes) if err != nil { @@ -281,6 +279,14 @@ func TestRouteRunnerIOPipes(t *testing.T) { time.Sleep(test.sleepAmount) } + for i, test := range testCases { + if test.expectedLogsSubStr != nil { + if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) { + isFailure = true + } + } + } + jsonIds := containerIds[0:5] // now cross check JSON container ids: @@ -374,7 +380,7 @@ func TestRouteRunnerExecution(t *testing.T) { bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K - for i, test := range []struct { + testCases := []struct { path string body string method string @@ -413,7 +419,11 @@ func TestRouteRunnerExecution(t *testing.T) { {"/r/myapp/mybigoutputhttp", smalloutput, "GET", http.StatusOK, nil, "", nil}, {"/r/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "", nil}, {"/r/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil}, - } { + } + + callIds := make([]string, len(testCases)) + + for i, test := range testCases { trx := fmt.Sprintf("_trx_%d_", i) body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1)) _, rec := routerRequest(t, srv.Router, test.method, test.path, body) @@ -424,6 +434,8 @@ func TestRouteRunnerExecution(t *testing.T) { maxBody = 1024 } + callIds[i] = rec.Header().Get("Fn_call_id") + if rec.Code != test.expectedCode { isFailure = true t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", @@ -453,10 +465,11 @@ func TestRouteRunnerExecution(t *testing.T) { } } } + } + for i, test := range testCases { if test.expectedLogsSubStr != nil { - callID := rec.Header().Get("Fn_call_id") - if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) { + if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) { isFailure = true } }