move the lb stuff around in lego form

this structure should allow us to keep the consistent hash code and just use
consistent hashing on a subset of nodes, then in order to satisfy the oracle
service stuff in functions-service we can just implement a different "Grouper"
that does vm allocation and whatever other magic we need to manage nodes and
poop out sets of nodes based on tenant id / func.

for the suga... see main.go and proxy.go, the rest is basically renaming /
moving stuff (not easy to follow changes, nature of the beast).

the only 'issues' i can think of is that down in the ch stuff (or Router) we
will need a back channel to tell the 'Grouper' to add a node (i.e. all nodes for
that shard are currently loaded) which isn't great and also the grouper has no
way of knowing that a node in the given set may not be being used anymore.
still thinking about how to couple those two. basically don't want to have to
just copy that consistent hash code but after munging with stuff i'm almost at
'fuck it' level and maybe it's worth it to just copy and hack it up in
functions-service for what we need. we'll also need to have different key
funcs for groupers and routers eventually (grouper wants tenant id, router
needs tenant id + router). anyway, open to any ideas, i haven't come up with
anything great. feedback on interface would be great

after this can plumb the datastore stuff into the allGrouper pretty easily
This commit is contained in:
Reed Allman
2017-06-10 01:57:28 -07:00
parent b45ad6d5f9
commit 398ecc388e
10 changed files with 839 additions and 822 deletions

View File

@@ -1,213 +0,0 @@
package main
import (
"errors"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/dchest/siphash"
)
// consistentHash will maintain a list of strings which can be accessed by
// keying them with a separate group of strings
type consistentHash struct {
// protects nodes
sync.RWMutex
nodes []string
loadMu sync.RWMutex
load map[string]*int64
rng *rand.Rand
}
func newCH() *consistentHash {
return &consistentHash{
rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
load: make(map[string]*int64),
}
}
type lockedSource struct {
lk sync.Mutex
src rand.Source64
}
func (r *lockedSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return n
}
func (r *lockedSource) Uint64() (n uint64) {
r.lk.Lock()
n = r.src.Uint64()
r.lk.Unlock()
return n
}
func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
func (ch *consistentHash) add(newb string) {
ch.Lock()
defer ch.Unlock()
// filter dupes, under lock. sorted, so binary search
i := sort.SearchStrings(ch.nodes, newb)
if i < len(ch.nodes) && ch.nodes[i] == newb {
return
}
ch.nodes = append(ch.nodes, newb)
// need to keep in sorted order so that hash index works across nodes
sort.Sort(sort.StringSlice(ch.nodes))
}
func (ch *consistentHash) remove(ded string) {
ch.Lock()
i := sort.SearchStrings(ch.nodes, ded)
if i < len(ch.nodes) && ch.nodes[i] == ded {
ch.nodes = append(ch.nodes[:i], ch.nodes[i+1:]...)
}
ch.Unlock()
}
// return a copy
func (ch *consistentHash) list() []string {
ch.RLock()
ret := make([]string, len(ch.nodes))
copy(ret, ch.nodes)
ch.RUnlock()
return ret
}
func (ch *consistentHash) get(key string) (string, error) {
// crc not unique enough & sha is too slow, it's 1 import
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
ch.RLock()
defer ch.RUnlock()
i := int(jumpConsistentHash(sum64, int32(len(ch.nodes))))
return ch.besti(key, i)
}
// A Fast, Minimal Memory, Consistent Hash Algorithm:
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
var b, j int64 = -1, 0
for j < int64(num_buckets) {
b = j
key = key*2862933555777941757 + 1
j = (b + 1) * int64((1<<31)/(key>>33)+1)
}
return int32(b)
}
// tracks last 10 samples (very fast)
const DECAY = 0.1
func ewma(old, new int64) int64 {
// TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
}
func (ch *consistentHash) setLoad(key string, load int64) {
ch.loadMu.RLock()
l, ok := ch.load[key]
ch.loadMu.RUnlock()
if ok {
// this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
prev := atomic.LoadInt64(l)
atomic.StoreInt64(l, ewma(prev, load))
} else {
ch.loadMu.Lock()
if _, ok := ch.load[key]; !ok {
ch.load[key] = &load
}
ch.loadMu.Unlock()
}
}
var (
ErrNoNodes = errors.New("no nodes available")
)
func loadKey(node, key string) string {
return node + "\x00" + key
}
// XXX (reed): push down fails / load into ch
func (ch *consistentHash) besti(key string, i int) (string, error) {
ch.RLock()
defer ch.RUnlock()
if len(ch.nodes) < 1 {
return "", ErrNoNodes
}
f := func(n string) string {
var load time.Duration
ch.loadMu.RLock()
loadPtr := ch.load[loadKey(n, key)]
ch.loadMu.RUnlock()
if loadPtr != nil {
load = time.Duration(atomic.LoadInt64(loadPtr))
}
const (
lowerLat = 500 * time.Millisecond
upperLat = 2 * time.Second
)
// TODO flesh out these values.
// if we send < 50% of traffic off to other nodes when loaded
// then as function scales nodes will get flooded, need to be careful.
//
// back off loaded node/function combos slightly to spread load
// TODO do we need a kind of ref counter as well so as to send functions
// to a different node while there's an outstanding call to another?
if load < lowerLat {
return n
} else if load > upperLat {
// really loaded
if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
return n
}
} else {
// 10 < x < 40, as load approaches upperLat, x decreases [linearly]
x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
if ch.rng.Intn(100) < x {
return n
}
}
// return invalid node to try next node
return ""
}
for ; ; i++ {
// theoretically this could take infinite time, but practically improbable...
node := f(ch.nodes[i])
if node != "" {
return node, nil
} else if i == len(ch.nodes)-1 {
i = -1 // reset i to 0
}
}
panic("strange things are afoot at the circle k")
}
func translate(val, inFrom, inTo, outFrom, outTo int64) int {
outRange := outTo - outFrom
inRange := inTo - inFrom
inVal := val - inFrom
// we want the number to be lower as intensity increases
return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
}

