diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index bf45652aa..714a4f9d8 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -47,6 +47,7 @@ func getCfg(t *models.Task) *task.Config { ID: t.ID, AppName: t.AppName, Env: t.EnvVars, + Ready: make(chan struct{}), } if t.Timeout == nil || *t.Timeout <= 0 { cfg.Timeout = DefaultTimeout diff --git a/api/runner/drivers/docker/docker.go b/api/runner/drivers/docker/docker.go index 8dd3569e7..56211c66e 100644 --- a/api/runner/drivers/docker/docker.go +++ b/api/runner/drivers/docker/docker.go @@ -60,7 +60,8 @@ type Auther interface { type runResult struct { error - StatusValue string + status string + start time.Time } func (r *runResult) Error() string { @@ -70,8 +71,9 @@ func (r *runResult) Error() string { return r.error.Error() } -func (r *runResult) Status() string { return r.StatusValue } -func (r *runResult) UserVisible() bool { return common.IsUserVisibleError(r.error) } +func (r *runResult) Status() string { return r.status } +func (r *runResult) UserVisible() bool { return common.IsUserVisibleError(r.error) } +func (r *runResult) StartTime() time.Time { return r.start } type DockerDriver struct { conf drivers.Config @@ -409,6 +411,8 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers return nil, err } + start := time.Now() + err = drv.startTask(ctx, container) if err != nil { return nil, err @@ -429,8 +433,9 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers status, err := drv.status(ctx, container) return &runResult{ - StatusValue: status, - error: err, + start: start, + status: status, + error: err, }, nil } diff --git a/api/runner/drivers/driver.go b/api/runner/drivers/driver.go index 0b20e3c9b..364a85934 100644 --- a/api/runner/drivers/driver.go +++ b/api/runner/drivers/driver.go @@ -52,6 +52,11 @@ type RunResult interface { // Status should return the current status of the task. // Only valid options are {"error", "success", "timeout", "killed", "cancelled"}. Status() string + + // StartTime returns the time just before beginning execution of a task, + // for example including the time to pull a container image and doing any + // other setup. This should not include a container's execution duration. + StartTime() time.Time } // The ContainerTask interface guides task execution across a wide variety of diff --git a/api/runner/drivers/mock/mocker.go b/api/runner/drivers/mock/mocker.go index ab8766074..301c37852 100644 --- a/api/runner/drivers/mock/mocker.go +++ b/api/runner/drivers/mock/mocker.go @@ -3,6 +3,7 @@ package mock import ( "context" "fmt" + "time" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" ) @@ -31,16 +32,17 @@ func (c *cookie) Run(ctx context.Context) (drivers.RunResult, error) { return nil, fmt.Errorf("Mocker error! Bad.") } return &runResult{ - error: nil, - StatusValue: "success", + error: nil, + status: "success", + start: time.Now(), }, nil } type runResult struct { error - StatusValue string + status string + start time.Time } -func (runResult *runResult) Status() string { - return runResult.StatusValue -} +func (r *runResult) Status() string { return r.status } +func (r *runResult) StartTime() time.Time { return r.start } diff --git a/api/runner/runner.go b/api/runner/runner.go index 598b33022..432b99621 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -211,6 +211,12 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, } defer cookie.Close() + select { + case <-cfg.Ready: + default: + close(cfg.Ready) + } + metricStart := time.Now() result, err := cookie.Run(ctx) diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 51edc81e4..d176721b1 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -37,6 +37,7 @@ func TestRunnerHello(t *testing.T) { ID: fmt.Sprintf("hello-%d-%d", i, time.Now().Unix()), Image: test.route.Image, Timeout: 10 * time.Second, + Ready: make(chan struct{}), Stdin: strings.NewReader(test.payload), Stdout: &stdout, Stderr: &stderr, @@ -90,6 +91,7 @@ func TestRunnerError(t *testing.T) { ID: fmt.Sprintf("err-%d-%d", i, time.Now().Unix()), Image: test.route.Image, Timeout: 10 * time.Second, + Ready: make(chan struct{}), Stdin: strings.NewReader(test.payload), Stdout: &stdout, Stderr: &stderr, diff --git a/api/runner/task/task.go b/api/runner/task/task.go index 4cb8bbbcb..ed5b9c768 100644 --- a/api/runner/task/task.go +++ b/api/runner/task/task.go @@ -9,15 +9,18 @@ import ( ) type Config struct { - ID string - Path string - Image string - Timeout time.Duration - IdleTimeout time.Duration - AppName string - Memory uint64 - Env map[string]string - Format string + ID string + Path string + Image string + Timeout time.Duration + IdleTimeout time.Duration + AppName string + Memory uint64 + Env map[string]string + Format string + ReceivedTime time.Time + // Ready is used to await the first pull + Ready chan struct{} Stdin io.Reader Stdout io.Writer diff --git a/api/runner/worker.go b/api/runner/worker.go index a9da88baa..77522684e 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -94,6 +94,7 @@ func (rnr *Runner) RunTask(ctx context.Context, cfg *task.Config) (drivers.RunRe } else { tasks <- treq } + resp := <-treq.Response return resp.Result, resp.Err } @@ -256,6 +257,14 @@ func (hc *htfn) serve(ctx context.Context) { go func() { for { + select { + case <-lctx.Done(): + case <-cfg.Ready: + // on first execution, wait before starting idle timeout / stopping wait time clock, + // since docker pull / container create need to happen. + // XXX (reed): should we still obey the task timeout? docker image could be 8GB... + } + select { case <-lctx.Done(): return @@ -263,6 +272,7 @@ func (hc *htfn) serve(ctx context.Context) { logger.Info("Canceling inactive hot function") cancel() case t := <-hc.tasks: + start := time.Now() err := hc.proto.Dispatch(lctx, t) status := "success" if err != nil { @@ -272,8 +282,8 @@ func (hc *htfn) serve(ctx context.Context) { hc.once() t.Response <- task.Response{ - &runResult{StatusValue: status, error: err}, - err, + Result: &runResult{start: start, status: status, error: err}, + Err: err, } } } @@ -304,7 +314,8 @@ func runTaskReq(rnr *Runner, t task.Request) { type runResult struct { error - StatusValue string + status string + start time.Time } func (r *runResult) Error() string { @@ -314,4 +325,5 @@ func (r *runResult) Error() string { return r.error.Error() } -func (r *runResult) Status() string { return r.StatusValue } +func (r *runResult) Status() string { return r.status } +func (r *runResult) StartTime() time.Time { return r.start } diff --git a/api/server/runner.go b/api/server/runner.go index a305cb2f5..a80e05df7 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -193,17 +193,19 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun } cfg := &task.Config{ - AppName: appName, - Path: found.Path, - Env: envVars, - Format: found.Format, - ID: reqID, - Image: found.Image, - Memory: found.Memory, - Stdin: payload, - Stdout: &stdout, - Timeout: time.Duration(found.Timeout) * time.Second, - IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, + AppName: appName, + Path: found.Path, + Env: envVars, + Format: found.Format, + ID: reqID, + Image: found.Image, + Memory: found.Memory, + Stdin: payload, + Stdout: &stdout, + Timeout: time.Duration(found.Timeout) * time.Second, + IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, + ReceivedTime: time.Now(), + Ready: make(chan struct{}), } // ensure valid values @@ -244,6 +246,11 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun default: result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg, s.Datastore) + if result != nil { + waitTime := result.StartTime().Sub(cfg.ReceivedTime) + c.Header("XXX-FXLB-WAIT", waitTime.String()) + } + if err != nil { c.JSON(http.StatusInternalServerError, runnerResponse{ RequestID: cfg.ID, @@ -254,6 +261,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun log.WithError(err).Error("Failed to run task") break } + for k, v := range found.Headers { c.Header(k, v[0]) } diff --git a/lb/README.md b/fnlb/README.md similarity index 100% rename from lb/README.md rename to fnlb/README.md diff --git a/lb/ch.go b/fnlb/ch.go similarity index 60% rename from lb/ch.go rename to fnlb/ch.go index c4a62a6ce..103dade29 100644 --- a/lb/ch.go +++ b/fnlb/ch.go @@ -25,11 +25,36 @@ type consistentHash struct { func newCH() *consistentHash { return &consistentHash{ - rng: rand.New(rand.NewSource(time.Now().Unix())), + rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}), load: make(map[string]*int64), } } +type lockedSource struct { + lk sync.Mutex + src rand.Source64 +} + +func (r *lockedSource) Int63() (n int64) { + r.lk.Lock() + n = r.src.Int63() + r.lk.Unlock() + return n +} + +func (r *lockedSource) Uint64() (n uint64) { + r.lk.Lock() + n = r.src.Uint64() + r.lk.Unlock() + return n +} + +func (r *lockedSource) Seed(seed int64) { + r.lk.Lock() + r.src.Seed(seed) + r.lk.Unlock() +} + func (ch *consistentHash) add(newb string) { ch.Lock() defer ch.Unlock() @@ -84,12 +109,22 @@ func jumpConsistentHash(key uint64, num_buckets int32) int32 { return int32(b) } +// tracks last 10 samples (very fast) +const DECAY = 0.1 + +func ewma(old, new int64) int64 { + // TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts + return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY))) +} + func (ch *consistentHash) setLoad(key string, load int64) { ch.loadMu.RLock() l, ok := ch.load[key] ch.loadMu.RUnlock() if ok { - atomic.StoreInt64(l, load) + // this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample + prev := atomic.LoadInt64(l) + atomic.StoreInt64(l, ewma(prev, load)) } else { ch.loadMu.Lock() if _, ok := ch.load[key]; !ok { @@ -117,50 +152,62 @@ func (ch *consistentHash) besti(key string, i int) (string, error) { } f := func(n string) string { - var load int64 + var load time.Duration ch.loadMu.RLock() loadPtr := ch.load[loadKey(n, key)] ch.loadMu.RUnlock() if loadPtr != nil { - load = atomic.LoadInt64(loadPtr) + load = time.Duration(atomic.LoadInt64(loadPtr)) } - // TODO flesh out these values. should be wait times. + const ( + lowerLat = 500 * time.Millisecond + upperLat = 2 * time.Second + ) + + // TODO flesh out these values. // if we send < 50% of traffic off to other nodes when loaded // then as function scales nodes will get flooded, need to be careful. // // back off loaded node/function combos slightly to spread load // TODO do we need a kind of ref counter as well so as to send functions // to a different node while there's an outstanding call to another? - if load < 70 { + if load < lowerLat { return n - } else if load > 90 { - if ch.rng.Intn(100) < 60 { + } else if load > upperLat { + // really loaded + if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ? return n } - } else if load > 70 { - if ch.rng.Float64() < 80 { + } else { + // 10 < x < 40, as load approaches upperLat, x decreases [linearly] + x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40) + if ch.rng.Intn(100) < x { return n } } - // otherwise loop until we find a sufficiently unloaded node or a lucky coin flip + + // return invalid node to try next node return "" } - for _, n := range ch.nodes[i:] { - node := f(n) + for ; ; i++ { + // theoretically this could take infinite time, but practically improbable... + node := f(ch.nodes[i]) if node != "" { return node, nil + } else if i == len(ch.nodes)-1 { + i = -1 // reset i to 0 } } - // try the other half of the ring - for _, n := range ch.nodes[:i] { - node := f(n) - if node != "" { - return node, nil - } - } - - return "", ErrNoNodes + panic("strange things are afoot at the circle k") +} + +func translate(val, inFrom, inTo, outFrom, outTo int64) int { + outRange := outTo - outFrom + inRange := inTo - inFrom + inVal := val - inFrom + // we want the number to be lower as intensity increases + return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange)) } diff --git a/lb/dash.js b/fnlb/dash.js similarity index 51% rename from lb/dash.js rename to fnlb/dash.js index 2ee0bcddb..b40707285 100644 --- a/lb/dash.js +++ b/fnlb/dash.js @@ -3,7 +3,7 @@ var example = 'dynamic-update', theme = 'default'; var chart; // global -var seriesMapper = {}; +var seriesMapper = {}; // map by node+func name to chart[i] index Highcharts.setOptions({ global: { @@ -15,12 +15,8 @@ function requestData() { $.ajax({ url: '/1/lb/stats', success: function(point) { - console.log(point) var jason = JSON.parse(point); - //var series = chart.series[0], - //shift = series.data.length > 20; - if (!jason["stats"] || jason["stats"].length == 0) { // XXX (reed): using server timestamps for real data this can drift easily // XXX (reed): uh how to insert empty data point w/o node data? enum all series names? @@ -29,39 +25,40 @@ function requestData() { //name: 'Random data', //data: [] //}] + setTimeout(requestData, 1000); return } for (var i = 0; i < jason["stats"].length; i++) { stat = jason["stats"][i]; var node = stat["node"]; + var func = stat["func"]; + var key = node + func - console.log("before", seriesMapper[node]) - if (seriesMapper[node] == null) { - console.log("yodawg") - chart.addSeries({name: node, data: []}) - seriesMapper[node] = chart.series.length - 1 - chart.redraw(); + if (seriesMapper[key] == null) { + chart.addSeries({name: key, data: []}); + waitChart.addSeries({name: key, data: []}); + seriesMapper[key] = chart.series.length - 1; } - console.log("done", seriesMapper[node]) - series = chart.series[seriesMapper[node]] - //series = chart.series[0] + series = chart.series[seriesMapper[key]]; + waitSeries = waitChart.series[seriesMapper[key]]; + // XXX (reed): hack shift = series.data.length > 20 && i == jason["stats"].length + 1; - timestamp = Date.parse(stat["timestamp"]); - console.log(series.data.length, timestamp, stat["tp"]) - series.addPoint([timestamp, stat["tp"]], true, shift); - //series.addPoint({ - //name: node, - //data: {x: timestamp, y: stat["tp"]} - //}, true, shift); + console.log(series.data.length, timestamp, stat["tp"], stat["wait"]); + series.addPoint([timestamp, stat["tp"]], false, shift); + waitSeries.addPoint([timestamp, stat["wait"]], false, shift); + } + if (jason["stats"].length > 0) { + chart.redraw(); + waitChart.redraw(); } // call it again after one second - // XXX (reed): this won't work cuz if the endpoint fails then we won't ask for more datas + // XXX (reed): this won't work perfectly cuz if the endpoint fails then we won't ask for more datas setTimeout(requestData, 1000); }, cache: false @@ -71,7 +68,7 @@ function requestData() { $(document).ready(function() { chart = new Highcharts.Chart({ chart: { - renderTo: 'container', + renderTo: 'throughput_chart', events: { load: requestData } @@ -107,7 +104,52 @@ $(document).ready(function() { minPadding: 0.2, maxPadding: 0.2, title: { - text: 'Value', + text: 'throughput (/s)', + margin: 80 + } + }, + series: [] + }); + + waitChart = new Highcharts.Chart({ + chart: { + renderTo: 'wait_chart', + events: { + load: requestData + } + }, + rangeSelector: { + buttons: [{ + count: 1, + type: 'minute', + text: '1M' + }, { + count: 5, + type: 'minute', + text: '5M' + }, { + type: 'all', + text: 'All' + }], + //inputEnabled: false, + selected: 0 + }, + title: { + text: 'lb data' + }, + exporting: { + enabled: false + }, + xAxis: { + type: 'datetime', + tickPixelInterval: 150, + maxZoom: 20 * 1000 + }, + yAxis: { + minPadding: 0.2, + maxPadding: 0.2, + title: { + text: 'wait time (seconds)', margin: 80 } }, diff --git a/lb/lb.go b/fnlb/lb.go similarity index 78% rename from lb/lb.go rename to fnlb/lb.go index e669e10ef..144b053aa 100644 --- a/lb/lb.go +++ b/fnlb/lb.go @@ -1,29 +1,27 @@ package main import ( + "context" "encoding/json" "flag" "fmt" "io/ioutil" "net/http" + "os" + "os/signal" "strings" + "sync" + "syscall" "time" "github.com/Sirupsen/logrus" ) -// TODO: consistent hashing is nice to get a cheap way to place nodes but it -// doesn't account well for certain functions that may be 'hotter' than others. -// we should very likely keep a load ordered list and distribute based on that. -// if we can get some kind of feedback from the f(x) nodes, we can use that. -// maybe it's good enough to just ch(x) + 1 if ch(x) is marked as "hot"? - // TODO the load balancers all need to have the same list of nodes. gossip? // also gossip would handle failure detection instead of elb style. or it can // be pluggable and then we can read from where bmc is storing them and use that // or some OSS alternative -// TODO when adding nodes we should health check them once before adding them // TODO when node goes offline should try to redirect request instead of 5xxing // TODO we could add some kind of pre-warming call to the functions server where @@ -39,7 +37,7 @@ func main() { fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes") var conf config - flag.IntVar(&conf.Port, "port", 8081, "port to run on") + flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on") flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds") flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health") flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy") @@ -50,12 +48,34 @@ func main() { ch := newProxy(conf) - // XXX (reed): safe shutdown - fmt.Println(http.ListenAndServe(":8081", ch)) + err := serve(conf.Listen, ch) + if err != nil { + logrus.WithError(err).Error("server error") + } +} + +func serve(addr string, handler http.Handler) error { + server := &http.Server{Addr: addr, Handler: handler} + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT) + go func() { + defer wg.Done() + for sig := range ch { + logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal") + server.Shutdown(context.Background()) // safe shutdown + return + } + }() + return server.ListenAndServe() } type config struct { - Port int `json:"port"` + Listen string `json:"port"` Nodes []string `json:"nodes"` HealthcheckInterval int `json:"healthcheck_interval"` HealthcheckEndpoint string `json:"healthcheck_endpoint"` @@ -108,31 +128,38 @@ func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) { Timestamp time.Time `json:"timestamp"` Throughput int `json:"tp"` Node string `json:"node"` + Func string `json:"func"` + Wait float64 `json:"wait"` // seconds } var sts []st + // roll up and calculate throughput per second. idk why i hate myself aggs := make(map[string][]*stat) for _, s := range stats { - // roll up and calculate throughput per second. idk why i hate myself - if t := aggs[s.node]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) { + key := s.node + "/" + s.fx + if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) { sts = append(sts, st{ Timestamp: t[0].timestamp, Throughput: len(t), - Node: s.node, + Node: t[0].node, + Func: t[0].fx, + Wait: avgWait(t), }) - aggs[s.node] = append(aggs[s.node][:0], s) + aggs[key] = append(aggs[key][:0], s) } else { - aggs[s.node] = append(aggs[s.node], s) + aggs[key] = append(aggs[key], s) } - } - for node, t := range aggs { + // leftovers + for _, t := range aggs { sts = append(sts, st{ Timestamp: t[0].timestamp, Throughput: len(t), - Node: node, + Node: t[0].node, + Func: t[0].fx, + Wait: avgWait(t), }) } @@ -143,6 +170,14 @@ func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) { }) } +func avgWait(stats []*stat) float64 { + var sum time.Duration + for _, s := range stats { + sum += s.wait + } + return (sum / time.Duration(len(stats))).Seconds() +} + func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) { var bod struct { Node string `json:"node"` @@ -220,7 +255,6 @@ var dashStr = ` lb dash - @@ -228,12 +262,11 @@ var dashStr = ` %s - - -
+
+
diff --git a/lb/lb_test.go b/fnlb/lb_test.go similarity index 100% rename from lb/lb_test.go rename to fnlb/lb_test.go diff --git a/lb/proxy.go b/fnlb/proxy.go similarity index 88% rename from lb/proxy.go rename to fnlb/proxy.go index 547f5b3ce..47e88237a 100644 --- a/lb/proxy.go +++ b/fnlb/proxy.go @@ -8,7 +8,6 @@ import ( "net" "net/http" "net/http/httputil" - "strconv" "sync" "time" @@ -36,12 +35,13 @@ type chProxy struct { transport http.RoundTripper } -// TODO should prob track per f(x) per node type stat struct { timestamp time.Time latency time.Duration node string code int + fx string + wait time.Duration } func (ch *chProxy) addStat(s *stat) { @@ -60,7 +60,6 @@ func (ch *chProxy) getStats() []*stat { ch.stats = ch.stats[:0] ch.statsMu.Unlock() - // XXX (reed): down sample to per second return stats } @@ -95,6 +94,7 @@ func newProxy(conf config) *chProxy { director := func(req *http.Request) { target, err := ch.ch.get(req.URL.Path) if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed") target = "error" } @@ -109,7 +109,6 @@ func newProxy(conf config) *chProxy { } for _, n := range conf.Nodes { - // XXX (reed): need to health check these ch.ch.add(n) } go ch.healthcheck() @@ -123,6 +122,7 @@ type bufferPool struct { func newBufferPool() httputil.BufferPool { return &bufferPool{ bufs: &sync.Pool{ + // 32KB is what the proxy would've used without recycling them New: func() interface{} { return make([]byte, 32*1024) }, }, } @@ -132,9 +132,11 @@ func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) } func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) } func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) { - if req.URL.Host == "error" { - io.Copy(ioutil.Discard, req.Body) - req.Body.Close() + if req != nil && req.URL.Host == "error" { + if req.Body != nil { + io.Copy(ioutil.Discard, req.Body) + req.Body.Close() + } // XXX (reed): if we let the proxy code write the response it will be body-less. ok? return nil, ErrNoNodes } @@ -148,9 +150,10 @@ func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) { } func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) { - // XXX (reed): give f(x) nodes ability to send back wait time in response - // XXX (reed): we should prob clear this from user response - load, _ := strconv.Atoi(resp.Header.Get("XXX-FXLB-WAIT")) + load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT")) + // XXX (reed): we should prob clear this from user response? + // resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user + // XXX (reed): need to validate these prob ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load)) @@ -159,7 +162,8 @@ func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency tim latency: latency, node: req.URL.Host, code: resp.StatusCode, - // XXX (reed): function + fx: req.URL.Path, + wait: load, }) } @@ -167,7 +171,6 @@ func (ch *chProxy) healthcheck() { for range time.Tick(ch.hcInterval) { nodes := ch.ch.list() nodes = append(nodes, ch.dead()...) - // XXX (reed): need to figure out elegant adding / removing better for _, n := range nodes { go ch.ping(n) }