From 0bde666395552a736ec48b0102cc76995464be47 Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Fri, 12 Jan 2018 13:56:17 -0800 Subject: [PATCH] clean up agent.Submit (#681) this was getting bloated with various contexts and spans and stats administrivia that obfuscated what was going on a lot. this makes some helper methods to shove most of that stuff into, and simplifies the context handling around getting a slot by moving it inside of slot acquisition code. also removed most uses of `call.Model()` -- I'll kill this thing some day, but if a reason is needed, then the overhead of dynamic dispatch is unnecessary, we're inside of the implementee for the agent, we don't want to use the interface methods inside of that. --- api/agent/agent.go | 157 +++++++++++++++++++++++---------------------- 1 file changed, 81 insertions(+), 76 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index a7b4a0f20..910be422f 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -28,7 +28,6 @@ import ( // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) // TODO storing logs / call can push call over the timeout -// TODO discuss concrete policy for hot launch or timeout / timeout vs time left // TODO if we don't cap the number of any one container we could get into a situation // where the machine is full but all the containers are idle up to the idle timeout. meh. // TODO async is still broken, but way less so. we need to modify mq semantics @@ -37,7 +36,6 @@ import ( // dies). need coordination w/ db. // TODO if a cold call times out but container is created but hasn't replied, could // end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?) -// TODO the log api should be plaintext (or at least offer it) // TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness. // TODO if async would store requests (or interchange format) it would be slick, but // if we're going to store full calls in db maybe we should only queue pointers to ids? @@ -152,7 +150,6 @@ func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { } func (a *agent) Close() error { - a.shutonce.Do(func() { close(a.shutdown) }) @@ -161,6 +158,60 @@ func (a *agent) Close() error { return nil } +func (a *agent) Submit(callI Call) error { + a.wg.Add(1) + defer a.wg.Done() + + select { + case <-a.shutdown: + return models.ErrCallTimeoutServerBusy + default: + } + + call := callI.(*call) + + ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline) + call.req = call.req.WithContext(ctx) + defer cancel() + + ctx, finish := statSpans(ctx, call) + defer finish() + + err := a.submit(ctx, call) + return err +} + +func (a *agent) submit(ctx context.Context, call *call) error { + a.stats.Enqueue(ctx, call.AppName, call.Path) + + slot, err := a.getSlot(ctx, call) + if err != nil { + a.handleStatsDequeue(ctx, call, err) + return transformTimeout(err, true) + } + + defer slot.Close() // notify our slot is free once we're done + + err = call.Start(ctx) + if err != nil { + a.handleStatsDequeue(ctx, call, err) + return transformTimeout(err, true) + } + + // decrement queued count, increment running count + a.stats.DequeueAndStart(ctx, call.AppName, call.Path) + + // pass this error (nil or otherwise) to end directly, to store status, etc + err = slot.exec(ctx, call) + a.handleStatsEnd(ctx, call, err) + + // TODO: we need to allocate more time to store the call + logs in case the call timed out, + // but this could put us over the timeout if the call did not reply yet (need better policy). + ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) + err = call.End(ctx, err) + return transformTimeout(err, false) +} + func transformTimeout(e error, isRetriable bool) error { if e == context.DeadlineExceeded { if isRetriable { @@ -173,100 +224,54 @@ func transformTimeout(e error, isRetriable bool) error { // handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start) // cases. Only timeouts can be a simple dequeue while other cases are actual errors. -func (a *agent) handleStatsDequeue(ctx context.Context, err error, callI Call) { +func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) { if err == context.DeadlineExceeded { - a.stats.Dequeue(ctx, callI.Model().AppName, callI.Model().Path) + a.stats.Dequeue(ctx, call.AppName, call.Path) } else { - a.stats.DequeueAndFail(ctx, callI.Model().AppName, callI.Model().Path) + a.stats.DequeueAndFail(ctx, call.AppName, call.Path) } } -func (a *agent) Submit(callI Call) error { - a.wg.Add(1) - defer a.wg.Done() - - select { - case <-a.shutdown: - return models.ErrCallTimeoutServerBusy - default: +// handleStatsEnd handles stats for after a call is ran, depending on error. +func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) { + if err == nil { + // decrement running count, increment completed count + a.stats.Complete(ctx, call.AppName, call.Path) + } else { + // decrement running count, increment failed count + a.stats.Failed(ctx, call.AppName, call.Path) } +} - call := callI.(*call) - ctx := call.req.Context() - +func statSpans(ctx context.Context, call *call) (ctxr context.Context, finish func()) { // agent_submit_global has no parent span because we don't want it to inherit fn_appname or fn_path - span_global := opentracing.StartSpan("agent_submit_global") - defer span_global.Finish() + spanGlobal := opentracing.StartSpan("agent_submit_global") // agent_submit_global has no parent span because we don't want it to inherit fn_path - span_app := opentracing.StartSpan("agent_submit_app") - span_app.SetBaggageItem("fn_appname", callI.Model().AppName) - defer span_app.Finish() + spanApp := opentracing.StartSpan("agent_submit_app") + spanApp.SetBaggageItem("fn_appname", call.AppName) // agent_submit has a parent span in the usual way // it doesn't matter if it inherits fn_appname or fn_path (and we set them here in any case) span, ctx := opentracing.StartSpanFromContext(ctx, "agent_submit") - span.SetBaggageItem("fn_appname", callI.Model().AppName) - span.SetBaggageItem("fn_path", callI.Model().Path) - defer span.Finish() + span.SetBaggageItem("fn_appname", call.AppName) + span.SetBaggageItem("fn_path", call.Path) - // Start the deadline context for Waiting for Slots - ctxSlotWait, cancelSlotWait := context.WithDeadline(ctx, call.slotDeadline) - call.req = call.req.WithContext(ctxSlotWait) - defer cancelSlotWait() - - // increment queued count - // this is done after setting "fn_appname" and "fn_path" - a.stats.Enqueue(ctx, callI.Model().AppName, callI.Model().Path) - - slot, err := a.getSlot(ctxSlotWait, call) // find ram available / running - - if err != nil { - a.handleStatsDequeue(ctx, err, call) - return transformTimeout(err, true) + return ctx, func() { + spanGlobal.Finish() + spanApp.Finish() + span.Finish() } - // TODO if the call times out & container is created, we need - // to make this remove the container asynchronously? - defer slot.Close() // notify our slot is free once we're done - - err = call.Start(ctxSlotWait) - if err != nil { - a.handleStatsDequeue(ctx, err, call) - return transformTimeout(err, true) - } - - // Swap deadline contexts for Execution Phase - cancelSlotWait() - ctxExec, cancelExec := context.WithDeadline(ctx, call.execDeadline) - call.req = call.req.WithContext(ctxExec) - defer cancelExec() - - // decrement queued count, increment running count - a.stats.DequeueAndStart(ctx, callI.Model().AppName, callI.Model().Path) - - err = slot.exec(ctxExec, call) - // pass this error (nil or otherwise) to end directly, to store status, etc - // End may rewrite the error or elect to return it - - if err == nil { - // decrement running count, increment completed count - a.stats.Complete(ctx, callI.Model().AppName, callI.Model().Path) - } else { - // decrement running count, increment failed count - a.stats.Failed(ctx, callI.Model().AppName, callI.Model().Path) - } - - // TODO: we need to allocate more time to store the call + logs in case the call timed out, - // but this could put us over the timeout if the call did not reply yet (need better policy). - ctx = opentracing.ContextWithSpan(context.Background(), span) - err = call.End(ctx, err) - return transformTimeout(err, false) } // getSlot returns a Slot (or error) for the request to run. Depending on hot/cold // request type, this may launch a new container or wait for other containers to become idle // or it may wait for resources to become available to launch a new container. func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { + // start the deadline context for waiting for slots + ctx, cancel := context.WithDeadline(ctx, call.slotDeadline) + defer cancel() + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot") defer span.Finish() @@ -508,7 +513,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { go func() { // TODO make sure stdin / stdout not blocked if container dies or we leak goroutine // we have to make sure this gets shut down or 2 threads will be reading/writing in/out - ci := protocol.NewCallInfo(call.Model(), call.req) + ci := protocol.NewCallInfo(call.Call, call.req) errApp <- s.proto.Dispatch(ctx, ci, call.w) }()