in a reasonable unworking state

This commit is contained in:
Reed Allman
2017-05-22 10:53:53 -07:00
parent b25e1e20ae
commit d8ada59911
2 changed files with 198 additions and 74 deletions

View File

@@ -1,4 +1,8 @@
jQuery.noConflict();
var example = 'dynamic-update',
theme = 'default';
(function($){ // encapsulate jQuery
Highcharts.setOptions({
global: {
@@ -11,16 +15,51 @@ Highcharts.stockChart('container', {
chart: {
events: {
load: function () {
// set up the updating of the chart each second
var series = this.series[0];
setInterval(function () {
//var x = (new Date()).getTime(), // current time
//y = Math.round(Math.random() * 100);
//series.addPoint([x, y], true, true);
var xmlhttp = new XMLHttpRequest();
var url = "/1/lb/stats";
series.addPoint([x, y], true, true);
xmlhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
var jason = JSON.parse(this.responseText);
if (!jason["stats"] || jason["stats"].length == 0) {
// XXX (reed): using server timestamps for real data this can drift easily
// XXX (reed): uh how to insert empty data point w/o node data? enum all series names?
//series.addPoint([(new Date()).getTime(), 0], true, shift);
//series: [{
//name: 'Random data',
//data: []
//}]
return
}
for (var i = 0; i < jason["stats"].length; i++) {
// set up the updating of the chart each second
var node = jason["stats"]
var series = chart.get(node)
if (!series) {
this.addSeries({name: node, data: []})
series = chart.get(node) // XXX (reed): meh
}
shift = series.data.length > 20;
stat = jason["stats"][i];
timestamp = Date.parse(stat["timestamp"]);
// series.addPoint([timestamp, stat["tp"]], true, shift);
console.log(stat["node"]);
series.addPoint({
name: stat["node"],
data: {x: timestamp, y: stat["tp"]}
}, true, shift);
}
}
};
xmlhttp.open("GET", url, true);
xmlhttp.send();
}, 1000);
}
}
@@ -44,28 +83,14 @@ Highcharts.stockChart('container', {
},
title: {
text: 'Live random data'
text: 'lb data'
},
exporting: {
enabled: false
},
series: [{
name: 'Random data',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -999; i <= 0; i += 1) {
data.push([
time + i * 1000,
Math.round(Math.random() * 100)
]);
}
return data;
}())
}]
series: []
});
})(jQuery);

143
lb/lb.go
View File

