mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
add jaeger support, link hot container & req span (#840)
* add jaeger support, link hot container & req span * adds jaeger support now with FN_JAEGER_URL, there's a simple tutorial in the operating/metrics.md file now and it's pretty easy to get up and running. * links a hot request span to a hot container span. when we change this to sample at a lower ratio we'll need to finagle the hot container span to always sample or something, otherwise we'll hide that info. at least, since we're sampling at 100% for now if this is flipped on, can see freeze/unfreeze etc. if they hit. this is useful for debugging. note that zipkin's exporter does not follow the link at all, hence jaeger... and they're backed by the Cloud Empire now (CNCF) so we'll probably use it anyway. * vendor: add thrift for jaeger
This commit is contained in:
@@ -492,11 +492,12 @@ func (s *coldSlot) Close(ctx context.Context) error {
|
||||
|
||||
// implements Slot
|
||||
type hotSlot struct {
|
||||
done chan struct{} // signal we are done with slot
|
||||
errC <-chan error // container error
|
||||
container *container // TODO mask this
|
||||
maxRespSize uint64 // TODO boo.
|
||||
fatalErr error
|
||||
done chan struct{} // signal we are done with slot
|
||||
errC <-chan error // container error
|
||||
container *container // TODO mask this
|
||||
maxRespSize uint64 // TODO boo.
|
||||
fatalErr error
|
||||
containerSpan trace.SpanContext
|
||||
}
|
||||
|
||||
func (s *hotSlot) Close(ctx context.Context) error {
|
||||
@@ -523,6 +524,13 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
// link the container id and id in the logs [for us!]
|
||||
common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call")
|
||||
|
||||
// link the container span to ours for additional context (start/freeze/etc.)
|
||||
span.AddLink(trace.Link{
|
||||
TraceID: s.containerSpan.TraceID,
|
||||
SpanID: s.containerSpan.SpanID,
|
||||
Type: trace.LinkTypeChild,
|
||||
})
|
||||
|
||||
// swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out]
|
||||
stdinRead, stdinWrite := io.Pipe()
|
||||
stdoutRead, stdoutWritePipe := io.Pipe()
|
||||
@@ -663,7 +671,13 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
default: // ok
|
||||
}
|
||||
|
||||
slot := &hotSlot{done: make(chan struct{}), errC: errC, container: container, maxRespSize: a.cfg.MaxResponseSize}
|
||||
slot := &hotSlot{
|
||||
done: make(chan struct{}),
|
||||
errC: errC,
|
||||
container: container,
|
||||
maxRespSize: a.cfg.MaxResponseSize,
|
||||
containerSpan: trace.FromContext(ctx).SpanContext(),
|
||||
}
|
||||
if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
|
||||
promclient "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/exporter/jaeger"
|
||||
"go.opencensus.io/exporter/prometheus"
|
||||
"go.opencensus.io/exporter/zipkin"
|
||||
"go.opencensus.io/plugin/ochttp"
|
||||
@@ -56,6 +57,7 @@ const (
|
||||
EnvGRPCPort = "FN_GRPC_PORT"
|
||||
EnvAPICORS = "FN_API_CORS"
|
||||
EnvZipkinURL = "FN_ZIPKIN_URL"
|
||||
EnvJaegerURL = "FN_JAEGER_URL"
|
||||
// Certificates to communicate with other FN nodes
|
||||
EnvCert = "FN_NODE_CERT"
|
||||
EnvCertKey = "FN_NODE_CERT_KEY"
|
||||
@@ -148,7 +150,9 @@ func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
|
||||
opts = append(opts, WithGRPCPort(getEnvInt(EnvGRPCPort, DefaultGRPCPort)))
|
||||
opts = append(opts, WithLogLevel(getEnv(EnvLogLevel, DefaultLogLevel)))
|
||||
opts = append(opts, WithLogDest(getEnv(EnvLogDest, DefaultLogDest), getEnv(EnvLogPrefix, "")))
|
||||
opts = append(opts, WithTracer(getEnv(EnvZipkinURL, ""))) // do this early on, so below can use these
|
||||
opts = append(opts, WithZipkin(getEnv(EnvZipkinURL, "")))
|
||||
opts = append(opts, WithJaeger(getEnv(EnvJaegerURL, "")))
|
||||
opts = append(opts, WithPrometheus()) // TODO option to turn this off?
|
||||
opts = append(opts, WithDBURL(getEnv(EnvDBURL, defaultDB)))
|
||||
opts = append(opts, WithMQURL(getEnv(EnvMQURL, defaultMQ)))
|
||||
opts = append(opts, WithLogURL(getEnv(EnvLOGDBURL, "")))
|
||||
@@ -521,28 +525,8 @@ func New(ctx context.Context, opts ...ServerOption) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// TODO this should be a 'plugin' most likely
|
||||
func WithTracer(zipkinURL string) ServerOption {
|
||||
func WithPrometheus() ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
var (
|
||||
// TODO add server identifier to this crap
|
||||
//debugMode = false
|
||||
//serviceName = "fnserver"
|
||||
//serviceHostPort = "localhost:8080" // meh
|
||||
zipkinHTTPEndpoint = zipkinURL
|
||||
// ex: "http://zipkin:9411/api/v2/spans"
|
||||
)
|
||||
|
||||
if zipkinHTTPEndpoint != "" {
|
||||
reporter := zipkinhttp.NewReporter(zipkinURL, zipkinhttp.MaxBacklog(10000))
|
||||
exporter := zipkin.NewExporter(reporter, nil)
|
||||
trace.RegisterExporter(exporter)
|
||||
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("exporting spans to zipkin")
|
||||
|
||||
// TODO don't do this. testing parity.
|
||||
trace.SetDefaultSampler(trace.AlwaysSample())
|
||||
}
|
||||
|
||||
reg := promclient.NewRegistry()
|
||||
reg.MustRegister(promclient.NewProcessCollector(os.Getpid(), "fn"),
|
||||
promclient.NewProcessCollectorPIDFn(dockerPid(), "dockerd"),
|
||||
@@ -555,11 +539,52 @@ func WithTracer(zipkinURL string) ServerOption {
|
||||
OnError: func(err error) { logrus.WithError(err).Error("opencensus prometheus exporter err") },
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
return fmt.Errorf("error starting prometheus exporter: %v", err)
|
||||
}
|
||||
s.promExporter = exporter
|
||||
view.RegisterExporter(exporter)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithJaeger(jaegerURL string) ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
// ex: "http://localhost:14268"
|
||||
if jaegerURL == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
exporter, err := jaeger.NewExporter(jaeger.Options{
|
||||
Endpoint: jaegerURL,
|
||||
ServiceName: "fn",
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error connecting to jaeger: %v", err)
|
||||
}
|
||||
trace.RegisterExporter(exporter)
|
||||
logrus.WithFields(logrus.Fields{"url": jaegerURL}).Info("exporting spans to jaeger")
|
||||
|
||||
// TODO don't do this. testing parity.
|
||||
trace.SetDefaultSampler(trace.AlwaysSample())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithZipkin(zipkinURL string) ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
// ex: "http://zipkin:9411/api/v2/spans"
|
||||
|
||||
if zipkinURL == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
reporter := zipkinhttp.NewReporter(zipkinURL, zipkinhttp.MaxBacklog(10000))
|
||||
exporter := zipkin.NewExporter(reporter, nil)
|
||||
trace.RegisterExporter(exporter)
|
||||
logrus.WithFields(logrus.Fields{"url": zipkinURL}).Info("exporting spans to zipkin")
|
||||
|
||||
// TODO don't do this. testing parity.
|
||||
trace.SetDefaultSampler(trace.AlwaysSample())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user