mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* add spans to async * clean up / add spans to agent * there were a few methods which had multiple contexts which existed in the same scope (this doesn't end well, usually), flattened those out. * loop bound context cancels now rely on defer (also was brittle) * runHot had a lot of ctx shuffling, flattened that. * added some additional spans in certain paths for added granularity * linked up the hot launcher / run hot / wait hot to _a_ root span, the first 2 are follows from spans, but at least we can see the source of these and also can see containers launched over a hot launcher's lifetime I left TODO around the FollowsFrom because OpenCensus doesn't, at least at the moment, appear to have any idea of FollowsFrom and it was an extra OpenTracing method (we have to get the span out, start a new span with the option, then add it to the context... some shuffling required). anyway, was on the fence about adding at least. * resource waiters need to manage their own goroutine lifecycle * if we get an impossible memory request, bail instead of infinite loop * handle timeout slippery case * still sucks, but hotLauncher doesn't leak anything. even the time.After timer goroutines * simplify GetResourceToken GetCall can guard against the impossible to allocate resource tasks entering the system by erroring instead of doling them out. this makes GetResourceToken logic more straightforward for callers, who now simply have the contract that they won't ever get a token if they let tasks into the agent that can't run (but GetCall guards this, and there's a test for it). sorry, I was going to make this only do that, but when I went to fix up the tests, my last patch went haywire so I fixed that too. this also at least tries to simplify the hotLaunch loop, which will now no longer leak time.After timers (which were long, and with signaller, they were many -- I got a stack trace :) -- this breaks out the bottom half of the logic to check to see if we need to launch into its own function, and handles the cleaning duties only in the caller instead of in 2 different select statements. played with this a bit, no doubt further cleaning could be done, but this _seems_ better. * fix vet * add units to exported method contract docs * oops
105 lines
3.1 KiB
Go
105 lines
3.1 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/models"
|
|
opentracing "github.com/opentracing/opentracing-go"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func (a *agent) asyncDequeue() {
|
|
defer a.wg.Done() // we can treat this thread like one big task and get safe shutdown fo free
|
|
|
|
// this is just so we can hang up the dequeue request if we get shut down
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// parent span here so that we can see how many async calls are running
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_dequeue")
|
|
defer span.Finish()
|
|
|
|
for {
|
|
select {
|
|
case <-a.shutdown:
|
|
return
|
|
case <-a.resources.WaitAsyncResource(ctx):
|
|
// TODO we _could_ return a token here to reserve the ram so that there's
|
|
// not a race between here and Submit but we're single threaded
|
|
// dequeueing and retries handled gracefully inside of Submit if we run
|
|
// out of RAM so..
|
|
}
|
|
|
|
// we think we can get a cookie now, so go get a cookie
|
|
select {
|
|
case <-a.shutdown:
|
|
return
|
|
case model, ok := <-a.asyncChew(ctx):
|
|
if ok {
|
|
a.wg.Add(1) // need to add 1 in this thread to ensure safe shutdown
|
|
go func(model *models.Call) {
|
|
a.asyncRun(ctx, model)
|
|
a.wg.Done() // can shed it after this is done, Submit will add 1 too but it's fine
|
|
}(model)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call {
|
|
ch := make(chan *models.Call, 1)
|
|
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
defer cancel()
|
|
|
|
call, err := a.da.Dequeue(ctx)
|
|
if call != nil {
|
|
ch <- call
|
|
} else { // call is nil / error
|
|
if err != nil && err != context.DeadlineExceeded {
|
|
logrus.WithError(err).Error("error fetching queued calls")
|
|
}
|
|
// queue may be empty / unavailable
|
|
time.Sleep(1 * time.Second) // backoff a little before sending no cookie message
|
|
close(ch)
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
func (a *agent) asyncRun(ctx context.Context, model *models.Call) {
|
|
// IMPORTANT: get a context that has a child span but NO timeout (Submit imposes timeout)
|
|
// TODO this is a 'FollowsFrom'
|
|
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
|
|
|
|
// additional enclosing context here since this isn't spawned from an http request
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_run")
|
|
defer span.Finish()
|
|
|
|
call, err := a.GetCall(
|
|
FromModel(model),
|
|
WithContext(ctx), // NOTE: order is important
|
|
)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("error getting async call")
|
|
return
|
|
}
|
|
|
|
// TODO if the task is cold and doesn't require reading STDIN, it could
|
|
// run but we may not listen for output since the task timed out. these
|
|
// are at least once semantics, which is really preferable to at most
|
|
// once, so let's do it for now
|
|
|
|
err = a.Submit(call)
|
|
if err != nil {
|
|
// NOTE: these could be errors / timeouts from the call that we're
|
|
// logging here (i.e. not our fault), but it's likely better to log
|
|
// these than suppress them so...
|
|
id := call.Model().ID
|
|
logrus.WithFields(logrus.Fields{"id": id}).WithError(err).Error("error running async call")
|
|
}
|
|
}
|