mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
make hot functions actually have logs now
This commit is contained in:
@@ -172,9 +172,8 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult,
|
|||||||
cfg.Memory = 128
|
cfg.Memory = 128
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
|
|
||||||
defer cfg.Stderr.Close() // TODO we should prob log this err but hey
|
|
||||||
if cfg.Stdout == nil {
|
if cfg.Stdout == nil {
|
||||||
|
// TODO why? async?
|
||||||
cfg.Stdout = cfg.Stderr
|
cfg.Stdout = cfg.Stderr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -255,16 +255,46 @@ func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ghostWriter is a writer who will pass writes to an inner writer
|
||||||
|
// (that may be changed at will).
|
||||||
|
type ghostWriter struct {
|
||||||
|
sync.Mutex
|
||||||
|
inner io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *ghostWriter) swap(w io.Writer) {
|
||||||
|
g.Lock()
|
||||||
|
g.inner = w
|
||||||
|
g.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *ghostWriter) Write(b []byte) (int, error) {
|
||||||
|
// we don't need to serialize writes but swapping g.inner could be a race if unprotected
|
||||||
|
g.Lock()
|
||||||
|
w := g.inner
|
||||||
|
g.Unlock()
|
||||||
|
return w.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *ghostWriter) Close() error { return nil }
|
||||||
|
|
||||||
func (hc *htfn) serve(ctx context.Context) {
|
func (hc *htfn) serve(ctx context.Context) {
|
||||||
lctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
cfg := *hc.cfg
|
cfg := *hc.cfg
|
||||||
logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout})
|
logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout})
|
||||||
|
|
||||||
|
// TODO go through FuncLogger probably
|
||||||
|
// if there's no active call, log any errors to stderr (for debugging issues)
|
||||||
|
bwLog := newLineWriter(&logWriter{ctx: ctx, appName: cfg.AppName, path: cfg.Path, image: cfg.Image, reqID: hc.id})
|
||||||
|
defer bwLog.Close()
|
||||||
|
|
||||||
|
stderr := &ghostWriter{inner: bwLog}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-lctx.Done():
|
case <-ctx.Done():
|
||||||
case <-cfg.Ready:
|
case <-cfg.Ready:
|
||||||
// on first execution, wait before starting idle timeout / stopping wait time clock,
|
// on first execution, wait before starting idle timeout / stopping wait time clock,
|
||||||
// since docker pull / container create need to happen.
|
// since docker pull / container create need to happen.
|
||||||
@@ -272,21 +302,28 @@ func (hc *htfn) serve(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-lctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(cfg.IdleTimeout):
|
case <-time.After(cfg.IdleTimeout):
|
||||||
logger.Info("Canceling inactive hot function")
|
logger.Info("Canceling inactive hot function")
|
||||||
cancel()
|
cancel()
|
||||||
case t := <-hc.tasks:
|
case t := <-hc.tasks:
|
||||||
|
// swap logs to log to the task logger instead of stderr
|
||||||
|
tlog := hc.rnr.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
|
||||||
|
stderr.swap(tlog)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := hc.proto.Dispatch(lctx, t)
|
err := hc.proto.Dispatch(ctx, t)
|
||||||
status := "success"
|
status := "success"
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status = "error"
|
status = "error"
|
||||||
logrus.WithField("ctx", lctx).Info("task failed")
|
logrus.WithField("ctx", ctx).Info("task failed")
|
||||||
}
|
}
|
||||||
hc.once()
|
hc.once()
|
||||||
|
|
||||||
|
stderr.swap(bwLog) // swap back out before flush
|
||||||
|
tlog.Close() // write to db/flush
|
||||||
|
|
||||||
t.Response <- task.Response{
|
t.Response <- task.Response{
|
||||||
Result: &runResult{start: start, status: status, error: err},
|
Result: &runResult{start: start, status: status, error: err},
|
||||||
Err: err,
|
Err: err,
|
||||||
@@ -299,9 +336,9 @@ func (hc *htfn) serve(ctx context.Context) {
|
|||||||
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
|
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
|
||||||
cfg.Stdin = hc.containerIn
|
cfg.Stdin = hc.containerIn
|
||||||
cfg.Stdout = hc.containerOut
|
cfg.Stdout = hc.containerOut
|
||||||
// NOTE: cfg.Stderr is overwritten in rnr.Run()
|
cfg.Stderr = stderr
|
||||||
|
|
||||||
result, err := hc.rnr.run(lctx, &cfg)
|
result, err := hc.rnr.run(ctx, &cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Error("hot function failure detected")
|
logger.WithError(err).Error("hot function failure detected")
|
||||||
}
|
}
|
||||||
@@ -310,6 +347,11 @@ func (hc *htfn) serve(ctx context.Context) {
|
|||||||
|
|
||||||
// TODO make Default protocol a real thing and get rid of this in favor of Dispatch
|
// TODO make Default protocol a real thing and get rid of this in favor of Dispatch
|
||||||
func runTaskReq(rnr *Runner, t task.Request) {
|
func runTaskReq(rnr *Runner, t task.Request) {
|
||||||
|
// TODO this will not be such a shit storm after the above TODO is TODONE
|
||||||
|
cfg := t.Config
|
||||||
|
t.Config.Stderr = rnr.flog.Writer(t.Ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
|
||||||
|
defer t.Config.Stderr.Close()
|
||||||
|
|
||||||
result, err := rnr.run(t.Ctx, t.Config)
|
result, err := rnr.run(t.Ctx, t.Config)
|
||||||
select {
|
select {
|
||||||
case t.Response <- task.Response{result, err}:
|
case t.Response <- task.Response{result, err}:
|
||||||
|
|||||||
Reference in New Issue
Block a user