View File

@@ -1,311 +0,0 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
)
// TODO the load balancers all need to have the same list of nodes. gossip?
// also gossip would handle failure detection instead of elb style. or it can
// be pluggable and then we can read from where bmc is storing them and use that
// or some OSS alternative
// TODO when node goes offline should try to redirect request instead of 5xxing
// TODO we could add some kind of pre-warming call to the functions server where
// the lb could send an image to it to download before the lb starts sending traffic
// there, otherwise when load starts expanding a few functions are going to eat
// the pull time
// TODO config
// TODO TLS
func main() {
// XXX (reed): normalize
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
var conf config
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
flag.Parse()
conf.Nodes = strings.Split(*fnodes, ",")
ch := newProxy(conf)
err := serve(conf.Listen, ch)
if err != nil {
logrus.WithError(err).Error("server error")
}
}
func serve(addr string, handler http.Handler) error {
server := &http.Server{Addr: addr, Handler: handler}
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
go func() {
defer wg.Done()
for sig := range ch {
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
server.Shutdown(context.Background()) // safe shutdown
return
}
}()
return server.ListenAndServe()
}
type config struct {
Listen string `json:"port"`
Nodes []string `json:"nodes"`
HealthcheckInterval int `json:"healthcheck_interval"`
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
HealthcheckTimeout int `json:"healthcheck_timeout"`
}
// XXX (reed): clean up mess
var dashPage []byte
func init() {
jsb, err := ioutil.ReadFile("dash.js")
if err != nil {
logrus.WithError(err).Fatal("couldn't open dash.js file")
}
dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
}
func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/nodes":
switch r.Method {
case "PUT":
ch.addNode(w, r)
return
case "DELETE":
ch.removeNode(w, r)
return
case "GET":
ch.listNodes(w, r)
return
}
case "/1/lb/stats":
ch.statsGet(w, r)
return
case "/1/lb/dash":
ch.dash(w, r)
return
}
ch.proxy.ServeHTTP(w, r)
}
func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) {
stats := ch.getStats()
type st struct {
Timestamp time.Time `json:"timestamp"`
Throughput int `json:"tp"`
Node string `json:"node"`
Func string `json:"func"`
Wait float64 `json:"wait"` // seconds
}
var sts []st
// roll up and calculate throughput per second. idk why i hate myself
aggs := make(map[string][]*stat)
for _, s := range stats {
key := s.node + "/" + s.fx
if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: t[0].node,
Func: t[0].fx,
Wait: avgWait(t),
})
aggs[key] = append(aggs[key][:0], s)
} else {
aggs[key] = append(aggs[key], s)
}
}
// leftovers
for _, t := range aggs {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: t[0].node,
Func: t[0].fx,
Wait: avgWait(t),
})
}
json.NewEncoder(w).Encode(struct {
Stats []st `json:"stats"`
}{
Stats: sts,
})
}
func avgWait(stats []*stat) float64 {
var sum time.Duration
for _, s := range stats {
sum += s.wait
}
return (sum / time.Duration(len(stats))).Seconds()
}
func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
ch.ch.add(bod.Node)
sendSuccess(w, "node added")
}
func (ch *chProxy) removeNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
ch.ch.remove(bod.Node)
sendSuccess(w, "node deleted")
}
func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) {
nodes := ch.ch.list()
dead := ch.dead()
out := make(map[string]string, len(nodes)+len(dead))
for _, n := range nodes {
if ch.isDead(n) {
out[n] = "offline"
} else {
out[n] = "online"
}
}
for _, n := range dead {
out[n] = "offline"
}
sendValue(w, struct {
Nodes map[string]string `json:"nodes"`
}{
Nodes: out,
})
}
func (ch *chProxy) isDead(node string) bool {
ch.RLock()
val, ok := ch.ded[node]
ch.RUnlock()
return ok && val >= ch.hcUnhealthy
}
func (ch *chProxy) dead() []string {
ch.RLock()
defer ch.RUnlock()
nodes := make([]string, 0, len(ch.ded))
for n, val := range ch.ded {
if val >= ch.hcUnhealthy {
nodes = append(nodes, n)
}
}
return nodes
}
var dashStr = `<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>lb dash</title>
<script type="text/javascript" src="https://code.jquery.com/jquery-1.10.1.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/highstock.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/modules/exporting.js"></script>
<script>
%s
</script>
</head>
<body>
<div id="throughput_chart" style="height: 400px; min-width: 310px"></div>
<div id="wait_chart" style="height: 400px; min-width: 310px"></div>
</body>
</html>
`
func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
w.Write(dashPage)
}
func sendValue(w http.ResponseWriter, v interface{}) {
err := json.NewEncoder(w).Encode(v)
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}
func sendSuccess(w http.ResponseWriter, msg string) {
err := json.NewEncoder(w).Encode(struct {
Msg string `json:"msg"`
}{
Msg: msg,
})
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}
func sendError(w http.ResponseWriter, code int, msg string) {
w.WriteHeader(code)
err := json.NewEncoder(w).Encode(struct {
Msg string `json:"msg"`
}{
Msg: msg,
})
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}

