diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index fdc6c6646..f15f46751 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" ) // HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant @@ -22,6 +23,9 @@ type HTTPProtocol struct { func (p *HTTPProtocol) IsStreamable() bool { return true } func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_http") + defer span.Finish() + req := ci.Request() req.RequestURI = ci.RequestURL() // force set to this, for req.Write to use (TODO? still?) @@ -32,18 +36,25 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e req.Header.Set("FN_REQUEST_URL", ci.RequestURL()) req.Header.Set("FN_CALL_ID", ci.CallID()) + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_request") // req.Write handles if the user does not specify content length err := req.Write(h.in) + span.Finish() if err != nil { return err } + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_read_response") resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) + span.Finish() if err != nil { return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid http response from function err: %v", err)) } defer resp.Body.Close() + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_response") + defer span.Finish() + rw, ok := w.(http.ResponseWriter) if !ok { // async / [some] tests go through here. write a full http request to the writer diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 8bf1bdf6d..aaa363e16 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -9,6 +9,7 @@ import ( "net/http" "github.com/fnproject/fn/api/models" + opentracing "github.com/opentracing/opentracing-go" ) // This is sent into the function @@ -189,44 +190,48 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error { } func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { - // write input into container + span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_json") + defer span.Finish() + + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_request") err := h.writeJSONToContainer(ci) + span.Finish() if err != nil { return err } - // now read the container output - jout := new(jsonOut) - dec := json.NewDecoder(h.out) - if err := dec.Decode(jout); err != nil { + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_read_response") + var jout jsonOut + err = json.NewDecoder(h.out).Decode(&jout) + span.Finish() + if err != nil { return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err)) } - if rw, ok := w.(http.ResponseWriter); ok { - // this has to be done for pulling out: - // - status code - // - body - // - headers - if jout.Protocol != nil { - p := jout.Protocol - for k, v := range p.Headers { - for _, vv := range v { - rw.Header().Add(k, vv) // on top of any specified on the route - } - } - if p.StatusCode != 0 { - rw.WriteHeader(p.StatusCode) - } - } - _, err = io.WriteString(rw, jout.Body) - if err != nil { - return err - } - } else { + + span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_response") + defer span.Finish() + + rw, ok := w.(http.ResponseWriter) + if !ok { // logs can just copy the full thing in there, headers and all. - err = json.NewEncoder(w).Encode(jout) - if err != nil { - return err + return json.NewEncoder(w).Encode(jout) + } + + // this has to be done for pulling out: + // - status code + // - body + // - headers + if jout.Protocol != nil { + p := jout.Protocol + for k, v := range p.Headers { + for _, vv := range v { + rw.Header().Add(k, vv) // on top of any specified on the route + } + } + if p.StatusCode != 0 { + rw.WriteHeader(p.StatusCode) } } - return nil + _, err = io.WriteString(rw, jout.Body) + return err } diff --git a/api/server/profile.go b/api/server/profile.go new file mode 100644 index 000000000..ab8c43584 --- /dev/null +++ b/api/server/profile.go @@ -0,0 +1,38 @@ +package server + +import ( + "expvar" + "fmt" + "net/http" + "net/http/pprof" + + "github.com/gin-gonic/gin" +) + +// Replicated from expvar.go as not public. +func expVars(w http.ResponseWriter, r *http.Request) { + first := true + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, "{\n") + expvar.Do(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(w, ",\n") + } + first = false + fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) + }) + fmt.Fprintf(w, "\n}\n") +} + +func profilerSetup(router *gin.Engine, path string) { + engine := router.Group(path) + engine.Any("/vars", gin.WrapF(expVars)) + engine.Any("/pprof/", gin.WrapF(pprof.Index)) + engine.Any("/pprof/cmdline", gin.WrapF(pprof.Cmdline)) + engine.Any("/pprof/profile", gin.WrapF(pprof.Profile)) + engine.Any("/pprof/symbol", gin.WrapF(pprof.Symbol)) + engine.Any("/pprof/block", gin.WrapF(pprof.Handler("block").ServeHTTP)) + engine.Any("/pprof/heap", gin.WrapF(pprof.Handler("heap").ServeHTTP)) + engine.Any("/pprof/goroutine", gin.WrapF(pprof.Handler("goroutine").ServeHTTP)) + engine.Any("/pprof/threadcreate", gin.WrapF(pprof.Handler("threadcreate").ServeHTTP)) +} diff --git a/api/server/server.go b/api/server/server.go index acfa08e31..e4ce49a0b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -305,7 +305,9 @@ func WithTracer(zipkinURL string) ServerOption { if zipkinHTTPEndpoint != "" { // Custom PrometheusCollector and Zipkin HTTPCollector - httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger)) + httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, + zipkintracer.HTTPLogger(logger), zipkintracer.HTTPMaxBacklog(1000), + ) if zipErr != nil { logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector") } @@ -436,6 +438,8 @@ func (s *Server) bindHandlers(ctx context.Context) { engine.GET("/stats", s.handleStats) engine.GET("/metrics", s.handlePrometheusMetrics) + profilerSetup(engine, "/debug") + if s.nodeType != ServerTypeRunner { v1 := engine.Group("/v1") v1.Use(setAppNameInCtx)