mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
add traces to the lb
also fixed the broken version checking stuff so this works again
This commit is contained in:
@@ -44,7 +44,7 @@ func NewAllGrouper(conf Config) (Grouper, error) {
|
||||
hcEndpoint: conf.HealthcheckEndpoint,
|
||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||
minAPIVersion: conf.MinAPIVersion,
|
||||
minAPIVersion: *conf.MinAPIVersion,
|
||||
|
||||
// for health checks
|
||||
httpClient: &http.Client{Transport: conf.Transport},
|
||||
@@ -86,7 +86,7 @@ type allGrouper struct {
|
||||
hcEndpoint string
|
||||
hcUnhealthy int64
|
||||
hcTimeout time.Duration
|
||||
minAPIVersion *semver.Version
|
||||
minAPIVersion semver.Version
|
||||
}
|
||||
|
||||
// TODO put this somewhere better
|
||||
@@ -288,8 +288,6 @@ type fnVersion struct {
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
var v fnVersion
|
||||
|
||||
func (a *allGrouper) getVersion(urlString string) (string, error) {
|
||||
req, _ := http.NewRequest(http.MethodGet, urlString, nil)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
|
||||
@@ -297,7 +295,6 @@ func (a *allGrouper) getVersion(urlString string) (string, error) {
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := a.httpClient.Do(req)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -306,8 +303,8 @@ func (a *allGrouper) getVersion(urlString string) (string, error) {
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(v)
|
||||
|
||||
var v fnVersion
|
||||
err = json.NewDecoder(resp.Body).Decode(&v)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -323,9 +320,8 @@ func (a *allGrouper) checkAPIVersion(node string) error {
|
||||
}
|
||||
|
||||
nodeVer := semver.New(version)
|
||||
|
||||
if a.minAPIVersion.Compare(*nodeVer) == -1 {
|
||||
return fmt.Errorf("incompatible API version: %v", a.minAPIVersion)
|
||||
if nodeVer.LessThan(a.minAPIVersion) {
|
||||
return fmt.Errorf("incompatible API version: %v", nodeVer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,9 @@ import (
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
"github.com/openzipkin/zipkin-go-opentracing"
|
||||
)
|
||||
|
||||
// TODO the load balancers all need to have the same list of nodes. gossip?
|
||||
@@ -29,6 +32,7 @@ import (
|
||||
type Config struct {
|
||||
DBurl string `json:"db_url"`
|
||||
Listen string `json:"port"`
|
||||
ZipkinURL string `json:"zipkin_url"`
|
||||
Nodes []string `json:"nodes"`
|
||||
HealthcheckInterval int `json:"healthcheck_interval"`
|
||||
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
|
||||
@@ -57,7 +61,7 @@ type Router interface {
|
||||
|
||||
// InterceptResponse allows a Router to extract information from proxied
|
||||
// requests so that it might do a better job next time. InterceptResponse
|
||||
// should not modify the Response as it has already been received nore the
|
||||
// should not modify the Response as it has already been received nor the
|
||||
// Request, having already been sent.
|
||||
InterceptResponse(req *http.Request, resp *http.Response)
|
||||
|
||||
@@ -95,6 +99,8 @@ func NewProxy(keyFunc KeyFunc, g Grouper, r Router, conf Config) http.Handler {
|
||||
},
|
||||
}
|
||||
|
||||
setTracer(conf.ZipkinURL)
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
@@ -114,7 +120,57 @@ func newBufferPool() httputil.BufferPool {
|
||||
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
|
||||
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
|
||||
|
||||
func setTracer(zipkinURL string) {
|
||||
var (
|
||||
debugMode = false
|
||||
serviceName = "fnlb"
|
||||
serviceHostPort = "localhost:8080" // meh
|
||||
zipkinHTTPEndpoint = zipkinURL
|
||||
// ex: "http://zipkin:9411/api/v1/spans"
|
||||
)
|
||||
|
||||
if zipkinHTTPEndpoint == "" {
|
||||
return
|
||||
}
|
||||
|
||||
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
|
||||
|
||||
collector, err := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalln("couldn't start trace collector")
|
||||
}
|
||||
tracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
|
||||
zipkintracer.ClientServerSameSpan(true),
|
||||
zipkintracer.TraceID128Bit(true),
|
||||
)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalln("couldn't start tracer")
|
||||
}
|
||||
|
||||
opentracing.SetGlobalTracer(tracer)
|
||||
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
|
||||
}
|
||||
|
||||
func (p *proxy) startSpan(req *http.Request) (opentracing.Span, *http.Request) {
|
||||
// try to grab a span from the request if made from another service, ignore err if not
|
||||
wireContext, _ := opentracing.GlobalTracer().Extract(
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(req.Header))
|
||||
|
||||
// Create the span referring to the RPC client if available.
|
||||
// If wireContext == nil, a root span will be created.
|
||||
// TODO we should add more tags?
|
||||
serverSpan := opentracing.StartSpan("lb_serve", ext.RPCServerOption(wireContext), opentracing.Tag{"path", req.URL.Path})
|
||||
|
||||
ctx := opentracing.ContextWithSpan(req.Context(), serverSpan)
|
||||
req = req.WithContext(ctx)
|
||||
return serverSpan, req
|
||||
}
|
||||
|
||||
func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
span, req := p.startSpan(req)
|
||||
defer span.Finish()
|
||||
|
||||
target, err := p.route(req)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
|
||||
@@ -129,7 +185,17 @@ func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
req.URL.Scheme = "http" // XXX (reed): h2 support
|
||||
req.URL.Host = target
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(req.Context(), "lb_roundtrip")
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
// shove the span into the outbound request
|
||||
opentracing.GlobalTracer().Inject(
|
||||
span.Context(),
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(req.Header))
|
||||
|
||||
resp, err := p.transport.RoundTrip(req)
|
||||
span.Finish()
|
||||
if err == nil {
|
||||
p.router.InterceptResponse(req, resp)
|
||||
}
|
||||
@@ -137,6 +203,10 @@ func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
}
|
||||
|
||||
func (p *proxy) route(req *http.Request) (string, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(req.Context(), "lb_route")
|
||||
defer span.Finish()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
// TODO errors from this func likely could return 401 or so instead of 503 always
|
||||
key, err := p.keyFunc(req)
|
||||
if err != nil {
|
||||
|
||||
@@ -31,7 +31,7 @@ func main() {
|
||||
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
|
||||
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
|
||||
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
|
||||
|
||||
flag.StringVar(&conf.ZipkinURL, "zipkin", "", "zipkin endpoint to send traces")
|
||||
flag.Parse()
|
||||
|
||||
conf.MinAPIVersion = semver.New(*minAPIVersion)
|
||||
|
||||
Reference in New Issue
Block a user