Merge branch 'fix-logs-reed' into 'master'

Fix logs stuff

See merge request !74
This commit is contained in:
Reed Allman
2017-07-05 10:41:27 -07:00
5 changed files with 309 additions and 67 deletions

View File

@@ -4,72 +4,202 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"github.com/Sirupsen/logrus"
"gitlab-odx.oracle.com/odx/functions/api/models"
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
)
// TODO kind of no reason to have FuncLogger interface... we can just do the thing.
type FuncLogger interface {
Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser
}
// FuncLogger reads STDERR output from a container and outputs it in a parsed structured log format, see: https://github.com/treeder/functions/issues/76
type DefaultFuncLogger struct {
logDB models.FnLog
}
func NewFuncLogger(logDB models.FnLog) FuncLogger {
return &DefaultFuncLogger{logDB}
// TODO we should probably make it somehow configurable to log to stderr and/or db but meh
return &DefaultFuncLogger{
logDB: logDB,
bufPool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }},
logPool: &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }},
}
}
type writer struct {
bytes.Buffer
stderr bytes.Buffer // for logging to stderr
db models.FnLog
ctx context.Context
reqID string
appName string
image string
path string
// DefaultFuncLogger returns a WriteCloser that writes STDERR output from a
// container and outputs it in a parsed structured log format to attached
// STDERR as well as writing the log to the db when Close is called.
type DefaultFuncLogger struct {
logDB models.FnLog
bufPool *sync.Pool // these are usually small, for buffering lines
logPool *sync.Pool // these are usually large, for buffering whole logs
}
func (w *writer) Close() error {
w.flush()
return w.db.InsertLog(context.TODO(), w.reqID, w.String())
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser {
lbuf := l.bufPool.Get().(*bytes.Buffer)
dbuf := l.logPool.Get().(*bytes.Buffer)
close := func() error {
// TODO we may want to toss out buffers that grow to grotesque size but meh they will prob get GC'd
lbuf.Reset()
dbuf.Reset()
l.bufPool.Put(lbuf)
l.logPool.Put(dbuf)
return nil
}
// we don't need to limit the log writer, but we do need it to dispense lines
linew := newLineWriterWithBuffer(lbuf, &logWriter{
ctx: ctx,
appName: appName,
path: path,
image: image,
reqID: reqID,
})
const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ?
// we don't need to log per line to db, but we do need to limit it
limitw := newLimitWriter(MB, &dbWriter{
Buffer: dbuf,
db: l.logDB,
ctx: ctx,
reqID: reqID,
})
// TODO / NOTE: we want linew to be first becauase limitw may error if limit
// is reached but we still want to log. we should probably ignore hitting the
// limit error since we really just want to not write too much to db and
// that's handled as is. put buffers back last to avoid misuse, if there's
// an error they won't get put back and that's really okay too.
return multiWriteCloser{linew, limitw, &fCloser{close}}
}
func (w *writer) Write(b []byte) (int, error) {
n, err := w.Buffer.Write(b)
// implements passthrough Write & arbitrary func close to have a seat at the cool kids lunch table
type fCloser struct {
close func() error
}
// temp or should move to another FuncLogger implementation
w.writeStdErr(b)
func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil }
func (f *fCloser) Close() error { return f.close() }
// multiWriteCloser returns the first write or close that returns a non-nil
// err, if no non-nil err is returned, then the returned bytes written will be
// from the last call to write.
type multiWriteCloser []io.WriteCloser
func (m multiWriteCloser) Write(b []byte) (n int, err error) {
for _, mw := range m {
n, err = mw.Write(b)
if err != nil {
return n, err
}
}
return n, err
}
func (w *writer) writeStdErr(b []byte) {
// for now, also write to stderr so we can debug quick ;)
// TODO this should be a separate FuncLogger but time is running short !
endLine := bytes.IndexByte(b, '\n')
if endLine < 0 {
w.stderr.Write(b)
return
func (m multiWriteCloser) Close() (err error) {
for _, mw := range m {
err = mw.Close()
if err != nil {
return err
}
}
// we have a new line, so:
w.stderr.Write(b[0:endLine])
w.flush()
w.writeStdErr(b[endLine+1:])
return err
}
func (w *writer) flush() {
log := common.Logger(w.ctx)
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": w.appName, "path": w.path, "image": w.image, "call_id": w.reqID})
log.Println(w.stderr.String())
w.stderr.Reset()
// logWriter will log (to real stderr) every call to Write as a line. it should
// be wrapped with a lineWriter so that the output makes sense.
type logWriter struct {
ctx context.Context
appName string
path string
image string
reqID string
}
func (l *logWriter) Write(b []byte) (int, error) {
log := common.Logger(l.ctx)
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID})
log.Println(string(b))
return len(b), nil
}
// lineWriter buffers all calls to Write and will call Write
// on the underlying writer once per new line. Close must
// be called to ensure that the buffer is flushed, and a newline
// will be appended in Close if none is present.
type lineWriter struct {
b *bytes.Buffer
w io.Writer
}
func newLineWriter(w io.Writer) io.WriteCloser {
return &lineWriter{b: new(bytes.Buffer), w: w}
}
func newLineWriterWithBuffer(b *bytes.Buffer, w io.Writer) io.WriteCloser {
return &lineWriter{b: b, w: w}
}
func (li *lineWriter) Write(ogb []byte) (int, error) {
li.b.Write(ogb) // bytes.Buffer is guaranteed, read it!
for {
b := li.b.Bytes()
i := bytes.IndexByte(b, '\n')
if i < 0 {
break // no more newlines in buffer
}
// write in this line and advance buffer past it
l := b[:i+1]
ns, err := li.w.Write(l)
if err != nil {
return ns, err
}
li.b.Next(len(l))
}
// technically we wrote all the bytes, so make things appear normal
return len(ogb), nil
}
func (li *lineWriter) Close() error {
// flush the remaining bytes in the buffer to underlying writer, adding a
// newline if needed
b := li.b.Bytes()
if len(b) == 0 {
return nil
}
if b[len(b)-1] != '\n' {
b = append(b, '\n')
}
_, err := li.w.Write(b)
return err
}
// dbWriter accumulates all calls to Write into an in memory buffer
// and writes them to the database when Close is called, returning
// any error from Close. it should be wrapped in a limitWriter to
// prevent blowing out the buffer and bloating the db.
type dbWriter struct {
*bytes.Buffer
db models.FnLog
ctx context.Context
reqID string
}
func (w *dbWriter) Close() error {
return w.db.InsertLog(w.ctx, w.reqID, w.String())
}
func (w *dbWriter) Write(b []byte) (int, error) {
return w.Buffer.Write(b)
}
// overrides Write, keeps Close
@@ -83,22 +213,18 @@ func newLimitWriter(max int, w io.WriteCloser) io.WriteCloser {
}
func (l *limitWriter) Write(b []byte) (int, error) {
if l.n > l.max {
if l.n >= l.max {
return 0, errors.New("max log size exceeded, truncating log")
}
if l.n+len(b) >= l.max {
// cut off to prevent gigantic line attack
b = b[:l.max-l.n]
}
n, err := l.WriteCloser.Write(b)
l.n += n
if l.n >= l.max {
// write in truncation message to log once
l.WriteCloser.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n")))
}
return n, err
}
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser {
const MB = 1 * 1024 * 1024
return newLimitWriter(MB, &writer{
db: l.logDB,
ctx: ctx,
appName: appName,
path: path,
image: image,
reqID: reqID,
})
}

View File

@@ -0,0 +1,76 @@
package runner
import (
"bytes"
"io"
"testing"
)
type nopCloser struct {
io.Writer
}
func (n nopCloser) Close() error { return nil }
func TestLimitWriter(t *testing.T) {
var b bytes.Buffer
const max = 5
lw := newLimitWriter(max, nopCloser{&b})
lw.Write([]byte("yo"))
if b.Len() != 2 {
t.Fatal("expected 2 bytes in buffer, got:", b.Len())
}
n, _ := lw.Write([]byte("dawg"))
// can't check b.Len() really since the overage message is written in
if n != 3 {
t.Fatalf("limit writer allowed writing over the limit or n was wrong. n: %d", n)
}
n, err := lw.Write([]byte("yodawg"))
if n != 0 || err == nil {
t.Fatalf("limit writer wrote after limit exceeded, n > 0 or err is nil. n: %d err: %v", n, err)
}
// yes should const this. yes i'm wrong. yes you're wrong. no it doesn't matter.
if !bytes.HasPrefix(b.Bytes(), []byte("yodaw\n-----max")) {
t.Fatal("expected buffer to be 'yodawg', got:", b.String())
}
}
func TestLineWriter(t *testing.T) {
var b bytes.Buffer
lw := newLineWriter(&b)
lw.Write([]byte("yo"))
if b.Len() != 0 {
t.Fatal("expected no bytes to be written, got bytes")
}
lw.Write([]byte("\ndawg"))
if b.Len() != 3 {
t.Fatal("expected 3 bytes to be written in, got:", b.Len())
}
lw.Write([]byte("\ndawgy\ndawg"))
if b.Len() != 14 {
t.Fatal("expected 14 bytes to be written in, got:", b.Len())
}
lw.Close()
if b.Len() != 19 {
t.Fatal("expected 19 bytes to be written in, got:", b.Len())
}
if !bytes.HasSuffix(b.Bytes(), []byte("\n")) {
t.Fatal("line writer close is broked, expected new line")
}
}

View File

@@ -172,9 +172,8 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult,
cfg.Memory = 128
}
cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
defer cfg.Stderr.Close() // TODO we should prob log this err but hey
if cfg.Stdout == nil {
// TODO why? async?
cfg.Stdout = cfg.Stderr
}

View File

@@ -8,11 +8,11 @@ import (
"testing"
"time"
"gitlab-odx.oracle.com/odx/functions/api/id"
"gitlab-odx.oracle.com/odx/functions/api/datastore"
"gitlab-odx.oracle.com/odx/functions/api/id"
"gitlab-odx.oracle.com/odx/functions/api/logs"
"gitlab-odx.oracle.com/odx/functions/api/models"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
"gitlab-odx.oracle.com/odx/functions/api/logs"
)
func TestRunnerHello(t *testing.T) {
@@ -28,7 +28,6 @@ func TestRunnerHello(t *testing.T) {
t.Fatalf("Test error during New() - %s", err)
}
for i, test := range []struct {
route *models.Route
payload string
@@ -49,7 +48,7 @@ func TestRunnerHello(t *testing.T) {
Stdin: strings.NewReader(test.payload),
AppName: test.route.AppName,
Stdout: &stdout,
Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID),
Stderr: nopCloser{&stderr},
}
result, err := runner.run(ctx, cfg)
@@ -107,7 +106,7 @@ func TestRunnerError(t *testing.T) {
Ready: make(chan struct{}),
Stdin: strings.NewReader(test.payload),
Stdout: &stdout,
Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID),
Stderr: nopCloser{&stderr},
}
result, err := runner.run(ctx, cfg)

View File

@@ -255,16 +255,46 @@ func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func
}
}
// ghostWriter is a writer who will pass writes to an inner writer
// (that may be changed at will).
type ghostWriter struct {
sync.Mutex
inner io.Writer
}
func (g *ghostWriter) swap(w io.Writer) {
g.Lock()
g.inner = w
g.Unlock()
}
func (g *ghostWriter) Write(b []byte) (int, error) {
// we don't need to serialize writes but swapping g.inner could be a race if unprotected
g.Lock()
w := g.inner
g.Unlock()
return w.Write(b)
}
func (g *ghostWriter) Close() error { return nil }
func (hc *htfn) serve(ctx context.Context) {
lctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cfg := *hc.cfg
logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout})
// TODO go through FuncLogger probably
// if there's no active call, log any errors to stderr (for debugging issues)
bwLog := newLineWriter(&logWriter{ctx: ctx, appName: cfg.AppName, path: cfg.Path, image: cfg.Image, reqID: hc.id})
defer bwLog.Close()
stderr := &ghostWriter{inner: bwLog}
go func() {
for {
select {
case <-lctx.Done():
case <-ctx.Done():
case <-cfg.Ready:
// on first execution, wait before starting idle timeout / stopping wait time clock,
// since docker pull / container create need to happen.
@@ -272,21 +302,28 @@ func (hc *htfn) serve(ctx context.Context) {
}
select {
case <-lctx.Done():
case <-ctx.Done():
return
case <-time.After(cfg.IdleTimeout):
logger.Info("Canceling inactive hot function")
cancel()
case t := <-hc.tasks:
// swap logs to log to the task logger instead of stderr
tlog := hc.rnr.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
stderr.swap(tlog)
start := time.Now()
err := hc.proto.Dispatch(lctx, t)
err := hc.proto.Dispatch(ctx, t)
status := "success"
if err != nil {
status = "error"
logrus.WithField("ctx", lctx).Info("task failed")
logrus.WithField("ctx", ctx).Info("task failed")
}
hc.once()
stderr.swap(bwLog) // swap back out before flush
tlog.Close() // write to db/flush
t.Response <- task.Response{
Result: &runResult{start: start, status: status, error: err},
Err: err,
@@ -299,9 +336,9 @@ func (hc *htfn) serve(ctx context.Context) {
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
cfg.Stdin = hc.containerIn
cfg.Stdout = hc.containerOut
// NOTE: cfg.Stderr is overwritten in rnr.Run()
cfg.Stderr = stderr
result, err := hc.rnr.run(lctx, &cfg)
result, err := hc.rnr.run(ctx, &cfg)
if err != nil {
logger.WithError(err).Error("hot function failure detected")
}
@@ -310,6 +347,11 @@ func (hc *htfn) serve(ctx context.Context) {
// TODO make Default protocol a real thing and get rid of this in favor of Dispatch
func runTaskReq(rnr *Runner, t task.Request) {
// TODO this will not be such a shit storm after the above TODO is TODONE
cfg := t.Config
t.Config.Stderr = rnr.flog.Writer(t.Ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
defer t.Config.Stderr.Close()
result, err := rnr.run(t.Ctx, t.Config)
select {
case t.Response <- task.Response{result, err}: