diff --git a/Dockerfile b/Dockerfile
index b6dd065d8..e4374806e 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM treeder/dind
+FROM funcy/dind
WORKDIR /app
diff --git a/Makefile b/Makefile
index e0c7f103a..ec083ab63 100644
--- a/Makefile
+++ b/Makefile
@@ -29,10 +29,10 @@ docker-dep:
docker-build:
docker pull funcy/go:dev
docker run --rm -v ${CURDIR}:/go/src/gitlab-odx.oracle.com/odx/functions -w /go/src/gitlab-odx.oracle.com/odx/functions funcy/go:dev go build -o functions-alpine
- docker build --build-arg HTTP_PROXY -t treeder/functions:latest .
+ docker build --build-arg HTTP_PROXY -t funcy/functions:latest .
docker-run: docker-build
- docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v ${CURDIR}/data:/app/data -p 8080:8080 treeder/functions
+ docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v ${CURDIR}/data:/app/data -p 8080:8080 funcy/functions
docker-test:
docker run -ti --privileged --rm -e LOG_LEVEL=debug \
diff --git a/api/server/server.go b/api/server/server.go
index 0ce248442..d2b622cb2 100644
--- a/api/server/server.go
+++ b/api/server/server.go
@@ -208,18 +208,17 @@ func (s *Server) startGears(ctx context.Context) {
}
const runHeader = `
- ____ __
- / __ \_________ ______/ /__
- / / / / ___/ __ / ___/ / _ \
- / /_/ / / / /_/ / /__/ / __/
- \_________ \__,_/\___/_/\____
- / ____/_ __ ___ _____/ /_( )___ ____ _____
- / /_ / / / / __ \/ ___/ __/ / __ \/ __ \/ ___/
- / __/ / /_/ / / / / /__/ /_/ / /_/ / / / (__ )
- /_/ \____/_/ /_/\___/\__/_/\____/_/ /_/____/
-`
-
- logrus.Infof(runHeader)
+ ____ __
+ / __ \_________ ______/ /__
+ / / / / ___/ __ / ___/ / _ \
+ / /_/ / / / /_/ / /__/ / __/
+ \_________ \__,_/\___/_/\____
+ / ____/_ __ ___ _____/ /_( )___ ____ _____
+ / /_ / / / / __ \/ ___/ __/ / __ \/ __ \/ ___/
+ / __/ / /_/ / / / / /__/ /_/ / /_/ / / / (__ )
+ /_/ \____/_/ /_/\___/\__/_/\____/_/ /_/____/
+ `
+ fmt.Println(runHeader)
logrus.Infof("Serving Functions API on address `%s`", listen)
svr := &supervisor.Supervisor{
diff --git a/fn/Makefile b/fn/Makefile
index 6abd33734..ad8b5e512 100644
--- a/fn/Makefile
+++ b/fn/Makefile
@@ -20,7 +20,3 @@ release:
GOOS=darwin go build -o fn_mac
GOOS=windows go build -o fn.exe
docker run --rm -v ${PWD}:/go/src/gitlab-odx.oracle.com/odx/functions/fn -w /go/src/gitlab-odx.oracle.com/odx/functions/fn funcy/go:dev go build -o fn_alpine
-
-# install locally
-install: build
- sudo mv fn /usr/local/bin/
diff --git a/fn/start.go b/fn/start.go
index 4fb157d5e..f53404c1e 100644
--- a/fn/start.go
+++ b/fn/start.go
@@ -30,7 +30,8 @@ func start(c *cli.Context) error {
if c.String("log-level") != "" {
denvs = append(denvs, "GIN_MODE="+c.String("log-level"))
}
- // docker run --rm -it --name functions -v ${PWD}/data:/app/data -v /var/run/docker.sock:/var/run/docker.sock -p 8080:8080 treeder/functions
+ // Socket mount: docker run --rm -it --name functions -v ${PWD}/data:/app/data -v /var/run/docker.sock:/var/run/docker.sock -p 8080:8080 funcy/functions
+ // OR dind: docker run --rm -it --name functions -v ${PWD}/data:/app/data --privileged -p 8080:8080 funcy/functions
wd, err := os.Getwd()
if err != nil {
logrus.WithError(err).Fatalln("Getwd failed")
diff --git a/fnlb/ch.go b/fnlb/ch.go
deleted file mode 100644
index 103dade29..000000000
--- a/fnlb/ch.go
+++ /dev/null
@@ -1,213 +0,0 @@
-package main
-
-import (
- "errors"
- "math/rand"
- "sort"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/dchest/siphash"
-)
-
-// consistentHash will maintain a list of strings which can be accessed by
-// keying them with a separate group of strings
-type consistentHash struct {
- // protects nodes
- sync.RWMutex
- nodes []string
-
- loadMu sync.RWMutex
- load map[string]*int64
- rng *rand.Rand
-}
-
-func newCH() *consistentHash {
- return &consistentHash{
- rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
- load: make(map[string]*int64),
- }
-}
-
-type lockedSource struct {
- lk sync.Mutex
- src rand.Source64
-}
-
-func (r *lockedSource) Int63() (n int64) {
- r.lk.Lock()
- n = r.src.Int63()
- r.lk.Unlock()
- return n
-}
-
-func (r *lockedSource) Uint64() (n uint64) {
- r.lk.Lock()
- n = r.src.Uint64()
- r.lk.Unlock()
- return n
-}
-
-func (r *lockedSource) Seed(seed int64) {
- r.lk.Lock()
- r.src.Seed(seed)
- r.lk.Unlock()
-}
-
-func (ch *consistentHash) add(newb string) {
- ch.Lock()
- defer ch.Unlock()
-
- // filter dupes, under lock. sorted, so binary search
- i := sort.SearchStrings(ch.nodes, newb)
- if i < len(ch.nodes) && ch.nodes[i] == newb {
- return
- }
- ch.nodes = append(ch.nodes, newb)
- // need to keep in sorted order so that hash index works across nodes
- sort.Sort(sort.StringSlice(ch.nodes))
-}
-
-func (ch *consistentHash) remove(ded string) {
- ch.Lock()
- i := sort.SearchStrings(ch.nodes, ded)
- if i < len(ch.nodes) && ch.nodes[i] == ded {
- ch.nodes = append(ch.nodes[:i], ch.nodes[i+1:]...)
- }
- ch.Unlock()
-}
-
-// return a copy
-func (ch *consistentHash) list() []string {
- ch.RLock()
- ret := make([]string, len(ch.nodes))
- copy(ret, ch.nodes)
- ch.RUnlock()
- return ret
-}
-
-func (ch *consistentHash) get(key string) (string, error) {
- // crc not unique enough & sha is too slow, it's 1 import
- sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
-
- ch.RLock()
- defer ch.RUnlock()
- i := int(jumpConsistentHash(sum64, int32(len(ch.nodes))))
- return ch.besti(key, i)
-}
-
-// A Fast, Minimal Memory, Consistent Hash Algorithm:
-// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
-func jumpConsistentHash(key uint64, num_buckets int32) int32 {
- var b, j int64 = -1, 0
- for j < int64(num_buckets) {
- b = j
- key = key*2862933555777941757 + 1
- j = (b + 1) * int64((1<<31)/(key>>33)+1)
- }
- return int32(b)
-}
-
-// tracks last 10 samples (very fast)
-const DECAY = 0.1
-
-func ewma(old, new int64) int64 {
- // TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
- return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
-}
-
-func (ch *consistentHash) setLoad(key string, load int64) {
- ch.loadMu.RLock()
- l, ok := ch.load[key]
- ch.loadMu.RUnlock()
- if ok {
- // this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
- prev := atomic.LoadInt64(l)
- atomic.StoreInt64(l, ewma(prev, load))
- } else {
- ch.loadMu.Lock()
- if _, ok := ch.load[key]; !ok {
- ch.load[key] = &load
- }
- ch.loadMu.Unlock()
- }
-}
-
-var (
- ErrNoNodes = errors.New("no nodes available")
-)
-
-func loadKey(node, key string) string {
- return node + "\x00" + key
-}
-
-// XXX (reed): push down fails / load into ch
-func (ch *consistentHash) besti(key string, i int) (string, error) {
- ch.RLock()
- defer ch.RUnlock()
-
- if len(ch.nodes) < 1 {
- return "", ErrNoNodes
- }
-
- f := func(n string) string {
- var load time.Duration
- ch.loadMu.RLock()
- loadPtr := ch.load[loadKey(n, key)]
- ch.loadMu.RUnlock()
- if loadPtr != nil {
- load = time.Duration(atomic.LoadInt64(loadPtr))
- }
-
- const (
- lowerLat = 500 * time.Millisecond
- upperLat = 2 * time.Second
- )
-
- // TODO flesh out these values.
- // if we send < 50% of traffic off to other nodes when loaded
- // then as function scales nodes will get flooded, need to be careful.
- //
- // back off loaded node/function combos slightly to spread load
- // TODO do we need a kind of ref counter as well so as to send functions
- // to a different node while there's an outstanding call to another?
- if load < lowerLat {
- return n
- } else if load > upperLat {
- // really loaded
- if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
- return n
- }
- } else {
- // 10 < x < 40, as load approaches upperLat, x decreases [linearly]
- x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
- if ch.rng.Intn(100) < x {
- return n
- }
- }
-
- // return invalid node to try next node
- return ""
- }
-
- for ; ; i++ {
- // theoretically this could take infinite time, but practically improbable...
- node := f(ch.nodes[i])
- if node != "" {
- return node, nil
- } else if i == len(ch.nodes)-1 {
- i = -1 // reset i to 0
- }
- }
-
- panic("strange things are afoot at the circle k")
-}
-
-func translate(val, inFrom, inTo, outFrom, outTo int64) int {
- outRange := outTo - outFrom
- inRange := inTo - inFrom
- inVal := val - inFrom
- // we want the number to be lower as intensity increases
- return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
-}
diff --git a/fnlb/lb.go b/fnlb/lb.go
deleted file mode 100644
index 144b053aa..000000000
--- a/fnlb/lb.go
+++ /dev/null
@@ -1,311 +0,0 @@
-package main
-
-import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "net/http"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/Sirupsen/logrus"
-)
-
-// TODO the load balancers all need to have the same list of nodes. gossip?
-// also gossip would handle failure detection instead of elb style. or it can
-// be pluggable and then we can read from where bmc is storing them and use that
-// or some OSS alternative
-
-// TODO when node goes offline should try to redirect request instead of 5xxing
-
-// TODO we could add some kind of pre-warming call to the functions server where
-// the lb could send an image to it to download before the lb starts sending traffic
-// there, otherwise when load starts expanding a few functions are going to eat
-// the pull time
-
-// TODO config
-// TODO TLS
-
-func main() {
- // XXX (reed): normalize
- fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
-
- var conf config
- flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
- flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
- flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
- flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
- flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
- flag.Parse()
-
- conf.Nodes = strings.Split(*fnodes, ",")
-
- ch := newProxy(conf)
-
- err := serve(conf.Listen, ch)
- if err != nil {
- logrus.WithError(err).Error("server error")
- }
-}
-
-func serve(addr string, handler http.Handler) error {
- server := &http.Server{Addr: addr, Handler: handler}
-
- var wg sync.WaitGroup
- wg.Add(1)
- defer wg.Wait()
-
- ch := make(chan os.Signal, 1)
- signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
- go func() {
- defer wg.Done()
- for sig := range ch {
- logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
- server.Shutdown(context.Background()) // safe shutdown
- return
- }
- }()
- return server.ListenAndServe()
-}
-
-type config struct {
- Listen string `json:"port"`
- Nodes []string `json:"nodes"`
- HealthcheckInterval int `json:"healthcheck_interval"`
- HealthcheckEndpoint string `json:"healthcheck_endpoint"`
- HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
- HealthcheckTimeout int `json:"healthcheck_timeout"`
-}
-
-// XXX (reed): clean up mess
-var dashPage []byte
-
-func init() {
- jsb, err := ioutil.ReadFile("dash.js")
- if err != nil {
- logrus.WithError(err).Fatal("couldn't open dash.js file")
- }
-
- dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
-}
-
-func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- switch r.URL.Path {
- // XXX (reed): probably do these on a separate port to avoid conflicts
- case "/1/lb/nodes":
- switch r.Method {
- case "PUT":
- ch.addNode(w, r)
- return
- case "DELETE":
- ch.removeNode(w, r)
- return
- case "GET":
- ch.listNodes(w, r)
- return
- }
- case "/1/lb/stats":
- ch.statsGet(w, r)
- return
- case "/1/lb/dash":
- ch.dash(w, r)
- return
- }
-
- ch.proxy.ServeHTTP(w, r)
-}
-
-func (ch *chProxy) statsGet(w http.ResponseWriter, r *http.Request) {
- stats := ch.getStats()
-
- type st struct {
- Timestamp time.Time `json:"timestamp"`
- Throughput int `json:"tp"`
- Node string `json:"node"`
- Func string `json:"func"`
- Wait float64 `json:"wait"` // seconds
- }
- var sts []st
-
- // roll up and calculate throughput per second. idk why i hate myself
- aggs := make(map[string][]*stat)
- for _, s := range stats {
- key := s.node + "/" + s.fx
- if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
- sts = append(sts, st{
- Timestamp: t[0].timestamp,
- Throughput: len(t),
- Node: t[0].node,
- Func: t[0].fx,
- Wait: avgWait(t),
- })
-
- aggs[key] = append(aggs[key][:0], s)
- } else {
- aggs[key] = append(aggs[key], s)
- }
- }
-
- // leftovers
- for _, t := range aggs {
- sts = append(sts, st{
- Timestamp: t[0].timestamp,
- Throughput: len(t),
- Node: t[0].node,
- Func: t[0].fx,
- Wait: avgWait(t),
- })
- }
-
- json.NewEncoder(w).Encode(struct {
- Stats []st `json:"stats"`
- }{
- Stats: sts,
- })
-}
-
-func avgWait(stats []*stat) float64 {
- var sum time.Duration
- for _, s := range stats {
- sum += s.wait
- }
- return (sum / time.Duration(len(stats))).Seconds()
-}
-
-func (ch *chProxy) addNode(w http.ResponseWriter, r *http.Request) {
- var bod struct {
- Node string `json:"node"`
- }
- err := json.NewDecoder(r.Body).Decode(&bod)
- if err != nil {
- sendError(w, http.StatusBadRequest, err.Error())
- return
- }
-
- ch.ch.add(bod.Node)
- sendSuccess(w, "node added")
-}
-
-func (ch *chProxy) removeNode(w http.ResponseWriter, r *http.Request) {
- var bod struct {
- Node string `json:"node"`
- }
- err := json.NewDecoder(r.Body).Decode(&bod)
- if err != nil {
- sendError(w, http.StatusBadRequest, err.Error())
- return
- }
-
- ch.ch.remove(bod.Node)
- sendSuccess(w, "node deleted")
-}
-
-func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) {
- nodes := ch.ch.list()
- dead := ch.dead()
-
- out := make(map[string]string, len(nodes)+len(dead))
- for _, n := range nodes {
- if ch.isDead(n) {
- out[n] = "offline"
- } else {
- out[n] = "online"
- }
- }
-
- for _, n := range dead {
- out[n] = "offline"
- }
-
- sendValue(w, struct {
- Nodes map[string]string `json:"nodes"`
- }{
- Nodes: out,
- })
-}
-
-func (ch *chProxy) isDead(node string) bool {
- ch.RLock()
- val, ok := ch.ded[node]
- ch.RUnlock()
- return ok && val >= ch.hcUnhealthy
-}
-
-func (ch *chProxy) dead() []string {
- ch.RLock()
- defer ch.RUnlock()
- nodes := make([]string, 0, len(ch.ded))
- for n, val := range ch.ded {
- if val >= ch.hcUnhealthy {
- nodes = append(nodes, n)
- }
- }
- return nodes
-}
-
-var dashStr = `
-
-
-
-lb dash
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-`
-
-func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
- w.Write(dashPage)
-}
-
-func sendValue(w http.ResponseWriter, v interface{}) {
- err := json.NewEncoder(w).Encode(v)
-
- if err != nil {
- logrus.WithError(err).Error("error writing response response")
- }
-}
-
-func sendSuccess(w http.ResponseWriter, msg string) {
- err := json.NewEncoder(w).Encode(struct {
- Msg string `json:"msg"`
- }{
- Msg: msg,
- })
-
- if err != nil {
- logrus.WithError(err).Error("error writing response response")
- }
-}
-
-func sendError(w http.ResponseWriter, code int, msg string) {
- w.WriteHeader(code)
-
- err := json.NewEncoder(w).Encode(struct {
- Msg string `json:"msg"`
- }{
- Msg: msg,
- })
-
- if err != nil {
- logrus.WithError(err).Error("error writing response response")
- }
-}
diff --git a/fnlb/lb/allgrouper.go b/fnlb/lb/allgrouper.go
new file mode 100644
index 000000000..2f63d5d6c
--- /dev/null
+++ b/fnlb/lb/allgrouper.go
@@ -0,0 +1,243 @@
+package lb
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+)
+
+// NewAllGrouper returns a Grouper that will return the entire list of nodes
+// that are being maintained, regardless of key. An 'AllGrouper' will health
+// check servers at a specified interval, taking them in and out as they
+// pass/fail and exposes endpoints for adding, removing and listing nodes.
+func NewAllGrouper(conf Config) Grouper {
+ a := &allGrouper{
+ ded: make(map[string]int64),
+
+ // XXX (reed): need to be reconfigurable at some point
+ hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
+ hcEndpoint: conf.HealthcheckEndpoint,
+ hcUnhealthy: int64(conf.HealthcheckUnhealthy),
+ hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
+
+ // for health checks
+ httpClient: &http.Client{Transport: conf.Transport},
+ }
+ for _, n := range conf.Nodes {
+ a.add(n)
+ }
+ go a.healthcheck()
+ return a
+}
+
+// TODO
+type allGrouper struct {
+ // protects nodes & ded
+ sync.RWMutex
+ nodes []string
+ ded map[string]int64 // [node] -> failedCount
+
+ httpClient *http.Client
+
+ hcInterval time.Duration
+ hcEndpoint string
+ hcUnhealthy int64
+ hcTimeout time.Duration
+}
+
+func (a *allGrouper) add(newb string) {
+ a.Lock()
+ a.addNoLock(newb)
+ a.Unlock()
+}
+
+func (a *allGrouper) addNoLock(newb string) {
+ // filter dupes, under lock. sorted, so binary search
+ i := sort.SearchStrings(a.nodes, newb)
+ if i < len(a.nodes) && a.nodes[i] == newb {
+ return
+ }
+ a.nodes = append(a.nodes, newb)
+ // need to keep in sorted order so that hash index works across nodes
+ sort.Sort(sort.StringSlice(a.nodes))
+}
+
+func (a *allGrouper) remove(ded string) {
+ a.Lock()
+ a.removeNoLock(ded)
+ a.Unlock()
+}
+
+func (a *allGrouper) removeNoLock(ded string) {
+ i := sort.SearchStrings(a.nodes, ded)
+ if i < len(a.nodes) && a.nodes[i] == ded {
+ a.nodes = append(a.nodes[:i], a.nodes[i+1:]...)
+ }
+}
+
+// return a copy
+func (a *allGrouper) List(string) ([]string, error) {
+ a.RLock()
+ ret := make([]string, len(a.nodes))
+ copy(ret, a.nodes)
+ a.RUnlock()
+ var err error
+ if len(ret) == 0 {
+ err = ErrNoNodes
+ }
+ return ret, err
+}
+
+func (a *allGrouper) healthcheck() {
+ for range time.Tick(a.hcInterval) {
+ nodes, _ := a.List("")
+ nodes = append(nodes, a.dead()...)
+ for _, n := range nodes {
+ go a.ping(n)
+ }
+ }
+}
+
+func (a *allGrouper) ping(node string) {
+ req, _ := http.NewRequest("GET", "http://"+node+a.hcEndpoint, nil)
+ ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+
+ resp, err := a.httpClient.Do(req)
+ if resp != nil && resp.Body != nil {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }
+
+ if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
+ logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("health check failed")
+ a.fail(node)
+ } else {
+ a.alive(node)
+ }
+}
+
+func (a *allGrouper) fail(node string) {
+ // shouldn't be a hot path so shouldn't be too contended on since health
+ // checks are infrequent
+ a.Lock()
+ a.ded[node]++
+ failed := a.ded[node]
+ if failed >= a.hcUnhealthy {
+ a.removeNoLock(node)
+ }
+ a.Unlock()
+}
+
+func (a *allGrouper) alive(node string) {
+ a.RLock()
+ _, ok := a.ded[node]
+ a.RUnlock()
+ if ok {
+ a.Lock()
+ delete(a.ded, node)
+ a.addNoLock(node)
+ a.Unlock()
+ }
+}
+
+func (a *allGrouper) Wrap(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ // XXX (reed): probably do these on a separate port to avoid conflicts
+ case "/1/lb/nodes":
+ switch r.Method {
+ case "PUT":
+ a.addNode(w, r)
+ return
+ case "DELETE":
+ a.removeNode(w, r)
+ return
+ case "GET":
+ a.listNodes(w, r)
+ return
+ }
+ }
+
+ next.ServeHTTP(w, r)
+ })
+}
+
+func (a *allGrouper) addNode(w http.ResponseWriter, r *http.Request) {
+ var bod struct {
+ Node string `json:"node"`
+ }
+ err := json.NewDecoder(r.Body).Decode(&bod)
+ if err != nil {
+ sendError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ a.add(bod.Node)
+ sendSuccess(w, "node added")
+}
+
+func (a *allGrouper) removeNode(w http.ResponseWriter, r *http.Request) {
+ var bod struct {
+ Node string `json:"node"`
+ }
+ err := json.NewDecoder(r.Body).Decode(&bod)
+ if err != nil {
+ sendError(w, http.StatusBadRequest, err.Error())
+ return
+ }
+
+ a.remove(bod.Node)
+ sendSuccess(w, "node deleted")
+}
+
+func (a *allGrouper) listNodes(w http.ResponseWriter, r *http.Request) {
+ nodes, _ := a.List("")
+ dead := a.dead()
+
+ out := make(map[string]string, len(nodes)+len(dead))
+ for _, n := range nodes {
+ if a.isDead(n) {
+ out[n] = "offline"
+ } else {
+ out[n] = "online"
+ }
+ }
+
+ for _, n := range dead {
+ out[n] = "offline"
+ }
+
+ sendValue(w, struct {
+ Nodes map[string]string `json:"nodes"`
+ }{
+ Nodes: out,
+ })
+}
+
+func (a *allGrouper) isDead(node string) bool {
+ a.RLock()
+ val, ok := a.ded[node]
+ a.RUnlock()
+ return ok && val >= a.hcUnhealthy
+}
+
+func (a *allGrouper) dead() []string {
+ a.RLock()
+ defer a.RUnlock()
+ nodes := make([]string, 0, len(a.ded))
+ for n, val := range a.ded {
+ if val >= a.hcUnhealthy {
+ nodes = append(nodes, n)
+ }
+ }
+ return nodes
+}
diff --git a/fnlb/lb/ch.go b/fnlb/lb/ch.go
new file mode 100644
index 000000000..d3503ce59
--- /dev/null
+++ b/fnlb/lb/ch.go
@@ -0,0 +1,299 @@
+package lb
+
+import (
+ "encoding/json"
+ "math/rand"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/dchest/siphash"
+)
+
+func NewConsistentRouter(conf Config) Router {
+ return &chRouter{
+ rng: rand.New(&lockedSource{src: rand.NewSource(time.Now().Unix()).(rand.Source64)}),
+ load: make(map[string]*int64),
+ }
+}
+
+type chRouter struct {
+ // XXX (reed): right now this only supports one client basically ;) use some real stat backend
+ statsMu sync.Mutex
+ stats []*stat
+
+ loadMu sync.RWMutex
+ load map[string]*int64
+ rng *rand.Rand
+}
+
+type stat struct {
+ timestamp time.Time
+ latency time.Duration
+ node string
+ code int
+ fx string
+ wait time.Duration
+}
+
+func (ch *chRouter) addStat(s *stat) {
+ ch.statsMu.Lock()
+ // delete last 1 minute of data if nobody is watching
+ for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
+ ch.stats = ch.stats[:i]
+ }
+ ch.stats = append(ch.stats, s)
+ ch.statsMu.Unlock()
+}
+
+func (ch *chRouter) getStats() []*stat {
+ ch.statsMu.Lock()
+ stats := ch.stats
+ ch.stats = ch.stats[:0]
+ ch.statsMu.Unlock()
+
+ return stats
+}
+
+type lockedSource struct {
+ lk sync.Mutex
+ src rand.Source64
+}
+
+func (r *lockedSource) Int63() (n int64) {
+ r.lk.Lock()
+ n = r.src.Int63()
+ r.lk.Unlock()
+ return n
+}
+
+func (r *lockedSource) Uint64() (n uint64) {
+ r.lk.Lock()
+ n = r.src.Uint64()
+ r.lk.Unlock()
+ return n
+}
+
+func (r *lockedSource) Seed(seed int64) {
+ r.lk.Lock()
+ r.src.Seed(seed)
+ r.lk.Unlock()
+}
+
+// Route in this form relies on the nodes being in sorted order so
+// that the output will be consistent (yes, slightly unfortunate).
+func (ch *chRouter) Route(nodes []string, key string) (string, error) {
+ // crc not unique enough & sha is too slow, it's 1 import
+ sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
+
+ i := int(jumpConsistentHash(sum64, int32(len(nodes))))
+ return ch.besti(key, i, nodes)
+}
+
+func (ch *chRouter) InterceptResponse(req *http.Request, resp *http.Response) {
+ load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
+ // XXX (reed): we should prob clear this from user response?
+ // resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
+
+ // XXX (reed): need to validate these prob
+ ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
+
+ ch.addStat(&stat{
+ timestamp: time.Now(),
+ //latency: latency, // XXX (reed): plumb
+ node: req.URL.Host,
+ code: resp.StatusCode,
+ fx: req.URL.Path,
+ wait: load,
+ })
+}
+
+// A Fast, Minimal Memory, Consistent Hash Algorithm:
+// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
+func jumpConsistentHash(key uint64, num_buckets int32) int32 {
+ var b, j int64 = -1, 0
+ for j < int64(num_buckets) {
+ b = j
+ key = key*2862933555777941757 + 1
+ j = (b + 1) * int64((1<<31)/(key>>33)+1)
+ }
+ return int32(b)
+}
+
+// tracks last 10 samples (very fast)
+const DECAY = 0.1
+
+func ewma(old, new int64) int64 {
+ // TODO could 'warm' it up and drop first few samples since we'll have docker pulls / hot starts
+ return int64((float64(new) * DECAY) + (float64(old) * (1 - DECAY)))
+}
+
+func (ch *chRouter) setLoad(key string, load int64) {
+ ch.loadMu.RLock()
+ l, ok := ch.load[key]
+ ch.loadMu.RUnlock()
+ if ok {
+ // this is a lossy ewma w/ or w/o CAS but if things are moving fast we have plenty of sample
+ prev := atomic.LoadInt64(l)
+ atomic.StoreInt64(l, ewma(prev, load))
+ } else {
+ ch.loadMu.Lock()
+ if _, ok := ch.load[key]; !ok {
+ ch.load[key] = &load
+ }
+ ch.loadMu.Unlock()
+ }
+}
+
+func loadKey(node, key string) string {
+ return node + "\x00" + key
+}
+
+func (ch *chRouter) besti(key string, i int, nodes []string) (string, error) {
+ if len(nodes) < 1 {
+ // supposed to be caught in grouper, but double check
+ return "", ErrNoNodes
+ }
+
+ // XXX (reed): trash the closure
+ f := func(n string) string {
+ var load time.Duration
+ ch.loadMu.RLock()
+ loadPtr := ch.load[loadKey(n, key)]
+ ch.loadMu.RUnlock()
+ if loadPtr != nil {
+ load = time.Duration(atomic.LoadInt64(loadPtr))
+ }
+
+ const (
+ // TODO we should probably use deltas rather than fixed wait times. for 'cold'
+ // functions these could always trigger. i.e. if wait time increased 5x over last
+ // 100 data points, point the cannon elsewhere (we'd have to track 2 numbers but meh)
+ lowerLat = 500 * time.Millisecond
+ upperLat = 2 * time.Second
+ )
+
+ // TODO flesh out these values.
+ // if we send < 50% of traffic off to other nodes when loaded
+ // then as function scales nodes will get flooded, need to be careful.
+ //
+ // back off loaded node/function combos slightly to spread load
+ if load < lowerLat {
+ return n
+ } else if load > upperLat {
+ // really loaded
+ if ch.rng.Intn(100) < 10 { // XXX (reed): 10% could be problematic, should sliding scale prob with log(x) ?
+ return n
+ }
+ } else {
+ // 10 < x < 40, as load approaches upperLat, x decreases [linearly]
+ x := translate(int64(load), int64(lowerLat), int64(upperLat), 10, 40)
+ if ch.rng.Intn(100) < x {
+ return n
+ }
+ }
+
+ // return invalid node to try next node
+ return ""
+ }
+
+ for ; ; i++ {
+ // theoretically this could take infinite time, but practically improbable...
+ node := f(nodes[i])
+ if node != "" {
+ return node, nil
+ } else if i == len(nodes)-1 {
+ i = -1 // reset i to 0
+ }
+ }
+
+ // TODO we need a way to add a node for a given key from down here if a node is overloaded.
+
+ panic("strange things are afoot at the circle k")
+}
+
+func translate(val, inFrom, inTo, outFrom, outTo int64) int {
+ outRange := outTo - outFrom
+ inRange := inTo - inFrom
+ inVal := val - inFrom
+ // we want the number to be lower as intensity increases
+ return int(float64(outTo) - (float64(inVal)/float64(inRange))*float64(outRange))
+}
+
+func (ch *chRouter) Wrap(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ // XXX (reed): probably do these on a separate port to avoid conflicts
+ case "/1/lb/stats":
+ ch.statsGet(w, r)
+ return
+ case "/1/lb/dash":
+ ch.dash(w, r)
+ return
+ }
+
+ next.ServeHTTP(w, r)
+ })
+}
+
+func (ch *chRouter) statsGet(w http.ResponseWriter, r *http.Request) {
+ stats := ch.getStats()
+
+ type st struct {
+ Timestamp time.Time `json:"timestamp"`
+ Throughput int `json:"tp"`
+ Node string `json:"node"`
+ Func string `json:"func"`
+ Wait float64 `json:"wait"` // seconds
+ }
+ var sts []st
+
+ // roll up and calculate throughput per second. idk why i hate myself
+ aggs := make(map[string][]*stat)
+ for _, s := range stats {
+ key := s.node + "/" + s.fx
+ if t := aggs[key]; len(t) > 0 && t[0].timestamp.Before(s.timestamp.Add(-1*time.Second)) {
+ sts = append(sts, st{
+ Timestamp: t[0].timestamp,
+ Throughput: len(t),
+ Node: t[0].node,
+ Func: t[0].fx,
+ Wait: avgWait(t),
+ })
+
+ aggs[key] = append(aggs[key][:0], s)
+ } else {
+ aggs[key] = append(aggs[key], s)
+ }
+ }
+
+ // leftovers
+ for _, t := range aggs {
+ sts = append(sts, st{
+ Timestamp: t[0].timestamp,
+ Throughput: len(t),
+ Node: t[0].node,
+ Func: t[0].fx,
+ Wait: avgWait(t),
+ })
+ }
+
+ json.NewEncoder(w).Encode(struct {
+ Stats []st `json:"stats"`
+ }{
+ Stats: sts,
+ })
+}
+
+func avgWait(stats []*stat) float64 {
+ var sum time.Duration
+ for _, s := range stats {
+ sum += s.wait
+ }
+ return (sum / time.Duration(len(stats))).Seconds()
+}
+
+func (ch *chRouter) dash(w http.ResponseWriter, r *http.Request) {
+ w.Write([]byte(dashPage))
+}
diff --git a/fnlb/dash.js b/fnlb/lb/dash.go
similarity index 84%
rename from fnlb/dash.js
rename to fnlb/lb/dash.go
index b40707285..3e8397484 100644
--- a/fnlb/dash.js
+++ b/fnlb/lb/dash.go
@@ -1,4 +1,15 @@
+package lb
+const dashPage = `
+
+
+
+lb dash
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+`
diff --git a/fnlb/lb/proxy.go b/fnlb/lb/proxy.go
new file mode 100644
index 000000000..8a281a3d3
--- /dev/null
+++ b/fnlb/lb/proxy.go
@@ -0,0 +1,147 @@
+package lb
+
+import (
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/http/httputil"
+ "sync"
+
+ "github.com/Sirupsen/logrus"
+)
+
+// TODO the load balancers all need to have the same list of nodes. gossip?
+// also gossip would handle failure detection instead of elb style. or it can
+// be pluggable and then we can read from where bmc is storing them and use that
+// or some OSS alternative
+
+// TODO when node goes offline should try to redirect request instead of 5xxing
+
+// TODO we could add some kind of pre-warming call to the functions server where
+// the lb could send an image to it to download before the lb starts sending traffic
+// there, otherwise when load starts expanding a few functions are going to eat
+// the pull time
+
+// TODO config
+// TODO TLS
+
+type Config struct {
+ Listen string `json:"port"`
+ Nodes []string `json:"nodes"`
+ HealthcheckInterval int `json:"healthcheck_interval"`
+ HealthcheckEndpoint string `json:"healthcheck_endpoint"`
+ HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
+ HealthcheckTimeout int `json:"healthcheck_timeout"`
+
+ Transport *http.Transport
+}
+
+type Grouper interface {
+ // List returns a set of hosts that may be used to route a request
+ // for a given key.
+ List(key string) ([]string, error)
+
+ // Wrap allows adding middleware to the provided http.Handler.
+ Wrap(http.Handler) http.Handler
+}
+
+type Router interface {
+ // TODO we could probably expose this just as some kind of http.RoundTripper
+ // but I can't think of anything elegant so here this is.
+
+ // Route will pick a node from the given set of nodes.
+ Route(nodes []string, key string) (string, error)
+
+ // InterceptResponse allows a Router to extract information from proxied
+ // requests so that it might do a better job next time. InterceptResponse
+ // should not modify the Response as it has already been received nore the
+ // Request, having already been sent.
+ InterceptResponse(req *http.Request, resp *http.Response)
+
+ // Wrap allows adding middleware to the provided http.Handler.
+ Wrap(http.Handler) http.Handler
+}
+
+// KeyFunc maps a request to a shard key, it may return an error
+// if there are issues locating the shard key.
+type KeyFunc func(req *http.Request) (string, error)
+
+type proxy struct {
+ keyFunc KeyFunc
+ grouper Grouper
+ router Router
+
+ transport http.RoundTripper
+
+ // embed for lazy ServeHTTP mostly
+ *httputil.ReverseProxy
+}
+
+// NewProxy will marry the given parameters into an able proxy.
+func NewProxy(keyFunc KeyFunc, g Grouper, r Router, conf Config) http.Handler {
+ p := new(proxy)
+ *p = proxy{
+ keyFunc: keyFunc,
+ grouper: g,
+ router: r,
+ transport: conf.Transport,
+ ReverseProxy: &httputil.ReverseProxy{
+ Director: func(*http.Request) { /* in RoundTrip so we can error out */ },
+ Transport: p,
+ BufferPool: newBufferPool(),
+ },
+ }
+
+ return p
+}
+
+type bufferPool struct {
+ bufs *sync.Pool
+}
+
+func newBufferPool() httputil.BufferPool {
+ return &bufferPool{
+ bufs: &sync.Pool{
+ // 32KB is what the proxy would've used without recycling them
+ New: func() interface{} { return make([]byte, 32*1024) },
+ },
+ }
+}
+
+func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
+func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
+
+func (p *proxy) RoundTrip(req *http.Request) (*http.Response, error) {
+ target, err := p.route(req)
+ if err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
+ if req.Body != nil {
+ io.Copy(ioutil.Discard, req.Body)
+ req.Body.Close()
+ }
+ // XXX (reed): if we let the proxy code write the response it will be body-less. ok?
+ return nil, ErrNoNodes
+ }
+
+ req.URL.Scheme = "http" // XXX (reed): h2 support
+ req.URL.Host = target
+
+ resp, err := p.transport.RoundTrip(req)
+ if err == nil {
+ p.router.InterceptResponse(req, resp)
+ }
+ return resp, err
+}
+
+func (p *proxy) route(req *http.Request) (string, error) {
+ // TODO errors from this func likely could return 401 or so instead of 503 always
+ key, err := p.keyFunc(req)
+ if err != nil {
+ return "", err
+ }
+ list, err := p.grouper.List(key)
+ if err != nil {
+ return "", err
+ }
+ return p.router.Route(list, key)
+}
diff --git a/fnlb/lb/util.go b/fnlb/lb/util.go
new file mode 100644
index 000000000..9c339609d
--- /dev/null
+++ b/fnlb/lb/util.go
@@ -0,0 +1,47 @@
+package lb
+
+import (
+ "encoding/json"
+ "errors"
+ "net/http"
+
+ "github.com/Sirupsen/logrus"
+)
+
+var (
+ ErrNoNodes = errors.New("no nodes available")
+)
+
+func sendValue(w http.ResponseWriter, v interface{}) {
+ err := json.NewEncoder(w).Encode(v)
+
+ if err != nil {
+ logrus.WithError(err).Error("error writing response response")
+ }
+}
+
+func sendSuccess(w http.ResponseWriter, msg string) {
+ err := json.NewEncoder(w).Encode(struct {
+ Msg string `json:"msg"`
+ }{
+ Msg: msg,
+ })
+
+ if err != nil {
+ logrus.WithError(err).Error("error writing response response")
+ }
+}
+
+func sendError(w http.ResponseWriter, code int, msg string) {
+ w.WriteHeader(code)
+
+ err := json.NewEncoder(w).Encode(struct {
+ Msg string `json:"msg"`
+ }{
+ Msg: msg,
+ })
+
+ if err != nil {
+ logrus.WithError(err).Error("error writing response response")
+ }
+}
diff --git a/fnlb/lb_test.go b/fnlb/lb_test.go
deleted file mode 100644
index 9b3feef0f..000000000
--- a/fnlb/lb_test.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package main
-
-import "testing"
-
-func TestCHAdd(t *testing.T) {
- var ch consistentHash
- nodes := []string{"1", "2", "3"}
- for _, n := range nodes {
- ch.add(n)
- }
-
- if len(ch.nodes) != 3 {
- t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
- }
-
- // test dupes don't get added
- for _, n := range nodes {
- ch.add(n)
- }
-
- if len(ch.nodes) != 3 {
- t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
- }
-}
-
-func TestCHRemove(t *testing.T) {
- var ch consistentHash
- nodes := []string{"1", "2", "3"}
- for _, n := range nodes {
- ch.add(n)
- }
-
- if len(ch.nodes) != 3 {
- t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
- }
-
- ch.remove("4")
-
- if len(ch.nodes) != 3 {
- t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
- }
-
- ch.remove("3")
-
- if len(ch.nodes) != 2 {
- t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
- }
-
- ch.remove("3")
-
- if len(ch.nodes) != 2 {
- t.Fatal("nodes list should be len of 2, got:", len(ch.nodes))
- }
-}
-
-func TestCHGet(t *testing.T) {
- var ch consistentHash
- nodes := []string{"1", "2", "3"}
- for _, n := range nodes {
- ch.add(n)
- }
-
- if len(ch.nodes) != 3 {
- t.Fatal("nodes list should be len of 3, got:", len(ch.nodes))
- }
-
- keys := []string{"a", "b", "c"}
- for _, k := range keys {
- _, err := ch.get(k)
- if err != nil {
- t.Fatal("CHGet returned an error: ", err)
- }
- // testing this doesn't panic basically? could test distro but meh
- }
-}
diff --git a/fnlb/main.go b/fnlb/main.go
new file mode 100644
index 000000000..86670d9cc
--- /dev/null
+++ b/fnlb/main.go
@@ -0,0 +1,81 @@
+package main
+
+import (
+ "context"
+ "crypto/tls"
+ "flag"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+ "gitlab-odx.oracle.com/odx/functions/fnlb/lb"
+)
+
+func main() {
+ // XXX (reed): normalize
+ fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
+
+ var conf lb.Config
+ flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
+ flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
+ flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
+ flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
+ flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
+ flag.Parse()
+
+ conf.Nodes = strings.Split(*fnodes, ",")
+
+ conf.Transport = &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ Dial: (&net.Dialer{
+ Timeout: 10 * time.Second,
+ KeepAlive: 120 * time.Second,
+ }).Dial,
+ MaxIdleConnsPerHost: 512,
+ TLSHandshakeTimeout: 10 * time.Second,
+ TLSClientConfig: &tls.Config{
+ ClientSessionCache: tls.NewLRUClientSessionCache(4096),
+ },
+ }
+
+ g := lb.NewAllGrouper(conf)
+ r := lb.NewConsistentRouter(conf)
+ k := func(r *http.Request) (string, error) {
+ return r.URL.Path, nil
+ }
+
+ h := lb.NewProxy(k, g, r, conf)
+ h = g.Wrap(h) // add/del/list endpoints
+ h = r.Wrap(h) // stats / dash endpoint
+
+ err := serve(conf.Listen, h)
+ if err != nil {
+ logrus.WithError(err).Error("server error")
+ }
+}
+
+func serve(addr string, handler http.Handler) error {
+ server := &http.Server{Addr: addr, Handler: handler}
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ defer wg.Wait()
+
+ ch := make(chan os.Signal, 1)
+ signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
+ go func() {
+ defer wg.Done()
+ for sig := range ch {
+ logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
+ server.Shutdown(context.Background()) // safe shutdown
+ return
+ }
+ }()
+ return server.ListenAndServe()
+}
diff --git a/fnlb/proxy.go b/fnlb/proxy.go
deleted file mode 100644
index 47e88237a..000000000
--- a/fnlb/proxy.go
+++ /dev/null
@@ -1,223 +0,0 @@
-package main
-
-import (
- "context"
- "crypto/tls"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/http/httputil"
- "sync"
- "time"
-
- "github.com/Sirupsen/logrus"
-)
-
-type chProxy struct {
- ch *consistentHash
-
- sync.RWMutex
- // TODO map[string][]time.Time
- ded map[string]int64
-
- hcInterval time.Duration
- hcEndpoint string
- hcUnhealthy int64
- hcTimeout time.Duration
-
- // XXX (reed): right now this only supports one client basically ;) use some real stat backend
- statsMu sync.Mutex
- stats []*stat
-
- proxy *httputil.ReverseProxy
- httpClient *http.Client
- transport http.RoundTripper
-}
-
-type stat struct {
- timestamp time.Time
- latency time.Duration
- node string
- code int
- fx string
- wait time.Duration
-}
-
-func (ch *chProxy) addStat(s *stat) {
- ch.statsMu.Lock()
- // delete last 1 minute of data if nobody is watching
- for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
- ch.stats = ch.stats[:i]
- }
- ch.stats = append(ch.stats, s)
- ch.statsMu.Unlock()
-}
-
-func (ch *chProxy) getStats() []*stat {
- ch.statsMu.Lock()
- stats := ch.stats
- ch.stats = ch.stats[:0]
- ch.statsMu.Unlock()
-
- return stats
-}
-
-func newProxy(conf config) *chProxy {
- tranny := &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- Dial: (&net.Dialer{
- Timeout: 10 * time.Second,
- KeepAlive: 120 * time.Second,
- }).Dial,
- MaxIdleConnsPerHost: 512,
- TLSHandshakeTimeout: 10 * time.Second,
- TLSClientConfig: &tls.Config{
- ClientSessionCache: tls.NewLRUClientSessionCache(4096),
- },
- }
-
- ch := &chProxy{
- ded: make(map[string]int64),
-
- // XXX (reed): need to be reconfigurable at some point
- hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
- hcEndpoint: conf.HealthcheckEndpoint,
- hcUnhealthy: int64(conf.HealthcheckUnhealthy),
- hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
- httpClient: &http.Client{Transport: tranny},
- transport: tranny,
-
- ch: newCH(),
- }
-
- director := func(req *http.Request) {
- target, err := ch.ch.get(req.URL.Path)
- if err != nil {
- logrus.WithError(err).WithFields(logrus.Fields{"url": req.URL.Path}).Error("getting index failed")
- target = "error"
- }
-
- req.URL.Scheme = "http" // XXX (reed): h2 support
- req.URL.Host = target
- }
-
- ch.proxy = &httputil.ReverseProxy{
- Director: director,
- Transport: ch,
- BufferPool: newBufferPool(),
- }
-
- for _, n := range conf.Nodes {
- ch.ch.add(n)
- }
- go ch.healthcheck()
- return ch
-}
-
-type bufferPool struct {
- bufs *sync.Pool
-}
-
-func newBufferPool() httputil.BufferPool {
- return &bufferPool{
- bufs: &sync.Pool{
- // 32KB is what the proxy would've used without recycling them
- New: func() interface{} { return make([]byte, 32*1024) },
- },
- }
-}
-
-func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
-func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
-
-func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
- if req != nil && req.URL.Host == "error" {
- if req.Body != nil {
- io.Copy(ioutil.Discard, req.Body)
- req.Body.Close()
- }
- // XXX (reed): if we let the proxy code write the response it will be body-less. ok?
- return nil, ErrNoNodes
- }
-
- then := time.Now()
- resp, err := ch.transport.RoundTrip(req)
- if err == nil {
- ch.intercept(req, resp, time.Since(then))
- }
- return resp, err
-}
-
-func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
- load, _ := time.ParseDuration(resp.Header.Get("XXX-FXLB-WAIT"))
- // XXX (reed): we should prob clear this from user response?
- // resp.Header.Del("XXX-FXLB-WAIT") // don't show this to user
-
- // XXX (reed): need to validate these prob
- ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
-
- ch.addStat(&stat{
- timestamp: time.Now(),
- latency: latency,
- node: req.URL.Host,
- code: resp.StatusCode,
- fx: req.URL.Path,
- wait: load,
- })
-}
-
-func (ch *chProxy) healthcheck() {
- for range time.Tick(ch.hcInterval) {
- nodes := ch.ch.list()
- nodes = append(nodes, ch.dead()...)
- for _, n := range nodes {
- go ch.ping(n)
- }
- }
-}
-
-func (ch *chProxy) ping(node string) {
- req, _ := http.NewRequest("GET", "http://"+node+ch.hcEndpoint, nil)
- ctx, cancel := context.WithTimeout(context.Background(), ch.hcTimeout)
- defer cancel()
- req = req.WithContext(ctx)
-
- resp, err := ch.httpClient.Do(req)
- if resp != nil && resp.Body != nil {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }
-
- if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
- logrus.WithFields(logrus.Fields{"node": node}).Error("health check failed")
- ch.fail(node)
- } else {
- ch.alive(node)
- }
-}
-
-func (ch *chProxy) fail(node string) {
- // shouldn't be a hot path so shouldn't be too contended on since health
- // checks are infrequent
- ch.Lock()
- ch.ded[node]++
- failed := ch.ded[node]
- ch.Unlock()
-
- if failed >= ch.hcUnhealthy {
- ch.ch.remove(node) // TODO under lock?
- }
-}
-
-func (ch *chProxy) alive(node string) {
- ch.RLock()
- _, ok := ch.ded[node]
- ch.RUnlock()
- if ok {
- ch.Lock()
- delete(ch.ded, node)
- ch.Unlock()
- ch.ch.add(node) // TODO under lock?
- }
-}
diff --git a/images/dind/Dockerfile b/images/dind/Dockerfile
index b5e4db4f1..ca07e2b5e 100644
--- a/images/dind/Dockerfile
+++ b/images/dind/Dockerfile
@@ -1,11 +1,13 @@
-FROM docker:1.13.1-dind
+FROM docker:edge-dind
-RUN apk update && apk upgrade && apk add --no-cache ca-certificates
+RUN apk add --no-cache ca-certificates
-COPY entrypoint.sh /usr/local/bin/
-COPY dind.sh /usr/local/bin/
+# cleanup warning: https://github.com/docker-library/docker/issues/55
+RUN addgroup -g 2999 docker
-ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
+COPY preentry.sh /usr/local/bin/
-# USAGE: Add a CMD to your own Dockerfile to use this (NOT an ENTRYPOINT, so that this is called)
+ENTRYPOINT ["preentry.sh"]
+
+# USAGE: Add a CMD to your own Dockerfile to use this (NOT an ENTRYPOINT), eg:
# CMD ["./runner"]
diff --git a/images/dind/README.md b/images/dind/README.md
index 227bacbb5..a264cd8e1 100644
--- a/images/dind/README.md
+++ b/images/dind/README.md
@@ -1,2 +1,4 @@
-This is the base image for all Titan's docker-in-docker images.
+This is the base image for all docker-in-docker images.
+The difference between this and the official `docker` images are that this will choose the best
+filesystem automatically. The official ones use `vfs` (bad) by default unless you pass in a flag.
diff --git a/images/dind/build.sh b/images/dind/build.sh
index d15958d29..bbbe4ce4f 100755
--- a/images/dind/build.sh
+++ b/images/dind/build.sh
@@ -1,3 +1,3 @@
set -ex
-docker build -t treeder/dind:latest .
+docker build --build-arg HTTP_PROXY -t funcy/dind:latest .
diff --git a/images/dind/dind.sh b/images/dind/dind.sh
deleted file mode 100755
index db3a15b9f..000000000
--- a/images/dind/dind.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/sh
-
-set -ex
-# modified from: https://github.com/docker-library/docker/blob/866c3fbd87e8eeed524fdf19ba2d63288ad49cd2/1.11/dind/dockerd-entrypoint.sh
-# this will run either overlay or aufs as the docker fs driver, if the OS has both, overlay is preferred.
-# rewrite overlay to use overlay2 (docker 1.12, linux >=4.x required), see https://docs.docker.com/engine/userguide/storagedriver/selectadriver/#overlay-vs-overlay2
-
-fsdriver=$(grep -Eh -w -m1 "overlay|aufs" /proc/filesystems | cut -f2)
-
-if [ $fsdriver == "overlay" ]; then
- fsdriver="overlay2"
-fi
-
-cmd="dockerd \
- --host=unix:///var/run/docker.sock \
- --host=tcp://0.0.0.0:2375 \
- --storage-driver=$fsdriver"
-
-# nanny and restart on crashes
-until eval $cmd; do
- echo "Docker crashed with exit code $?. Respawning.." >&2
- # if we just restart it won't work, so start it (it wedges up) and
- # then kill the wedgie and restart it again and ta da... yea, seriously
- pidfile=/var/run/docker/libcontainerd/docker-containerd.pid
- kill -9 $(cat $pidfile)
- rm $pidfile
- sleep 1
-done
diff --git a/images/dind/entrypoint.sh b/images/dind/entrypoint.sh
deleted file mode 100755
index b05bbe475..000000000
--- a/images/dind/entrypoint.sh
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-/usr/local/bin/dind.sh &
-
-# wait for daemon to initialize
-sleep 3
-
-exec "$@"
diff --git a/images/dind/preentry.sh b/images/dind/preentry.sh
new file mode 100755
index 000000000..64d60d19e
--- /dev/null
+++ b/images/dind/preentry.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+set -e
+
+fsdriver=$(grep -Eh -w -m1 "overlay|aufs" /proc/filesystems | cut -f2)
+if [ $fsdriver == "overlay" ]; then
+ fsdriver="overlay2"
+fi
+
+dockerd-entrypoint.sh --storage-driver=$fsdriver &
+
+# give docker a few seconds
+sleep 3
+
+exec "$@"
diff --git a/images/dind/release.sh b/images/dind/release.sh
index 8812e7f0c..14ceaed0a 100755
--- a/images/dind/release.sh
+++ b/images/dind/release.sh
@@ -6,7 +6,7 @@ docker run --rm -v "$PWD":/app treeder/bump patch
version=`cat VERSION`
echo "version $version"
-docker tag treeder/dind:latest treeder/dind:$version
+docker tag funcy/dind:latest funcy/dind:$version
-docker push treeder/dind:latest
-docker push treeder/dind:$version
+docker push funcy/dind:latest
+docker push funcy/dind:$version
diff --git a/release.sh b/release.sh
index 04b6a5e4e..9a87d0f0f 100755
--- a/release.sh
+++ b/release.sh
@@ -1,7 +1,7 @@
#!/bin/bash
set -ex
-user="treeder"
+user="funcy"
service="functions"
tag="latest"
@@ -34,11 +34,10 @@ git tag -f -a "$version" -m "version $version"
git push
git push origin $version
-# TODO: Where to push these?
# Finally tag and push docker images
-# docker tag $user/$service:$tag $user/$service:$version
-# docker push $user/$service:$version
-# docker push $user/$service:$tag
+docker tag $user/$service:$tag $user/$service:$version
+docker push $user/$service:$version
+docker push $user/$service:$tag
cd fn
./release.sh $version