mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
clean up agent.Submit (#681)
this was getting bloated with various contexts and spans and stats administrivia that obfuscated what was going on a lot. this makes some helper methods to shove most of that stuff into, and simplifies the context handling around getting a slot by moving it inside of slot acquisition code. also removed most uses of `call.Model()` -- I'll kill this thing some day, but if a reason is needed, then the overhead of dynamic dispatch is unnecessary, we're inside of the implementee for the agent, we don't want to use the interface methods inside of that.
This commit is contained in:
@@ -28,7 +28,6 @@ import (
|
||||
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
|
||||
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
|
||||
// TODO storing logs / call can push call over the timeout
|
||||
// TODO discuss concrete policy for hot launch or timeout / timeout vs time left
|
||||
// TODO if we don't cap the number of any one container we could get into a situation
|
||||
// where the machine is full but all the containers are idle up to the idle timeout. meh.
|
||||
// TODO async is still broken, but way less so. we need to modify mq semantics
|
||||
@@ -37,7 +36,6 @@ import (
|
||||
// dies). need coordination w/ db.
|
||||
// TODO if a cold call times out but container is created but hasn't replied, could
|
||||
// end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?)
|
||||
// TODO the log api should be plaintext (or at least offer it)
|
||||
// TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness.
|
||||
// TODO if async would store requests (or interchange format) it would be slick, but
|
||||
// if we're going to store full calls in db maybe we should only queue pointers to ids?
|
||||
@@ -152,7 +150,6 @@ func (a *agent) Enqueue(ctx context.Context, call *models.Call) error {
|
||||
}
|
||||
|
||||
func (a *agent) Close() error {
|
||||
|
||||
a.shutonce.Do(func() {
|
||||
close(a.shutdown)
|
||||
})
|
||||
@@ -161,6 +158,60 @@ func (a *agent) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *agent) Submit(callI Call) error {
|
||||
a.wg.Add(1)
|
||||
defer a.wg.Done()
|
||||
|
||||
select {
|
||||
case <-a.shutdown:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
}
|
||||
|
||||
call := callI.(*call)
|
||||
|
||||
ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline)
|
||||
call.req = call.req.WithContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
ctx, finish := statSpans(ctx, call)
|
||||
defer finish()
|
||||
|
||||
err := a.submit(ctx, call)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *agent) submit(ctx context.Context, call *call) error {
|
||||
a.stats.Enqueue(ctx, call.AppName, call.Path)
|
||||
|
||||
slot, err := a.getSlot(ctx, call)
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(ctx, call, err)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
defer slot.Close() // notify our slot is free once we're done
|
||||
|
||||
err = call.Start(ctx)
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(ctx, call, err)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
// decrement queued count, increment running count
|
||||
a.stats.DequeueAndStart(ctx, call.AppName, call.Path)
|
||||
|
||||
// pass this error (nil or otherwise) to end directly, to store status, etc
|
||||
err = slot.exec(ctx, call)
|
||||
a.handleStatsEnd(ctx, call, err)
|
||||
|
||||
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
||||
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
||||
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
|
||||
err = call.End(ctx, err)
|
||||
return transformTimeout(err, false)
|
||||
}
|
||||
|
||||
func transformTimeout(e error, isRetriable bool) error {
|
||||
if e == context.DeadlineExceeded {
|
||||
if isRetriable {
|
||||
@@ -173,100 +224,54 @@ func transformTimeout(e error, isRetriable bool) error {
|
||||
|
||||
// handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start)
|
||||
// cases. Only timeouts can be a simple dequeue while other cases are actual errors.
|
||||
func (a *agent) handleStatsDequeue(ctx context.Context, err error, callI Call) {
|
||||
func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) {
|
||||
if err == context.DeadlineExceeded {
|
||||
a.stats.Dequeue(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.Dequeue(ctx, call.AppName, call.Path)
|
||||
} else {
|
||||
a.stats.DequeueAndFail(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.DequeueAndFail(ctx, call.AppName, call.Path)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *agent) Submit(callI Call) error {
|
||||
a.wg.Add(1)
|
||||
defer a.wg.Done()
|
||||
|
||||
select {
|
||||
case <-a.shutdown:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
// handleStatsEnd handles stats for after a call is ran, depending on error.
|
||||
func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) {
|
||||
if err == nil {
|
||||
// decrement running count, increment completed count
|
||||
a.stats.Complete(ctx, call.AppName, call.Path)
|
||||
} else {
|
||||
// decrement running count, increment failed count
|
||||
a.stats.Failed(ctx, call.AppName, call.Path)
|
||||
}
|
||||
}
|
||||
|
||||
call := callI.(*call)
|
||||
ctx := call.req.Context()
|
||||
|
||||
func statSpans(ctx context.Context, call *call) (ctxr context.Context, finish func()) {
|
||||
// agent_submit_global has no parent span because we don't want it to inherit fn_appname or fn_path
|
||||
span_global := opentracing.StartSpan("agent_submit_global")
|
||||
defer span_global.Finish()
|
||||
spanGlobal := opentracing.StartSpan("agent_submit_global")
|
||||
|
||||
// agent_submit_global has no parent span because we don't want it to inherit fn_path
|
||||
span_app := opentracing.StartSpan("agent_submit_app")
|
||||
span_app.SetBaggageItem("fn_appname", callI.Model().AppName)
|
||||
defer span_app.Finish()
|
||||
spanApp := opentracing.StartSpan("agent_submit_app")
|
||||
spanApp.SetBaggageItem("fn_appname", call.AppName)
|
||||
|
||||
// agent_submit has a parent span in the usual way
|
||||
// it doesn't matter if it inherits fn_appname or fn_path (and we set them here in any case)
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_submit")
|
||||
span.SetBaggageItem("fn_appname", callI.Model().AppName)
|
||||
span.SetBaggageItem("fn_path", callI.Model().Path)
|
||||
defer span.Finish()
|
||||
span.SetBaggageItem("fn_appname", call.AppName)
|
||||
span.SetBaggageItem("fn_path", call.Path)
|
||||
|
||||
// Start the deadline context for Waiting for Slots
|
||||
ctxSlotWait, cancelSlotWait := context.WithDeadline(ctx, call.slotDeadline)
|
||||
call.req = call.req.WithContext(ctxSlotWait)
|
||||
defer cancelSlotWait()
|
||||
|
||||
// increment queued count
|
||||
// this is done after setting "fn_appname" and "fn_path"
|
||||
a.stats.Enqueue(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
|
||||
slot, err := a.getSlot(ctxSlotWait, call) // find ram available / running
|
||||
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(ctx, err, call)
|
||||
return transformTimeout(err, true)
|
||||
return ctx, func() {
|
||||
spanGlobal.Finish()
|
||||
spanApp.Finish()
|
||||
span.Finish()
|
||||
}
|
||||
// TODO if the call times out & container is created, we need
|
||||
// to make this remove the container asynchronously?
|
||||
defer slot.Close() // notify our slot is free once we're done
|
||||
|
||||
err = call.Start(ctxSlotWait)
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(ctx, err, call)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
// Swap deadline contexts for Execution Phase
|
||||
cancelSlotWait()
|
||||
ctxExec, cancelExec := context.WithDeadline(ctx, call.execDeadline)
|
||||
call.req = call.req.WithContext(ctxExec)
|
||||
defer cancelExec()
|
||||
|
||||
// decrement queued count, increment running count
|
||||
a.stats.DequeueAndStart(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
|
||||
err = slot.exec(ctxExec, call)
|
||||
// pass this error (nil or otherwise) to end directly, to store status, etc
|
||||
// End may rewrite the error or elect to return it
|
||||
|
||||
if err == nil {
|
||||
// decrement running count, increment completed count
|
||||
a.stats.Complete(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
} else {
|
||||
// decrement running count, increment failed count
|
||||
a.stats.Failed(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
}
|
||||
|
||||
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
||||
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
||||
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
||||
err = call.End(ctx, err)
|
||||
return transformTimeout(err, false)
|
||||
}
|
||||
|
||||
// getSlot returns a Slot (or error) for the request to run. Depending on hot/cold
|
||||
// request type, this may launch a new container or wait for other containers to become idle
|
||||
// or it may wait for resources to become available to launch a new container.
|
||||
func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
|
||||
// start the deadline context for waiting for slots
|
||||
ctx, cancel := context.WithDeadline(ctx, call.slotDeadline)
|
||||
defer cancel()
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot")
|
||||
defer span.Finish()
|
||||
|
||||
@@ -508,7 +513,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
go func() {
|
||||
// TODO make sure stdin / stdout not blocked if container dies or we leak goroutine
|
||||
// we have to make sure this gets shut down or 2 threads will be reading/writing in/out
|
||||
ci := protocol.NewCallInfo(call.Model(), call.req)
|
||||
ci := protocol.NewCallInfo(call.Call, call.req)
|
||||
errApp <- s.proto.Dispatch(ctx, ci, call.w)
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user