mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: logging/context improvements for runner status calls (#1345)
* fn: logging/context improvements for runner status calls Avoid blocking calls to runStatusCall() to make sure gRPC context can be cancelled/timedout. This is unlikely an issue, but blocked runStatusCall() while gRPC is cancelled is a hard to follow case mentally. New flow is a bit easier to follow. Log all error cases in Status() gRPC entry point including client side cancellations.
This commit is contained in:
@@ -746,15 +746,10 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
|
||||
|
||||
// Runs a status call using status image with baked in parameters.
|
||||
func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
|
||||
|
||||
// IMPORTANT: We have to use our own context with a set timeout in order
|
||||
// to ignore client timeouts. Original 'ctx' here carries client side deadlines.
|
||||
// Since these deadlines can vary, in order to make sure Status runs predictably we
|
||||
// use a hardcoded preset timeout 'ctxTimeout' instead.
|
||||
// TODO: It would be good to copy original ctx key/value pairs into our new
|
||||
// context for tracking, etc. But go-lang context today does not seem to allow this.
|
||||
execCtx, execCtxCancel := context.WithTimeout(context.Background(), StatusCtxTimeout)
|
||||
defer execCtxCancel()
|
||||
// IMPORTANT: apply an upper bound timeout
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, StatusCtxTimeout)
|
||||
defer cancel()
|
||||
|
||||
result := &runner.RunnerStatus{}
|
||||
log := common.Logger(ctx)
|
||||
@@ -790,7 +785,7 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
|
||||
agentCall, err := pr.a.GetCall(FromModelAndInput(&c, player),
|
||||
WithLogger(common.NoopReadWriteCloser{}),
|
||||
WithWriter(recorder),
|
||||
WithContext(execCtx),
|
||||
WithContext(ctx),
|
||||
)
|
||||
|
||||
if err == nil {
|
||||
@@ -836,87 +831,97 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
|
||||
result.Details = string(body)
|
||||
result.Id = c.ID
|
||||
|
||||
if result.Failed {
|
||||
log.Errorf("Status call failure id=%v result=%+v", c.ID, result)
|
||||
} else {
|
||||
log.Debugf("Status call success id=%v result=%+v", c.ID, result)
|
||||
log.Debugf("Finished status call id=%v result=%+v", c.ID, result)
|
||||
return result
|
||||
}
|
||||
|
||||
func (pr *pureRunner) spawnStatusCall(ctx context.Context) {
|
||||
go func() {
|
||||
var waitChan chan struct{}
|
||||
// IMPORTANT: We have to strip client timeouts to make sure this completes
|
||||
// in the background even if client cancels/times out.
|
||||
cachePtr := pr.runStatusCall(common.BackgroundContext(ctx))
|
||||
now := time.Now()
|
||||
|
||||
// Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow
|
||||
// copy of 'cache'
|
||||
pr.status.lock.Lock()
|
||||
|
||||
pr.status.cache = cachePtr
|
||||
pr.status.expiry = now.Add(StatusCallCacheDuration)
|
||||
waitChan = pr.status.wait // cannot be null
|
||||
pr.status.wait = nil
|
||||
|
||||
pr.status.lock.Unlock()
|
||||
|
||||
// signal waiters
|
||||
close(waitChan)
|
||||
}()
|
||||
}
|
||||
|
||||
func (pr *pureRunner) fetchStatusCall(ctx context.Context) (*runner.RunnerStatus, error) {
|
||||
var cacheObj runner.RunnerStatus
|
||||
|
||||
// A shallow copy is sufficient here, as we do not modify nested data in
|
||||
// RunnerStatus in any way.
|
||||
pr.status.lock.Lock()
|
||||
|
||||
cacheObj = *pr.status.cache // cannot be null
|
||||
pr.status.cache.Cached = true
|
||||
|
||||
pr.status.lock.Unlock()
|
||||
|
||||
// The rest of the RunnerStatus fields are not cached and always populated
|
||||
// with latest metrics.
|
||||
cacheObj.Active = atomic.LoadInt32(&pr.status.inflight)
|
||||
cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
|
||||
cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
|
||||
cacheObj.KdumpsOnDisk = atomic.LoadUint64(&pr.status.kdumpsOnDisk)
|
||||
|
||||
return &cacheObj, ctx.Err()
|
||||
}
|
||||
|
||||
func (pr *pureRunner) checkStatusCall(ctx context.Context) (chan struct{}, bool) {
|
||||
now := time.Now()
|
||||
|
||||
pr.status.lock.Lock()
|
||||
defer pr.status.lock.Unlock()
|
||||
|
||||
// cached?
|
||||
if pr.status.expiry.After(now) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return result
|
||||
// already running?
|
||||
if pr.status.wait != nil {
|
||||
return pr.status.wait, false
|
||||
}
|
||||
|
||||
// spawn a new call
|
||||
pr.status.wait = make(chan struct{})
|
||||
return pr.status.wait, true
|
||||
}
|
||||
|
||||
// Handles a status call concurrency and caching.
|
||||
func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatus, error) {
|
||||
var myChan chan struct{}
|
||||
|
||||
isWaiter := false
|
||||
isCached := false
|
||||
now := time.Now()
|
||||
waitChan, isSpawner := pr.checkStatusCall(ctx)
|
||||
|
||||
pr.status.lock.Lock()
|
||||
|
||||
if now.Before(pr.status.expiry) {
|
||||
// cache is still valid.
|
||||
isCached = true
|
||||
} else if pr.status.wait != nil {
|
||||
// A wait channel is already installed, we must wait
|
||||
isWaiter = true
|
||||
myChan = pr.status.wait
|
||||
} else {
|
||||
// Wait channel is not present, we install a new one.
|
||||
myChan = make(chan struct{})
|
||||
pr.status.wait = myChan
|
||||
// from cache
|
||||
if waitChan == nil {
|
||||
return pr.fetchStatusCall(ctx)
|
||||
}
|
||||
|
||||
pr.status.lock.Unlock()
|
||||
|
||||
// We either need to wait and/or serve the request from cache
|
||||
if isWaiter || isCached {
|
||||
if isWaiter {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-myChan:
|
||||
}
|
||||
}
|
||||
|
||||
var cacheObj runner.RunnerStatus
|
||||
|
||||
// A shallow copy is sufficient here, as we do not modify nested data in
|
||||
// RunnerStatus in any way.
|
||||
pr.status.lock.Lock()
|
||||
|
||||
cacheObj = *pr.status.cache
|
||||
|
||||
pr.status.lock.Unlock()
|
||||
|
||||
cacheObj.Cached = true
|
||||
cacheObj.Active = atomic.LoadInt32(&pr.status.inflight)
|
||||
cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
|
||||
cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
|
||||
return &cacheObj, nil
|
||||
if isSpawner {
|
||||
pr.spawnStatusCall(ctx)
|
||||
}
|
||||
|
||||
cachePtr := pr.runStatusCall(ctx)
|
||||
cachePtr.Active = atomic.LoadInt32(&pr.status.inflight)
|
||||
cachePtr.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
|
||||
cachePtr.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
|
||||
cachePtr.KdumpsOnDisk = atomic.LoadUint64(&pr.status.kdumpsOnDisk)
|
||||
now = time.Now()
|
||||
|
||||
// Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow
|
||||
// copy of 'cache'
|
||||
pr.status.lock.Lock()
|
||||
|
||||
pr.status.cache = cachePtr
|
||||
pr.status.expiry = now.Add(StatusCallCacheDuration)
|
||||
pr.status.wait = nil
|
||||
|
||||
pr.status.lock.Unlock()
|
||||
|
||||
// signal waiters
|
||||
close(myChan)
|
||||
return cachePtr, nil
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-waitChan:
|
||||
return pr.fetchStatusCall(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// implements RunnerProtocolServer
|
||||
@@ -929,7 +934,11 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne
|
||||
RequestsHandled: atomic.LoadUint64(&pr.status.requestsHandled),
|
||||
}, nil
|
||||
}
|
||||
return pr.handleStatusCall(ctx)
|
||||
status, err := pr.handleStatusCall(ctx)
|
||||
if err != nil {
|
||||
common.Logger(ctx).WithError(err).Errorf("Status call failed result=%+v", status)
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
|
||||
// BeforeCall called before a function is executed
|
||||
|
||||
Reference in New Issue
Block a user