243
fnlb/lb/allgrouper.go Normal file
View File

@@ -0,0 +1,243 @@
package lb
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"sort"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// NewAllGrouper returns a Grouper that will return the entire list of nodes
// that are being maintained, regardless of key. An 'AllGrouper' will health
// check servers at a specified interval, taking them in and out as they
// pass/fail and exposes endpoints for adding, removing and listing nodes.
func NewAllGrouper(conf Config) Grouper {
a := &allGrouper{
ded: make(map[string]int64),
// XXX (reed): need to be reconfigurable at some point
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
hcEndpoint: conf.HealthcheckEndpoint,
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
// for health checks
httpClient: &http.Client{Transport: conf.Transport},
}
for _, n := range conf.Nodes {
a.add(n)
}
go a.healthcheck()
return a
}
// TODO
type allGrouper struct {
// protects nodes & ded
sync.RWMutex
nodes []string
ded map[string]int64 // [node] -> failedCount
httpClient *http.Client
hcInterval time.Duration
hcEndpoint string
hcUnhealthy int64
hcTimeout time.Duration
}
func (a *allGrouper) add(newb string) {
a.Lock()
a.addNoLock(newb)
a.Unlock()
}
func (a *allGrouper) addNoLock(newb string) {
// filter dupes, under lock. sorted, so binary search
i := sort.SearchStrings(a.nodes, newb)
if i < len(a.nodes) && a.nodes[i] == newb {
return
}
a.nodes = append(a.nodes, newb)
// need to keep in sorted order so that hash index works across nodes
sort.Sort(sort.StringSlice(a.nodes))
}
func (a *allGrouper) remove(ded string) {
a.Lock()
a.removeNoLock(ded)
a.Unlock()
}
func (a *allGrouper) removeNoLock(ded string) {
i := sort.SearchStrings(a.nodes, ded)
if i < len(a.nodes) && a.nodes[i] == ded {
a.nodes = append(a.nodes[:i], a.nodes[i+1:]...)
}
}
// return a copy
func (a *allGrouper) List(string) ([]string, error) {
a.RLock()
ret := make([]string, len(a.nodes))
copy(ret, a.nodes)
a.RUnlock()
var err error
if len(ret) == 0 {
err = ErrNoNodes
}
return ret, err
}
func (a *allGrouper) healthcheck() {
for range time.Tick(a.hcInterval) {
nodes, _ := a.List("")
nodes = append(nodes, a.dead()...)
for _, n := range nodes {
go a.ping(n)
}
}
}
func (a *allGrouper) ping(node string) {
req, _ := http.NewRequest("GET", "http://"+node+a.hcEndpoint, nil)
ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
defer cancel()
req = req.WithContext(ctx)
resp, err := a.httpClient.Do(req)
if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("health check failed")
a.fail(node)
} else {
a.alive(node)
}
}
func (a *allGrouper) fail(node string) {
// shouldn't be a hot path so shouldn't be too contended on since health
// checks are infrequent
a.Lock()
a.ded[node]++
failed := a.ded[node]
if failed >= a.hcUnhealthy {
a.removeNoLock(node)
}
a.Unlock()
}
func (a *allGrouper) alive(node string) {
a.RLock()
_, ok := a.ded[node]
a.RUnlock()
if ok {
a.Lock()
delete(a.ded, node)
a.addNoLock(node)
a.Unlock()
}
}
func (a *allGrouper) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/nodes":
switch r.Method {
case "PUT":
a.addNode(w, r)
return
case "DELETE":
a.removeNode(w, r)
return
case "GET":
a.listNodes(w, r)
return
}
}
next.ServeHTTP(w, r)
})
}
func (a *allGrouper) addNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
a.add(bod.Node)
sendSuccess(w, "node added")
}
func (a *allGrouper) removeNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
a.remove(bod.Node)
sendSuccess(w, "node deleted")
}
func (a *allGrouper) listNodes(w http.ResponseWriter, r *http.Request) {
nodes, _ := a.List("")
dead := a.dead()
out := make(map[string]string, len(nodes)+len(dead))
for _, n := range nodes {
if a.isDead(n) {
out[n] = "offline"
} else {
out[n] = "online"
}
}
for _, n := range dead {
out[n] = "offline"
}
sendValue(w, struct {
Nodes map[string]string `json:"nodes"`
}{
Nodes: out,
})
}
func (a *allGrouper) isDead(node string) bool {
a.RLock()
val, ok := a.ded[node]
a.RUnlock()
return ok && val >= a.hcUnhealthy
}
func (a *allGrouper) dead() []string {
a.RLock()
defer a.RUnlock()
nodes := make([]string, 0, len(a.ded))
for n, val := range a.ded {
if val >= a.hcUnhealthy {
nodes = append(nodes, n)
}
}
return nodes
}

