From b5333508554ac825201ba2065feb3bbb65c073df Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Fri, 28 Jul 2017 23:31:16 -0700 Subject: [PATCH] add traces to the lb also fixed the broken version checking stuff so this works again --- fnlb/lb/allgrouper.go | 16 ++++------ fnlb/lb/proxy.go | 72 ++++++++++++++++++++++++++++++++++++++++++- fnlb/main.go | 2 +- 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/fnlb/lb/allgrouper.go b/fnlb/lb/allgrouper.go index 6bed22ef7..1dfb6e252 100644 --- a/fnlb/lb/allgrouper.go +++ b/fnlb/lb/allgrouper.go @@ -44,7 +44,7 @@ func NewAllGrouper(conf Config) (Grouper, error) { hcEndpoint: conf.HealthcheckEndpoint, hcUnhealthy: int64(conf.HealthcheckUnhealthy), hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second, - minAPIVersion: conf.MinAPIVersion, + minAPIVersion: *conf.MinAPIVersion, // for health checks httpClient: &http.Client{Transport: conf.Transport}, @@ -86,7 +86,7 @@ type allGrouper struct { hcEndpoint string hcUnhealthy int64 hcTimeout time.Duration - minAPIVersion *semver.Version + minAPIVersion semver.Version } // TODO put this somewhere better @@ -288,8 +288,6 @@ type fnVersion struct { Version string `json:"version"` } -var v fnVersion - func (a *allGrouper) getVersion(urlString string) (string, error) { req, _ := http.NewRequest(http.MethodGet, urlString, nil) ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout) @@ -297,7 +295,6 @@ func (a *allGrouper) getVersion(urlString string) (string, error) { req = req.WithContext(ctx) resp, err := a.httpClient.Do(req) - if err != nil { return "", err } @@ -306,8 +303,8 @@ func (a *allGrouper) getVersion(urlString string) (string, error) { resp.Body.Close() }() - err = json.NewDecoder(resp.Body).Decode(v) - + var v fnVersion + err = json.NewDecoder(resp.Body).Decode(&v) if err != nil { return "", err } @@ -323,9 +320,8 @@ func (a *allGrouper) checkAPIVersion(node string) error { } nodeVer := semver.New(version) - - if a.minAPIVersion.Compare(*nodeVer) == -1 { - return fmt.Errorf("incompatible API version: %v", a.minAPIVersion) + if nodeVer.LessThan(a.minAPIVersion) { + return fmt.Errorf("incompatible API version: %v", nodeVer) } return nil } diff --git a/fnlb/lb/proxy.go b/fnlb/lb/proxy.go index 57f42e8f0..fb44a9e7c 100644 --- a/fnlb/lb/proxy.go +++ b/fnlb/lb/proxy.go @@ -9,6 +9,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/coreos/go-semver/semver" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/openzipkin/zipkin-go-opentracing" ) // TODO the load balancers all need to have the same list of nodes. gossip? @@ -29,6 +32,7 @@ import ( type Config struct { DBurl string `json:"db_url"` Listen string `json:"port"` + ZipkinURL string `json:"zipkin_url"` Nodes []string `json:"nodes"` HealthcheckInterval int `json:"healthcheck_interval"` HealthcheckEndpoint string `json:"healthcheck_endpoint"` @@ -57,7 +61,7 @@ type Router interface { // InterceptResponse allows a Router to extract information from proxied // requests so that it might do a better job next time. InterceptResponse - // should not modify the Response as it has already been received nore the + // should not modify the Response as it has already been received nor the // Request, having already been sent. InterceptResponse(req *http.Request, resp *http.Response) @@ -95,6 +99,8 @@ func NewProxy(keyFunc KeyFunc, g Grouper, r Router, conf Config) http.Handler { }, } + setTracer(conf.ZipkinURL) + return p } @@ -114,7 +120,57 @@ func newBufferPool() httputil.BufferPool { func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) } func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) } +func setTracer(zipkinURL string) { + var ( + debugMode = false + serviceName = "fnlb" + serviceHostPort = "localhost:8080" // meh + zipkinHTTPEndpoint = zipkinURL + // ex: "http://zipkin:9411/api/v1/spans" + ) + + if zipkinHTTPEndpoint == "" { + return + } + + logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil }) + + collector, err := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger)) + if err != nil { + logrus.WithError(err).Fatalln("couldn't start trace collector") + } + tracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName), + zipkintracer.ClientServerSameSpan(true), + zipkintracer.TraceID128Bit(true), + ) + if err != nil { + logrus.WithError(err).Fatalln("couldn't start tracer") + } + + opentracing.SetGlobalTracer(tracer) + logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer") +} + +func (p *proxy) startSpan(req *http.Request) (opentracing.Span, *http.Request) { + // try to grab a span from the request if made from another service, ignore err if not + wireContext, _ := opentracing.GlobalTracer().Extract( + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + + // Create the span referring to the RPC client if available. + // If wireContext == nil, a root span will be created. + // TODO we should add more tags? + serverSpan := opentracing.StartSpan("lb_serve", ext.RPCServerOption(wireContext), opentracing.Tag{"path", req.URL.Path}) + + ctx := opentracing.ContextWithSpan(req.Context(), serverSpan) + req = req.WithContext(ctx) + return serverSpan, req +} + func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) { + span, req := p.startSpan(req) + defer span.Finish() + target, err := p.route(req) if err != nil { logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed") @@ -129,7 +185,17 @@ func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) { req.URL.Scheme = "http" // XXX (reed): h2 support req.URL.Host = target + span, ctx := opentracing.StartSpanFromContext(req.Context(), "lb_roundtrip") + req = req.WithContext(ctx) + + // shove the span into the outbound request + opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header)) + resp, err := p.transport.RoundTrip(req) + span.Finish() if err == nil { p.router.InterceptResponse(req, resp) } @@ -137,6 +203,10 @@ func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) { } func (p *proxy) route(req *http.Request) (string, error) { + span, ctx := opentracing.StartSpanFromContext(req.Context(), "lb_route") + defer span.Finish() + req = req.WithContext(ctx) + // TODO errors from this func likely could return 401 or so instead of 503 always key, err := p.keyFunc(req) if err != nil { diff --git a/fnlb/main.go b/fnlb/main.go index 3d2f06721..5edb5b289 100644 --- a/fnlb/main.go +++ b/fnlb/main.go @@ -31,7 +31,7 @@ func main() { flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health") flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy") flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds") - + flag.StringVar(&conf.ZipkinURL, "zipkin", "", "zipkin endpoint to send traces") flag.Parse() conf.MinAPIVersion = semver.New(*minAPIVersion)