@@ -31,11 +31,18 @@ import (
// maybe it's good enough to just ch(x) + 1 if ch(x) is marked as "hot"?
// TODO the load balancers all need to have the same list of nodes. gossip?
// also gossip would handle failure detection instead of elb style
// also gossip would handle failure detection instead of elb style. or it can
// be pluggable and then we can read from where bmc is storing them and use that
// or some OSS alternative
// TODO when adding nodes we should health check them once before adding them
// TODO when node goes offline should try to redirect request instead of 5xxing
// TODO we could add some kind of pre-warming call to the functions server where
// the lb could send an image to it to download before the lb starts sending traffic
// there, otherwise when load starts expanding a few functions are going to eat
// the pull time
// TODO config
// TODO TLS
@@ -89,15 +96,20 @@ type chProxy struct {
transport http.RoundTripper
}
// TODO should prob track per f(x) per node
type stat struct {
tim time.Time
timestamp time.Time
latency time.Duration
host string
code uint64
node string
code int
}
func (ch *chProxy) addStat(s *stat) {
ch.statsMu.Lock()
// delete last 1 minute of data if nobody is watching
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
ch.stats = ch.stats[:i]
}
ch.stats = append(ch.stats, s)
ch.statsMu.Unlock()
}
@@ -109,6 +121,7 @@ func (ch *chProxy) getStats() []*stat {
ch.statsMu.Unlock()
// XXX (reed): down sample to per second
return stats
}
func newProxy(conf config) *chProxy {
@@ -151,7 +164,7 @@ func newProxy(conf config) *chProxy {
ch.proxy = &httputil.ReverseProxy{
Director: director,
Transport: tranny,
Transport: ch,
BufferPool: newBufferPool(),
}
@@ -171,24 +184,28 @@ func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, ErrNoNodes
}
then := time.Now()
resp, err := ch.transport.RoundTrip(req)
ch.intercept(req, resp)
if err == nil {
ch.intercept(req, resp, time.Since(then))
}
return resp, err
}
func (ch *chProxy) intercept(req *http.Request, resp *http.Response) {
func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
// XXX (reed): give f(x) nodes ability to send back wait time in response
// XXX (reed): we should prob clear this from user response
load, _ := strconv.Atoi(resp.Header.Get("XXX-FXLB-WAIT"))
// XXX (reed): need to validate these prob
ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
// XXX (reed): stats data
//ch.statsMu.Lock()
//ch.stats = append(ch.stats, &stat{
//host: r.URL.Host,
//}
//ch.stats = r.URL.Host
ch.addStat(&stat{
timestamp: time.Now(),
latency: latency,
node: req.URL.Host,
code: resp.StatusCode,
// XXX (reed): function
})
}
type bufferPool struct {
@@ -263,7 +280,9 @@ func (ch *chProxy) alive(node string) {
}
func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/1/lb/nodes" {
switch r.URL.Path {
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/nodes":
switch r.Method {
case "PUT":
ch.addNode(w, r)
@@ -275,14 +294,59 @@ func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ch.listNodes(w, r)
return
}
// XXX (reed): stats?
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/stats":
ch.statsGet(w, r)
return
case "/1/lb/dash":
ch.dash(w, r)
return
}
ch.proxy.ServeHTTP(w, r)
}
func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) {
stats := ch.getStats()
type st struct {
Timestamp time.Time `json:"timestamp"`
Throughput int `json:"tp"`
Node string `json:"node"`
}
var sts []st
aggs := make(map[string][]*stat)
for _, s := range stats {
// roll up and calculate throughput per second. idk why i hate myself
if t := aggs[s.node]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: s.node,
})
aggs[s.node] = append(aggs[s.node][:0], s)
} else {
aggs[s.node] = append(aggs[s.node], s)
}
}
for node, t := range aggs {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: node,
})
}
json.NewEncoder(w).Encode(struct {
Stats []st `json:"stats"`
}{
Stats: sts,
})
}
func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
@@ -335,13 +399,46 @@ func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) {
})
}
func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
_ = `
<script src="https://code.highcharts.com/stock/highstock.js"></script>
<script src="https://code.highcharts.com/stock/modules/exporting.js"></script>
// XXX (reed): clean up mess
var dashPage []byte
func init() {
jsb, err := ioutil.ReadFile("dash.js")
if err != nil {
logrus.WithError(err).Fatal("couldn't open dash.js file")
}
dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
}
var dashStr = `<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>lb dash</title>
<!-- 1. Add these JavaScript inclusions in the head of your page -->
<script type="text/javascript" src="https://code.jquery.com/jquery-3.2.1.min.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/highstock.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/modules/exporting.js"></script>
<!-- 2. Add the JavaScript to initialize the chart on document ready -->
</head>
<body>
<!-- 3. Add the container -->
<div id="container" style="height: 400px; min-width: 310px"></div>
<script>
%s
</script>
</body>
</html>
`
func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
w.Write(dashPage)
}
func (ch *chProxy) isDead(node string) bool {
@@ -505,7 +602,7 @@ func (ch *consistentHash) besti(key string, i int) (string, error) {
f := func(n string) string {
var load int64
ch.loadMu.RLock()
loadPtr := ch.load[loadKey(node, key)]
loadPtr := ch.load[loadKey(n, key)]
ch.loadMu.RUnlock()
if loadPtr != nil {
load = atomic.LoadInt64(loadPtr)
@@ -516,6 +613,8 @@ func (ch *consistentHash) besti(key string, i int) (string, error) {
// then as function scales nodes will get flooded, need to be careful.
//
// back off loaded node/function combos slightly to spread load
// TODO do we need a kind of ref counter as well so as to send functions
// to a different node while there's an outstanding call to another?
if load < 70 {
return n
} else if load > 90 {