299
fnlb/lb/ch.go Normal file
View File

@@ -0,0 +1,299 @@
package lb
import (
"encoding/json"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/dchest/siphash"
)
func NewConsistentRouter(conf Config) Router {
return &chRouter{
rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
load: make(map[string]*int64),
}
}
type chRouter struct {
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
statsMu sync.Mutex
stats []*stat
loadMu sync.RWMutex
load map[string]*int64
rng *rand.Rand
}
type stat struct {
timestamp time.Time
latency time.Duration
node string
code int
fx string
wait time.Duration
}
func (ch *chRouter) addStat(s *stat) {
ch.statsMu.Lock()
// delete last 1 minute of data if nobody is watching
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
ch.stats = ch.stats[:i]
}
ch.stats = append(ch.stats, s)
ch.statsMu.Unlock()
}
func (ch *chRouter) getStats() []*stat {
ch.statsMu.Lock()
stats := ch.stats
ch.stats = ch.stats[:0]
ch.statsMu.Unlock()
return stats
}
type lockedSource struct {
lk sync.Mutex
src rand.Source64
}
func (r *lockedSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return n
}
func (r *lockedSource) Uint64() (n uint64) {
r.lk.Lock()
n = r.src.Uint64()
r.lk.Unlock()
return n
}
func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
// Route in this form relies on the nodes being in sorted order so
// that the output will be consistent (yes, slightly unfortunate).
func (ch *chRouter) Route(nodes []string, key string) (string, error) {
// crc not unique enough & sha is too slow, it's 1 import
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
i := int(jumpConsistentHash(sum64, int32(len(nodes))))
return ch.besti(key, i, nodes)
}
func (ch *chRouter) InterceptResponse(req *http.Request, resp *http.Response) {
load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
// XXX (reed): we should prob clear this from user response?
// resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
// XXX (reed): need to validate these prob
ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
ch.addStat(&stat{
timestamp: time.Now(),
//latency: latency, // XXX (reed): plumb
node: req.URL.Host,
code: resp.StatusCode,
fx: req.URL.Path,
wait: load,
})
}
// A Fast, Minimal Memory, Consistent Hash Algorithm:
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
var b, j int64 = -1, 0
for j < int64(num_buckets) {
b = j
key = key*2862933555777941757 + 1
j = (b + 1) * int64((1<<31)/(key>>33)+1)
}
return int32(b)
}
// tracks last 10 samples (very fast)
const DECAY = 0.1
func ewma(old, new int64) int64 {
// TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
}
func (ch *chRouter) setLoad(key string, load int64) {
ch.loadMu.RLock()
l, ok := ch.load[key]
ch.loadMu.RUnlock()
if ok {
// this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
prev := atomic.LoadInt64(l)
atomic.StoreInt64(l, ewma(prev, load))
} else {
ch.loadMu.Lock()
if _, ok := ch.load[key]; !ok {
ch.load[key] = &load
}
ch.loadMu.Unlock()
}
}
func loadKey(node, key string) string {
return node + "\x00" + key
}
func (ch *chRouter) besti(key string, i int, nodes []string) (string, error) {
if len(nodes) < 1 {
// supposed to be caught in grouper, but double check
return "", ErrNoNodes
}
// XXX (reed): trash the closure
f := func(n string) string {
var load time.Duration
ch.loadMu.RLock()
loadPtr := ch.load[loadKey(n, key)]
ch.loadMu.RUnlock()
if loadPtr != nil {
load = time.Duration(atomic.LoadInt64(loadPtr))
}
const (
// TODO we should probably use deltas rather than fixed wait times. for 'cold'
// functions these could always trigger. i.e. if wait time increased 5x over last
// 100 data points, point the cannon elsewhere (we'd have to track 2 numbers but meh)
lowerLat = 500 * time.Millisecond
upperLat = 2 * time.Second
)
// TODO flesh out these values.
// if we send < 50% of traffic off to other nodes when loaded
// then as function scales nodes will get flooded, need to be careful.
//
// back off loaded node/function combos slightly to spread load
if load < lowerLat {
return n
} else if load > upperLat {
// really loaded
if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
return n
}
} else {
// 10 < x < 40, as load approaches upperLat, x decreases [linearly]
x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
if ch.rng.Intn(100) < x {
return n
}
}
// return invalid node to try next node
return ""
}
for ; ; i++ {
// theoretically this could take infinite time, but practically improbable...
node := f(nodes[i])
if node != "" {
return node, nil
} else if i == len(nodes)-1 {
i = -1 // reset i to 0
}
}
// TODO we need a way to add a node for a given key from down here if a node is overloaded.
panic("strange things are afoot at the circle k")
}
func translate(val, inFrom, inTo, outFrom, outTo int64) int {
outRange := outTo - outFrom
inRange := inTo - inFrom
inVal := val - inFrom
// we want the number to be lower as intensity increases
return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
}
func (ch *chRouter) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/stats":
ch.statsGet(w, r)
return
case "/1/lb/dash":
ch.dash(w, r)
return
}
next.ServeHTTP(w, r)
})
}
func (ch *chRouter) statsGet(w http.ResponseWriter, r *http.Request) {
stats := ch.getStats()
type st struct {
Timestamp time.Time `json:"timestamp"`
Throughput int `json:"tp"`
Node string `json:"node"`
Func string `json:"func"`
Wait float64 `json:"wait"` // seconds
}
var sts []st
// roll up and calculate throughput per second. idk why i hate myself
aggs := make(map[string][]*stat)
for _, s := range stats {
key := s.node + "/" + s.fx
if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: t[0].node,
Func: t[0].fx,
Wait: avgWait(t),
})
aggs[key] = append(aggs[key][:0], s)
} else {
aggs[key] = append(aggs[key], s)
}
}
// leftovers
for _, t := range aggs {
sts = append(sts, st{
Timestamp: t[0].timestamp,
Throughput: len(t),
Node: t[0].node,
Func: t[0].fx,
Wait: avgWait(t),
})
}
json.NewEncoder(w).Encode(struct {
Stats []st `json:"stats"`
}{
Stats: sts,
})
}
func avgWait(stats []*stat) float64 {
var sum time.Duration
for _, s := range stats {
sum += s.wait
}
return (sum / time.Duration(len(stats))).Seconds()
}
func (ch *chRouter) dash(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(dashPage))
}

