mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'master' of https://gitlab-odx.oracle.com/odx/functions
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
FROM treeder/dind
|
FROM funcy/dind
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
|
|||||||
4
Makefile
4
Makefile
@@ -29,10 +29,10 @@ docker-dep:
|
|||||||
docker-build:
|
docker-build:
|
||||||
docker pull funcy/go:dev
|
docker pull funcy/go:dev
|
||||||
docker run --rm -v ${CURDIR}:/go/src/gitlab-odx.oracle.com/odx/functions -w /go/src/gitlab-odx.oracle.com/odx/functions funcy/go:dev go build -o functions-alpine
|
docker run --rm -v ${CURDIR}:/go/src/gitlab-odx.oracle.com/odx/functions -w /go/src/gitlab-odx.oracle.com/odx/functions funcy/go:dev go build -o functions-alpine
|
||||||
docker build --build-arg HTTP_PROXY -t treeder/functions:latest .
|
docker build --build-arg HTTP_PROXY -t funcy/functions:latest .
|
||||||
|
|
||||||
docker-run: docker-build
|
docker-run: docker-build
|
||||||
docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v ${CURDIR}/data:/app/data -p 8080:8080 treeder/functions
|
docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v ${CURDIR}/data:/app/data -p 8080:8080 funcy/functions
|
||||||
|
|
||||||
docker-test:
|
docker-test:
|
||||||
docker run -ti --privileged --rm -e LOG_LEVEL=debug \
|
docker run -ti --privileged --rm -e LOG_LEVEL=debug \
|
||||||
|
|||||||
@@ -218,8 +218,7 @@ func (s *Server) startGears(ctx context.Context) {
|
|||||||
/ __/ / /_/ / / / / /__/ /_/ / /_/ / / / (__ )
|
/ __/ / /_/ / / / / /__/ /_/ / /_/ / / / (__ )
|
||||||
/_/ \____/_/ /_/\___/\__/_/\____/_/ /_/____/
|
/_/ \____/_/ /_/\___/\__/_/\____/_/ /_/____/
|
||||||
`
|
`
|
||||||
|
fmt.Println(runHeader)
|
||||||
logrus.Infof(runHeader)
|
|
||||||
logrus.Infof("Serving Functions API on address `%s`", listen)
|
logrus.Infof("Serving Functions API on address `%s`", listen)
|
||||||
|
|
||||||
svr := &supervisor.Supervisor{
|
svr := &supervisor.Supervisor{
|
||||||
|
|||||||
@@ -20,7 +20,3 @@ release:
|
|||||||
GOOS=darwin go build -o fn_mac
|
GOOS=darwin go build -o fn_mac
|
||||||
GOOS=windows go build -o fn.exe
|
GOOS=windows go build -o fn.exe
|
||||||
docker run --rm -v ${PWD}:/go/src/gitlab-odx.oracle.com/odx/functions/fn -w /go/src/gitlab-odx.oracle.com/odx/functions/fn funcy/go:dev go build -o fn_alpine
|
docker run --rm -v ${PWD}:/go/src/gitlab-odx.oracle.com/odx/functions/fn -w /go/src/gitlab-odx.oracle.com/odx/functions/fn funcy/go:dev go build -o fn_alpine
|
||||||
|
|
||||||
# install locally
|
|
||||||
install: build
|
|
||||||
sudo mv fn /usr/local/bin/
|
|
||||||
|
|||||||
@@ -30,7 +30,8 @@ func start(c *cli.Context) error {
|
|||||||
if c.String("log-level") != "" {
|
if c.String("log-level") != "" {
|
||||||
denvs = append(denvs, "GIN_MODE="+c.String("log-level"))
|
denvs = append(denvs, "GIN_MODE="+c.String("log-level"))
|
||||||
}
|
}
|
||||||
// docker run --rm -it --name functions -v ${PWD}/data:/app/data -v /var/run/docker.sock:/var/run/docker.sock -p 8080:8080 treeder/functions
|
// Socket mount: docker run --rm -it --name functions -v ${PWD}/data:/app/data -v /var/run/docker.sock:/var/run/docker.sock -p 8080:8080 funcy/functions
|
||||||
|
// OR dind: docker run --rm -it --name functions -v ${PWD}/data:/app/data --privileged -p 8080:8080 funcy/functions
|
||||||
wd, err := os.Getwd()
|
wd, err := os.Getwd()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Fatalln("Getwd failed")
|
logrus.WithError(err).Fatalln("Getwd failed")
|
||||||
|
|||||||
213
fnlb/ch.go
213
fnlb/ch.go
@@ -1,213 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"math/rand"
|
|
||||||
"sort"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/dchest/siphash"
|
|
||||||
)
|
|
||||||
|
|
||||||
// consistentHash will maintain a list of strings which can be accessed by
|
|
||||||
// keying them with a separate group of strings
|
|
||||||
type consistentHash struct {
|
|
||||||
// protects nodes
|
|
||||||
sync.RWMutex
|
|
||||||
nodes []string
|
|
||||||
|
|
||||||
loadMu sync.RWMutex
|
|
||||||
load map[string]*int64
|
|
||||||
rng *rand.Rand
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCH() *consistentHash {
|
|
||||||
return &consistentHash{
|
|
||||||
rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
|
|
||||||
load: make(map[string]*int64),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type lockedSource struct {
|
|
||||||
lk sync.Mutex
|
|
||||||
src rand.Source64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *lockedSource) Int63() (n int64) {
|
|
||||||
r.lk.Lock()
|
|
||||||
n = r.src.Int63()
|
|
||||||
r.lk.Unlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *lockedSource) Uint64() (n uint64) {
|
|
||||||
r.lk.Lock()
|
|
||||||
n = r.src.Uint64()
|
|
||||||
r.lk.Unlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *lockedSource) Seed(seed int64) {
|
|
||||||
r.lk.Lock()
|
|
||||||
r.src.Seed(seed)
|
|
||||||
r.lk.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *consistentHash) add(newb string) {
|
|
||||||
ch.Lock()
|
|
||||||
defer ch.Unlock()
|
|
||||||
|
|
||||||
// filter dupes, under lock. sorted, so binary search
|
|
||||||
i := sort.SearchStrings(ch.nodes, newb)
|
|
||||||
if i < len(ch.nodes) && ch.nodes[i] == newb {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ch.nodes = append(ch.nodes, newb)
|
|
||||||
// need to keep in sorted order so that hash index works across nodes
|
|
||||||
sort.Sort(sort.StringSlice(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *consistentHash) remove(ded string) {
|
|
||||||
ch.Lock()
|
|
||||||
i := sort.SearchStrings(ch.nodes, ded)
|
|
||||||
if i < len(ch.nodes) && ch.nodes[i] == ded {
|
|
||||||
ch.nodes = append(ch.nodes[:i], ch.nodes[i+1:]...)
|
|
||||||
}
|
|
||||||
ch.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// return a copy
|
|
||||||
func (ch *consistentHash) list() []string {
|
|
||||||
ch.RLock()
|
|
||||||
ret := make([]string, len(ch.nodes))
|
|
||||||
copy(ret, ch.nodes)
|
|
||||||
ch.RUnlock()
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *consistentHash) get(key string) (string, error) {
|
|
||||||
// crc not unique enough & sha is too slow, it's 1 import
|
|
||||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
|
||||||
|
|
||||||
ch.RLock()
|
|
||||||
defer ch.RUnlock()
|
|
||||||
i := int(jumpConsistentHash(sum64, int32(len(ch.nodes))))
|
|
||||||
return ch.besti(key, i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
|
||||||
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
|
||||||
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
|
||||||
var b, j int64 = -1, 0
|
|
||||||
for j < int64(num_buckets) {
|
|
||||||
b = j
|
|
||||||
key = key*2862933555777941757 + 1
|
|
||||||
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
|
||||||
}
|
|
||||||
return int32(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
// tracks last 10 samples (very fast)
|
|
||||||
const DECAY = 0.1
|
|
||||||
|
|
||||||
func ewma(old, new int64) int64 {
|
|
||||||
// TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
|
|
||||||
return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *consistentHash) setLoad(key string, load int64) {
|
|
||||||
ch.loadMu.RLock()
|
|
||||||
l, ok := ch.load[key]
|
|
||||||
ch.loadMu.RUnlock()
|
|
||||||
if ok {
|
|
||||||
// this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
|
|
||||||
prev := atomic.LoadInt64(l)
|
|
||||||
atomic.StoreInt64(l, ewma(prev, load))
|
|
||||||
} else {
|
|
||||||
ch.loadMu.Lock()
|
|
||||||
if _, ok := ch.load[key]; !ok {
|
|
||||||
ch.load[key] = &load
|
|
||||||
}
|
|
||||||
ch.loadMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrNoNodes = errors.New("no nodes available")
|
|
||||||
)
|
|
||||||
|
|
||||||
func loadKey(node, key string) string {
|
|
||||||
return node + "\x00" + key
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX (reed): push down fails / load into ch
|
|
||||||
func (ch *consistentHash) besti(key string, i int) (string, error) {
|
|
||||||
ch.RLock()
|
|
||||||
defer ch.RUnlock()
|
|
||||||
|
|
||||||
if len(ch.nodes) < 1 {
|
|
||||||
return "", ErrNoNodes
|
|
||||||
}
|
|
||||||
|
|
||||||
f := func(n string) string {
|
|
||||||
var load time.Duration
|
|
||||||
ch.loadMu.RLock()
|
|
||||||
loadPtr := ch.load[loadKey(n, key)]
|
|
||||||
ch.loadMu.RUnlock()
|
|
||||||
if loadPtr != nil {
|
|
||||||
load = time.Duration(atomic.LoadInt64(loadPtr))
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
lowerLat = 500 * time.Millisecond
|
|
||||||
upperLat = 2 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO flesh out these values.
|
|
||||||
// if we send < 50% of traffic off to other nodes when loaded
|
|
||||||
// then as function scales nodes will get flooded, need to be careful.
|
|
||||||
//
|
|
||||||
// back off loaded node/function combos slightly to spread load
|
|
||||||
// TODO do we need a kind of ref counter as well so as to send functions
|
|
||||||
// to a different node while there's an outstanding call to another?
|
|
||||||
if load < lowerLat {
|
|
||||||
return n
|
|
||||||
} else if load > upperLat {
|
|
||||||
// really loaded
|
|
||||||
if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// 10 < x < 40, as load approaches upperLat, x decreases [linearly]
|
|
||||||
x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
|
|
||||||
if ch.rng.Intn(100) < x {
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// return invalid node to try next node
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
for ; ; i++ {
|
|
||||||
// theoretically this could take infinite time, but practically improbable...
|
|
||||||
node := f(ch.nodes[i])
|
|
||||||
if node != "" {
|
|
||||||
return node, nil
|
|
||||||
} else if i == len(ch.nodes)-1 {
|
|
||||||
i = -1 // reset i to 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
panic("strange things are afoot at the circle k")
|
|
||||||
}
|
|
||||||
|
|
||||||
func translate(val, inFrom, inTo, outFrom, outTo int64) int {
|
|
||||||
outRange := outTo - outFrom
|
|
||||||
inRange := inTo - inFrom
|
|
||||||
inVal := val - inFrom
|
|
||||||
// we want the number to be lower as intensity increases
|
|
||||||
return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
|
|
||||||
}
|
|
||||||
311
fnlb/lb.go
311
fnlb/lb.go
@@ -1,311 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO the load balancers all need to have the same list of nodes. gossip?
|
|
||||||
// also gossip would handle failure detection instead of elb style. or it can
|
|
||||||
// be pluggable and then we can read from where bmc is storing them and use that
|
|
||||||
// or some OSS alternative
|
|
||||||
|
|
||||||
// TODO when node goes offline should try to redirect request instead of 5xxing
|
|
||||||
|
|
||||||
// TODO we could add some kind of pre-warming call to the functions server where
|
|
||||||
// the lb could send an image to it to download before the lb starts sending traffic
|
|
||||||
// there, otherwise when load starts expanding a few functions are going to eat
|
|
||||||
// the pull time
|
|
||||||
|
|
||||||
// TODO config
|
|
||||||
// TODO TLS
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
// XXX (reed): normalize
|
|
||||||
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
|
|
||||||
|
|
||||||
var conf config
|
|
||||||
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
|
|
||||||
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
|
|
||||||
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
|
|
||||||
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
|
|
||||||
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
conf.Nodes = strings.Split(*fnodes, ",")
|
|
||||||
|
|
||||||
ch := newProxy(conf)
|
|
||||||
|
|
||||||
err := serve(conf.Listen, ch)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("server error")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func serve(addr string, handler http.Handler) error {
|
|
||||||
server := &http.Server{Addr: addr, Handler: handler}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
defer wg.Wait()
|
|
||||||
|
|
||||||
ch := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for sig := range ch {
|
|
||||||
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
|
|
||||||
server.Shutdown(context.Background()) // safe shutdown
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return server.ListenAndServe()
|
|
||||||
}
|
|
||||||
|
|
||||||
type config struct {
|
|
||||||
Listen string `json:"port"`
|
|
||||||
Nodes []string `json:"nodes"`
|
|
||||||
HealthcheckInterval int `json:"healthcheck_interval"`
|
|
||||||
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
|
|
||||||
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
|
|
||||||
HealthcheckTimeout int `json:"healthcheck_timeout"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX (reed): clean up mess
|
|
||||||
var dashPage []byte
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
jsb, err := ioutil.ReadFile("dash.js")
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Fatal("couldn't open dash.js file")
|
|
||||||
}
|
|
||||||
|
|
||||||
dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
switch r.URL.Path {
|
|
||||||
// XXX (reed): probably do these on a separate port to avoid conflicts
|
|
||||||
case "/1/lb/nodes":
|
|
||||||
switch r.Method {
|
|
||||||
case "PUT":
|
|
||||||
ch.addNode(w, r)
|
|
||||||
return
|
|
||||||
case "DELETE":
|
|
||||||
ch.removeNode(w, r)
|
|
||||||
return
|
|
||||||
case "GET":
|
|
||||||
ch.listNodes(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case "/1/lb/stats":
|
|
||||||
ch.statsGet(w, r)
|
|
||||||
return
|
|
||||||
case "/1/lb/dash":
|
|
||||||
ch.dash(w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.proxy.ServeHTTP(w, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) {
|
|
||||||
stats := ch.getStats()
|
|
||||||
|
|
||||||
type st struct {
|
|
||||||
Timestamp time.Time `json:"timestamp"`
|
|
||||||
Throughput int `json:"tp"`
|
|
||||||
Node string `json:"node"`
|
|
||||||
Func string `json:"func"`
|
|
||||||
Wait float64 `json:"wait"` // seconds
|
|
||||||
}
|
|
||||||
var sts []st
|
|
||||||
|
|
||||||
// roll up and calculate throughput per second. idk why i hate myself
|
|
||||||
aggs := make(map[string][]*stat)
|
|
||||||
for _, s := range stats {
|
|
||||||
key := s.node + "/" + s.fx
|
|
||||||
if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
|
|
||||||
sts = append(sts, st{
|
|
||||||
Timestamp: t[0].timestamp,
|
|
||||||
Throughput: len(t),
|
|
||||||
Node: t[0].node,
|
|
||||||
Func: t[0].fx,
|
|
||||||
Wait: avgWait(t),
|
|
||||||
})
|
|
||||||
|
|
||||||
aggs[key] = append(aggs[key][:0], s)
|
|
||||||
} else {
|
|
||||||
aggs[key] = append(aggs[key], s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// leftovers
|
|
||||||
for _, t := range aggs {
|
|
||||||
sts = append(sts, st{
|
|
||||||
Timestamp: t[0].timestamp,
|
|
||||||
Throughput: len(t),
|
|
||||||
Node: t[0].node,
|
|
||||||
Func: t[0].fx,
|
|
||||||
Wait: avgWait(t),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(struct {
|
|
||||||
Stats []st `json:"stats"`
|
|
||||||
}{
|
|
||||||
Stats: sts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func avgWait(stats []*stat) float64 {
|
|
||||||
var sum time.Duration
|
|
||||||
for _, s := range stats {
|
|
||||||
sum += s.wait
|
|
||||||
}
|
|
||||||
return (sum / time.Duration(len(stats))).Seconds()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) {
|
|
||||||
var bod struct {
|
|
||||||
Node string `json:"node"`
|
|
||||||
}
|
|
||||||
err := json.NewDecoder(r.Body).Decode(&bod)
|
|
||||||
if err != nil {
|
|
||||||
sendError(w, http.StatusBadRequest, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.ch.add(bod.Node)
|
|
||||||
sendSuccess(w, "node added")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) removeNode(w http.ResponseWriter, r *http.Request) {
|
|
||||||
var bod struct {
|
|
||||||
Node string `json:"node"`
|
|
||||||
}
|
|
||||||
err := json.NewDecoder(r.Body).Decode(&bod)
|
|
||||||
if err != nil {
|
|
||||||
sendError(w, http.StatusBadRequest, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.ch.remove(bod.Node)
|
|
||||||
sendSuccess(w, "node deleted")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) {
|
|
||||||
nodes := ch.ch.list()
|
|
||||||
dead := ch.dead()
|
|
||||||
|
|
||||||
out := make(map[string]string, len(nodes)+len(dead))
|
|
||||||
for _, n := range nodes {
|
|
||||||
if ch.isDead(n) {
|
|
||||||
out[n] = "offline"
|
|
||||||
} else {
|
|
||||||
out[n] = "online"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, n := range dead {
|
|
||||||
out[n] = "offline"
|
|
||||||
}
|
|
||||||
|
|
||||||
sendValue(w, struct {
|
|
||||||
Nodes map[string]string `json:"nodes"`
|
|
||||||
}{
|
|
||||||
Nodes: out,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) isDead(node string) bool {
|
|
||||||
ch.RLock()
|
|
||||||
val, ok := ch.ded[node]
|
|
||||||
ch.RUnlock()
|
|
||||||
return ok && val >= ch.hcUnhealthy
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) dead() []string {
|
|
||||||
ch.RLock()
|
|
||||||
defer ch.RUnlock()
|
|
||||||
nodes := make([]string, 0, len(ch.ded))
|
|
||||||
for n, val := range ch.ded {
|
|
||||||
if val >= ch.hcUnhealthy {
|
|
||||||
nodes = append(nodes, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
var dashStr = `<!DOCTYPE html>
|
|
||||||
<html>
|
|
||||||
<head>
|
|
||||||
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
|
||||||
<title>lb dash</title>
|
|
||||||
|
|
||||||
<script type="text/javascript" src="https://code.jquery.com/jquery-1.10.1.js"></script>
|
|
||||||
<script type="text/javascript" src="https://code.highcharts.com/stock/highstock.js"></script>
|
|
||||||
<script type="text/javascript" src="https://code.highcharts.com/stock/modules/exporting.js"></script>
|
|
||||||
<script>
|
|
||||||
%s
|
|
||||||
</script>
|
|
||||||
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
|
|
||||||
<div id="throughput_chart" style="height: 400px; min-width: 310px"></div>
|
|
||||||
<div id="wait_chart" style="height: 400px; min-width: 310px"></div>
|
|
||||||
|
|
||||||
</body>
|
|
||||||
</html>
|
|
||||||
`
|
|
||||||
|
|
||||||
func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.Write(dashPage)
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendValue(w http.ResponseWriter, v interface{}) {
|
|
||||||
err := json.NewEncoder(w).Encode(v)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("error writing response response")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendSuccess(w http.ResponseWriter, msg string) {
|
|
||||||
err := json.NewEncoder(w).Encode(struct {
|
|
||||||
Msg string `json:"msg"`
|
|
||||||
}{
|
|
||||||
Msg: msg,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("error writing response response")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendError(w http.ResponseWriter, code int, msg string) {
|
|
||||||
w.WriteHeader(code)
|
|
||||||
|
|
||||||
err := json.NewEncoder(w).Encode(struct {
|
|
||||||
Msg string `json:"msg"`
|
|
||||||
}{
|
|
||||||
Msg: msg,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("error writing response response")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
243
fnlb/lb/allgrouper.go
Normal file
243
fnlb/lb/allgrouper.go
Normal file
@@ -0,0 +1,243 @@
|
|||||||
|
package lb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewAllGrouper returns a Grouper that will return the entire list of nodes
|
||||||
|
// that are being maintained, regardless of key. An 'AllGrouper' will health
|
||||||
|
// check servers at a specified interval, taking them in and out as they
|
||||||
|
// pass/fail and exposes endpoints for adding, removing and listing nodes.
|
||||||
|
func NewAllGrouper(conf Config) Grouper {
|
||||||
|
a := &allGrouper{
|
||||||
|
ded: make(map[string]int64),
|
||||||
|
|
||||||
|
// XXX (reed): need to be reconfigurable at some point
|
||||||
|
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
||||||
|
hcEndpoint: conf.HealthcheckEndpoint,
|
||||||
|
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||||
|
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||||
|
|
||||||
|
// for health checks
|
||||||
|
httpClient: &http.Client{Transport: conf.Transport},
|
||||||
|
}
|
||||||
|
for _, n := range conf.Nodes {
|
||||||
|
a.add(n)
|
||||||
|
}
|
||||||
|
go a.healthcheck()
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
type allGrouper struct {
|
||||||
|
// protects nodes & ded
|
||||||
|
sync.RWMutex
|
||||||
|
nodes []string
|
||||||
|
ded map[string]int64 // [node] -> failedCount
|
||||||
|
|
||||||
|
httpClient *http.Client
|
||||||
|
|
||||||
|
hcInterval time.Duration
|
||||||
|
hcEndpoint string
|
||||||
|
hcUnhealthy int64
|
||||||
|
hcTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) add(newb string) {
|
||||||
|
a.Lock()
|
||||||
|
a.addNoLock(newb)
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) addNoLock(newb string) {
|
||||||
|
// filter dupes, under lock. sorted, so binary search
|
||||||
|
i := sort.SearchStrings(a.nodes, newb)
|
||||||
|
if i < len(a.nodes) && a.nodes[i] == newb {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.nodes = append(a.nodes, newb)
|
||||||
|
// need to keep in sorted order so that hash index works across nodes
|
||||||
|
sort.Sort(sort.StringSlice(a.nodes))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) remove(ded string) {
|
||||||
|
a.Lock()
|
||||||
|
a.removeNoLock(ded)
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) removeNoLock(ded string) {
|
||||||
|
i := sort.SearchStrings(a.nodes, ded)
|
||||||
|
if i < len(a.nodes) && a.nodes[i] == ded {
|
||||||
|
a.nodes = append(a.nodes[:i], a.nodes[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// return a copy
|
||||||
|
func (a *allGrouper) List(string) ([]string, error) {
|
||||||
|
a.RLock()
|
||||||
|
ret := make([]string, len(a.nodes))
|
||||||
|
copy(ret, a.nodes)
|
||||||
|
a.RUnlock()
|
||||||
|
var err error
|
||||||
|
if len(ret) == 0 {
|
||||||
|
err = ErrNoNodes
|
||||||
|
}
|
||||||
|
return ret, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) healthcheck() {
|
||||||
|
for range time.Tick(a.hcInterval) {
|
||||||
|
nodes, _ := a.List("")
|
||||||
|
nodes = append(nodes, a.dead()...)
|
||||||
|
for _, n := range nodes {
|
||||||
|
go a.ping(n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) ping(node string) {
|
||||||
|
req, _ := http.NewRequest("GET", "http://"+node+a.hcEndpoint, nil)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := a.httpClient.Do(req)
|
||||||
|
if resp != nil && resp.Body != nil {
|
||||||
|
io.Copy(ioutil.Discard, resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||||
|
logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("health check failed")
|
||||||
|
a.fail(node)
|
||||||
|
} else {
|
||||||
|
a.alive(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) fail(node string) {
|
||||||
|
// shouldn't be a hot path so shouldn't be too contended on since health
|
||||||
|
// checks are infrequent
|
||||||
|
a.Lock()
|
||||||
|
a.ded[node]++
|
||||||
|
failed := a.ded[node]
|
||||||
|
if failed >= a.hcUnhealthy {
|
||||||
|
a.removeNoLock(node)
|
||||||
|
}
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) alive(node string) {
|
||||||
|
a.RLock()
|
||||||
|
_, ok := a.ded[node]
|
||||||
|
a.RUnlock()
|
||||||
|
if ok {
|
||||||
|
a.Lock()
|
||||||
|
delete(a.ded, node)
|
||||||
|
a.addNoLock(node)
|
||||||
|
a.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) Wrap(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
// XXX (reed): probably do these on a separate port to avoid conflicts
|
||||||
|
case "/1/lb/nodes":
|
||||||
|
switch r.Method {
|
||||||
|
case "PUT":
|
||||||
|
a.addNode(w, r)
|
||||||
|
return
|
||||||
|
case "DELETE":
|
||||||
|
a.removeNode(w, r)
|
||||||
|
return
|
||||||
|
case "GET":
|
||||||
|
a.listNodes(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) addNode(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var bod struct {
|
||||||
|
Node string `json:"node"`
|
||||||
|
}
|
||||||
|
err := json.NewDecoder(r.Body).Decode(&bod)
|
||||||
|
if err != nil {
|
||||||
|
sendError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.add(bod.Node)
|
||||||
|
sendSuccess(w, "node added")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) removeNode(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var bod struct {
|
||||||
|
Node string `json:"node"`
|
||||||
|
}
|
||||||
|
err := json.NewDecoder(r.Body).Decode(&bod)
|
||||||
|
if err != nil {
|
||||||
|
sendError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.remove(bod.Node)
|
||||||
|
sendSuccess(w, "node deleted")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) listNodes(w http.ResponseWriter, r *http.Request) {
|
||||||
|
nodes, _ := a.List("")
|
||||||
|
dead := a.dead()
|
||||||
|
|
||||||
|
out := make(map[string]string, len(nodes)+len(dead))
|
||||||
|
for _, n := range nodes {
|
||||||
|
if a.isDead(n) {
|
||||||
|
out[n] = "offline"
|
||||||
|
} else {
|
||||||
|
out[n] = "online"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, n := range dead {
|
||||||
|
out[n] = "offline"
|
||||||
|
}
|
||||||
|
|
||||||
|
sendValue(w, struct {
|
||||||
|
Nodes map[string]string `json:"nodes"`
|
||||||
|
}{
|
||||||
|
Nodes: out,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) isDead(node string) bool {
|
||||||
|
a.RLock()
|
||||||
|
val, ok := a.ded[node]
|
||||||
|
a.RUnlock()
|
||||||
|
return ok && val >= a.hcUnhealthy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *allGrouper) dead() []string {
|
||||||
|
a.RLock()
|
||||||
|
defer a.RUnlock()
|
||||||
|
nodes := make([]string, 0, len(a.ded))
|
||||||
|
for n, val := range a.ded {
|
||||||
|
if val >= a.hcUnhealthy {
|
||||||
|
nodes = append(nodes, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
299
fnlb/lb/ch.go
Normal file
299
fnlb/lb/ch.go
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
package lb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/dchest/siphash"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewConsistentRouter(conf Config) Router {
|
||||||
|
return &chRouter{
|
||||||
|
rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
|
||||||
|
load: make(map[string]*int64),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type chRouter struct {
|
||||||
|
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
|
||||||
|
statsMu sync.Mutex
|
||||||
|
stats []*stat
|
||||||
|
|
||||||
|
loadMu sync.RWMutex
|
||||||
|
load map[string]*int64
|
||||||
|
rng *rand.Rand
|
||||||
|
}
|
||||||
|
|
||||||
|
type stat struct {
|
||||||
|
timestamp time.Time
|
||||||
|
latency time.Duration
|
||||||
|
node string
|
||||||
|
code int
|
||||||
|
fx string
|
||||||
|
wait time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) addStat(s *stat) {
|
||||||
|
ch.statsMu.Lock()
|
||||||
|
// delete last 1 minute of data if nobody is watching
|
||||||
|
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
|
||||||
|
ch.stats = ch.stats[:i]
|
||||||
|
}
|
||||||
|
ch.stats = append(ch.stats, s)
|
||||||
|
ch.statsMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) getStats() []*stat {
|
||||||
|
ch.statsMu.Lock()
|
||||||
|
stats := ch.stats
|
||||||
|
ch.stats = ch.stats[:0]
|
||||||
|
ch.statsMu.Unlock()
|
||||||
|
|
||||||
|
return stats
|
||||||
|
}
|
||||||
|
|
||||||
|
type lockedSource struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
src rand.Source64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *lockedSource) Int63() (n int64) {
|
||||||
|
r.lk.Lock()
|
||||||
|
n = r.src.Int63()
|
||||||
|
r.lk.Unlock()
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *lockedSource) Uint64() (n uint64) {
|
||||||
|
r.lk.Lock()
|
||||||
|
n = r.src.Uint64()
|
||||||
|
r.lk.Unlock()
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *lockedSource) Seed(seed int64) {
|
||||||
|
r.lk.Lock()
|
||||||
|
r.src.Seed(seed)
|
||||||
|
r.lk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Route in this form relies on the nodes being in sorted order so
|
||||||
|
// that the output will be consistent (yes, slightly unfortunate).
|
||||||
|
func (ch *chRouter) Route(nodes []string, key string) (string, error) {
|
||||||
|
// crc not unique enough & sha is too slow, it's 1 import
|
||||||
|
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||||
|
|
||||||
|
i := int(jumpConsistentHash(sum64, int32(len(nodes))))
|
||||||
|
return ch.besti(key, i, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) InterceptResponse(req *http.Request, resp *http.Response) {
|
||||||
|
load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
|
||||||
|
// XXX (reed): we should prob clear this from user response?
|
||||||
|
// resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
|
||||||
|
|
||||||
|
// XXX (reed): need to validate these prob
|
||||||
|
ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
|
||||||
|
|
||||||
|
ch.addStat(&stat{
|
||||||
|
timestamp: time.Now(),
|
||||||
|
//latency: latency, // XXX (reed): plumb
|
||||||
|
node: req.URL.Host,
|
||||||
|
code: resp.StatusCode,
|
||||||
|
fx: req.URL.Path,
|
||||||
|
wait: load,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
||||||
|
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
||||||
|
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
||||||
|
var b, j int64 = -1, 0
|
||||||
|
for j < int64(num_buckets) {
|
||||||
|
b = j
|
||||||
|
key = key*2862933555777941757 + 1
|
||||||
|
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
||||||
|
}
|
||||||
|
return int32(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// tracks last 10 samples (very fast)
|
||||||
|
const DECAY = 0.1
|
||||||
|
|
||||||
|
func ewma(old, new int64) int64 {
|
||||||
|
// TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
|
||||||
|
return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) setLoad(key string, load int64) {
|
||||||
|
ch.loadMu.RLock()
|
||||||
|
l, ok := ch.load[key]
|
||||||
|
ch.loadMu.RUnlock()
|
||||||
|
if ok {
|
||||||
|
// this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
|
||||||
|
prev := atomic.LoadInt64(l)
|
||||||
|
atomic.StoreInt64(l, ewma(prev, load))
|
||||||
|
} else {
|
||||||
|
ch.loadMu.Lock()
|
||||||
|
if _, ok := ch.load[key]; !ok {
|
||||||
|
ch.load[key] = &load
|
||||||
|
}
|
||||||
|
ch.loadMu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadKey(node, key string) string {
|
||||||
|
return node + "\x00" + key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) besti(key string, i int, nodes []string) (string, error) {
|
||||||
|
if len(nodes) < 1 {
|
||||||
|
// supposed to be caught in grouper, but double check
|
||||||
|
return "", ErrNoNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// XXX (reed): trash the closure
|
||||||
|
f := func(n string) string {
|
||||||
|
var load time.Duration
|
||||||
|
ch.loadMu.RLock()
|
||||||
|
loadPtr := ch.load[loadKey(n, key)]
|
||||||
|
ch.loadMu.RUnlock()
|
||||||
|
if loadPtr != nil {
|
||||||
|
load = time.Duration(atomic.LoadInt64(loadPtr))
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TODO we should probably use deltas rather than fixed wait times. for 'cold'
|
||||||
|
// functions these could always trigger. i.e. if wait time increased 5x over last
|
||||||
|
// 100 data points, point the cannon elsewhere (we'd have to track 2 numbers but meh)
|
||||||
|
lowerLat = 500 * time.Millisecond
|
||||||
|
upperLat = 2 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO flesh out these values.
|
||||||
|
// if we send < 50% of traffic off to other nodes when loaded
|
||||||
|
// then as function scales nodes will get flooded, need to be careful.
|
||||||
|
//
|
||||||
|
// back off loaded node/function combos slightly to spread load
|
||||||
|
if load < lowerLat {
|
||||||
|
return n
|
||||||
|
} else if load > upperLat {
|
||||||
|
// really loaded
|
||||||
|
if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 10 < x < 40, as load approaches upperLat, x decreases [linearly]
|
||||||
|
x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
|
||||||
|
if ch.rng.Intn(100) < x {
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// return invalid node to try next node
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
for ; ; i++ {
|
||||||
|
// theoretically this could take infinite time, but practically improbable...
|
||||||
|
node := f(nodes[i])
|
||||||
|
if node != "" {
|
||||||
|
return node, nil
|
||||||
|
} else if i == len(nodes)-1 {
|
||||||
|
i = -1 // reset i to 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO we need a way to add a node for a given key from down here if a node is overloaded.
|
||||||
|
|
||||||
|
panic("strange things are afoot at the circle k")
|
||||||
|
}
|
||||||
|
|
||||||
|
func translate(val, inFrom, inTo, outFrom, outTo int64) int {
|
||||||
|
outRange := outTo - outFrom
|
||||||
|
inRange := inTo - inFrom
|
||||||
|
inVal := val - inFrom
|
||||||
|
// we want the number to be lower as intensity increases
|
||||||
|
return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) Wrap(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
// XXX (reed): probably do these on a separate port to avoid conflicts
|
||||||
|
case "/1/lb/stats":
|
||||||
|
ch.statsGet(w, r)
|
||||||
|
return
|
||||||
|
case "/1/lb/dash":
|
||||||
|
ch.dash(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) statsGet(w http.ResponseWriter, r *http.Request) {
|
||||||
|
stats := ch.getStats()
|
||||||
|
|
||||||
|
type st struct {
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
Throughput int `json:"tp"`
|
||||||
|
Node string `json:"node"`
|
||||||
|
Func string `json:"func"`
|
||||||
|
Wait float64 `json:"wait"` // seconds
|
||||||
|
}
|
||||||
|
var sts []st
|
||||||
|
|
||||||
|
// roll up and calculate throughput per second. idk why i hate myself
|
||||||
|
aggs := make(map[string][]*stat)
|
||||||
|
for _, s := range stats {
|
||||||
|
key := s.node + "/" + s.fx
|
||||||
|
if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
|
||||||
|
sts = append(sts, st{
|
||||||
|
Timestamp: t[0].timestamp,
|
||||||
|
Throughput: len(t),
|
||||||
|
Node: t[0].node,
|
||||||
|
Func: t[0].fx,
|
||||||
|
Wait: avgWait(t),
|
||||||
|
})
|
||||||
|
|
||||||
|
aggs[key] = append(aggs[key][:0], s)
|
||||||
|
} else {
|
||||||
|
aggs[key] = append(aggs[key], s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// leftovers
|
||||||
|
for _, t := range aggs {
|
||||||
|
sts = append(sts, st{
|
||||||
|
Timestamp: t[0].timestamp,
|
||||||
|
Throughput: len(t),
|
||||||
|
Node: t[0].node,
|
||||||
|
Func: t[0].fx,
|
||||||
|
Wait: avgWait(t),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
json.NewEncoder(w).Encode(struct {
|
||||||
|
Stats []st `json:"stats"`
|
||||||
|
}{
|
||||||
|
Stats: sts,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func avgWait(stats []*stat) float64 {
|
||||||
|
var sum time.Duration
|
||||||
|
for _, s := range stats {
|
||||||
|
sum += s.wait
|
||||||
|
}
|
||||||
|
return (sum / time.Duration(len(stats))).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *chRouter) dash(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Write([]byte(dashPage))
|
||||||
|
}
|
||||||
@@ -1,4 +1,15 @@
|
|||||||
|
package lb
|
||||||
|
|
||||||
|
const dashPage = `<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
|
||||||
|
<title>lb dash</title>
|
||||||
|
|
||||||
|
<script type="text/javascript" src="https://code.jquery.com/jquery-1.10.1.js"></script>
|
||||||
|
<script type="text/javascript" src="https://code.highcharts.com/stock/highstock.js"></script>
|
||||||
|
<script type="text/javascript" src="https://code.highcharts.com/stock/modules/exporting.js"></script>
|
||||||
|
<script>
|
||||||
var example = 'dynamic-update',
|
var example = 'dynamic-update',
|
||||||
theme = 'default';
|
theme = 'default';
|
||||||
|
|
||||||
@@ -156,3 +167,14 @@ $(document).ready(function() {
|
|||||||
series: []
|
series: []
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
</script>
|
||||||
|
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
|
||||||
|
<div id="throughput_chart" style="height: 400px; min-width: 310px"></div>
|
||||||
|
<div id="wait_chart" style="height: 400px; min-width: 310px"></div>
|
||||||
|
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
`
|
||||||
147
fnlb/lb/proxy.go
Normal file
147
fnlb/lb/proxy.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
package lb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httputil"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO the load balancers all need to have the same list of nodes. gossip?
|
||||||
|
// also gossip would handle failure detection instead of elb style. or it can
|
||||||
|
// be pluggable and then we can read from where bmc is storing them and use that
|
||||||
|
// or some OSS alternative
|
||||||
|
|
||||||
|
// TODO when node goes offline should try to redirect request instead of 5xxing
|
||||||
|
|
||||||
|
// TODO we could add some kind of pre-warming call to the functions server where
|
||||||
|
// the lb could send an image to it to download before the lb starts sending traffic
|
||||||
|
// there, otherwise when load starts expanding a few functions are going to eat
|
||||||
|
// the pull time
|
||||||
|
|
||||||
|
// TODO config
|
||||||
|
// TODO TLS
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Listen string `json:"port"`
|
||||||
|
Nodes []string `json:"nodes"`
|
||||||
|
HealthcheckInterval int `json:"healthcheck_interval"`
|
||||||
|
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
|
||||||
|
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
|
||||||
|
HealthcheckTimeout int `json:"healthcheck_timeout"`
|
||||||
|
|
||||||
|
Transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
type Grouper interface {
|
||||||
|
// List returns a set of hosts that may be used to route a request
|
||||||
|
// for a given key.
|
||||||
|
List(key string) ([]string, error)
|
||||||
|
|
||||||
|
// Wrap allows adding middleware to the provided http.Handler.
|
||||||
|
Wrap(http.Handler) http.Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
type Router interface {
|
||||||
|
// TODO we could probably expose this just as some kind of http.RoundTripper
|
||||||
|
// but I can't think of anything elegant so here this is.
|
||||||
|
|
||||||
|
// Route will pick a node from the given set of nodes.
|
||||||
|
Route(nodes []string, key string) (string, error)
|
||||||
|
|
||||||
|
// InterceptResponse allows a Router to extract information from proxied
|
||||||
|
// requests so that it might do a better job next time. InterceptResponse
|
||||||
|
// should not modify the Response as it has already been received nore the
|
||||||
|
// Request, having already been sent.
|
||||||
|
InterceptResponse(req *http.Request, resp *http.Response)
|
||||||
|
|
||||||
|
// Wrap allows adding middleware to the provided http.Handler.
|
||||||
|
Wrap(http.Handler) http.Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyFunc maps a request to a shard key, it may return an error
|
||||||
|
// if there are issues locating the shard key.
|
||||||
|
type KeyFunc func(req *http.Request) (string, error)
|
||||||
|
|
||||||
|
type proxy struct {
|
||||||
|
keyFunc KeyFunc
|
||||||
|
grouper Grouper
|
||||||
|
router Router
|
||||||
|
|
||||||
|
transport http.RoundTripper
|
||||||
|
|
||||||
|
// embed for lazy ServeHTTP mostly
|
||||||
|
*httputil.ReverseProxy
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProxy will marry the given parameters into an able proxy.
|
||||||
|
func NewProxy(keyFunc KeyFunc, g Grouper, r Router, conf Config) http.Handler {
|
||||||
|
p := new(proxy)
|
||||||
|
*p = proxy{
|
||||||
|
keyFunc: keyFunc,
|
||||||
|
grouper: g,
|
||||||
|
router: r,
|
||||||
|
transport: conf.Transport,
|
||||||
|
ReverseProxy: &httputil.ReverseProxy{
|
||||||
|
Director: func(*http.Request) { /* in RoundTrip so we can error out */ },
|
||||||
|
Transport: p,
|
||||||
|
BufferPool: newBufferPool(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
type bufferPool struct {
|
||||||
|
bufs *sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBufferPool() httputil.BufferPool {
|
||||||
|
return &bufferPool{
|
||||||
|
bufs: &sync.Pool{
|
||||||
|
// 32KB is what the proxy would've used without recycling them
|
||||||
|
New: func() interface{} { return make([]byte, 32*1024) },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
|
||||||
|
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
|
||||||
|
|
||||||
|
func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
target, err := p.route(req)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
|
||||||
|
if req.Body != nil {
|
||||||
|
io.Copy(ioutil.Discard, req.Body)
|
||||||
|
req.Body.Close()
|
||||||
|
}
|
||||||
|
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
|
||||||
|
return nil, ErrNoNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
req.URL.Scheme = "http" // XXX (reed): h2 support
|
||||||
|
req.URL.Host = target
|
||||||
|
|
||||||
|
resp, err := p.transport.RoundTrip(req)
|
||||||
|
if err == nil {
|
||||||
|
p.router.InterceptResponse(req, resp)
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *proxy) route(req *http.Request) (string, error) {
|
||||||
|
// TODO errors from this func likely could return 401 or so instead of 503 always
|
||||||
|
key, err := p.keyFunc(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
list, err := p.grouper.List(key)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return p.router.Route(list, key)
|
||||||
|
}
|
||||||
47
fnlb/lb/util.go
Normal file
47
fnlb/lb/util.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package lb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNoNodes = errors.New("no nodes available")
|
||||||
|
)
|
||||||
|
|
||||||
|
func sendValue(w http.ResponseWriter, v interface{}) {
|
||||||
|
err := json.NewEncoder(w).Encode(v)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("error writing response response")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendSuccess(w http.ResponseWriter, msg string) {
|
||||||
|
err := json.NewEncoder(w).Encode(struct {
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}{
|
||||||
|
Msg: msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("error writing response response")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendError(w http.ResponseWriter, code int, msg string) {
|
||||||
|
w.WriteHeader(code)
|
||||||
|
|
||||||
|
err := json.NewEncoder(w).Encode(struct {
|
||||||
|
Msg string `json:"msg"`
|
||||||
|
}{
|
||||||
|
Msg: msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("error writing response response")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,75 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
|
|
||||||
func TestCHAdd(t *testing.T) {
|
|
||||||
var ch consistentHash
|
|
||||||
nodes := []string{"1", "2", "3"}
|
|
||||||
for _, n := range nodes {
|
|
||||||
ch.add(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ch.nodes) != 3 {
|
|
||||||
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
// test dupes don't get added
|
|
||||||
for _, n := range nodes {
|
|
||||||
ch.add(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ch.nodes) != 3 {
|
|
||||||
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCHRemove(t *testing.T) {
|
|
||||||
var ch consistentHash
|
|
||||||
nodes := []string{"1", "2", "3"}
|
|
||||||
for _, n := range nodes {
|
|
||||||
ch.add(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ch.nodes) != 3 {
|
|
||||||
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.remove("4")
|
|
||||||
|
|
||||||
if len(ch.nodes) != 3 {
|
|
||||||
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.remove("3")
|
|
||||||
|
|
||||||
if len(ch.nodes) != 2 {
|
|
||||||
t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.remove("3")
|
|
||||||
|
|
||||||
if len(ch.nodes) != 2 {
|
|
||||||
t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCHGet(t *testing.T) {
|
|
||||||
var ch consistentHash
|
|
||||||
nodes := []string{"1", "2", "3"}
|
|
||||||
for _, n := range nodes {
|
|
||||||
ch.add(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ch.nodes) != 3 {
|
|
||||||
t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
keys := []string{"a", "b", "c"}
|
|
||||||
for _, k := range keys {
|
|
||||||
_, err := ch.get(k)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("CHGet returned an error: ", err)
|
|
||||||
}
|
|
||||||
// testing this doesn't panic basically? could test distro but meh
|
|
||||||
}
|
|
||||||
}
|
|
||||||
81
fnlb/main.go
Normal file
81
fnlb/main.go
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"flag"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"gitlab-odx.oracle.com/odx/functions/fnlb/lb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// XXX (reed): normalize
|
||||||
|
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
|
||||||
|
|
||||||
|
var conf lb.Config
|
||||||
|
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
|
||||||
|
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
|
||||||
|
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
|
||||||
|
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
|
||||||
|
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
conf.Nodes = strings.Split(*fnodes, ",")
|
||||||
|
|
||||||
|
conf.Transport = &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
Dial: (&net.Dialer{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
KeepAlive: 120 * time.Second,
|
||||||
|
}).Dial,
|
||||||
|
MaxIdleConnsPerHost: 512,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
g := lb.NewAllGrouper(conf)
|
||||||
|
r := lb.NewConsistentRouter(conf)
|
||||||
|
k := func(r *http.Request) (string, error) {
|
||||||
|
return r.URL.Path, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
h := lb.NewProxy(k, g, r, conf)
|
||||||
|
h = g.Wrap(h) // add/del/list endpoints
|
||||||
|
h = r.Wrap(h) // stats / dash endpoint
|
||||||
|
|
||||||
|
err := serve(conf.Listen, h)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("server error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func serve(addr string, handler http.Handler) error {
|
||||||
|
server := &http.Server{Addr: addr, Handler: handler}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Wait()
|
||||||
|
|
||||||
|
ch := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for sig := range ch {
|
||||||
|
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
|
||||||
|
server.Shutdown(context.Background()) // safe shutdown
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return server.ListenAndServe()
|
||||||
|
}
|
||||||
223
fnlb/proxy.go
223
fnlb/proxy.go
@@ -1,223 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httputil"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
type chProxy struct {
|
|
||||||
ch *consistentHash
|
|
||||||
|
|
||||||
sync.RWMutex
|
|
||||||
// TODO map[string][]time.Time
|
|
||||||
ded map[string]int64
|
|
||||||
|
|
||||||
hcInterval time.Duration
|
|
||||||
hcEndpoint string
|
|
||||||
hcUnhealthy int64
|
|
||||||
hcTimeout time.Duration
|
|
||||||
|
|
||||||
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
|
|
||||||
statsMu sync.Mutex
|
|
||||||
stats []*stat
|
|
||||||
|
|
||||||
proxy *httputil.ReverseProxy
|
|
||||||
httpClient *http.Client
|
|
||||||
transport http.RoundTripper
|
|
||||||
}
|
|
||||||
|
|
||||||
type stat struct {
|
|
||||||
timestamp time.Time
|
|
||||||
latency time.Duration
|
|
||||||
node string
|
|
||||||
code int
|
|
||||||
fx string
|
|
||||||
wait time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) addStat(s *stat) {
|
|
||||||
ch.statsMu.Lock()
|
|
||||||
// delete last 1 minute of data if nobody is watching
|
|
||||||
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
|
|
||||||
ch.stats = ch.stats[:i]
|
|
||||||
}
|
|
||||||
ch.stats = append(ch.stats, s)
|
|
||||||
ch.statsMu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) getStats() []*stat {
|
|
||||||
ch.statsMu.Lock()
|
|
||||||
stats := ch.stats
|
|
||||||
ch.stats = ch.stats[:0]
|
|
||||||
ch.statsMu.Unlock()
|
|
||||||
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
|
|
||||||
func newProxy(conf config) *chProxy {
|
|
||||||
tranny := &http.Transport{
|
|
||||||
Proxy: http.ProxyFromEnvironment,
|
|
||||||
Dial: (&net.Dialer{
|
|
||||||
Timeout: 10 * time.Second,
|
|
||||||
KeepAlive: 120 * time.Second,
|
|
||||||
}).Dial,
|
|
||||||
MaxIdleConnsPerHost: 512,
|
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
|
||||||
TLSClientConfig: &tls.Config{
|
|
||||||
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := &chProxy{
|
|
||||||
ded: make(map[string]int64),
|
|
||||||
|
|
||||||
// XXX (reed): need to be reconfigurable at some point
|
|
||||||
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
|
||||||
hcEndpoint: conf.HealthcheckEndpoint,
|
|
||||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
|
||||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
|
||||||
httpClient: &http.Client{Transport: tranny},
|
|
||||||
transport: tranny,
|
|
||||||
|
|
||||||
ch: newCH(),
|
|
||||||
}
|
|
||||||
|
|
||||||
director := func(req *http.Request) {
|
|
||||||
target, err := ch.ch.get(req.URL.Path)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
|
|
||||||
target = "error"
|
|
||||||
}
|
|
||||||
|
|
||||||
req.URL.Scheme = "http" // XXX (reed): h2 support
|
|
||||||
req.URL.Host = target
|
|
||||||
}
|
|
||||||
|
|
||||||
ch.proxy = &httputil.ReverseProxy{
|
|
||||||
Director: director,
|
|
||||||
Transport: ch,
|
|
||||||
BufferPool: newBufferPool(),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, n := range conf.Nodes {
|
|
||||||
ch.ch.add(n)
|
|
||||||
}
|
|
||||||
go ch.healthcheck()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
type bufferPool struct {
|
|
||||||
bufs *sync.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBufferPool() httputil.BufferPool {
|
|
||||||
return &bufferPool{
|
|
||||||
bufs: &sync.Pool{
|
|
||||||
// 32KB is what the proxy would've used without recycling them
|
|
||||||
New: func() interface{} { return make([]byte, 32*1024) },
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
|
|
||||||
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
|
|
||||||
|
|
||||||
func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
||||||
if req != nil && req.URL.Host == "error" {
|
|
||||||
if req.Body != nil {
|
|
||||||
io.Copy(ioutil.Discard, req.Body)
|
|
||||||
req.Body.Close()
|
|
||||||
}
|
|
||||||
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
|
|
||||||
return nil, ErrNoNodes
|
|
||||||
}
|
|
||||||
|
|
||||||
then := time.Now()
|
|
||||||
resp, err := ch.transport.RoundTrip(req)
|
|
||||||
if err == nil {
|
|
||||||
ch.intercept(req, resp, time.Since(then))
|
|
||||||
}
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
|
|
||||||
load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
|
|
||||||
// XXX (reed): we should prob clear this from user response?
|
|
||||||
// resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
|
|
||||||
|
|
||||||
// XXX (reed): need to validate these prob
|
|
||||||
ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
|
|
||||||
|
|
||||||
ch.addStat(&stat{
|
|
||||||
timestamp: time.Now(),
|
|
||||||
latency: latency,
|
|
||||||
node: req.URL.Host,
|
|
||||||
code: resp.StatusCode,
|
|
||||||
fx: req.URL.Path,
|
|
||||||
wait: load,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) healthcheck() {
|
|
||||||
for range time.Tick(ch.hcInterval) {
|
|
||||||
nodes := ch.ch.list()
|
|
||||||
nodes = append(nodes, ch.dead()...)
|
|
||||||
for _, n := range nodes {
|
|
||||||
go ch.ping(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) ping(node string) {
|
|
||||||
req, _ := http.NewRequest("GET", "http://"+node+ch.hcEndpoint, nil)
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), ch.hcTimeout)
|
|
||||||
defer cancel()
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
resp, err := ch.httpClient.Do(req)
|
|
||||||
if resp != nil && resp.Body != nil {
|
|
||||||
io.Copy(ioutil.Discard, resp.Body)
|
|
||||||
resp.Body.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
||||||
logrus.WithFields(logrus.Fields{"node": node}).Error("health check failed")
|
|
||||||
ch.fail(node)
|
|
||||||
} else {
|
|
||||||
ch.alive(node)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) fail(node string) {
|
|
||||||
// shouldn't be a hot path so shouldn't be too contended on since health
|
|
||||||
// checks are infrequent
|
|
||||||
ch.Lock()
|
|
||||||
ch.ded[node]++
|
|
||||||
failed := ch.ded[node]
|
|
||||||
ch.Unlock()
|
|
||||||
|
|
||||||
if failed >= ch.hcUnhealthy {
|
|
||||||
ch.ch.remove(node) // TODO under lock?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *chProxy) alive(node string) {
|
|
||||||
ch.RLock()
|
|
||||||
_, ok := ch.ded[node]
|
|
||||||
ch.RUnlock()
|
|
||||||
if ok {
|
|
||||||
ch.Lock()
|
|
||||||
delete(ch.ded, node)
|
|
||||||
ch.Unlock()
|
|
||||||
ch.ch.add(node) // TODO under lock?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,11 +1,13 @@
|
|||||||
FROM docker:1.13.1-dind
|
FROM docker:edge-dind
|
||||||
|
|
||||||
RUN apk update && apk upgrade && apk add --no-cache ca-certificates
|
RUN apk add --no-cache ca-certificates
|
||||||
|
|
||||||
COPY entrypoint.sh /usr/local/bin/
|
# cleanup warning: https://github.com/docker-library/docker/issues/55
|
||||||
COPY dind.sh /usr/local/bin/
|
RUN addgroup -g 2999 docker
|
||||||
|
|
||||||
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
|
COPY preentry.sh /usr/local/bin/
|
||||||
|
|
||||||
# USAGE: Add a CMD to your own Dockerfile to use this (NOT an ENTRYPOINT, so that this is called)
|
ENTRYPOINT ["preentry.sh"]
|
||||||
|
|
||||||
|
# USAGE: Add a CMD to your own Dockerfile to use this (NOT an ENTRYPOINT), eg:
|
||||||
# CMD ["./runner"]
|
# CMD ["./runner"]
|
||||||
|
|||||||
@@ -1,2 +1,4 @@
|
|||||||
This is the base image for all Titan's docker-in-docker images.
|
This is the base image for all docker-in-docker images.
|
||||||
|
|
||||||
|
The difference between this and the official `docker` images are that this will choose the best
|
||||||
|
filesystem automatically. The official ones use `vfs` (bad) by default unless you pass in a flag.
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
set -ex
|
set -ex
|
||||||
|
|
||||||
docker build -t treeder/dind:latest .
|
docker build --build-arg HTTP_PROXY -t funcy/dind:latest .
|
||||||
|
|||||||
@@ -1,28 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
|
|
||||||
set -ex
|
|
||||||
# modified from: https://github.com/docker-library/docker/blob/866c3fbd87e8eeed524fdf19ba2d63288ad49cd2/1.11/dind/dockerd-entrypoint.sh
|
|
||||||
# this will run either overlay or aufs as the docker fs driver, if the OS has both, overlay is preferred.
|
|
||||||
# rewrite overlay to use overlay2 (docker 1.12, linux >=4.x required), see https://docs.docker.com/engine/userguide/storagedriver/selectadriver/#overlay-vs-overlay2
|
|
||||||
|
|
||||||
fsdriver=$(grep -Eh -w -m1 "overlay|aufs" /proc/filesystems | cut -f2)
|
|
||||||
|
|
||||||
if [ $fsdriver == "overlay" ]; then
|
|
||||||
fsdriver="overlay2"
|
|
||||||
fi
|
|
||||||
|
|
||||||
cmd="dockerd \
|
|
||||||
--host=unix:///var/run/docker.sock \
|
|
||||||
--host=tcp://0.0.0.0:2375 \
|
|
||||||
--storage-driver=$fsdriver"
|
|
||||||
|
|
||||||
# nanny and restart on crashes
|
|
||||||
until eval $cmd; do
|
|
||||||
echo "Docker crashed with exit code $?. Respawning.." >&2
|
|
||||||
# if we just restart it won't work, so start it (it wedges up) and
|
|
||||||
# then kill the wedgie and restart it again and ta da... yea, seriously
|
|
||||||
pidfile=/var/run/docker/libcontainerd/docker-containerd.pid
|
|
||||||
kill -9 $(cat $pidfile)
|
|
||||||
rm $pidfile
|
|
||||||
sleep 1
|
|
||||||
done
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
|
|
||||||
set -ex
|
|
||||||
|
|
||||||
/usr/local/bin/dind.sh &
|
|
||||||
|
|
||||||
# wait for daemon to initialize
|
|
||||||
sleep 3
|
|
||||||
|
|
||||||
exec "$@"
|
|
||||||
14
images/dind/preentry.sh
Executable file
14
images/dind/preentry.sh
Executable file
@@ -0,0 +1,14 @@
|
|||||||
|
#!/bin/sh
|
||||||
|
set -e
|
||||||
|
|
||||||
|
fsdriver=$(grep -Eh -w -m1 "overlay|aufs" /proc/filesystems | cut -f2)
|
||||||
|
if [ $fsdriver == "overlay" ]; then
|
||||||
|
fsdriver="overlay2"
|
||||||
|
fi
|
||||||
|
|
||||||
|
dockerd-entrypoint.sh --storage-driver=$fsdriver &
|
||||||
|
|
||||||
|
# give docker a few seconds
|
||||||
|
sleep 3
|
||||||
|
|
||||||
|
exec "$@"
|
||||||
@@ -6,7 +6,7 @@ docker run --rm -v "$PWD":/app treeder/bump patch
|
|||||||
version=`cat VERSION`
|
version=`cat VERSION`
|
||||||
echo "version $version"
|
echo "version $version"
|
||||||
|
|
||||||
docker tag treeder/dind:latest treeder/dind:$version
|
docker tag funcy/dind:latest funcy/dind:$version
|
||||||
|
|
||||||
docker push treeder/dind:latest
|
docker push funcy/dind:latest
|
||||||
docker push treeder/dind:$version
|
docker push funcy/dind:$version
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -ex
|
set -ex
|
||||||
|
|
||||||
user="treeder"
|
user="funcy"
|
||||||
service="functions"
|
service="functions"
|
||||||
tag="latest"
|
tag="latest"
|
||||||
|
|
||||||
@@ -34,11 +34,10 @@ git tag -f -a "$version" -m "version $version"
|
|||||||
git push
|
git push
|
||||||
git push origin $version
|
git push origin $version
|
||||||
|
|
||||||
# TODO: Where to push these?
|
|
||||||
# Finally tag and push docker images
|
# Finally tag and push docker images
|
||||||
# docker tag $user/$service:$tag $user/$service:$version
|
docker tag $user/$service:$tag $user/$service:$version
|
||||||
# docker push $user/$service:$version
|
docker push $user/$service:$version
|
||||||
# docker push $user/$service:$tag
|
docker push $user/$service:$tag
|
||||||
|
|
||||||
cd fn
|
cd fn
|
||||||
./release.sh $version
|
./release.sh $version
|
||||||
|
|||||||
Reference in New Issue
Block a user