Merge pull request #447 from fnproject/tracing_to_prometheus

Tracing to prometheus
This commit is contained in:
Reed Allman
2017-10-25 10:58:07 -07:00
committed by GitHub
7 changed files with 500 additions and 11 deletions

View File

@@ -188,6 +188,8 @@ func (a *agent) Submit(callI Call) error {
ctx := call.req.Context()
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_submit")
span.SetBaggageItem("fn_appname", callI.Model().AppName)
span.SetBaggageItem("fn_path", callI.Model().Path)
defer span.Finish()
// start the timer STAT! TODO add some wiggle room

60
api/server/fntracer.go Normal file
View File

@@ -0,0 +1,60 @@
package server
import (
"github.com/opentracing/opentracing-go"
"strings"
)
// FnTracer is a custom Tracer which wraps another another tracer
// its main purpose is to wrap the underlying Span in a FnSpan,
// which adds some extra behaviour required for sending tracing spans to prometheus
type FnTracer struct {
opentracing.Tracer
}
// NewFnTracer returns a new FnTracer which wraps the specified Tracer
func NewFnTracer(t opentracing.Tracer) opentracing.Tracer {
return &FnTracer{t}
}
// FnTracer implements opentracing.Tracer
// Override StartSpan to wrap the returned Span in a FnSpan
func (fnt FnTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
return NewFnSpan(fnt.Tracer.StartSpan(operationName, opts...))
}
// FnSpan is a custom Span that wraps another span
// which adds some extra behaviour required for sending tracing spans to prometheus
type FnSpan struct {
opentracing.Span
}
// NewFnSpan returns a new FnSpan which wraps the specified Span
func NewFnSpan(s opentracing.Span) opentracing.Span {
return &FnSpan{s}
}
// FnSpan implements opentracing.Span
func (fns FnSpan) Finish() {
fns.copyBaggageItemsToTags()
fns.Span.Finish()
}
// FnSpan implements opentracing.Span
func (fns FnSpan) FinishWithOptions(opts opentracing.FinishOptions) {
fns.copyBaggageItemsToTags()
fns.Span.FinishWithOptions(opts)
}
func (fns FnSpan) copyBaggageItemsToTags() {
// copy baggage items (which are inherited from the parent) with keys starting with "fn" to tags
// the PrometheusCollector will send these to Prometheus
// need to do this because the collector can't access baggage items, but it can access tags
// whereas here we can access the parent's baggage items, but not its tags
fns.Context().ForeachBaggageItem(func(k, v string) bool {
if strings.HasPrefix(k, "fn") {
fns.SetTag(k, v)
}
return true
})
}

View File

@@ -0,0 +1,77 @@
package server
import (
"github.com/openzipkin/zipkin-go-opentracing"
"github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
"github.com/prometheus/client_golang/prometheus"
"strings"
"time"
)
// PrometheusCollector is a custom Collector
// which sends ZipKin traces to Prometheus
type PrometheusCollector struct {
// Each span name is published as a separate Histogram metric
// Using metric names of the form fn_span_<span-name>_duration_seconds
histogramVecMap map[string]*prometheus.HistogramVec
}
// NewPrometheusCollector returns a new PrometheusCollector
func NewPrometheusCollector() (zipkintracer.Collector, error) {
pc := &PrometheusCollector{make(map[string]*prometheus.HistogramVec)}
return pc, nil
}
// Return the HistogramVec corresponding to the specified spanName.
// If a HistogramVec does not already exist for specified spanName then one is created and configured with the specified labels
// otherwise the labels parameter is ignored.
func (pc PrometheusCollector) getHistogramVecForSpanName(spanName string, labels []string) *prometheus.HistogramVec {
thisHistogramVec, found := pc.histogramVecMap[spanName]
if !found {
thisHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "fn_span_" + spanName + "_duration_seconds",
Help: "Span " + spanName + " duration, by span name",
},
labels,
)
pc.histogramVecMap[spanName] = thisHistogramVec
prometheus.MustRegister(thisHistogramVec)
}
return thisHistogramVec
}
// PrometheusCollector implements Collector.
func (pc PrometheusCollector) Collect(span *zipkincore.Span) error {
// extract any label values from the span
labelKeys, labelValueMap := getLabels(span)
pc.getHistogramVecForSpanName(span.GetName(), labelKeys).With(labelValueMap).Observe((time.Duration(span.GetDuration()) * time.Microsecond).Seconds())
return nil
}
// extract from the specified span the key/value pairs that we want to add as labels to the Prometheus metric for this span
// returns an array of keys, and a map of key-value pairs
func getLabels(span *zipkincore.Span) ([]string, map[string]string) {
var keys []string
labelMap := make(map[string]string)
// extract any tags whose key starts with "fn" from the span
binaryAnnotations := span.GetBinaryAnnotations()
for _, thisBinaryAnnotation := range binaryAnnotations {
key := thisBinaryAnnotation.GetKey()
if thisBinaryAnnotation.GetAnnotationType() == zipkincore.AnnotationType_STRING && strings.HasPrefix(key, "fn") {
keys = append(keys, key)
value := string(thisBinaryAnnotation.GetValue()[:])
labelMap[key] = value
}
}
return keys, labelMap
}
// PrometheusCollector implements Collector.
func (PrometheusCollector) Close() error { return nil }

View File

@@ -86,7 +86,7 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB
}
setMachineId()
setTracer()
s.setTracer()
s.Router.Use(loggerWrap, traceWrap, panicWrap)
s.bindHandlers(ctx)
@@ -110,6 +110,8 @@ func traceWrap(c *gin.Context) {
// If wireContext == nil, a root span will be created.
// TODO we should add more tags?
serverSpan := opentracing.StartSpan("serve_http", ext.RPCServerOption(wireContext), opentracing.Tag{Key: "path", Value: c.Request.URL.Path})
serverSpan.SetBaggageItem("fn_appname", c.Param(api.CApp))
serverSpan.SetBaggageItem("fn_path", c.Param(api.CRoute))
defer serverSpan.Finish()
ctx := opentracing.ContextWithSpan(c.Request.Context(), serverSpan)
@@ -117,7 +119,7 @@ func traceWrap(c *gin.Context) {
c.Next()
}
func setTracer() {
func (s *Server) setTracer() {
var (
debugMode = false
serviceName = "fn-server"
@@ -126,17 +128,29 @@ func setTracer() {
// ex: "http://zipkin:9411/api/v1/spans"
)
if zipkinHTTPEndpoint == "" {
return
var collector zipkintracer.Collector
// custom Zipkin collector to send tracing spans to Prometheus
promCollector, promErr := NewPrometheusCollector()
if promErr != nil {
logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector")
}
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
collector, err := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if err != nil {
logrus.WithError(err).Fatalln("couldn't start trace collector")
if zipkinHTTPEndpoint != "" {
// Custom PrometheusCollector and Zipkin HTTPCollector
httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if zipErr != nil {
logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector")
}
collector = zipkintracer.MultiCollector{httpCollector, promCollector}
} else {
// Custom PrometheusCollector only
collector = promCollector
}
tracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
@@ -144,7 +158,10 @@ func setTracer() {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
opentracing.SetGlobalTracer(tracer)
// wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus
fntracer := NewFnTracer(ziptracer)
opentracing.SetGlobalTracer(fntracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}