View File

@@ -1,4 +1,15 @@
package lb
const dashPage = `<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>lb dash</title>
<script type="text/javascript" src="https://code.jquery.com/jquery-1.10.1.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/highstock.js"></script>
<script type="text/javascript" src="https://code.highcharts.com/stock/modules/exporting.js"></script>
<script>
var example = 'dynamic-update',
theme = 'default';
@@ -156,3 +167,14 @@ $(document).ready(function() {
series: []
});
});
</script>
</head>
<body>
<div id="throughput_chart" style="height: 400px; min-width: 310px"></div>
<div id="wait_chart" style="height: 400px; min-width: 310px"></div>
</body>
</html>
`

147
fnlb/lb/proxy.go Normal file
View File

@@ -0,0 +1,147 @@
package lb
import (
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
"sync"
"github.com/Sirupsen/logrus"
)
// TODO the load balancers all need to have the same list of nodes. gossip?
// also gossip would handle failure detection instead of elb style. or it can
// be pluggable and then we can read from where bmc is storing them and use that
// or some OSS alternative
// TODO when node goes offline should try to redirect request instead of 5xxing
// TODO we could add some kind of pre-warming call to the functions server where
// the lb could send an image to it to download before the lb starts sending traffic
// there, otherwise when load starts expanding a few functions are going to eat
// the pull time
// TODO config
// TODO TLS
type Config struct {
Listen string `json:"port"`
Nodes []string `json:"nodes"`
HealthcheckInterval int `json:"healthcheck_interval"`
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
HealthcheckTimeout int `json:"healthcheck_timeout"`
Transport *http.Transport
}
type Grouper interface {
// List returns a set of hosts that may be used to route a request
// for a given key.
List(key string) ([]string, error)
// Wrap allows adding middleware to the provided http.Handler.
Wrap(http.Handler) http.Handler
}
type Router interface {
// TODO we could probably expose this just as some kind of http.RoundTripper
// but I can't think of anything elegant so here this is.
// Route will pick a node from the given set of nodes.
Route(nodes []string, key string) (string, error)
// InterceptResponse allows a Router to extract information from proxied
// requests so that it might do a better job next time. InterceptResponse
// should not modify the Response as it has already been received nore the
// Request, having already been sent.
InterceptResponse(req *http.Request, resp *http.Response)
// Wrap allows adding middleware to the provided http.Handler.
Wrap(http.Handler) http.Handler
}
// KeyFunc maps a request to a shard key, it may return an error
// if there are issues locating the shard key.
type KeyFunc func(req *http.Request) (string, error)
type proxy struct {
keyFunc KeyFunc
grouper Grouper
router Router
transport http.RoundTripper
// embed for lazy ServeHTTP mostly
*httputil.ReverseProxy
}
// NewProxy will marry the given parameters into an able proxy.
func NewProxy(keyFunc KeyFunc, g Grouper, r Router, conf Config) http.Handler {
p := new(proxy)
*p = proxy{
keyFunc: keyFunc,
grouper: g,
router: r,
transport: conf.Transport,
ReverseProxy: &httputil.ReverseProxy{
Director: func(*http.Request) { /* in RoundTrip so we can error out */ },
Transport: p,
BufferPool: newBufferPool(),
},
}
return p
}
type bufferPool struct {
bufs *sync.Pool
}
func newBufferPool() httputil.BufferPool {
return &bufferPool{
bufs: &sync.Pool{
// 32KB is what the proxy would've used without recycling them
New: func() interface{} { return make([]byte, 32*1024) },
},
}
}
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
target, err := p.route(req)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
if req.Body != nil {
io.Copy(ioutil.Discard, req.Body)
req.Body.Close()
}
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
return nil, ErrNoNodes
}
req.URL.Scheme = "http" // XXX (reed): h2 support
req.URL.Host = target
resp, err := p.transport.RoundTrip(req)
if err == nil {
p.router.InterceptResponse(req, resp)
}
return resp, err
}
func (p *proxy) route(req *http.Request) (string, error) {
// TODO errors from this func likely could return 401 or so instead of 503 always
key, err := p.keyFunc(req)
if err != nil {
return "", err
}
list, err := p.grouper.List(key)
if err != nil {
return "", err
}
return p.router.Route(list, key)
}

