diff --git a/api/runner/func_logger.go b/api/runner/func_logger.go index 2376df7c4..1c8594784 100644 --- a/api/runner/func_logger.go +++ b/api/runner/func_logger.go @@ -4,72 +4,202 @@ import ( "bytes" "context" "errors" + "fmt" "io" + "sync" "github.com/Sirupsen/logrus" "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/runner/common" ) +// TODO kind of no reason to have FuncLogger interface... we can just do the thing. + type FuncLogger interface { Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser } -// FuncLogger reads STDERR output from a container and outputs it in a parsed structured log format, see: https://github.com/treeder/functions/issues/76 -type DefaultFuncLogger struct { - logDB models.FnLog -} - func NewFuncLogger(logDB models.FnLog) FuncLogger { - return &DefaultFuncLogger{logDB} + // TODO we should probably make it somehow configurable to log to stderr and/or db but meh + return &DefaultFuncLogger{ + logDB: logDB, + bufPool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}, + logPool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}, + } } -type writer struct { - bytes.Buffer - - stderr bytes.Buffer // for logging to stderr - db models.FnLog - ctx context.Context - reqID string - appName string - image string - path string +// DefaultFuncLogger returns a WriteCloser that writes STDERR output from a +// container and outputs it in a parsed structured log format to attached +// STDERR as well as writing the log to the db when Close is called. +type DefaultFuncLogger struct { + logDB models.FnLog + bufPool *sync.Pool // these are usually small, for buffering lines + logPool *sync.Pool // these are usually large, for buffering whole logs } -func (w *writer) Close() error { - w.flush() - return w.db.InsertLog(context.TODO(), w.reqID, w.String()) +func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser { + lbuf := l.bufPool.Get().(*bytes.Buffer) + dbuf := l.logPool.Get().(*bytes.Buffer) + + close := func() error { + // TODO we may want to toss out buffers that grow to grotesque size but meh they will prob get GC'd + lbuf.Reset() + dbuf.Reset() + l.bufPool.Put(lbuf) + l.logPool.Put(dbuf) + return nil + } + + // we don't need to limit the log writer, but we do need it to dispense lines + linew := newLineWriterWithBuffer(lbuf, &logWriter{ + ctx: ctx, + appName: appName, + path: path, + image: image, + reqID: reqID, + }) + + const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ? + + // we don't need to log per line to db, but we do need to limit it + limitw := newLimitWriter(MB, &dbWriter{ + Buffer: dbuf, + db: l.logDB, + ctx: ctx, + reqID: reqID, + }) + + // TODO / NOTE: we want linew to be first becauase limitw may error if limit + // is reached but we still want to log. we should probably ignore hitting the + // limit error since we really just want to not write too much to db and + // that's handled as is. put buffers back last to avoid misuse, if there's + // an error they won't get put back and that's really okay too. + return multiWriteCloser{linew, limitw, &fCloser{close}} } -func (w *writer) Write(b []byte) (int, error) { - n, err := w.Buffer.Write(b) +// implements passthrough Write & arbitrary func close to have a seat at the cool kids lunch table +type fCloser struct { + close func() error +} - // temp or should move to another FuncLogger implementation - w.writeStdErr(b) +func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil } +func (f *fCloser) Close() error { return f.close() } +// multiWriteCloser returns the first write or close that returns a non-nil +// err, if no non-nil err is returned, then the returned bytes written will be +// from the last call to write. +type multiWriteCloser []io.WriteCloser + +func (m multiWriteCloser) Write(b []byte) (n int, err error) { + for _, mw := range m { + n, err = mw.Write(b) + if err != nil { + return n, err + } + } return n, err } -func (w *writer) writeStdErr(b []byte) { - // for now, also write to stderr so we can debug quick ;) - // TODO this should be a separate FuncLogger but time is running short ! - endLine := bytes.IndexByte(b, '\n') - if endLine < 0 { - w.stderr.Write(b) - return +func (m multiWriteCloser) Close() (err error) { + for _, mw := range m { + err = mw.Close() + if err != nil { + return err + } } - // we have a new line, so: - w.stderr.Write(b[0:endLine]) - w.flush() - w.writeStdErr(b[endLine+1:]) - + return err } -func (w *writer) flush() { - log := common.Logger(w.ctx) - log = log.WithFields(logrus.Fields{"user_log": true, "app_name": w.appName, "path": w.path, "image": w.image, "call_id": w.reqID}) - log.Println(w.stderr.String()) - w.stderr.Reset() +// logWriter will log (to real stderr) every call to Write as a line. it should +// be wrapped with a lineWriter so that the output makes sense. +type logWriter struct { + ctx context.Context + appName string + path string + image string + reqID string +} + +func (l *logWriter) Write(b []byte) (int, error) { + log := common.Logger(l.ctx) + log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID}) + log.Println(string(b)) + return len(b), nil +} + +// lineWriter buffers all calls to Write and will call Write +// on the underlying writer once per new line. Close must +// be called to ensure that the buffer is flushed, and a newline +// will be appended in Close if none is present. +type lineWriter struct { + b *bytes.Buffer + w io.Writer +} + +func newLineWriter(w io.Writer) io.WriteCloser { + return &lineWriter{b: new(bytes.Buffer), w: w} +} + +func newLineWriterWithBuffer(b *bytes.Buffer, w io.Writer) io.WriteCloser { + return &lineWriter{b: b, w: w} +} + +func (li *lineWriter) Write(ogb []byte) (int, error) { + li.b.Write(ogb) // bytes.Buffer is guaranteed, read it! + + for { + b := li.b.Bytes() + i := bytes.IndexByte(b, '\n') + if i < 0 { + break // no more newlines in buffer + } + + // write in this line and advance buffer past it + l := b[:i+1] + ns, err := li.w.Write(l) + if err != nil { + return ns, err + } + li.b.Next(len(l)) + } + + // technically we wrote all the bytes, so make things appear normal + return len(ogb), nil +} + +func (li *lineWriter) Close() error { + // flush the remaining bytes in the buffer to underlying writer, adding a + // newline if needed + b := li.b.Bytes() + if len(b) == 0 { + return nil + } + + if b[len(b)-1] != '\n' { + b = append(b, '\n') + } + _, err := li.w.Write(b) + return err +} + +// dbWriter accumulates all calls to Write into an in memory buffer +// and writes them to the database when Close is called, returning +// any error from Close. it should be wrapped in a limitWriter to +// prevent blowing out the buffer and bloating the db. +type dbWriter struct { + *bytes.Buffer + + db models.FnLog + ctx context.Context + reqID string +} + +func (w *dbWriter) Close() error { + return w.db.InsertLog(w.ctx, w.reqID, w.String()) +} + +func (w *dbWriter) Write(b []byte) (int, error) { + return w.Buffer.Write(b) } // overrides Write, keeps Close @@ -83,22 +213,18 @@ func newLimitWriter(max int, w io.WriteCloser) io.WriteCloser { } func (l *limitWriter) Write(b []byte) (int, error) { - if l.n > l.max { + if l.n >= l.max { return 0, errors.New("max log size exceeded, truncating log") } + if l.n+len(b) >= l.max { + // cut off to prevent gigantic line attack + b = b[:l.max-l.n] + } n, err := l.WriteCloser.Write(b) l.n += n + if l.n >= l.max { + // write in truncation message to log once + l.WriteCloser.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n"))) + } return n, err } - -func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser { - const MB = 1 * 1024 * 1024 - return newLimitWriter(MB, &writer{ - db: l.logDB, - ctx: ctx, - appName: appName, - path: path, - image: image, - reqID: reqID, - }) -} diff --git a/api/runner/func_logger_test.go b/api/runner/func_logger_test.go new file mode 100644 index 000000000..ed9c8858b --- /dev/null +++ b/api/runner/func_logger_test.go @@ -0,0 +1,76 @@ +package runner + +import ( + "bytes" + "io" + "testing" +) + +type nopCloser struct { + io.Writer +} + +func (n nopCloser) Close() error { return nil } + +func TestLimitWriter(t *testing.T) { + var b bytes.Buffer + const max = 5 + lw := newLimitWriter(max, nopCloser{&b}) + + lw.Write([]byte("yo")) + + if b.Len() != 2 { + t.Fatal("expected 2 bytes in buffer, got:", b.Len()) + } + + n, _ := lw.Write([]byte("dawg")) + + // can't check b.Len() really since the overage message is written in + if n != 3 { + t.Fatalf("limit writer allowed writing over the limit or n was wrong. n: %d", n) + } + + n, err := lw.Write([]byte("yodawg")) + + if n != 0 || err == nil { + t.Fatalf("limit writer wrote after limit exceeded, n > 0 or err is nil. n: %d err: %v", n, err) + } + + // yes should const this. yes i'm wrong. yes you're wrong. no it doesn't matter. + if !bytes.HasPrefix(b.Bytes(), []byte("yodaw\n-----max")) { + t.Fatal("expected buffer to be 'yodawg', got:", b.String()) + } +} + +func TestLineWriter(t *testing.T) { + var b bytes.Buffer + lw := newLineWriter(&b) + + lw.Write([]byte("yo")) + + if b.Len() != 0 { + t.Fatal("expected no bytes to be written, got bytes") + } + + lw.Write([]byte("\ndawg")) + + if b.Len() != 3 { + t.Fatal("expected 3 bytes to be written in, got:", b.Len()) + } + + lw.Write([]byte("\ndawgy\ndawg")) + + if b.Len() != 14 { + t.Fatal("expected 14 bytes to be written in, got:", b.Len()) + } + + lw.Close() + + if b.Len() != 19 { + t.Fatal("expected 19 bytes to be written in, got:", b.Len()) + } + + if !bytes.HasSuffix(b.Bytes(), []byte("\n")) { + t.Fatal("line writer close is broked, expected new line") + } +} diff --git a/api/runner/runner.go b/api/runner/runner.go index fad4ef251..228158ac6 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -172,9 +172,8 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, cfg.Memory = 128 } - cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) - defer cfg.Stderr.Close() // TODO we should prob log this err but hey if cfg.Stdout == nil { + // TODO why? async? cfg.Stdout = cfg.Stderr } diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 9f37c8def..7f69fda1c 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" - "gitlab-odx.oracle.com/odx/functions/api/id" "gitlab-odx.oracle.com/odx/functions/api/datastore" + "gitlab-odx.oracle.com/odx/functions/api/id" + "gitlab-odx.oracle.com/odx/functions/api/logs" "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/runner/task" - "gitlab-odx.oracle.com/odx/functions/api/logs" ) func TestRunnerHello(t *testing.T) { @@ -28,7 +28,6 @@ func TestRunnerHello(t *testing.T) { t.Fatalf("Test error during New() - %s", err) } - for i, test := range []struct { route *models.Route payload string @@ -49,7 +48,7 @@ func TestRunnerHello(t *testing.T) { Stdin: strings.NewReader(test.payload), AppName: test.route.AppName, Stdout: &stdout, - Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID), + Stderr: nopCloser{&stderr}, } result, err := runner.run(ctx, cfg) @@ -107,7 +106,7 @@ func TestRunnerError(t *testing.T) { Ready: make(chan struct{}), Stdin: strings.NewReader(test.payload), Stdout: &stdout, - Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID), + Stderr: nopCloser{&stderr}, } result, err := runner.run(ctx, cfg) diff --git a/api/runner/worker.go b/api/runner/worker.go index 7c7e0c1e3..c0be3f740 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -255,16 +255,46 @@ func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func } } +// ghostWriter is a writer who will pass writes to an inner writer +// (that may be changed at will). +type ghostWriter struct { + sync.Mutex + inner io.Writer +} + +func (g *ghostWriter) swap(w io.Writer) { + g.Lock() + g.inner = w + g.Unlock() +} + +func (g *ghostWriter) Write(b []byte) (int, error) { + // we don't need to serialize writes but swapping g.inner could be a race if unprotected + g.Lock() + w := g.inner + g.Unlock() + return w.Write(b) +} + +func (g *ghostWriter) Close() error { return nil } + func (hc *htfn) serve(ctx context.Context) { - lctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() cfg := *hc.cfg logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout}) + // TODO go through FuncLogger probably + // if there's no active call, log any errors to stderr (for debugging issues) + bwLog := newLineWriter(&logWriter{ctx: ctx, appName: cfg.AppName, path: cfg.Path, image: cfg.Image, reqID: hc.id}) + defer bwLog.Close() + + stderr := &ghostWriter{inner: bwLog} + go func() { for { select { - case <-lctx.Done(): + case <-ctx.Done(): case <-cfg.Ready: // on first execution, wait before starting idle timeout / stopping wait time clock, // since docker pull / container create need to happen. @@ -272,21 +302,28 @@ func (hc *htfn) serve(ctx context.Context) { } select { - case <-lctx.Done(): + case <-ctx.Done(): return case <-time.After(cfg.IdleTimeout): logger.Info("Canceling inactive hot function") cancel() case t := <-hc.tasks: + // swap logs to log to the task logger instead of stderr + tlog := hc.rnr.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) + stderr.swap(tlog) + start := time.Now() - err := hc.proto.Dispatch(lctx, t) + err := hc.proto.Dispatch(ctx, t) status := "success" if err != nil { status = "error" - logrus.WithField("ctx", lctx).Info("task failed") + logrus.WithField("ctx", ctx).Info("task failed") } hc.once() + stderr.swap(bwLog) // swap back out before flush + tlog.Close() // write to db/flush + t.Response <- task.Response{ Result: &runResult{start: start, status: status, error: err}, Err: err, @@ -299,9 +336,9 @@ func (hc *htfn) serve(ctx context.Context) { cfg.Timeout = 0 // add a timeout to simulate ab.end. failure. cfg.Stdin = hc.containerIn cfg.Stdout = hc.containerOut - // NOTE: cfg.Stderr is overwritten in rnr.Run() + cfg.Stderr = stderr - result, err := hc.rnr.run(lctx, &cfg) + result, err := hc.rnr.run(ctx, &cfg) if err != nil { logger.WithError(err).Error("hot function failure detected") } @@ -310,6 +347,11 @@ func (hc *htfn) serve(ctx context.Context) { // TODO make Default protocol a real thing and get rid of this in favor of Dispatch func runTaskReq(rnr *Runner, t task.Request) { + // TODO this will not be such a shit storm after the above TODO is TODONE + cfg := t.Config + t.Config.Stderr = rnr.flog.Writer(t.Ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) + defer t.Config.Stderr.Close() + result, err := rnr.run(t.Ctx, t.Config) select { case t.Response <- task.Response{result, err}: