diff --git a/api/agent/agent.go b/api/agent/agent.go index 1f1427bf0..022c3fccf 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -23,8 +23,6 @@ import ( // TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution) // TODO async calls need to add route.Headers as well -// TODO need to shut off reads/writes in dispatch to the pipes when call times out so that -// 2 calls don't have the same container's pipes... // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) // TODO storing logs / call can push call over the timeout @@ -36,7 +34,6 @@ import ( // dies). need coordination w/ db. // TODO if a cold call times out but container is created but hasn't replied, could // end up that the client doesn't get a reply until long after the timeout (b/c of container removal, async it?) -// TODO between calls, logs and stderr can contain output/ids from previous call. need elegant solution. grossness. // TODO if async would store requests (or interchange format) it would be slick, but // if we're going to store full calls in db maybe we should only queue pointers to ids? // TODO examine cases where hot can't start a container and the user would never see an error @@ -517,9 +514,8 @@ func (s *coldSlot) Close() error { // implements Slot type hotSlot struct { done chan<- struct{} // signal we are done with slot - proto protocol.ContainerIO - errC <-chan error // container error - container *container // TODO mask this + errC <-chan error // container error + container *container // TODO mask this err error } @@ -541,16 +537,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { // link the container id and id in the logs [for us!] common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") - // swap in the new stderr logger & stat accumulator - oldStderr := s.container.swap(call.stderr, &call.Stats) - defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races + // swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out] + stdinRead, stdinWrite := io.Pipe() + stdoutRead, stdoutWrite := io.Pipe() + defer stdinRead.Close() + defer stdoutWrite.Close() + + proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead) + + swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats) + defer swapBack() // NOTE: it's important this runs before the pipes are closed. errApp := make(chan error, 1) go func() { - // TODO make sure stdin / stdout not blocked if container dies or we leak goroutine - // we have to make sure this gets shut down or 2 threads will be reading/writing in/out ci := protocol.NewCallInfo(call.Call, call.req) - errApp <- s.proto.Dispatch(ctx, ci, call.w) + errApp <- proto.Dispatch(ctx, ci, call.w) }() select { @@ -561,8 +562,6 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { case <-ctx.Done(): // call timeout return ctx.Err() } - - // TODO we REALLY need to wait for dispatch to return before conceding our slot } func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) { @@ -618,31 +617,29 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state defer span.Finish() defer tok.Close() // IMPORTANT: this MUST get called - // TODO we have to make sure we flush these pipes or we will deadlock - stdinRead, stdinWrite := io.Pipe() - stdoutRead, stdoutWrite := io.Pipe() - - proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead) - state.UpdateState(ctx, ContainerStateStart, call.slots) defer state.UpdateState(ctx, ContainerStateDone, call.slots) cid := id.New().String() - // set up the stderr for the first one to capture any logs before the slot is - // executed and between hot functions TODO this is still a little tobias funke + // set up the stderr to capture any logs before the slot is executed and + // between hot functions stderr := newLineWriter(&logWriter{ logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}), }) + // between calls we need a reader that doesn't do anything + stdin := &ghostReader{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitReader)} + defer stdin.Close() + container := &container{ id: cid, // XXX we could just let docker generate ids... image: call.Image, env: map[string]string(call.Config), memory: call.Memory, cpus: uint64(call.CPUs), - stdin: stdinRead, - stdout: stdoutWrite, + stdin: stdin, + stdout: &ghostWriter{inner: stderr}, stderr: &ghostWriter{inner: stderr}, } @@ -684,7 +681,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state done := make(chan struct{}) state.UpdateState(ctx, ContainerStateIdle, call.slots) - s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil}) + s := call.slots.queueSlot(&hotSlot{done, errC, container, nil}) select { case <-s.trigger: @@ -720,7 +717,8 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state if err != nil { errC <- err } else if res.Error() != nil { - errC <- res.Error() + err = res.Error() + errC <- err } logger.WithError(err).Info("hot function terminated") @@ -742,24 +740,34 @@ type container struct { stdout io.Writer stderr io.Writer - // lock protects the swap and any fields that need to be swapped - sync.Mutex - stats *drivers.Stats + // lock protects the stats swapping + statsMu sync.Mutex + stats *drivers.Stats } -func (c *container) swap(stderr io.Writer, cs *drivers.Stats) (old io.Writer) { - c.Lock() - defer c.Unlock() +func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() { + ostdin := c.stdin.(*ghostReader).inner + ostdout := c.stdout.(*ghostWriter).inner + ostderr := c.stderr.(*ghostWriter).inner - // TODO meh, maybe shouldn't bury this - old = c.stderr - gw, ok := c.stderr.(*ghostWriter) - if ok { - old = gw.swap(stderr) - } + // if tests don't catch this, then fuck me + c.stdin.(*ghostReader).swap(stdin) + c.stdout.(*ghostWriter).swap(stdout) + c.stderr.(*ghostWriter).swap(stderr) + c.statsMu.Lock() + ocs := c.stats c.stats = cs - return old + c.statsMu.Unlock() + + return func() { + c.stdin.(*ghostReader).swap(ostdin) + c.stdout.(*ghostWriter).swap(ostdout) + c.stderr.(*ghostWriter).swap(ostderr) + c.statsMu.Lock() + c.stats = ocs + c.statsMu.Unlock() + } } func (c *container) Id() string { return c.id } @@ -788,11 +796,11 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { common.PublishHistograms(ctx, metrics) - c.Lock() - defer c.Unlock() + c.statsMu.Lock() if c.stats != nil { *(c.stats) = append(*(c.stats), stat) } + c.statsMu.Unlock() } //func (c *container) DockerAuth() (docker.AuthConfiguration, error) { @@ -800,8 +808,8 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { // TODO per call could implement this stored somewhere (vs. configured on host) //} -// ghostWriter is a writer who will pass writes to an inner writer -// (that may be changed at will). +// ghostWriter is an io.Writer who will pass writes to an inner writer +// that may be changed at will. it is thread safe to swap or write. type ghostWriter struct { sync.Mutex inner io.Writer @@ -820,5 +828,78 @@ func (g *ghostWriter) Write(b []byte) (int, error) { g.Lock() w := g.inner g.Unlock() - return w.Write(b) + n, err := w.Write(b) + if err == io.ErrClosedPipe { + // NOTE: we need to mask this error so that docker does not get an error + // from writing the output stream and shut down the container. + err = nil + } + return n, err +} + +// ghostReader is an io.ReadCloser who will pass reads to an inner reader +// that may be changed at will. it is thread safe to swap or read. +// Read will wait for a 'real' reader if inner is of type *waitReader. +// Close must be called to prevent any pending readers from leaking. +type ghostReader struct { + cond *sync.Cond + inner io.Reader + closed bool +} + +func (g *ghostReader) swap(r io.Reader) { + g.cond.L.Lock() + g.inner = r + g.cond.L.Unlock() + g.cond.Broadcast() +} + +func (g *ghostReader) Close() { + g.cond.L.Lock() + g.closed = true + g.cond.L.Unlock() + g.cond.Broadcast() +} + +func (g *ghostReader) awaitRealReader() (io.Reader, bool) { + // wait for a real reader + g.cond.L.Lock() + for { + if g.closed { // check this first + g.cond.L.Unlock() + return nil, false + } + if _, ok := g.inner.(*waitReader); ok { + g.cond.Wait() + } else { + break + } + } + + // we don't need to serialize reads but swapping g.inner could be a race if unprotected + r := g.inner + g.cond.L.Unlock() + return r, true +} + +func (g *ghostReader) Read(b []byte) (int, error) { + r, ok := g.awaitRealReader() + if !ok { + return 0, io.EOF + } + + n, err := r.Read(b) + if err == io.ErrClosedPipe { + // NOTE: we need to mask this error so that docker does not get an error + // from reading the input stream and shut down the container. + err = nil + } + return n, err +} + +// waitReader returns io.EOF if anyone calls Read. don't call Read, this is a sentinel type +type waitReader struct{} + +func (e *waitReader) Read([]byte) (int, error) { + panic("read on waitReader should not happen") } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 08ea74c60..6ac0382e9 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "strconv" "strings" + "sync" "testing" "time" @@ -20,6 +21,17 @@ import ( "github.com/sirupsen/logrus" ) +func init() { + // TODO figure out some sane place to stick this + formatter := &logrus.TextFormatter{ + FullTimestamp: true, + } + logrus.SetFormatter(formatter) + logrus.SetLevel(logrus.DebugLevel) +} + +// TODO need to add at least one test for our cachy cache + func checkExpectedHeaders(t *testing.T, expectedHeaders http.Header, receivedHeaders http.Header) { checkMap := make([]string, 0, len(expectedHeaders)) @@ -79,7 +91,7 @@ func TestCallConfigurationRequest(t *testing.T) { }, nil, ) - a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() w := httptest.NewRecorder() @@ -221,7 +233,7 @@ func TestCallConfigurationModel(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() callI, err := a.GetCall(FromModel(cm)) @@ -291,7 +303,7 @@ func TestAsyncCallHeaders(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() callI, err := a.GetCall(FromModel(cm)) @@ -337,7 +349,7 @@ func TestLoggerIsStringerAndWorks(t *testing.T) { t.Fatal("logs did not match expectations, like being an adult", strGot, str) } - logger.Close() // idk maybe this would panic might as well call this + logger.Close() // idk maybe this would panic might as well ca this // TODO we could check for the toilet to flush here to logrus } @@ -386,7 +398,7 @@ func TestSubmitError(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit(nil, nil, nil) - a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() callI, err := a.GetCall(FromModel(cm)) @@ -412,11 +424,7 @@ func TestSubmitError(t *testing.T) { // a type of reader than NewRequest can identify to set the content length // (important, for some tests) type dummyReader struct { - inner io.Reader -} - -func (d *dummyReader) Read(b []byte) (int, error) { - return d.inner.Read(b) + io.Reader } func TestHTTPWithoutContentLengthWorks(t *testing.T) { @@ -454,7 +462,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { // get a req that uses the dummy reader, so that this can't determine // the size of the body and set content length (user requests may also // forget to do this, and we _should_ read it as chunked without issue). - req, err := http.NewRequest("GET", url, &dummyReader{strings.NewReader(bodOne)}) + req, err := http.NewRequest("GET", url, &dummyReader{Reader: strings.NewReader(bodOne)}) if err != nil { t.Fatal("unexpected error building request", err) } @@ -517,3 +525,319 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { t.Fatal("did not get expected err, got: ", err) } } + +// return a model with all fields filled in with fnproject/sleeper image, change as needed +func testCall() *models.Call { + appName := "myapp" + path := "/sleeper" + image := "fnproject/fn-test-utils:latest" + const timeout = 10 + const idleTimeout = 20 + const memory = 256 + CPUs := models.MilliCPUs(200) + method := "GET" + url := "http://127.0.0.1:8080/r/" + appName + path + payload := "payload" + typ := "sync" + format := "http" + contentType := "suberb_type" + contentLength := strconv.FormatInt(int64(len(payload)), 10) + config := map[string]string{ + "FN_FORMAT": format, + "FN_APP_NAME": appName, + "FN_PATH": path, + "FN_MEMORY": strconv.Itoa(memory), + "FN_CPUS": CPUs.String(), + "FN_TYPE": typ, + "APP_VAR": "FOO", + "ROUTE_VAR": "BAR", + "DOUBLE_VAR": "BIZ, BAZ", + } + headers := map[string][]string{ + // FromRequest would insert these from original HTTP request + "Content-Type": []string{contentType}, + "Content-Length": []string{contentLength}, + } + + return &models.Call{ + Config: config, + Headers: headers, + AppName: appName, + Path: path, + Image: image, + Type: typ, + Format: format, + Timeout: timeout, + IdleTimeout: idleTimeout, + Memory: memory, + CPUs: CPUs, + Payload: payload, + URL: url, + Method: method, + } +} + +func TestPipesAreClear(t *testing.T) { + // The basic idea here is to make a call start a hot container, and the + // first call has a reader that only reads after a delay, which is beyond + // the boundary of the first call's timeout. Then, run a second call + // with a different body that also has a slight delay. make sure the second + // call gets the correct body. This ensures the input paths for calls do not + // overlap into the same container and they don't block past timeout. + // TODO make sure the second call does not get the first call's body if + // we write the first call's body in before the second's (it was tested + // but not put in stone here, the code is ~same). + // + // causal (seconds): + // T1=start task one, T1TO=task one times out, T2=start task two + // T1W=task one writes, T2W=task two writes + // + // + // 1s 2 3 4 5 6 + // --------------------------- + // + // T1-------T1TO-T2-T1W--T2W-- + + ca := testCall() + ca.Type = "sync" + ca.Format = "http" + ca.IdleTimeout = 60 // keep this bad boy alive + ca.Timeout = 4 // short + + // we need to load in app & route so that FromRequest works + ds := datastore.NewMockInit( + []*models.App{ + {Name: ca.AppName}, + }, + []*models.Route{ + { + Path: ca.Path, + AppName: ca.AppName, + Image: ca.Image, + Type: ca.Type, + Format: ca.Format, + Timeout: ca.Timeout, + IdleTimeout: ca.IdleTimeout, + Memory: ca.Memory, + }, + }, nil, + ) + + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + defer a.Close() + + // test read this body after 5s (after call times out) and make sure we don't get yodawg + // TODO could read after 10 seconds, to make sure the 2nd task's input stream isn't blocked + // TODO we need to test broken HTTP output from a task should return a useful error + bodOne := `{"echoContent":"yodawg"}` + delayBodyOne := &delayReader{Reader: strings.NewReader(bodOne), delay: 5 * time.Second} + + req, err := http.NewRequest("GET", ca.URL, delayBodyOne) + if err != nil { + t.Fatal("unexpected error building request", err) + } + // NOTE: using chunked here seems to perplex the go http request reading code, so for + // the purposes of this test, set this. json also works. + req.ContentLength = int64(len(bodOne)) + req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodOne))) + + var outOne bytes.Buffer + callI, err := a.GetCall(FromRequest(ca.AppName, ca.Path, req), WithWriter(&outOne)) + if err != nil { + t.Fatal(err) + } + + // this will time out after 4s, our reader reads after 5s + t.Log("before submit one:", time.Now()) + err = a.Submit(callI) + t.Log("after submit one:", time.Now()) + if err == nil { + t.Error("expected error but got none") + } + t.Log("first guy err:", err) + + if len(outOne.String()) > 0 { + t.Fatal("input should not have been read, producing 0 output, got:", outOne.String()) + } + + // if we submit another call to the hot container, this can be finicky if the + // hot logic simply fails to re-use a container then this will 'just work' + // but at one point this failed. + + // only delay this body 2 seconds, so that we read at 6s (first writes at 5s) before time out + bodTwo := `{"echoContent":"NODAWG"}` + delayBodyTwo := &delayReader{Reader: strings.NewReader(bodTwo), delay: 2 * time.Second} + + req, err = http.NewRequest("GET", ca.URL, delayBodyTwo) + if err != nil { + t.Fatal("unexpected error building request", err) + } + req.ContentLength = int64(len(bodTwo)) + req.Header.Set("Content-Length", fmt.Sprintf("%d", len(bodTwo))) + + var outTwo bytes.Buffer + callI, err = a.GetCall(FromRequest(ca.AppName, ca.Path, req), WithWriter(&outTwo)) + if err != nil { + t.Fatal(err) + } + + t.Log("before submit two:", time.Now()) + err = a.Submit(callI) + t.Log("after submit two:", time.Now()) + if err != nil { + t.Error("got error from submit when task should succeed", err) + } + + body := outTwo.String() + + // we're using http format so this will have written a whole http request + res, err := http.ReadResponse(bufio.NewReader(&outTwo), nil) + if err != nil { + t.Fatalf("error reading body. err: %v body: %s", err, body) + } + defer res.Body.Close() + + // {"request":{"echoContent":"yodawg"}} + var resp struct { + R struct { + Body string `json:"echoContent"` + } `json:"request"` + } + + json.NewDecoder(res.Body).Decode(&resp) + + if resp.R.Body != "NODAWG" { + t.Fatalf("body from second call was not what we wanted. boo. got wrong body: %v wanted: %v", resp.R.Body, "NODAWG") + } + + // NOTE: we need to make sure that 2 containers didn't launch to process + // this. this isn't perfect but we really should be able to run 2 tasks + // sequentially even if the first times out in the same container, so this + // ends up testing hot container management more than anything. i do not like + // digging around in the concrete type of ca for state stats but this seems + // the best way to ensure 2 containers aren't launched. this does have the + // shortcoming that if the first container dies and another launches, we + // don't see it and this passes when it should not. feel free to amend... + callConcrete := callI.(*call) + var count uint64 + for _, up := range callConcrete.slots.getStats().containerStates { + up += count + } + if count > 1 { + t.Fatalf("multiple containers launched to service this test. this shouldn't be. %d", count) + } +} + +type delayReader struct { + once sync.Once + delay time.Duration + + io.Reader +} + +func (r *delayReader) Read(b []byte) (int, error) { + r.once.Do(func() { time.Sleep(r.delay) }) + return r.Reader.Read(b) +} + +func TestPipesDontMakeSpuriousCalls(t *testing.T) { + // if we swap out the pipes between tasks really fast, we need to ensure that + // there are no spurious reads on the container's input that give us a bad + // task output (i.e. 2nd task should succeed). if this test is fussing up, + // make sure input swapping out is not racing, it is very likely not the test + // that is finicky since this is a totally normal happy path (run 2 hot tasks + // in the same container in a row). + + call := testCall() + call.Type = "sync" + call.Format = "http" + call.IdleTimeout = 60 // keep this bad boy alive + call.Timeout = 4 // short + + // we need to load in app & route so that FromRequest works + ds := datastore.NewMockInit( + []*models.App{ + {Name: call.AppName}, + }, + []*models.Route{ + { + Path: call.Path, + AppName: call.AppName, + Image: call.Image, + Type: call.Type, + Format: call.Format, + Timeout: call.Timeout, + IdleTimeout: call.IdleTimeout, + Memory: call.Memory, + }, + }, nil, + ) + + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + defer a.Close() + + bodOne := `{"echoContent":"yodawg"}` + req, err := http.NewRequest("GET", call.URL, strings.NewReader(bodOne)) + if err != nil { + t.Fatal("unexpected error building request", err) + } + + var outOne bytes.Buffer + callI, err := a.GetCall(FromRequest(call.AppName, call.Path, req), WithWriter(&outOne)) + if err != nil { + t.Fatal(err) + } + + // this will time out after 4s, our reader reads after 5s + t.Log("before submit one:", time.Now()) + err = a.Submit(callI) + t.Log("after submit one:", time.Now()) + if err != nil { + t.Error("got error from submit when task should succeed", err) + } + + // if we submit the same ca to the hot container again, + // this can be finicky if the + // hot logic simply fails to re-use a container then this will + // 'just work' but at one point this failed. + + bodTwo := `{"echoContent":"NODAWG"}` + req, err = http.NewRequest("GET", call.URL, strings.NewReader(bodTwo)) + if err != nil { + t.Fatal("unexpected error building request", err) + } + + var outTwo bytes.Buffer + callI, err = a.GetCall(FromRequest(call.AppName, call.Path, req), WithWriter(&outTwo)) + if err != nil { + t.Fatal(err) + } + + t.Log("before submit two:", time.Now()) + err = a.Submit(callI) + t.Log("after submit two:", time.Now()) + if err != nil { + // don't do a Fatal so that we can read the body to see what really happened + t.Error("got error from submit when task should succeed", err) + } + + // we're using http format so this will have written a whole http request + res, err := http.ReadResponse(bufio.NewReader(&outTwo), nil) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + + // {"request":{"echoContent":"yodawg"}} + var resp struct { + R struct { + Body string `json:"echoContent"` + } `json:"request"` + } + + json.NewDecoder(res.Body).Decode(&resp) + + if resp.R.Body != "NODAWG" { + t.Fatalf("body from second call was not what we wanted. boo. got wrong body: %v wanted: %v", resp.R.Body, "NODAWG") + } +} diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index 4016c6fb0..ea2761e0c 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -18,8 +18,6 @@ type HTTPProtocol struct { func (p *HTTPProtocol) IsStreamable() bool { return true } -// TODO handle req.Context better with io.Copy. io.Copy could push us -// over the timeout. func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { req := ci.Request() @@ -37,7 +35,7 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e return err } - resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) // TODO timeout + resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) if err != nil { return err } @@ -46,7 +44,7 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e rw, ok := w.(http.ResponseWriter) if !ok { // async / [some] tests go through here. write a full http request to the writer - resp.Write(w) // TODO timeout + resp.Write(w) return nil } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 20a2a17b8..e601c9675 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -215,7 +215,7 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e rw.WriteHeader(p.StatusCode) } } - _, err = io.WriteString(rw, jout.Body) // TODO timeout + _, err = io.WriteString(rw, jout.Body) if err != nil { return err } diff --git a/api/common/logging.go b/api/common/logging.go index b752ed2c4..8213627c9 100644 --- a/api/common/logging.go +++ b/api/common/logging.go @@ -12,6 +12,12 @@ func SetLogLevel(ll string) { if ll == "" { ll = "info" } + // show full timestamps + formatter := &logrus.TextFormatter{ + FullTimestamp: true, + } + logrus.SetFormatter(formatter) + logrus.WithFields(logrus.Fields{"level": ll}).Info("Setting log level to") logLevel, err := logrus.ParseLevel(ll) if err != nil {