diff --git a/api/agent/agent.go b/api/agent/agent.go index a525219f6..52caf1648 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -32,9 +32,7 @@ import ( // TODO herd launch prevention part deux // TODO storing logs / call can push call over the timeout // TODO all Datastore methods need to take unit of tenancy (app or route) at least (e.g. not just call id) -// TODO limit the request body length when making calls // TODO discuss concrete policy for hot launch or timeout / timeout vs time left -// TODO call env need to be map[string][]string to match headers behavior... // TODO it may be nice to have an interchange type for Dispatch that can have // all the info we need to build e.g. http req, grpc req, json, etc. so that // we can easily do e.g. http->grpc, grpc->http, http->json. ofc grpc<->http is @@ -50,7 +48,6 @@ import ( // end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?) // TODO the call api should fill in all the fields // TODO the log api should be plaintext (or at least offer it) -// TODO func logger needs to be hanged, dragged and quartered. in reverse order. // TODO we should probably differentiate ran-but-timeout vs timeout-before-run // TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness. // TODO if async would store requests (or interchange format) it would be slick, but @@ -428,7 +425,6 @@ type slot interface { type coldSlot struct { cookie drivers.Cookie tok Token - stderr io.Closer } func (s *coldSlot) exec(ctx context.Context, call *call) error { @@ -459,7 +455,6 @@ func (s *coldSlot) Close() error { s.cookie.Close(context.Background()) // ensure container removal, separate ctx } s.tok.Close() - s.stderr.Close() return nil } @@ -477,20 +472,11 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec") defer span.Finish() - stderr := NewFuncLogger(ctx, call.AppName, call.Path, call.Image, call.ID, call.ds) - if call.w == nil { - // send STDOUT to logs if no writer given (async...) - // TODO fuck func logger, change it to not need a context and make calls - // require providing their own stderr and writer instead of this crap. punting atm. - call.w = stderr - } - // link the container id and id in the logs [for us!] common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") - // swap in the new id and the new stderr logger - s.container.swap(stderr) - defer stderr.Close() // TODO shove in Close / elsewhere (to upload logs after exec exits) + // swap in the new stderr logger + s.container.swap(call.stderr) errApp := make(chan error, 1) go func() { @@ -541,15 +527,6 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T } func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error { - // TODO dupe stderr code, reduce me - stderr := NewFuncLogger(ctx, call.AppName, call.Path, call.Image, call.ID, call.ds) - if call.w == nil { - // send STDOUT to logs if no writer given (async...) - // TODO fuck func logger, change it to not need a context and make calls - // require providing their own stderr and writer instead of this crap. punting atm. - call.w = stderr - } - container := &container{ id: id.New().String(), // XXX we could just let docker generate ids... image: call.Image, @@ -558,7 +535,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails... stdin: call.req.Body, stdout: call.w, - stderr: stderr, + stderr: call.stderr, } // pull & create container before we return a slot, so as to be friendly @@ -569,7 +546,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok return err } - slot := &coldSlot{cookie, tok, stderr} + slot := &coldSlot{cookie, tok} select { case slots <- slot: // TODO need to make sure receiver will be ready (go routine race) default: @@ -600,19 +577,22 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error { ctx, shutdownContainer := context.WithCancel(context.Background()) defer shutdownContainer() // close this if our waiter returns + cid := id.New().String() + // set up the stderr for the first one to capture any logs before the slot is - // executed. - // TODO need to figure out stderr logging for hot functions at a high level - stderr := &ghostWriter{inner: newLineWriter(&logWriter{ctx: ctx, appName: call.AppName, path: call.Path, image: call.Image, reqID: call.ID})} + // executed and between hot functions TODO this is still a little tobias funke + stderr := newLineWriter(&logWriter{ + logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}), + }) container := &container{ - id: id.New().String(), // XXX we could just let docker generate ids... + id: cid, // XXX we could just let docker generate ids... image: call.Image, env: call.BaseEnv, // only base env memory: call.Memory, stdin: stdinRead, stdout: stdoutWrite, - stderr: stderr, + stderr: &ghostWriter{inner: stderr}, } logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "format": call.Format, "idle_timeout": call.IdleTimeout}) @@ -663,6 +643,7 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error { // wait for this call to finish // NOTE do NOT select with shutdown / other channels. slot handles this. <-done + container.swap(stderr) // log between tasks } }() diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index d81e013ac..faff2f8e4 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "fmt" "io" "net/http" "net/http/httptest" @@ -13,6 +14,7 @@ import ( "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/mqs" + "github.com/sirupsen/logrus" ) func TestCallConfigurationRequest(t *testing.T) { @@ -273,3 +275,28 @@ func TestCallConfigurationModel(t *testing.T) { t.Fatal("expected payload to match, but it was a lie") } } + +func TestLoggerIsStringerAndWorks(t *testing.T) { + // TODO test limit writer, logrus writer, etc etc + + loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true}) + logger := setupLogger(loggyloo) + + if _, ok := logger.(fmt.Stringer); !ok { + // NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization... + t.Fatal("you turned the logger into something inefficient and possibly better all at the same time, how dare ye!") + } + + str := "0 line\n1 line\n2 line\n\n4 line" + logger.Write([]byte(str)) + + strGot := logger.(fmt.Stringer).String() + + if strGot != str { + t.Fatal("logs did not match expectations, like being an adult", strGot, str) + } + + logger.Close() // idk maybe this would panic might as well call this + + // TODO we could check for the toilet to flush here to logrus +} diff --git a/api/agent/call.go b/api/agent/call.go index e0ede3e03..3b811c919 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -246,7 +246,6 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } - // TODO move func logger here // TODO add log store interface (yagni?) c.ds = a.ds c.mq = a.mq @@ -255,6 +254,15 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { logrus.Fields{"id": c.ID, "app": c.AppName, "route": c.Path}) c.req = c.req.WithContext(ctx) + // setup stderr logger separate (don't inherit ctx vars) + logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": c.AppName, "path": c.Path, "image": c.Image, "call_id": c.ID}) + c.stderr = setupLogger(logger) + if c.w == nil { + // send STDOUT to logs if no writer given (async...) + // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) + c.w = c.stderr + } + return &c, nil } @@ -265,7 +273,7 @@ type call struct { mq models.MessageQueue w io.Writer req *http.Request - stderr io.WriteCloser + stderr io.ReadWriteCloser } func (c *call) Model() *models.Call { return c.Call } @@ -335,8 +343,15 @@ func (c *call) End(ctx context.Context, err error) { // call that ran successfully [by a user's perspective] // TODO: this should be update, really if err := c.ds.InsertCall(ctx, c.Call); err != nil { - logrus.WithError(err).Error("error inserting call into datastore") + common.Logger(ctx).WithError(err).Error("error inserting call into datastore") } + + if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { + common.Logger(ctx).WithError(err).Error("error uploading log") + } + + // NOTE call this after InsertLog or the buffer will get reset + c.stderr.Close() } func fakeHandler(http.ResponseWriter, *http.Request, Params) {} diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index 2d856c8d4..7d2ac3d62 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -2,15 +2,11 @@ package agent import ( "bytes" - "context" "errors" "fmt" "io" "sync" - "github.com/fnproject/fn/api/common" - "github.com/fnproject/fn/api/models" - "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -19,13 +15,10 @@ var ( logPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} ) -// TODO we can have different types of these func loggers later -// TODO move this to a different package - -// DefaultFuncLogger returns a WriteCloser that writes STDERR output from a -// container and outputs it in a parsed structured log format to attached -// STDERR as well as writing the log to the db when Close is called. -func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logDB models.LogStore) io.WriteCloser { +// setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's, +// and may be read from the returned io.Reader (singular). After Close is called, +// the Reader is not safe to read from, nor the Writer to write to. +func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser { lbuf := bufPool.Get().(*bytes.Buffer) dbuf := logPool.Get().(*bytes.Buffer) @@ -39,34 +32,33 @@ func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logD } // we don't need to limit the log writer, but we do need it to dispense lines - linew := newLineWriterWithBuffer(lbuf, &logWriter{ - ctx: ctx, - appName: appName, - path: path, - image: image, - reqID: reqID, - }) + linew := newLineWriterWithBuffer(lbuf, &logWriter{logger}) const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ? // we don't need to log per line to db, but we do need to limit it - limitw := newLimitWriter(MB, &dbWriter{ - Buffer: dbuf, - db: logDB, - ctx: ctx, - reqID: reqID, - appName: appName, - }) + limitw := &nopCloser{newLimitWriter(MB, dbuf)} // TODO / NOTE: we want linew to be first because limitw may error if limit // is reached but we still want to log. we should probably ignore hitting the // limit error since we really just want to not write too much to db and // that's handled as is. put buffers back last to avoid misuse, if there's // an error they won't get put back and that's really okay too. - return multiWriteCloser{linew, limitw, &fCloser{close}} + mw := multiWriteCloser{linew, limitw, &fCloser{close}} + return &rwc{mw, dbuf} } -// implements passthrough Write & arbitrary func close to have a seat at the cool kids lunch table +// implements io.ReadWriteCloser, keeps the buffer for all its handy methods +type rwc struct { + io.WriteCloser + *bytes.Buffer +} + +// these are explicit to override the *bytes.Buffer's methods +func (r *rwc) Write(b []byte) (int, error) { return r.WriteCloser.Write(b) } +func (r *rwc) Close() error { return r.WriteCloser.Close() } + +// implements passthrough Write & closure call in Close type fCloser struct { close func() error } @@ -74,6 +66,12 @@ type fCloser struct { func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil } func (f *fCloser) Close() error { return f.close() } +type nopCloser struct { + io.Writer +} + +func (n *nopCloser) Close() error { return nil } + // multiWriteCloser returns the first write or close that returns a non-nil // err, if no non-nil err is returned, then the returned bytes written will be // from the last call to write. @@ -102,17 +100,11 @@ func (m multiWriteCloser) Close() (err error) { // logWriter will log (to real stderr) every call to Write as a line. it should // be wrapped with a lineWriter so that the output makes sense. type logWriter struct { - ctx context.Context - appName string - path string - image string - reqID string + logrus.FieldLogger } func (l *logWriter) Write(b []byte) (int, error) { - log := common.Logger(l.ctx) - log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID}) - log.Debug(string(b)) + l.Debug(string(b)) return len(b), nil } @@ -171,37 +163,14 @@ func (li *lineWriter) Close() error { return err } -// dbWriter accumulates all calls to Write into an in memory buffer -// and writes them to the database when Close is called, returning -// any error from Close. it should be wrapped in a limitWriter to -// prevent blowing out the buffer and bloating the db. -type dbWriter struct { - *bytes.Buffer - - db models.LogStore - ctx context.Context - reqID string - appName string -} - -func (w *dbWriter) Close() error { - span, ctx := opentracing.StartSpanFromContext(context.Background(), "agent_log_write") - defer span.Finish() - return w.db.InsertLog(ctx, w.appName, w.reqID, w.String()) -} - -func (w *dbWriter) Write(b []byte) (int, error) { - return w.Buffer.Write(b) -} - -// overrides Write, keeps Close +// io.Writer that allows limiting bytes written to w type limitWriter struct { n, max int - io.WriteCloser + io.Writer } -func newLimitWriter(max int, w io.WriteCloser) io.WriteCloser { - return &limitWriter{max: max, WriteCloser: w} +func newLimitWriter(max int, w io.Writer) io.Writer { + return &limitWriter{max: max, Writer: w} } func (l *limitWriter) Write(b []byte) (int, error) { @@ -212,11 +181,11 @@ func (l *limitWriter) Write(b []byte) (int, error) { // cut off to prevent gigantic line attack b = b[:l.max-l.n] } - n, err := l.WriteCloser.Write(b) + n, err := l.Writer.Write(b) l.n += n if l.n >= l.max { // write in truncation message to log once - l.WriteCloser.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n"))) + l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n"))) } return n, err } diff --git a/api/datastore/internal/datastoreutil/metrics.go b/api/datastore/internal/datastoreutil/metrics.go index cf1c8afbf..34c79f042 100644 --- a/api/datastore/internal/datastoreutil/metrics.go +++ b/api/datastore/internal/datastoreutil/metrics.go @@ -2,6 +2,7 @@ package datastoreutil import ( "context" + "io" "github.com/fnproject/fn/api/models" "github.com/jmoiron/sqlx" @@ -100,7 +101,7 @@ func (m *metricds) GetCalls(ctx context.Context, filter *models.CallFilter) ([]* return m.ds.GetCalls(ctx, filter) } -func (m *metricds) InsertLog(ctx context.Context, appName, callID, callLog string) error { +func (m *metricds) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { span, ctx := opentracing.StartSpanFromContext(ctx, "ds_insert_log") defer span.Finish() return m.ds.InsertLog(ctx, appName, callID, callLog) diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 62f1e0758..1f7fe93b6 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/url" "os" "path/filepath" @@ -600,9 +601,21 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([] return res, nil } -func (ds *sqlStore) InsertLog(ctx context.Context, appName, callID, callLog string) error { +func (ds *sqlStore) InsertLog(ctx context.Context, appName, callID string, logR io.Reader) error { + // coerce this into a string for sql + var log string + if stringer, ok := logR.(fmt.Stringer); ok { + log = stringer.String() + } else { + // TODO we could optimize for Size / buffer pool, but atm we aren't hitting + // this code path anyway (a fallback) + var b bytes.Buffer + io.Copy(&b, logR) + log = b.String() + } + query := ds.db.Rebind(`INSERT INTO logs (id, app_name, log) VALUES (?, ?, ?);`) - _, err := ds.db.ExecContext(ctx, query, callID, appName, callLog) + _, err := ds.db.ExecContext(ctx, query, callID, appName, log) return err } diff --git a/api/logs/mock.go b/api/logs/mock.go index 0a1f298c4..0eba061d4 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -1,7 +1,10 @@ package logs import ( + "bytes" "context" + "io" + "github.com/fnproject/fn/api/models" "github.com/pkg/errors" ) @@ -27,8 +30,10 @@ func (m *mock) SetDatastore(ctx context.Context, ds models.Datastore) { m.ds = ds } -func (m *mock) InsertLog(ctx context.Context, appName, callID, callLog string) error { - m.Logs[callID] = &models.CallLog{CallID: callID, Log: callLog} +func (m *mock) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { + var b bytes.Buffer + io.Copy(&b, callLog) + m.Logs[callID] = &models.CallLog{CallID: callID, Log: b.String()} return nil } diff --git a/api/logs/testing/test.go b/api/logs/testing/test.go index 482b7affa..b65dfa2a7 100644 --- a/api/logs/testing/test.go +++ b/api/logs/testing/test.go @@ -44,7 +44,8 @@ func Test(t *testing.T, fnl models.LogStore, ds models.Datastore) { if err != nil { t.Fatalf("Test InsertCall(ctx, &call): unexpected error `%v`", err) } - err = fnl.InsertLog(ctx, call.AppName, call.ID, "test") + log := strings.NewReader("test") + err = fnl.InsertLog(ctx, call.AppName, call.ID, log) if err != nil { t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) } @@ -52,11 +53,12 @@ func Test(t *testing.T, fnl models.LogStore, ds models.Datastore) { t.Run("call-log-insert-get", func(t *testing.T) { call.ID = id.New().String() err := ds.InsertCall(ctx, call) - logText := "test" if err != nil { t.Fatalf("Test InsertCall(ctx, &call): unexpected error `%v`", err) } - err = fnl.InsertLog(ctx, call.AppName, call.ID, logText) + logText := "test" + log := strings.NewReader(logText) + err = fnl.InsertLog(ctx, call.AppName, call.ID, log) if err != nil { t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) } @@ -69,11 +71,12 @@ func Test(t *testing.T, fnl models.LogStore, ds models.Datastore) { t.Run("call-log-insert-get-delete", func(t *testing.T) { call.ID = id.New().String() err := ds.InsertCall(ctx, call) - logText := "test" if err != nil { t.Fatalf("Test InsertCall(ctx, &call): unexpected error `%v`", err) } - err = fnl.InsertLog(ctx, call.AppName, call.ID, logText) + logText := "test" + log := strings.NewReader(logText) + err = fnl.InsertLog(ctx, call.AppName, call.ID, log) if err != nil { t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) } diff --git a/api/models/logs.go b/api/models/logs.go index a4cc7da88..03a919a07 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -2,19 +2,17 @@ package models import ( "context" + "io" ) type LogStore interface { - // TODO TODO TODO BAD BUG BUG BUG WILL ROBINSON - // TODO these need to take an app name or users can provide ids for - // other users calls with their own app name and access their logs. - // InsertLog will insert the log at callID, overwriting if it previously // existed. - InsertLog(ctx context.Context, appName, callID string, callLog string) error + InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error // GetLog will return the log at callID, an error will be returned if the log // cannot be found. + // TODO it would be nice if this were an io.Reader... GetLog(ctx context.Context, appName, callID string) (*CallLog, error) // DeleteLog will remove the log at callID, it will not return an error if