47
fnlb/lb/util.go Normal file
View File

@@ -0,0 +1,47 @@
package lb
import (
"encoding/json"
"errors"
"net/http"
"github.com/Sirupsen/logrus"
)
var (
ErrNoNodes = errors.New("no nodes available")
)
func sendValue(w http.ResponseWriter, v interface{}) {
err := json.NewEncoder(w).Encode(v)
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}
func sendSuccess(w http.ResponseWriter, msg string) {
err := json.NewEncoder(w).Encode(struct {
Msg string `json:"msg"`
}{
Msg: msg,
})
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}
func sendError(w http.ResponseWriter, code int, msg string) {
w.WriteHeader(code)
err := json.NewEncoder(w).Encode(struct {
Msg string `json:"msg"`
}{
Msg: msg,
})
if err != nil {
logrus.WithError(err).Error("error writing response response")
}
}

View File

@@ -1,75 +0,0 @@
package main
import "testing"
func TestCHAdd(t *testing.T) {
var ch consistentHash
nodes := []string{"1", "2", "3"}
for _, n := range nodes {
ch.add(n)
}
if len(ch.nodes) != 3 {
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
}
// test dupes don't get added
for _, n := range nodes {
ch.add(n)
}
if len(ch.nodes) != 3 {
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
}
}
func TestCHRemove(t *testing.T) {
var ch consistentHash
nodes := []string{"1", "2", "3"}
for _, n := range nodes {
ch.add(n)
}
if len(ch.nodes) != 3 {
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
}
ch.remove("4")
if len(ch.nodes) != 3 {
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
}
ch.remove("3")
if len(ch.nodes) != 2 {
t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
}
ch.remove("3")
if len(ch.nodes) != 2 {
t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
}
}
func TestCHGet(t *testing.T) {
var ch consistentHash
nodes := []string{"1", "2", "3"}
for _, n := range nodes {
ch.add(n)
}
if len(ch.nodes) != 3 {
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
}
keys := []string{"a", "b", "c"}
for _, k := range keys {
_, err := ch.get(k)
if err != nil {
t.Fatal("CHGet returned an error: ", err)
}
// testing this doesn't panic basically? could test distro but meh
}
}

