diff --git a/api/agent/agent.go b/api/agent/agent.go index ce199e31b..3e62bade9 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -7,7 +7,6 @@ import ( "io" "strings" "sync" - "sync/atomic" "time" "github.com/fnproject/fn/api/agent/drivers" @@ -103,7 +102,6 @@ type agent struct { // used to track running calls / safe shutdown shutWg *common.WaitGroup shutonce sync.Once - callEndCount int64 disableAsyncDequeue bool callOverrider CallOverrider @@ -293,68 +291,28 @@ func (a *agent) submit(ctx context.Context, call *call) error { statsDequeueAndStart(ctx) // We are about to execute the function, set container Exec Deadline (call.Timeout) - ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) + slotCtx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) defer cancel() // Pass this error (nil or otherwise) to end directly, to store status, etc. - err = slot.exec(ctx, call) + err = slot.exec(slotCtx, call) return a.handleCallEnd(ctx, call, slot, err, true) } -func (a *agent) scheduleCallEnd(fn func()) { - atomic.AddInt64(&a.callEndCount, 1) - go func() { - fn() - atomic.AddInt64(&a.callEndCount, -1) - a.shutWg.DoneSession() - }() -} +func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error { -func (a *agent) finalizeCallEnd(ctx context.Context, err error, isRetriable, isScheduled bool) error { - // if scheduled in background, let scheduleCallEnd() handle - // the shutWg group, otherwise decrement here. - if !isScheduled { - a.shutWg.DoneSession() - } - handleStatsEnd(ctx, err) - return transformTimeout(err, isRetriable) -} - -func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isCommitted bool) error { - - // For hot-containers, slot close is a simple channel close... No need - // to handle it async. Execute it here ASAP - if slot != nil && protocol.IsStreamable(protocol.Protocol(call.Format)) { - slot.Close(ctx) - slot = nil - } - - // This means call was routed (executed), in order to reduce latency here - // we perform most of these tasks in go-routine asynchronously. - if isCommitted { - a.scheduleCallEnd(func() { - ctx = common.BackgroundContext(ctx) - if slot != nil { - slot.Close(ctx) // (no timeout) - } - ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) - call.End(ctx, err) - cancel() - }) - return a.finalizeCallEnd(ctx, err, false, true) - } - - // The call did not succeed. And it is retriable. We close the slot - // ASAP in the background if we haven't already done so (cold-container case), - // in order to keep latency down. if slot != nil { - a.scheduleCallEnd(func() { - slot.Close(common.BackgroundContext(ctx)) // (no timeout) - }) - return a.finalizeCallEnd(ctx, err, true, true) + slot.Close(common.BackgroundContext(ctx)) } - return a.finalizeCallEnd(ctx, err, true, false) + // This means call was routed (executed) + if isStarted { + call.End(ctx, err) + } + + handleStatsEnd(ctx, err) + a.shutWg.DoneSession() + return transformTimeout(err, !isStarted) } func transformTimeout(e error, isRetriable bool) error { @@ -417,11 +375,6 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { ctx, span := trace.StartSpan(ctx, "agent_get_slot") defer span.End() - // first check any excess case of call.End() stacking. - if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) { - return nil, context.DeadlineExceeded - } - if protocol.IsStreamable(protocol.Protocol(call.Format)) { // For hot requests, we use a long lived slot queue, which we use to manage hot containers var isNew bool diff --git a/api/agent/config.go b/api/agent/config.go index cd24697c9..0e8650601 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -18,8 +18,6 @@ type Config struct { HotPoll time.Duration `json:"hot_poll_msecs"` HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` - CallEndTimeout time.Duration `json:"call_end_timeout"` - MaxCallEndStacking uint64 `json:"max_call_end_stacking"` MaxResponseSize uint64 `json:"max_response_size_bytes"` MaxLogSize uint64 `json:"max_log_size_bytes"` MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` @@ -54,10 +52,6 @@ const ( EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" // EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" - // EnvCallEndTimeout is the timeout after a call is completed to store information about that invocation - EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" - // EnvMaxCallEndStacking is the maximum number of concurrent calls in call.End storing info - EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" // EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" // EnvMaxLogSize is the maximum size that a function's log may reach @@ -103,11 +97,10 @@ const ( func NewConfig() (*Config, error) { cfg := &Config{ - MinDockerVersion: "17.10.0-ce", - MaxLogSize: 1 * 1024 * 1024, - MaxCallEndStacking: 8192, - PreForkImage: "busybox", - PreForkCmd: "tail -f /dev/null", + MinDockerVersion: "17.10.0-ce", + MaxLogSize: 1 * 1024 * 1024, + PreForkImage: "busybox", + PreForkCmd: "tail -f /dev/null", } var err error @@ -117,14 +110,12 @@ func NewConfig() (*Config, error) { err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll) err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) - err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute) err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize) err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize) err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU) err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory) err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize) err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize) - err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking) err = setEnvStr(err, EnvPreForkImage, &cfg.PreForkImage) err = setEnvStr(err, EnvPreForkCmd, &cfg.PreForkCmd) err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index d00ad5a48..bfbe64361 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -6,7 +6,6 @@ import ( "errors" "io" "io/ioutil" - "sync/atomic" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -25,7 +24,6 @@ type lbAgent struct { placer pool.Placer callOverrider CallOverrider shutWg *common.WaitGroup - callEndCount int64 } type LBAgentOption func(*lbAgent) error @@ -158,11 +156,6 @@ func (a *lbAgent) Submit(callI Call) error { statsEnqueue(ctx) - // first check any excess case of call.End() stacking. - if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) { - a.handleCallEnd(ctx, call, context.DeadlineExceeded, false) - } - err := call.Start(ctx) if err != nil { return a.handleCallEnd(ctx, call, err, false) @@ -242,31 +235,17 @@ func (a *lbAgent) Enqueue(context.Context, *models.Call) error { return errors.New("Enqueue not implemented") } -func (a *lbAgent) scheduleCallEnd(fn func()) { - atomic.AddInt64(&a.callEndCount, 1) - go func() { - fn() - atomic.AddInt64(&a.callEndCount, -1) - a.shutWg.DoneSession() - }() -} - func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error { - if isStarted { - a.scheduleCallEnd(func() { - ctx = common.BackgroundContext(ctx) - ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) - call.End(ctx, err) - cancel() - }) + if isStarted { + call.End(ctx, err) handleStatsEnd(ctx, err) - return transformTimeout(err, false) + } else { + handleStatsDequeue(ctx, err) } a.shutWg.DoneSession() - handleStatsDequeue(ctx, err) - return transformTimeout(err, true) + return transformTimeout(err, !isStarted) } var _ Agent = &lbAgent{}