From 18716911b97bffcc5ac099678756e8348ba9d079 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 8 Jan 2018 12:33:37 -0800 Subject: [PATCH] fn: agent slot and execution wait correction (#658) Since by policy we require timeout/2 remaining time before we can execute the request, we should also bound the slot wait time by timeout/2 to avoid waiting for full timeout in slot wait phase. --- api/agent/agent.go | 21 +++++++++++++-------- api/agent/call.go | 39 +++++++++++++++++++++------------------ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index f588fb7ef..ae19c80a2 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -213,12 +213,12 @@ func (a *agent) Submit(callI Call) error { span.SetBaggageItem("fn_path", callI.Model().Path) defer span.Finish() - // start the timer STAT! TODO add some wiggle room - ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) - call.req = call.req.WithContext(ctx) - defer cancel() + // Start the deadline context for Waiting for Slots + ctxSlotWait, cancelSlotWait := context.WithDeadline(ctx, call.slotDeadline) + call.req = call.req.WithContext(ctxSlotWait) + defer cancelSlotWait() - slot, err := a.getSlot(ctx, call) // find ram available / running + slot, err := a.getSlot(ctxSlotWait, call) // find ram available / running if err != nil { a.handleStatsDequeue(err, call) return transformTimeout(err, true) @@ -227,17 +227,22 @@ func (a *agent) Submit(callI Call) error { // to make this remove the container asynchronously? defer slot.Close() // notify our slot is free once we're done - // TODO Start is checking the timer now, we could do it here, too. - err = call.Start(ctx) + err = call.Start(ctxSlotWait) if err != nil { a.handleStatsDequeue(err, call) return transformTimeout(err, true) } + // Swap deadline contexts for Execution Phase + cancelSlotWait() + ctxExec, cancelExec := context.WithDeadline(ctx, call.execDeadline) + call.req = call.req.WithContext(ctxExec) + defer cancelExec() + // decrement queued count, increment running count a.stats.DequeueAndStart(callI.Model().AppName, callI.Model().Path) - err = slot.exec(ctx, call) + err = slot.exec(ctxExec, call) // pass this error (nil or otherwise) to end directly, to store status, etc // End may rewrite the error or elect to return it diff --git a/api/agent/call.go b/api/agent/call.go index ee37772e8..31a1e8275 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -288,9 +288,16 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { c.w = c.stderr } - deadline := strfmt.DateTime(time.Now().Add(time.Duration(c.Call.Timeout) * time.Second)).String() - c.EnvVars["FN_DEADLINE"] = deadline - c.req.Header.Set("FN_DEADLINE", deadline) + now := time.Now() + slotDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second / 2) + execDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second) + + c.slotDeadline = slotDeadline + c.execDeadline = execDeadline + + execDeadlineStr := strfmt.DateTime(execDeadline).String() + c.EnvVars["FN_DEADLINE"] = execDeadlineStr + c.req.Header.Set("FN_DEADLINE", execDeadlineStr) return &c, nil } @@ -298,12 +305,14 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { type call struct { *models.Call - da DataAccess - w io.Writer - req *http.Request - stderr io.ReadWriteCloser - ct callTrigger - slots *slotQueue + da DataAccess + w io.Writer + req *http.Request + stderr io.ReadWriteCloser + ct callTrigger + slots *slotQueue + slotDeadline time.Time + execDeadline time.Time } func (c *call) Model() *models.Call { return c.Call } @@ -312,15 +321,9 @@ func (c *call) Start(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start") defer span.Finish() - // TODO discuss this policy. cold has not yet started the container, - // hot just has to dispatch - // - // make sure we have at least half our timeout to run, or timeout here - deadline, ok := ctx.Deadline() - need := time.Now().Add(time.Duration(c.Timeout) * time.Second) // > deadline, always - // need.Sub(deadline) = elapsed time - if ok && need.Sub(deadline) > (time.Duration(c.Timeout)*time.Second)/2 { - return context.DeadlineExceeded + // Check context timeouts, errors + if ctx.Err() != nil { + return ctx.Err() } c.StartedAt = strfmt.DateTime(time.Now())