From d8ada599112e0732f352d2eb94384db0f5ecf64c Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Mon, 22 May 2017 10:53:53 -0700 Subject: [PATCH] in a reasonable unworking state --- lb/dash.js | 123 +++++++++++++++++++++++++------------------ lb/lb.go | 149 ++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 198 insertions(+), 74 deletions(-) diff --git a/lb/dash.js b/lb/dash.js index fc3b672c0..0604ed9e5 100644 --- a/lb/dash.js +++ b/lb/dash.js @@ -1,71 +1,96 @@ +jQuery.noConflict(); +var example = 'dynamic-update', + theme = 'default'; +(function($){ // encapsulate jQuery -Highcharts.setOptions({ + Highcharts.setOptions({ global: { - useUTC: false + useUTC: false } -}); + }); -// Create the chart -Highcharts.stockChart('container', { + // Create the chart + Highcharts.stockChart('container', { chart: { - events: { - load: function () { + events: { + load: function () { + setInterval(function () { + var xmlhttp = new XMLHttpRequest(); + var url = "/1/lb/stats"; - // set up the updating of the chart each second - var series = this.series[0]; - setInterval(function () { - //var x = (new Date()).getTime(), // current time - //y = Math.round(Math.random() * 100); - //series.addPoint([x, y], true, true); + xmlhttp.onreadystatechange = function() { + if (this.readyState == 4 && this.status == 200) { + var jason = JSON.parse(this.responseText); - series.addPoint([x, y], true, true); + if (!jason["stats"] || jason["stats"].length == 0) { + // XXX (reed): using server timestamps for real data this can drift easily + // XXX (reed): uh how to insert empty data point w/o node data? enum all series names? + //series.addPoint([(new Date()).getTime(), 0], true, shift); + //series: [{ + //name: 'Random data', + //data: [] + //}] + return + } - }, 1000); - } + for (var i = 0; i < jason["stats"].length; i++) { + // set up the updating of the chart each second + + var node = jason["stats"] + var series = chart.get(node) + if (!series) { + this.addSeries({name: node, data: []}) + series = chart.get(node) // XXX (reed): meh + } + + shift = series.data.length > 20; + + + stat = jason["stats"][i]; + timestamp = Date.parse(stat["timestamp"]); + // series.addPoint([timestamp, stat["tp"]], true, shift); + console.log(stat["node"]); + series.addPoint({ + name: stat["node"], + data: {x: timestamp, y: stat["tp"]} + }, true, shift); + } + } + }; + xmlhttp.open("GET", url, true); + xmlhttp.send(); + }, 1000); } + } }, rangeSelector: { - buttons: [{ - count: 1, - type: 'minute', - text: '1M' - }, { - count: 5, - type: 'minute', - text: '5M' - }, { - type: 'all', - text: 'All' - }], - inputEnabled: false, - selected: 0 + buttons: [{ + count: 1, + type: 'minute', + text: '1M' + }, { + count: 5, + type: 'minute', + text: '5M' + }, { + type: 'all', + text: 'All' + }], + inputEnabled: false, + selected: 0 }, title: { - text: 'Live random data' + text: 'lb data' }, exporting: { - enabled: false + enabled: false }, - series: [{ - name: 'Random data', - data: (function () { - // generate an array of random data - var data = [], - time = (new Date()).getTime(), - i; + series: [] + }); - for (i = -999; i <= 0; i += 1) { - data.push([ - time + i * 1000, - Math.round(Math.random() * 100) - ]); - } - return data; - }()) - }] -}); +})(jQuery); diff --git a/lb/lb.go b/lb/lb.go index 60eb597f2..d81feb9f9 100644 --- a/lb/lb.go +++ b/lb/lb.go @@ -31,11 +31,18 @@ import ( // maybe it's good enough to just ch(x) + 1 if ch(x) is marked as "hot"? // TODO the load balancers all need to have the same list of nodes. gossip? -// also gossip would handle failure detection instead of elb style +// also gossip would handle failure detection instead of elb style. or it can +// be pluggable and then we can read from where bmc is storing them and use that +// or some OSS alternative // TODO when adding nodes we should health check them once before adding them // TODO when node goes offline should try to redirect request instead of 5xxing +// TODO we could add some kind of pre-warming call to the functions server where +// the lb could send an image to it to download before the lb starts sending traffic +// there, otherwise when load starts expanding a few functions are going to eat +// the pull time + // TODO config // TODO TLS @@ -89,15 +96,20 @@ type chProxy struct { transport http.RoundTripper } +// TODO should prob track per f(x) per node type stat struct { - tim time.Time - latency time.Duration - host string - code uint64 + timestamp time.Time + latency time.Duration + node string + code int } func (ch *chProxy) addStat(s *stat) { ch.statsMu.Lock() + // delete last 1 minute of data if nobody is watching + for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ { + ch.stats = ch.stats[:i] + } ch.stats = append(ch.stats, s) ch.statsMu.Unlock() } @@ -109,6 +121,7 @@ func (ch *chProxy) getStats() []*stat { ch.statsMu.Unlock() // XXX (reed): down sample to per second + return stats } func newProxy(conf config) *chProxy { @@ -151,7 +164,7 @@ func newProxy(conf config) *chProxy { ch.proxy = &httputil.ReverseProxy{ Director: director, - Transport: tranny, + Transport: ch, BufferPool: newBufferPool(), } @@ -171,24 +184,28 @@ func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) { return nil, ErrNoNodes } + then := time.Now() resp, err := ch.transport.RoundTrip(req) - ch.intercept(req, resp) + if err == nil { + ch.intercept(req, resp, time.Since(then)) + } return resp, err } -func (ch *chProxy) intercept(req *http.Request, resp *http.Response) { +func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) { // XXX (reed): give f(x) nodes ability to send back wait time in response // XXX (reed): we should prob clear this from user response load, _ := strconv.Atoi(resp.Header.Get("XXX-FXLB-WAIT")) // XXX (reed): need to validate these prob ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load)) - // XXX (reed): stats data - //ch.statsMu.Lock() - //ch.stats = append(ch.stats, &stat{ - //host: r.URL.Host, - //} - //ch.stats = r.URL.Host + ch.addStat(&stat{ + timestamp: time.Now(), + latency: latency, + node: req.URL.Host, + code: resp.StatusCode, + // XXX (reed): function + }) } type bufferPool struct { @@ -263,7 +280,9 @@ func (ch *chProxy) alive(node string) { } func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/1/lb/nodes" { + switch r.URL.Path { + // XXX (reed): probably do these on a separate port to avoid conflicts + case "/1/lb/nodes": switch r.Method { case "PUT": ch.addNode(w, r) @@ -275,14 +294,59 @@ func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ch.listNodes(w, r) return } - - // XXX (reed): stats? - // XXX (reed): probably do these on a separate port to avoid conflicts + case "/1/lb/stats": + ch.statsGet(w, r) + return + case "/1/lb/dash": + ch.dash(w, r) + return } ch.proxy.ServeHTTP(w, r) } +func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) { + stats := ch.getStats() + + type st struct { + Timestamp time.Time `json:"timestamp"` + Throughput int `json:"tp"` + Node string `json:"node"` + } + var sts []st + + aggs := make(map[string][]*stat) + for _, s := range stats { + // roll up and calculate throughput per second. idk why i hate myself + if t := aggs[s.node]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) { + sts = append(sts, st{ + Timestamp: t[0].timestamp, + Throughput: len(t), + Node: s.node, + }) + + aggs[s.node] = append(aggs[s.node][:0], s) + } else { + aggs[s.node] = append(aggs[s.node], s) + } + + } + + for node, t := range aggs { + sts = append(sts, st{ + Timestamp: t[0].timestamp, + Throughput: len(t), + Node: node, + }) + } + + json.NewEncoder(w).Encode(struct { + Stats []st `json:"stats"` + }{ + Stats: sts, + }) +} + func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) { var bod struct { Node string `json:"node"` @@ -335,13 +399,46 @@ func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) { }) } -func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) { - _ = ` - - +// XXX (reed): clean up mess +var dashPage []byte -
- ` +func init() { + jsb, err := ioutil.ReadFile("dash.js") + if err != nil { + logrus.WithError(err).Fatal("couldn't open dash.js file") + } + + dashPage = []byte(fmt.Sprintf(dashStr, string(jsb))) +} + +var dashStr = ` + + + +lb dash + + + + + + + + + + + +
+ + + + + +` + +func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) { + w.Write(dashPage) } func (ch *chProxy) isDead(node string) bool { @@ -505,7 +602,7 @@ func (ch *consistentHash) besti(key string, i int) (string, error) { f := func(n string) string { var load int64 ch.loadMu.RLock() - loadPtr := ch.load[loadKey(node, key)] + loadPtr := ch.load[loadKey(n, key)] ch.loadMu.RUnlock() if loadPtr != nil { load = atomic.LoadInt64(loadPtr) @@ -516,6 +613,8 @@ func (ch *consistentHash) besti(key string, i int) (string, error) { // then as function scales nodes will get flooded, need to be careful. // // back off loaded node/function combos slightly to spread load + // TODO do we need a kind of ref counter as well so as to send functions + // to a different node while there's an outstanding call to another? if load < 70 { return n } else if load > 90 {