diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 9aaa39a11..d5c43d740 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -746,15 +746,10 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro // Runs a status call using status image with baked in parameters. func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus { - - // IMPORTANT: We have to use our own context with a set timeout in order - // to ignore client timeouts. Original 'ctx' here carries client side deadlines. - // Since these deadlines can vary, in order to make sure Status runs predictably we - // use a hardcoded preset timeout 'ctxTimeout' instead. - // TODO: It would be good to copy original ctx key/value pairs into our new - // context for tracking, etc. But go-lang context today does not seem to allow this. - execCtx, execCtxCancel := context.WithTimeout(context.Background(), StatusCtxTimeout) - defer execCtxCancel() + // IMPORTANT: apply an upper bound timeout + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, StatusCtxTimeout) + defer cancel() result := &runner.RunnerStatus{} log := common.Logger(ctx) @@ -790,7 +785,7 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus { agentCall, err := pr.a.GetCall(FromModelAndInput(&c, player), WithLogger(common.NoopReadWriteCloser{}), WithWriter(recorder), - WithContext(execCtx), + WithContext(ctx), ) if err == nil { @@ -836,87 +831,97 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus { result.Details = string(body) result.Id = c.ID - if result.Failed { - log.Errorf("Status call failure id=%v result=%+v", c.ID, result) - } else { - log.Debugf("Status call success id=%v result=%+v", c.ID, result) + log.Debugf("Finished status call id=%v result=%+v", c.ID, result) + return result +} + +func (pr *pureRunner) spawnStatusCall(ctx context.Context) { + go func() { + var waitChan chan struct{} + // IMPORTANT: We have to strip client timeouts to make sure this completes + // in the background even if client cancels/times out. + cachePtr := pr.runStatusCall(common.BackgroundContext(ctx)) + now := time.Now() + + // Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow + // copy of 'cache' + pr.status.lock.Lock() + + pr.status.cache = cachePtr + pr.status.expiry = now.Add(StatusCallCacheDuration) + waitChan = pr.status.wait // cannot be null + pr.status.wait = nil + + pr.status.lock.Unlock() + + // signal waiters + close(waitChan) + }() +} + +func (pr *pureRunner) fetchStatusCall(ctx context.Context) (*runner.RunnerStatus, error) { + var cacheObj runner.RunnerStatus + + // A shallow copy is sufficient here, as we do not modify nested data in + // RunnerStatus in any way. + pr.status.lock.Lock() + + cacheObj = *pr.status.cache // cannot be null + pr.status.cache.Cached = true + + pr.status.lock.Unlock() + + // The rest of the RunnerStatus fields are not cached and always populated + // with latest metrics. + cacheObj.Active = atomic.LoadInt32(&pr.status.inflight) + cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived) + cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled) + cacheObj.KdumpsOnDisk = atomic.LoadUint64(&pr.status.kdumpsOnDisk) + + return &cacheObj, ctx.Err() +} + +func (pr *pureRunner) checkStatusCall(ctx context.Context) (chan struct{}, bool) { + now := time.Now() + + pr.status.lock.Lock() + defer pr.status.lock.Unlock() + + // cached? + if pr.status.expiry.After(now) { + return nil, false } - return result + // already running? + if pr.status.wait != nil { + return pr.status.wait, false + } + + // spawn a new call + pr.status.wait = make(chan struct{}) + return pr.status.wait, true } // Handles a status call concurrency and caching. func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatus, error) { - var myChan chan struct{} - isWaiter := false - isCached := false - now := time.Now() + waitChan, isSpawner := pr.checkStatusCall(ctx) - pr.status.lock.Lock() - - if now.Before(pr.status.expiry) { - // cache is still valid. - isCached = true - } else if pr.status.wait != nil { - // A wait channel is already installed, we must wait - isWaiter = true - myChan = pr.status.wait - } else { - // Wait channel is not present, we install a new one. - myChan = make(chan struct{}) - pr.status.wait = myChan + // from cache + if waitChan == nil { + return pr.fetchStatusCall(ctx) } - pr.status.lock.Unlock() - - // We either need to wait and/or serve the request from cache - if isWaiter || isCached { - if isWaiter { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-myChan: - } - } - - var cacheObj runner.RunnerStatus - - // A shallow copy is sufficient here, as we do not modify nested data in - // RunnerStatus in any way. - pr.status.lock.Lock() - - cacheObj = *pr.status.cache - - pr.status.lock.Unlock() - - cacheObj.Cached = true - cacheObj.Active = atomic.LoadInt32(&pr.status.inflight) - cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived) - cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled) - return &cacheObj, nil + if isSpawner { + pr.spawnStatusCall(ctx) } - cachePtr := pr.runStatusCall(ctx) - cachePtr.Active = atomic.LoadInt32(&pr.status.inflight) - cachePtr.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived) - cachePtr.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled) - cachePtr.KdumpsOnDisk = atomic.LoadUint64(&pr.status.kdumpsOnDisk) - now = time.Now() - - // Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow - // copy of 'cache' - pr.status.lock.Lock() - - pr.status.cache = cachePtr - pr.status.expiry = now.Add(StatusCallCacheDuration) - pr.status.wait = nil - - pr.status.lock.Unlock() - - // signal waiters - close(myChan) - return cachePtr, nil + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-waitChan: + return pr.fetchStatusCall(ctx) + } } // implements RunnerProtocolServer @@ -929,7 +934,11 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne RequestsHandled: atomic.LoadUint64(&pr.status.requestsHandled), }, nil } - return pr.handleStatusCall(ctx) + status, err := pr.handleStatusCall(ctx) + if err != nil { + common.Logger(ctx).WithError(err).Errorf("Status call failed result=%+v", status) + } + return status, err } // BeforeCall called before a function is executed