fn: agent slot and execution wait correction (#658)

Since by policy we require timeout/2 remaining time
before we can execute the request, we should also
bound the slot wait time by timeout/2 to avoid
waiting for full timeout in slot wait phase.
This commit is contained in:
Tolga Ceylan
2018-01-08 12:33:37 -08:00
committed by Reed Allman
parent dba306e3c8
commit 18716911b9
2 changed files with 34 additions and 26 deletions

View File

@@ -213,12 +213,12 @@ func (a *agent) Submit(callI Call) error {
span.SetBaggageItem("fn_path", callI.Model().Path) span.SetBaggageItem("fn_path", callI.Model().Path)
defer span.Finish() defer span.Finish()
// start the timer STAT! TODO add some wiggle room // Start the deadline context for Waiting for Slots
ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) ctxSlotWait, cancelSlotWait := context.WithDeadline(ctx, call.slotDeadline)
call.req = call.req.WithContext(ctx) call.req = call.req.WithContext(ctxSlotWait)
defer cancel() defer cancelSlotWait()
slot, err := a.getSlot(ctx, call) // find ram available / running slot, err := a.getSlot(ctxSlotWait, call) // find ram available / running
if err != nil { if err != nil {
a.handleStatsDequeue(err, call) a.handleStatsDequeue(err, call)
return transformTimeout(err, true) return transformTimeout(err, true)
@@ -227,17 +227,22 @@ func (a *agent) Submit(callI Call) error {
// to make this remove the container asynchronously? // to make this remove the container asynchronously?
defer slot.Close() // notify our slot is free once we're done defer slot.Close() // notify our slot is free once we're done
// TODO Start is checking the timer now, we could do it here, too. err = call.Start(ctxSlotWait)
err = call.Start(ctx)
if err != nil { if err != nil {
a.handleStatsDequeue(err, call) a.handleStatsDequeue(err, call)
return transformTimeout(err, true) return transformTimeout(err, true)
} }
// Swap deadline contexts for Execution Phase
cancelSlotWait()
ctxExec, cancelExec := context.WithDeadline(ctx, call.execDeadline)
call.req = call.req.WithContext(ctxExec)
defer cancelExec()
// decrement queued count, increment running count // decrement queued count, increment running count
a.stats.DequeueAndStart(callI.Model().AppName, callI.Model().Path) a.stats.DequeueAndStart(callI.Model().AppName, callI.Model().Path)
err = slot.exec(ctx, call) err = slot.exec(ctxExec, call)
// pass this error (nil or otherwise) to end directly, to store status, etc // pass this error (nil or otherwise) to end directly, to store status, etc
// End may rewrite the error or elect to return it // End may rewrite the error or elect to return it

View File

@@ -288,9 +288,16 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
c.w = c.stderr c.w = c.stderr
} }
deadline := strfmt.DateTime(time.Now().Add(time.Duration(c.Call.Timeout) * time.Second)).String() now := time.Now()
c.EnvVars["FN_DEADLINE"] = deadline slotDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second / 2)
c.req.Header.Set("FN_DEADLINE", deadline) execDeadline := now.Add(time.Duration(c.Call.Timeout) * time.Second)
c.slotDeadline = slotDeadline
c.execDeadline = execDeadline
execDeadlineStr := strfmt.DateTime(execDeadline).String()
c.EnvVars["FN_DEADLINE"] = execDeadlineStr
c.req.Header.Set("FN_DEADLINE", execDeadlineStr)
return &c, nil return &c, nil
} }
@@ -298,12 +305,14 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
type call struct { type call struct {
*models.Call *models.Call
da DataAccess da DataAccess
w io.Writer w io.Writer
req *http.Request req *http.Request
stderr io.ReadWriteCloser stderr io.ReadWriteCloser
ct callTrigger ct callTrigger
slots *slotQueue slots *slotQueue
slotDeadline time.Time
execDeadline time.Time
} }
func (c *call) Model() *models.Call { return c.Call } func (c *call) Model() *models.Call { return c.Call }
@@ -312,15 +321,9 @@ func (c *call) Start(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start")
defer span.Finish() defer span.Finish()
// TODO discuss this policy. cold has not yet started the container, // Check context timeouts, errors
// hot just has to dispatch if ctx.Err() != nil {
// return ctx.Err()
// make sure we have at least half our timeout to run, or timeout here
deadline, ok := ctx.Deadline()
need := time.Now().Add(time.Duration(c.Timeout) * time.Second) // > deadline, always
// need.Sub(deadline) = elapsed time
if ok && need.Sub(deadline) > (time.Duration(c.Timeout)*time.Second)/2 {
return context.DeadlineExceeded
} }
c.StartedAt = strfmt.DateTime(time.Now()) c.StartedAt = strfmt.DateTime(time.Now())