diff --git a/api/agent/agent.go b/api/agent/agent.go index 3c65874f4..fde19a6b5 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -24,7 +24,6 @@ import ( // TODO async calls need to add route.Headers as well // TODO need to shut off reads/writes in dispatch to the pipes when call times out so that // 2 calls don't have the same container's pipes... -// TODO add spans back around container launching for hot (follows from?) + other more granular spans // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) // TODO storing logs / call can push call over the timeout @@ -251,7 +250,7 @@ func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) { } } -func statSpans(ctx context.Context, call *call) (ctxr context.Context, finish func()) { +func statSpans(ctx context.Context, call *call) (_ context.Context, finish func()) { // agent_submit_global has no parent span because we don't want it to inherit fn_appname or fn_path spanGlobal := opentracing.StartSpan("agent_submit_global") @@ -305,81 +304,77 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { // hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot // containers if needed. Upon shutdown or activity timeout, hotLauncher exits and during exit, // it destroys the slot queue. -func (a *agent) hotLauncher(ctx context.Context, callObj *call) { - +func (a *agent) hotLauncher(ctx context.Context, call *call) { // Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick // whichever is longer. If in this time, there's no activity, then // we destroy the hot queue. timeout := time.Duration(60) * time.Minute - idleTimeout := time.Duration(callObj.IdleTimeout) * time.Second * 2 + idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2 if timeout < idleTimeout { timeout = idleTimeout } logger := common.Logger(ctx) logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting") - isAsync := callObj.Type == models.TypeAsync + + // IMPORTANT: get a context that has a child span / logger but NO timeout + // TODO this is a 'FollowsFrom' + ctx = opentracing.ContextWithSpan(common.WithLogger(context.Background(), logger), opentracing.SpanFromContext(ctx)) + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_launcher") + defer span.Finish() for { + ctx, cancel := context.WithTimeout(ctx, timeout) + a.checkLaunch(ctx, call) + select { case <-a.shutdown: // server shutdown + cancel() return - case <-time.After(timeout): - if a.slotMgr.deleteSlotQueue(callObj.slots) { + case <-ctx.Done(): // timed out + cancel() + if a.slotMgr.deleteSlotQueue(call.slots) { logger.Info("Hot function launcher timed out") return } - case <-callObj.slots.signaller: + case <-call.slots.signaller: + cancel() } + } +} - curStats := callObj.slots.getStats() - isNeeded := isNewContainerNeeded(&curStats) - logger.WithFields(logrus.Fields{ - "currentStats": curStats, - "isNeeded": isNeeded, - }).Debug("Hot function launcher stats") - if !isNeeded { - continue - } +func (a *agent) checkLaunch(ctx context.Context, call *call) { + curStats := call.slots.getStats() + isAsync := call.Type == models.TypeAsync + isNeeded := isNewContainerNeeded(&curStats) + common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Debug("Hot function launcher stats") + if !isNeeded { + return + } + common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") - ctxResource, cancelResource := context.WithCancel(context.Background()) - logger.WithFields(logrus.Fields{ - "currentStats": curStats, - "isNeeded": isNeeded, - }).Info("Hot function launcher starting hot container") - - select { - case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, callObj.Memory, uint64(callObj.CPUs), isAsync): - cancelResource() - if isOpen { - a.wg.Add(1) - go func(ctx context.Context, call *call, tok ResourceToken) { - a.runHot(ctx, call, tok) - a.wg.Done() - }(ctx, callObj, tok) - } else { - // this means the resource was impossible to reserve (eg. memory size we can never satisfy) - callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy}) - } - case <-time.After(timeout): - cancelResource() - if a.slotMgr.deleteSlotQueue(callObj.slots) { - logger.Info("Hot function launcher timed out") - return - } - case <-a.shutdown: // server shutdown - cancelResource() - return - } + select { + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): + a.wg.Add(1) // add waiter in this thread + go func() { + // NOTE: runHot will not inherit the timeout from ctx (ignore timings) + a.runHot(ctx, call, tok) + a.wg.Done() + }() + case <-ctx.Done(): // timeout + case <-a.shutdown: // server shutdown } } // waitHot pings and waits for a hot container from the slot queue func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_wait_hot") + defer span.Finish() - ctxDequeuer, cancelDequeuer := context.WithCancel(ctx) - defer cancelDequeuer() - ch := call.slots.startDequeuer(ctxDequeuer) + ctx, cancel := context.WithCancel(ctx) + defer cancel() // shut down dequeuer if we grab a slot + + ch := call.slots.startDequeuer(ctx) // 1) if we can get a slot immediately, grab it. // 2) if we don't, send a signaller every 200ms until we do. @@ -417,24 +412,19 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { // launchCold waits for necessary resources to launch a new container, then // returns the slot for that new container to run the request on. func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { - isAsync := call.Type == models.TypeAsync ch := make(chan Slot) - ctxResource, cancelResource := context.WithCancel(ctx) - defer cancelResource() + + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_launch_cold") + defer span.Finish() select { - case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, call.Memory, uint64(call.CPUs), isAsync): - if !isOpen { - return nil, models.ErrCallTimeoutServerBusy - } + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): go a.prepCold(ctx, call, tok, ch) case <-ctx.Done(): return nil, ctx.Err() } - cancelResource() - // wait for launch err or a slot to open up select { case s := <-ch: @@ -518,9 +508,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") start := time.Now() - defer func() { - call.slots.recordLatency(SlotQueueRunner, uint64(time.Now().Sub(start).Seconds()*1000)) - }() + defer func() { call.slots.recordLatency(SlotQueueRunner, uint64(time.Now().Sub(start).Seconds()*1000)) }() // swap in the new stderr logger & stat accumulator oldStderr := s.container.swap(call.stderr, &call.Stats) @@ -551,6 +539,9 @@ func specialHeader(k string) bool { } func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) { + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold") + defer span.Finish() + // add additional headers to the config to shove everything into env vars for cold for k, v := range call.Headers { if !specialHeader(k) { @@ -585,14 +576,13 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch } } -func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { - // We must be careful to only use ctxArg for logs/spans - - // create a span from ctxArg but ignore the new Context - // instead we will create a new Context below and explicitly set its span - span, _ := opentracing.StartSpanFromContext(ctxArg, "docker_run_hot") +func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { + // IMPORTANT: get a context that has a child span / logger but NO timeout + // TODO this is a 'FollowsFrom' + ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_run_hot") defer span.Finish() - defer tok.Close() + defer tok.Close() // IMPORTANT: this MUST get called // TODO we have to make sure we flush these pipes or we will deadlock stdinRead, stdinWrite := io.Pipe() @@ -600,17 +590,6 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead) - // we don't want to timeout in here. this is inside of a goroutine and the - // caller can timeout this Call appropriately. e.g. w/ hot if it takes 20 - // minutes to pull, then timing out calls for 20 minutes and eventually - // having the image is ideal vs. never getting the image pulled. - // TODO this ctx needs to inherit logger, etc - ctx, shutdownContainer := context.WithCancel(context.Background()) - defer shutdownContainer() // close this if our waiter returns - - // add the span we created above to the new Context - ctx = opentracing.ContextWithSpan(ctx, span) - start := time.Now() call.slots.enterState(SlotQueueStarter) @@ -659,13 +638,16 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { // buffered, in case someone has slot when waiter returns but isn't yet listening errC := make(chan error, 1) + ctx, shutdownContainer := context.WithCancel(ctx) + defer shutdownContainer() // close this if our waiter returns, to call off slots go func() { + defer shutdownContainer() // also close if we get an agent shutdown / idle timeout + for { select { // make sure everything is up before trying to send slot case <-ctx.Done(): // container shutdown return case <-a.shutdown: // server shutdown - shutdownContainer() return default: // ok } @@ -682,7 +664,6 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { if call.slots.ejectSlot(s) { call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) logger.Info("Canceling inactive hot function") - shutdownContainer() return } case <-ctx.Done(): // container shutdown @@ -693,7 +674,6 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { case <-a.shutdown: // server shutdown if call.slots.ejectSlot(s) { call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) - shutdownContainer() return } } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 6366ac140..46220b860 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/http/httptest" "strconv" @@ -495,3 +496,27 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { no Content-Length is set. also make sure the body makes it (and the image hasn't changed)); GLHF, got:`, resp.R.Body) } } + +func TestGetCallReturnsResourceImpossibility(t *testing.T) { + call := &models.Call{ + AppName: "yo", + Path: "/yoyo", + Image: "fnproject/fn-test-utils", + Type: "sync", + Format: "http", + Timeout: 1, + IdleTimeout: 2, + Memory: math.MaxUint64, + } + + // FromModel doesn't need a datastore, for now... + ds := datastore.NewMockInit(nil, nil, nil) + + a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + defer a.Close() + + _, err := a.GetCall(FromModel(call)) + if err != models.ErrCallTimeoutServerBusy { + t.Fatal("did not get expected err, got: ", err) + } +} diff --git a/api/agent/async.go b/api/agent/async.go index f31e19685..e76ad8688 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -5,6 +5,7 @@ import ( "time" "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -15,14 +16,15 @@ func (a *agent) asyncDequeue() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // parent span here so that we can see how many async calls are running + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_dequeue") + defer span.Finish() + for { - ctxResource, cancelResource := context.WithCancel(context.Background()) select { case <-a.shutdown: - cancelResource() return - case <-a.resources.WaitAsyncResource(ctxResource): - cancelResource() + case <-a.resources.WaitAsyncResource(ctx): // TODO we _could_ return a token here to reserve the ram so that there's // not a race between here and Submit but we're single threaded // dequeueing and retries handled gracefully inside of Submit if we run @@ -37,7 +39,7 @@ func (a *agent) asyncDequeue() { if ok { a.wg.Add(1) // need to add 1 in this thread to ensure safe shutdown go func(model *models.Call) { - a.asyncRun(model) + a.asyncRun(ctx, model) a.wg.Done() // can shed it after this is done, Submit will add 1 too but it's fine }(model) } @@ -68,9 +70,19 @@ func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call { return ch } -func (a *agent) asyncRun(model *models.Call) { - // TODO output / logger should be here too... - call, err := a.GetCall(FromModel(model)) +func (a *agent) asyncRun(ctx context.Context, model *models.Call) { + // IMPORTANT: get a context that has a child span but NO timeout (Submit imposes timeout) + // TODO this is a 'FollowsFrom' + ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) + + // additional enclosing context here since this isn't spawned from an http request + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_run") + defer span.Finish() + + call, err := a.GetCall( + FromModel(model), + WithContext(ctx), // NOTE: order is important + ) if err != nil { logrus.WithError(err).Error("error getting async call") return diff --git a/api/agent/call.go b/api/agent/call.go index b8a2b40f2..5bbbe43cd 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -180,6 +180,13 @@ func WithWriter(w io.Writer) CallOpt { } } +func WithContext(ctx context.Context) CallOpt { + return func(a *agent, c *call) error { + c.req = c.req.WithContext(ctx) + return nil + } +} + // GetCall builds a Call that can be used to submit jobs to the agent. // // TODO where to put this? async and sync both call this @@ -198,6 +205,11 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } + if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { + // if we're not going to be able to run this call on this machine, bail here. + return nil, models.ErrCallTimeoutServerBusy + } + c.da = a.da c.ct = a diff --git a/api/agent/resource.go b/api/agent/resource.go index 62fbb8874..a22319c7e 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -13,6 +13,7 @@ import ( "strings" "sync" + opentracing "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -24,9 +25,21 @@ const ( // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { + // WaitAsyncResource returns a channel that will send once when there seem to be sufficient + // resource levels to run an async task, it is up to the implementer to create policy here. WaitAsyncResource(ctx context.Context) chan struct{} - // returns a closed channel if the resource can never me met. - GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken + + // GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled, + // the channel will never receive anything. If it is not possible to fulfill this resource, the channel + // will never receive anything (use IsResourcePossible). If a resource token is available for the provided + // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. + // Memory is expected to be provided in MB units. + GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync bool) <-chan ResourceToken + + // IsResourcePossible returns whether it's possible to fulfill the requested resources on this + // machine. It must be called before GetResourceToken or GetResourceToken may hang. + // Memory is expected to be provided in MB units. + IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool } type resourceTracker struct { @@ -100,7 +113,9 @@ func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint } // is this request possible to meet? If no, fail quick -func (a *resourceTracker) isResourcePossible(memory uint64, cpuQuota uint64, isAsync bool) bool { +func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isAsync bool) bool { + memory = memory * Mem1MB + if isAsync { return memory <= a.ramAsyncTotal && cpuQuota <= a.cpuAsyncTotal } else { @@ -111,17 +126,20 @@ func (a *resourceTracker) isResourcePossible(memory uint64, cpuQuota uint64, isA // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { - - memory = memory * Mem1MB + ch := make(chan ResourceToken) + if !a.IsResourcePossible(memory, cpuQuota, isAsync) { + // return the channel, but never send anything. + return ch + } c := a.cond isWaiting := false - ch := make(chan ResourceToken) - if !a.isResourcePossible(memory, cpuQuota, isAsync) { - close(ch) - return ch - } + memory = memory * Mem1MB + + // if we find a resource token, shut down the thread waiting on ctx finish. + // alternatively, if the ctx is done, wake up the cond loop. + ctx, cancel := context.WithCancel(ctx) go func() { <-ctx.Done() @@ -132,7 +150,10 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c c.L.Unlock() }() + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_resource_token") go func() { + defer span.Finish() + defer cancel() c.L.Lock() isWaiting = true @@ -202,6 +223,10 @@ func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} { isWaiting := false c := a.cond + // if we find a resource token, shut down the thread waiting on ctx finish. + // alternatively, if the ctx is done, wake up the cond loop. + ctx, cancel := context.WithCancel(ctx) + go func() { <-ctx.Done() c.L.Lock() @@ -211,7 +236,10 @@ func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} { c.L.Unlock() }() + span, ctx := opentracing.StartSpanFromContext(ctx, "agent_wait_async_resource") go func() { + defer span.Finish() + defer cancel() c.L.Lock() isWaiting = true for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index aa61c6fba..17a4cf44c 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -238,8 +238,9 @@ func TestResourceGetCombo(t *testing.T) { // impossible request ctx, cancel := context.WithCancel(context.Background()) ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) - if !isClosed(ch) { - t.Fatalf("impossible request should return closed channel") + _, err := fetchToken(ch) + if err == nil { + t.Fatalf("impossible request should never return (error here)") } cancel()