81
fnlb/main.go Normal file
View File

@@ -0,0 +1,81 @@
package main
import (
"context"
"crypto/tls"
"flag"
"net"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"gitlab-odx.oracle.com/odx/functions/fnlb/lb"
)
func main() {
// XXX (reed): normalize
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
var conf lb.Config
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
flag.Parse()
conf.Nodes = strings.Split(*fnodes, ",")
conf.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 120 * time.Second,
}).Dial,
MaxIdleConnsPerHost: 512,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
},
}
g := lb.NewAllGrouper(conf)
r := lb.NewConsistentRouter(conf)
k := func(r *http.Request) (string, error) {
return r.URL.Path, nil
}
h := lb.NewProxy(k, g, r, conf)
h = g.Wrap(h) // add/del/list endpoints
h = r.Wrap(h) // stats / dash endpoint
err := serve(conf.Listen, h)
if err != nil {
logrus.WithError(err).Error("server error")
}
}
func serve(addr string, handler http.Handler) error {
server := &http.Server{Addr: addr, Handler: handler}
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
go func() {
defer wg.Done()
for sig := range ch {
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
server.Shutdown(context.Background()) // safe shutdown
return
}
}()
return server.ListenAndServe()
}

View File

@@ -1,223 +0,0 @@
package main
import (
"context"
"crypto/tls"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
type chProxy struct {
ch *consistentHash
sync.RWMutex
// TODO map[string][]time.Time
ded map[string]int64
hcInterval time.Duration
hcEndpoint string
hcUnhealthy int64
hcTimeout time.Duration
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
statsMu sync.Mutex
stats []*stat
proxy *httputil.ReverseProxy
httpClient *http.Client
transport http.RoundTripper
}
type stat struct {
timestamp time.Time
latency time.Duration
node string
code int
fx string
wait time.Duration
}
func (ch *chProxy) addStat(s *stat) {
ch.statsMu.Lock()
// delete last 1 minute of data if nobody is watching
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
ch.stats = ch.stats[:i]
}
ch.stats = append(ch.stats, s)
ch.statsMu.Unlock()
}
func (ch *chProxy) getStats() []*stat {
ch.statsMu.Lock()
stats := ch.stats
ch.stats = ch.stats[:0]
ch.statsMu.Unlock()
return stats
}
func newProxy(conf config) *chProxy {
tranny := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 120 * time.Second,
}).Dial,
MaxIdleConnsPerHost: 512,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
},
}
ch := &chProxy{
ded: make(map[string]int64),
// XXX (reed): need to be reconfigurable at some point
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
hcEndpoint: conf.HealthcheckEndpoint,
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
httpClient: &http.Client{Transport: tranny},
transport: tranny,
ch: newCH(),
}
director := func(req *http.Request) {
target, err := ch.ch.get(req.URL.Path)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
target = "error"
}
req.URL.Scheme = "http" // XXX (reed): h2 support
req.URL.Host = target
}
ch.proxy = &httputil.ReverseProxy{
Director: director,
Transport: ch,
BufferPool: newBufferPool(),
}
for _, n := range conf.Nodes {
ch.ch.add(n)
}
go ch.healthcheck()
return ch
}
type bufferPool struct {
bufs *sync.Pool
}
func newBufferPool() httputil.BufferPool {
return &bufferPool{
bufs: &sync.Pool{
// 32KB is what the proxy would've used without recycling them
New: func() interface{} { return make([]byte, 32*1024) },
},
}
}
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
if req != nil && req.URL.Host == "error" {
if req.Body != nil {
io.Copy(ioutil.Discard, req.Body)
req.Body.Close()
}
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
return nil, ErrNoNodes
}
then := time.Now()
resp, err := ch.transport.RoundTrip(req)
if err == nil {
ch.intercept(req, resp, time.Since(then))
}
return resp, err
}
func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
// XXX (reed): we should prob clear this from user response?
// resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
// XXX (reed): need to validate these prob
ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
ch.addStat(&stat{
timestamp: time.Now(),
latency: latency,
node: req.URL.Host,
code: resp.StatusCode,
fx: req.URL.Path,
wait: load,
})
}
func (ch *chProxy) healthcheck() {
for range time.Tick(ch.hcInterval) {
nodes := ch.ch.list()
nodes = append(nodes, ch.dead()...)
for _, n := range nodes {
go ch.ping(n)
}
}
}
func (ch *chProxy) ping(node string) {
req, _ := http.NewRequest("GET", "http://"+node+ch.hcEndpoint, nil)
ctx, cancel := context.WithTimeout(context.Background(), ch.hcTimeout)
defer cancel()
req = req.WithContext(ctx)
resp, err := ch.httpClient.Do(req)
if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
logrus.WithFields(logrus.Fields{"node": node}).Error("health check failed")
ch.fail(node)
} else {
ch.alive(node)
}
}
func (ch *chProxy) fail(node string) {
// shouldn't be a hot path so shouldn't be too contended on since health
// checks are infrequent
ch.Lock()
ch.ded[node]++
failed := ch.ded[node]
ch.Unlock()
if failed >= ch.hcUnhealthy {
ch.ch.remove(node) // TODO under lock?
}
}
func (ch *chProxy) alive(node string) {
ch.RLock()
_, ok := ch.ded[node]
ch.RUnlock()
if ok {
ch.Lock()
delete(ch.ded, node)
ch.Unlock()
ch.ch.add(node) // TODO under lock?
}
}