diff --git a/api/runner/runner.go b/api/runner/runner.go index fad4ef251..228158ac6 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -172,9 +172,8 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, cfg.Memory = 128 } - cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) - defer cfg.Stderr.Close() // TODO we should prob log this err but hey if cfg.Stdout == nil { + // TODO why? async? cfg.Stdout = cfg.Stderr } diff --git a/api/runner/worker.go b/api/runner/worker.go index 7c7e0c1e3..c0be3f740 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -255,16 +255,46 @@ func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func } } +// ghostWriter is a writer who will pass writes to an inner writer +// (that may be changed at will). +type ghostWriter struct { + sync.Mutex + inner io.Writer +} + +func (g *ghostWriter) swap(w io.Writer) { + g.Lock() + g.inner = w + g.Unlock() +} + +func (g *ghostWriter) Write(b []byte) (int, error) { + // we don't need to serialize writes but swapping g.inner could be a race if unprotected + g.Lock() + w := g.inner + g.Unlock() + return w.Write(b) +} + +func (g *ghostWriter) Close() error { return nil } + func (hc *htfn) serve(ctx context.Context) { - lctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() cfg := *hc.cfg logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout}) + // TODO go through FuncLogger probably + // if there's no active call, log any errors to stderr (for debugging issues) + bwLog := newLineWriter(&logWriter{ctx: ctx, appName: cfg.AppName, path: cfg.Path, image: cfg.Image, reqID: hc.id}) + defer bwLog.Close() + + stderr := &ghostWriter{inner: bwLog} + go func() { for { select { - case <-lctx.Done(): + case <-ctx.Done(): case <-cfg.Ready: // on first execution, wait before starting idle timeout / stopping wait time clock, // since docker pull / container create need to happen. @@ -272,21 +302,28 @@ func (hc *htfn) serve(ctx context.Context) { } select { - case <-lctx.Done(): + case <-ctx.Done(): return case <-time.After(cfg.IdleTimeout): logger.Info("Canceling inactive hot function") cancel() case t := <-hc.tasks: + // swap logs to log to the task logger instead of stderr + tlog := hc.rnr.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) + stderr.swap(tlog) + start := time.Now() - err := hc.proto.Dispatch(lctx, t) + err := hc.proto.Dispatch(ctx, t) status := "success" if err != nil { status = "error" - logrus.WithField("ctx", lctx).Info("task failed") + logrus.WithField("ctx", ctx).Info("task failed") } hc.once() + stderr.swap(bwLog) // swap back out before flush + tlog.Close() // write to db/flush + t.Response <- task.Response{ Result: &runResult{start: start, status: status, error: err}, Err: err, @@ -299,9 +336,9 @@ func (hc *htfn) serve(ctx context.Context) { cfg.Timeout = 0 // add a timeout to simulate ab.end. failure. cfg.Stdin = hc.containerIn cfg.Stdout = hc.containerOut - // NOTE: cfg.Stderr is overwritten in rnr.Run() + cfg.Stderr = stderr - result, err := hc.rnr.run(lctx, &cfg) + result, err := hc.rnr.run(ctx, &cfg) if err != nil { logger.WithError(err).Error("hot function failure detected") } @@ -310,6 +347,11 @@ func (hc *htfn) serve(ctx context.Context) { // TODO make Default protocol a real thing and get rid of this in favor of Dispatch func runTaskReq(rnr *Runner, t task.Request) { + // TODO this will not be such a shit storm after the above TODO is TODONE + cfg := t.Config + t.Config.Stderr = rnr.flog.Writer(t.Ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) + defer t.Config.Stderr.Close() + result, err := rnr.run(t.Ctx, t.Config) select { case t.Response <- task.Response{result, err}: