From 9cbe4ea5366fddfec9c52eae6df5e4d49dce5b67 Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Tue, 13 Feb 2018 20:01:41 -0800 Subject: [PATCH] add pprof endpoints, additional spans (#770) i would split this commit in two if i were a good dev. the pprof stuff is really useful and this only samples when called. this is pretty standard go service stuff. expvar is cool, too. the additional spannos have turned up some interesting tid bits... gonna slide em in --- api/agent/protocol/http.go | 11 +++++++ api/agent/protocol/json.go | 65 ++++++++++++++++++++------------------ api/server/profile.go | 38 ++++++++++++++++++++++ api/server/server.go | 6 +++- 4 files changed, 89 insertions(+), 31 deletions(-) create mode 100644 api/server/profile.go diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index fdc6c6646..f15f46751 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" ) // HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant @@ -22,6 +23,9 @@ type HTTPProtocol struct { func (p *HTTPProtocol) IsStreamable() bool { return true } func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_http") + defer span.Finish() + req := ci.Request() req.RequestURI = ci.RequestURL() // force set to this, for req.Write to use (TODO? still?) @@ -32,18 +36,25 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e req.Header.Set("FN_REQUEST_URL", ci.RequestURL()) req.Header.Set("FN_CALL_ID", ci.CallID()) + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_request") // req.Write handles if the user does not specify content length err := req.Write(h.in) + span.Finish() if err != nil { return err } + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_read_response") resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) + span.Finish() if err != nil { return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid http response from function err: %v", err)) } defer resp.Body.Close() + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_response") + defer span.Finish() + rw, ok := w.(http.ResponseWriter) if !ok { // async / [some] tests go through here. write a full http request to the writer diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 8bf1bdf6d..aaa363e16 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -9,6 +9,7 @@ import ( "net/http" "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" ) // This is sent into the function @@ -189,44 +190,48 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error { } func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { - // write input into container + span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_json") + defer span.Finish() + + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_request") err := h.writeJSONToContainer(ci) + span.Finish() if err != nil { return err } - // now read the container output - jout := new(jsonOut) - dec := json.NewDecoder(h.out) - if err := dec.Decode(jout); err != nil { + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_read_response") + var jout jsonOut + err = json.NewDecoder(h.out).Decode(&jout) + span.Finish() + if err != nil { return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err)) } - if rw, ok := w.(http.ResponseWriter); ok { - // this has to be done for pulling out: - // - status code - // - body - // - headers - if jout.Protocol != nil { - p := jout.Protocol - for k, v := range p.Headers { - for _, vv := range v { - rw.Header().Add(k, vv) // on top of any specified on the route - } - } - if p.StatusCode != 0 { - rw.WriteHeader(p.StatusCode) - } - } - _, err = io.WriteString(rw, jout.Body) - if err != nil { - return err - } - } else { + + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_response") + defer span.Finish() + + rw, ok := w.(http.ResponseWriter) + if !ok { // logs can just copy the full thing in there, headers and all. - err = json.NewEncoder(w).Encode(jout) - if err != nil { - return err + return json.NewEncoder(w).Encode(jout) + } + + // this has to be done for pulling out: + // - status code + // - body + // - headers + if jout.Protocol != nil { + p := jout.Protocol + for k, v := range p.Headers { + for _, vv := range v { + rw.Header().Add(k, vv) // on top of any specified on the route + } + } + if p.StatusCode != 0 { + rw.WriteHeader(p.StatusCode) } } - return nil + _, err = io.WriteString(rw, jout.Body) + return err } diff --git a/api/server/profile.go b/api/server/profile.go new file mode 100644 index 000000000..ab8c43584 --- /dev/null +++ b/api/server/profile.go @@ -0,0 +1,38 @@ +package server + +import ( + "expvar" + "fmt" + "net/http" + "net/http/pprof" + + "github.com/gin-gonic/gin" +) + +// Replicated from expvar.go as not public. +func expVars(w http.ResponseWriter, r *http.Request) { + first := true + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, "{\n") + expvar.Do(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(w, ",\n") + } + first = false + fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) + }) + fmt.Fprintf(w, "\n}\n") +} + +func profilerSetup(router *gin.Engine, path string) { + engine := router.Group(path) + engine.Any("/vars", gin.WrapF(expVars)) + engine.Any("/pprof/", gin.WrapF(pprof.Index)) + engine.Any("/pprof/cmdline", gin.WrapF(pprof.Cmdline)) + engine.Any("/pprof/profile", gin.WrapF(pprof.Profile)) + engine.Any("/pprof/symbol", gin.WrapF(pprof.Symbol)) + engine.Any("/pprof/block", gin.WrapF(pprof.Handler("block").ServeHTTP)) + engine.Any("/pprof/heap", gin.WrapF(pprof.Handler("heap").ServeHTTP)) + engine.Any("/pprof/goroutine", gin.WrapF(pprof.Handler("goroutine").ServeHTTP)) + engine.Any("/pprof/threadcreate", gin.WrapF(pprof.Handler("threadcreate").ServeHTTP)) +} diff --git a/api/server/server.go b/api/server/server.go index acfa08e31..e4ce49a0b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -305,7 +305,9 @@ func WithTracer(zipkinURL string) ServerOption { if zipkinHTTPEndpoint != "" { // Custom PrometheusCollector and Zipkin HTTPCollector - httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger)) + httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, + zipkintracer.HTTPLogger(logger), zipkintracer.HTTPMaxBacklog(1000), + ) if zipErr != nil { logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector") } @@ -436,6 +438,8 @@ func (s *Server) bindHandlers(ctx context.Context) { engine.GET("/stats", s.handleStats) engine.GET("/metrics", s.handlePrometheusMetrics) + profilerSetup(engine, "/debug") + if s.nodeType != ServerTypeRunner { v1 := engine.Group("/v1") v1.Use(setAppNameInCtx)