mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
chop up da files
This commit is contained in:
166
lb/ch.go
Normal file
166
lb/ch.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/dchest/siphash"
|
||||
)
|
||||
|
||||
// consistentHash will maintain a list of strings which can be accessed by
|
||||
// keying them with a separate group of strings
|
||||
type consistentHash struct {
|
||||
// protects nodes
|
||||
sync.RWMutex
|
||||
nodes []string
|
||||
|
||||
loadMu sync.RWMutex
|
||||
load map[string]*int64
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
func newCH() *consistentHash {
|
||||
return &consistentHash{
|
||||
rng: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
load: make(map[string]*int64),
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *consistentHash) add(newb string) {
|
||||
ch.Lock()
|
||||
defer ch.Unlock()
|
||||
|
||||
// filter dupes, under lock. sorted, so binary search
|
||||
i := sort.SearchStrings(ch.nodes, newb)
|
||||
if i < len(ch.nodes) && ch.nodes[i] == newb {
|
||||
return
|
||||
}
|
||||
ch.nodes = append(ch.nodes, newb)
|
||||
// need to keep in sorted order so that hash index works across nodes
|
||||
sort.Sort(sort.StringSlice(ch.nodes))
|
||||
}
|
||||
|
||||
func (ch *consistentHash) remove(ded string) {
|
||||
ch.Lock()
|
||||
i := sort.SearchStrings(ch.nodes, ded)
|
||||
if i < len(ch.nodes) && ch.nodes[i] == ded {
|
||||
ch.nodes = append(ch.nodes[:i], ch.nodes[i+1:]...)
|
||||
}
|
||||
ch.Unlock()
|
||||
}
|
||||
|
||||
// return a copy
|
||||
func (ch *consistentHash) list() []string {
|
||||
ch.RLock()
|
||||
ret := make([]string, len(ch.nodes))
|
||||
copy(ret, ch.nodes)
|
||||
ch.RUnlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ch *consistentHash) get(key string) (string, error) {
|
||||
// crc not unique enough & sha is too slow, it's 1 import
|
||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
i := int(jumpConsistentHash(sum64, int32(len(ch.nodes))))
|
||||
return ch.besti(key, i)
|
||||
}
|
||||
|
||||
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
||||
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
||||
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
||||
var b, j int64 = -1, 0
|
||||
for j < int64(num_buckets) {
|
||||
b = j
|
||||
key = key*2862933555777941757 + 1
|
||||
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
||||
}
|
||||
return int32(b)
|
||||
}
|
||||
|
||||
func (ch *consistentHash) setLoad(key string, load int64) {
|
||||
ch.loadMu.RLock()
|
||||
l, ok := ch.load[key]
|
||||
ch.loadMu.RUnlock()
|
||||
if ok {
|
||||
atomic.StoreInt64(l, load)
|
||||
} else {
|
||||
ch.loadMu.Lock()
|
||||
if _, ok := ch.load[key]; !ok {
|
||||
ch.load[key] = &load
|
||||
}
|
||||
ch.loadMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoNodes = errors.New("no nodes available")
|
||||
)
|
||||
|
||||
func loadKey(node, key string) string {
|
||||
return node + "\x00" + key
|
||||
}
|
||||
|
||||
// XXX (reed): push down fails / load into ch
|
||||
func (ch *consistentHash) besti(key string, i int) (string, error) {
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
|
||||
if len(ch.nodes) < 1 {
|
||||
return "", ErrNoNodes
|
||||
}
|
||||
|
||||
f := func(n string) string {
|
||||
var load int64
|
||||
ch.loadMu.RLock()
|
||||
loadPtr := ch.load[loadKey(n, key)]
|
||||
ch.loadMu.RUnlock()
|
||||
if loadPtr != nil {
|
||||
load = atomic.LoadInt64(loadPtr)
|
||||
}
|
||||
|
||||
// TODO flesh out these values. should be wait times.
|
||||
// if we send < 50% of traffic off to other nodes when loaded
|
||||
// then as function scales nodes will get flooded, need to be careful.
|
||||
//
|
||||
// back off loaded node/function combos slightly to spread load
|
||||
// TODO do we need a kind of ref counter as well so as to send functions
|
||||
// to a different node while there's an outstanding call to another?
|
||||
if load < 70 {
|
||||
return n
|
||||
} else if load > 90 {
|
||||
if ch.rng.Intn(100) < 60 {
|
||||
return n
|
||||
}
|
||||
} else if load > 70 {
|
||||
if ch.rng.Float64() < 80 {
|
||||
return n
|
||||
}
|
||||
}
|
||||
// otherwise loop until we find a sufficiently unloaded node or a lucky coin flip
|
||||
return ""
|
||||
}
|
||||
|
||||
for _, n := range ch.nodes[i:] {
|
||||
node := f(n)
|
||||
if node != "" {
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
// try the other half of the ring
|
||||
for _, n := range ch.nodes[:i] {
|
||||
node := f(n)
|
||||
if node != "" {
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", ErrNoNodes
|
||||
}
|
||||
414
lb/lb.go
414
lb/lb.go
@@ -1,27 +1,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/dchest/siphash"
|
||||
)
|
||||
|
||||
// TODO: consistent hashing is nice to get a cheap way to place nodes but it
|
||||
@@ -75,208 +63,16 @@ type config struct {
|
||||
HealthcheckTimeout int `json:"healthcheck_timeout"`
|
||||
}
|
||||
|
||||
type chProxy struct {
|
||||
ch *consistentHash
|
||||
// XXX (reed): clean up mess
|
||||
var dashPage []byte
|
||||
|
||||
sync.RWMutex
|
||||
// TODO map[string][]time.Time
|
||||
ded map[string]int64
|
||||
|
||||
hcInterval time.Duration
|
||||
hcEndpoint string
|
||||
hcUnhealthy int64
|
||||
hcTimeout time.Duration
|
||||
|
||||
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
|
||||
statsMu sync.Mutex
|
||||
stats []*stat
|
||||
|
||||
proxy *httputil.ReverseProxy
|
||||
httpClient *http.Client
|
||||
transport http.RoundTripper
|
||||
}
|
||||
|
||||
// TODO should prob track per f(x) per node
|
||||
type stat struct {
|
||||
timestamp time.Time
|
||||
latency time.Duration
|
||||
node string
|
||||
code int
|
||||
}
|
||||
|
||||
func (ch *chProxy) addStat(s *stat) {
|
||||
ch.statsMu.Lock()
|
||||
// delete last 1 minute of data if nobody is watching
|
||||
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
|
||||
ch.stats = ch.stats[:i]
|
||||
}
|
||||
ch.stats = append(ch.stats, s)
|
||||
ch.statsMu.Unlock()
|
||||
}
|
||||
|
||||
func (ch *chProxy) getStats() []*stat {
|
||||
ch.statsMu.Lock()
|
||||
stats := ch.stats
|
||||
ch.stats = ch.stats[:0]
|
||||
ch.statsMu.Unlock()
|
||||
|
||||
// XXX (reed): down sample to per second
|
||||
return stats
|
||||
}
|
||||
|
||||
func newProxy(conf config) *chProxy {
|
||||
tranny := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 120 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: 512,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: &tls.Config{
|
||||
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
||||
},
|
||||
func init() {
|
||||
jsb, err := ioutil.ReadFile("dash.js")
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatal("couldn't open dash.js file")
|
||||
}
|
||||
|
||||
ch := &chProxy{
|
||||
ded: make(map[string]int64),
|
||||
|
||||
// XXX (reed): need to be reconfigurable at some point
|
||||
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
||||
hcEndpoint: conf.HealthcheckEndpoint,
|
||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||
httpClient: &http.Client{Transport: tranny},
|
||||
transport: tranny,
|
||||
|
||||
ch: newCH(),
|
||||
}
|
||||
|
||||
director := func(req *http.Request) {
|
||||
target, err := ch.ch.get(req.URL.Path)
|
||||
if err != nil {
|
||||
target = "error"
|
||||
}
|
||||
|
||||
req.URL.Scheme = "http" // XXX (reed): h2 support
|
||||
req.URL.Host = target
|
||||
}
|
||||
|
||||
ch.proxy = &httputil.ReverseProxy{
|
||||
Director: director,
|
||||
Transport: ch,
|
||||
BufferPool: newBufferPool(),
|
||||
}
|
||||
|
||||
for _, n := range conf.Nodes {
|
||||
// XXX (reed): need to health check these
|
||||
ch.ch.add(n)
|
||||
}
|
||||
go ch.healthcheck()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
if req.URL.Host == "error" {
|
||||
io.Copy(ioutil.Discard, req.Body)
|
||||
req.Body.Close()
|
||||
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
|
||||
return nil, ErrNoNodes
|
||||
}
|
||||
|
||||
then := time.Now()
|
||||
resp, err := ch.transport.RoundTrip(req)
|
||||
if err == nil {
|
||||
ch.intercept(req, resp, time.Since(then))
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
|
||||
// XXX (reed): give f(x) nodes ability to send back wait time in response
|
||||
// XXX (reed): we should prob clear this from user response
|
||||
load, _ := strconv.Atoi(resp.Header.Get("XXX-FXLB-WAIT"))
|
||||
// XXX (reed): need to validate these prob
|
||||
ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
|
||||
|
||||
ch.addStat(&stat{
|
||||
timestamp: time.Now(),
|
||||
latency: latency,
|
||||
node: req.URL.Host,
|
||||
code: resp.StatusCode,
|
||||
// XXX (reed): function
|
||||
})
|
||||
}
|
||||
|
||||
type bufferPool struct {
|
||||
bufs *sync.Pool
|
||||
}
|
||||
|
||||
func newBufferPool() httputil.BufferPool {
|
||||
return &bufferPool{
|
||||
bufs: &sync.Pool{
|
||||
New: func() interface{} { return make([]byte, 32*1024) },
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
|
||||
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
|
||||
|
||||
func (ch *chProxy) healthcheck() {
|
||||
for range time.Tick(ch.hcInterval) {
|
||||
nodes := ch.ch.list()
|
||||
nodes = append(nodes, ch.dead()...)
|
||||
// XXX (reed): need to figure out elegant adding / removing better
|
||||
for _, n := range nodes {
|
||||
go ch.ping(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) ping(node string) {
|
||||
req, _ := http.NewRequest("GET", "http://"+node+ch.hcEndpoint, nil)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ch.hcTimeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := ch.httpClient.Do(req)
|
||||
if resp != nil && resp.Body != nil {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
logrus.WithFields(logrus.Fields{"node": node}).Error("health check failed")
|
||||
ch.fail(node)
|
||||
} else {
|
||||
ch.alive(node)
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) fail(node string) {
|
||||
// shouldn't be a hot path so shouldn't be too contended on since health
|
||||
// checks are infrequent
|
||||
ch.Lock()
|
||||
ch.ded[node]++
|
||||
failed := ch.ded[node]
|
||||
ch.Unlock()
|
||||
|
||||
if failed >= ch.hcUnhealthy {
|
||||
ch.ch.remove(node) // TODO under lock?
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) alive(node string) {
|
||||
ch.RLock()
|
||||
_, ok := ch.ded[node]
|
||||
ch.RUnlock()
|
||||
if ok {
|
||||
ch.Lock()
|
||||
delete(ch.ded, node)
|
||||
ch.Unlock()
|
||||
ch.ch.add(node) // TODO under lock?
|
||||
}
|
||||
dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
|
||||
}
|
||||
|
||||
func (ch *chProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -399,16 +195,23 @@ func (ch *chProxy) listNodes(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
}
|
||||
|
||||
// XXX (reed): clean up mess
|
||||
var dashPage []byte
|
||||
func (ch *chProxy) isDead(node string) bool {
|
||||
ch.RLock()
|
||||
val, ok := ch.ded[node]
|
||||
ch.RUnlock()
|
||||
return ok && val >= ch.hcUnhealthy
|
||||
}
|
||||
|
||||
func init() {
|
||||
jsb, err := ioutil.ReadFile("dash.js")
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatal("couldn't open dash.js file")
|
||||
func (ch *chProxy) dead() []string {
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
nodes := make([]string, 0, len(ch.ded))
|
||||
for n, val := range ch.ded {
|
||||
if val >= ch.hcUnhealthy {
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
}
|
||||
|
||||
dashPage = []byte(fmt.Sprintf(dashStr, string(jsb)))
|
||||
return nodes
|
||||
}
|
||||
|
||||
var dashStr = `<!DOCTYPE html>
|
||||
@@ -440,25 +243,6 @@ func (ch *chProxy) dash(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write(dashPage)
|
||||
}
|
||||
|
||||
func (ch *chProxy) isDead(node string) bool {
|
||||
ch.RLock()
|
||||
val, ok := ch.ded[node]
|
||||
ch.RUnlock()
|
||||
return ok && val >= ch.hcUnhealthy
|
||||
}
|
||||
|
||||
func (ch *chProxy) dead() []string {
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
nodes := make([]string, 0, len(ch.ded))
|
||||
for n, val := range ch.ded {
|
||||
if val >= ch.hcUnhealthy {
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func sendValue(w http.ResponseWriter, v interface{}) {
|
||||
err := json.NewEncoder(w).Encode(v)
|
||||
|
||||
@@ -492,157 +276,3 @@ func sendError(w http.ResponseWriter, code int, msg string) {
|
||||
logrus.WithError(err).Error("error writing response response")
|
||||
}
|
||||
}
|
||||
|
||||
// consistentHash will maintain a list of strings which can be accessed by
|
||||
// keying them with a separate group of strings
|
||||
type consistentHash struct {
|
||||
// protects nodes
|
||||
sync.RWMutex
|
||||
nodes []string
|
||||
|
||||
loadMu sync.RWMutex
|
||||
load map[string]*int64
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
func newCH() *consistentHash {
|
||||
return &consistentHash{
|
||||
rng: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
load: make(map[string]*int64),
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *consistentHash) add(newb string) {
|
||||
ch.Lock()
|
||||
defer ch.Unlock()
|
||||
|
||||
// filter dupes, under lock. sorted, so binary search
|
||||
i := sort.SearchStrings(ch.nodes, newb)
|
||||
if i < len(ch.nodes) && ch.nodes[i] == newb {
|
||||
return
|
||||
}
|
||||
ch.nodes = append(ch.nodes, newb)
|
||||
// need to keep in sorted order so that hash index works across nodes
|
||||
sort.Sort(sort.StringSlice(ch.nodes))
|
||||
}
|
||||
|
||||
func (ch *consistentHash) remove(ded string) {
|
||||
ch.Lock()
|
||||
i := sort.SearchStrings(ch.nodes, ded)
|
||||
if i < len(ch.nodes) && ch.nodes[i] == ded {
|
||||
ch.nodes = append(ch.nodes[:i], ch.nodes[i+1:]...)
|
||||
}
|
||||
ch.Unlock()
|
||||
}
|
||||
|
||||
// return a copy
|
||||
func (ch *consistentHash) list() []string {
|
||||
ch.RLock()
|
||||
ret := make([]string, len(ch.nodes))
|
||||
copy(ret, ch.nodes)
|
||||
ch.RUnlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (ch *consistentHash) get(key string) (string, error) {
|
||||
// crc not unique enough & sha is too slow, it's 1 import
|
||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
i := int(jumpConsistentHash(sum64, int32(len(ch.nodes))))
|
||||
return ch.besti(key, i)
|
||||
}
|
||||
|
||||
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
||||
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
||||
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
||||
var b, j int64 = -1, 0
|
||||
for j < int64(num_buckets) {
|
||||
b = j
|
||||
key = key*2862933555777941757 + 1
|
||||
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
||||
}
|
||||
return int32(b)
|
||||
}
|
||||
|
||||
func (ch *consistentHash) setLoad(key string, load int64) {
|
||||
ch.loadMu.RLock()
|
||||
l, ok := ch.load[key]
|
||||
ch.loadMu.RUnlock()
|
||||
if ok {
|
||||
atomic.StoreInt64(l, load)
|
||||
} else {
|
||||
ch.loadMu.Lock()
|
||||
if _, ok := ch.load[key]; !ok {
|
||||
ch.load[key] = &load
|
||||
}
|
||||
ch.loadMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoNodes = errors.New("no nodes available")
|
||||
)
|
||||
|
||||
func loadKey(node, key string) string {
|
||||
return node + "\x00" + key
|
||||
}
|
||||
|
||||
// XXX (reed): push down fails / load into ch
|
||||
func (ch *consistentHash) besti(key string, i int) (string, error) {
|
||||
ch.RLock()
|
||||
defer ch.RUnlock()
|
||||
|
||||
if len(ch.nodes) < 1 {
|
||||
return "", ErrNoNodes
|
||||
}
|
||||
|
||||
f := func(n string) string {
|
||||
var load int64
|
||||
ch.loadMu.RLock()
|
||||
loadPtr := ch.load[loadKey(n, key)]
|
||||
ch.loadMu.RUnlock()
|
||||
if loadPtr != nil {
|
||||
load = atomic.LoadInt64(loadPtr)
|
||||
}
|
||||
|
||||
// TODO flesh out these values. should be wait times.
|
||||
// if we send < 50% of traffic off to other nodes when loaded
|
||||
// then as function scales nodes will get flooded, need to be careful.
|
||||
//
|
||||
// back off loaded node/function combos slightly to spread load
|
||||
// TODO do we need a kind of ref counter as well so as to send functions
|
||||
// to a different node while there's an outstanding call to another?
|
||||
if load < 70 {
|
||||
return n
|
||||
} else if load > 90 {
|
||||
if ch.rng.Intn(100) < 60 {
|
||||
return n
|
||||
}
|
||||
} else if load > 70 {
|
||||
if ch.rng.Float64() < 80 {
|
||||
return n
|
||||
}
|
||||
}
|
||||
// otherwise loop until we find a sufficiently unloaded node or a lucky coin flip
|
||||
return ""
|
||||
}
|
||||
|
||||
for _, n := range ch.nodes[i:] {
|
||||
node := f(n)
|
||||
if node != "" {
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
// try the other half of the ring
|
||||
for _, n := range ch.nodes[:i] {
|
||||
node := f(n)
|
||||
if node != "" {
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", ErrNoNodes
|
||||
}
|
||||
|
||||
220
lb/proxy.go
Normal file
220
lb/proxy.go
Normal file
@@ -0,0 +1,220 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
type chProxy struct {
|
||||
ch *consistentHash
|
||||
|
||||
sync.RWMutex
|
||||
// TODO map[string][]time.Time
|
||||
ded map[string]int64
|
||||
|
||||
hcInterval time.Duration
|
||||
hcEndpoint string
|
||||
hcUnhealthy int64
|
||||
hcTimeout time.Duration
|
||||
|
||||
// XXX (reed): right now this only supports one client basically ;) use some real stat backend
|
||||
statsMu sync.Mutex
|
||||
stats []*stat
|
||||
|
||||
proxy *httputil.ReverseProxy
|
||||
httpClient *http.Client
|
||||
transport http.RoundTripper
|
||||
}
|
||||
|
||||
// TODO should prob track per f(x) per node
|
||||
type stat struct {
|
||||
timestamp time.Time
|
||||
latency time.Duration
|
||||
node string
|
||||
code int
|
||||
}
|
||||
|
||||
func (ch *chProxy) addStat(s *stat) {
|
||||
ch.statsMu.Lock()
|
||||
// delete last 1 minute of data if nobody is watching
|
||||
for i := 0; i < len(ch.stats) && ch.stats[i].timestamp.Before(time.Now().Add(-1*time.Minute)); i++ {
|
||||
ch.stats = ch.stats[:i]
|
||||
}
|
||||
ch.stats = append(ch.stats, s)
|
||||
ch.statsMu.Unlock()
|
||||
}
|
||||
|
||||
func (ch *chProxy) getStats() []*stat {
|
||||
ch.statsMu.Lock()
|
||||
stats := ch.stats
|
||||
ch.stats = ch.stats[:0]
|
||||
ch.statsMu.Unlock()
|
||||
|
||||
// XXX (reed): down sample to per second
|
||||
return stats
|
||||
}
|
||||
|
||||
func newProxy(conf config) *chProxy {
|
||||
tranny := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 120 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: 512,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: &tls.Config{
|
||||
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
||||
},
|
||||
}
|
||||
|
||||
ch := &chProxy{
|
||||
ded: make(map[string]int64),
|
||||
|
||||
// XXX (reed): need to be reconfigurable at some point
|
||||
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
||||
hcEndpoint: conf.HealthcheckEndpoint,
|
||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||
httpClient: &http.Client{Transport: tranny},
|
||||
transport: tranny,
|
||||
|
||||
ch: newCH(),
|
||||
}
|
||||
|
||||
director := func(req *http.Request) {
|
||||
target, err := ch.ch.get(req.URL.Path)
|
||||
if err != nil {
|
||||
target = "error"
|
||||
}
|
||||
|
||||
req.URL.Scheme = "http" // XXX (reed): h2 support
|
||||
req.URL.Host = target
|
||||
}
|
||||
|
||||
ch.proxy = &httputil.ReverseProxy{
|
||||
Director: director,
|
||||
Transport: ch,
|
||||
BufferPool: newBufferPool(),
|
||||
}
|
||||
|
||||
for _, n := range conf.Nodes {
|
||||
// XXX (reed): need to health check these
|
||||
ch.ch.add(n)
|
||||
}
|
||||
go ch.healthcheck()
|
||||
return ch
|
||||
}
|
||||
|
||||
type bufferPool struct {
|
||||
bufs *sync.Pool
|
||||
}
|
||||
|
||||
func newBufferPool() httputil.BufferPool {
|
||||
return &bufferPool{
|
||||
bufs: &sync.Pool{
|
||||
New: func() interface{} { return make([]byte, 32*1024) },
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bufferPool) Get() []byte { return b.bufs.Get().([]byte) }
|
||||
func (b *bufferPool) Put(x []byte) { b.bufs.Put(x) }
|
||||
|
||||
func (ch *chProxy) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
if req.URL.Host == "error" {
|
||||
io.Copy(ioutil.Discard, req.Body)
|
||||
req.Body.Close()
|
||||
// XXX (reed): if we let the proxy code write the response it will be body-less. ok?
|
||||
return nil, ErrNoNodes
|
||||
}
|
||||
|
||||
then := time.Now()
|
||||
resp, err := ch.transport.RoundTrip(req)
|
||||
if err == nil {
|
||||
ch.intercept(req, resp, time.Since(then))
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (ch *chProxy) intercept(req *http.Request, resp *http.Response, latency time.Duration) {
|
||||
// XXX (reed): give f(x) nodes ability to send back wait time in response
|
||||
// XXX (reed): we should prob clear this from user response
|
||||
load, _ := strconv.Atoi(resp.Header.Get("XXX-FXLB-WAIT"))
|
||||
// XXX (reed): need to validate these prob
|
||||
ch.ch.setLoad(loadKey(req.URL.Host, req.URL.Path), int64(load))
|
||||
|
||||
ch.addStat(&stat{
|
||||
timestamp: time.Now(),
|
||||
latency: latency,
|
||||
node: req.URL.Host,
|
||||
code: resp.StatusCode,
|
||||
// XXX (reed): function
|
||||
})
|
||||
}
|
||||
|
||||
func (ch *chProxy) healthcheck() {
|
||||
for range time.Tick(ch.hcInterval) {
|
||||
nodes := ch.ch.list()
|
||||
nodes = append(nodes, ch.dead()...)
|
||||
// XXX (reed): need to figure out elegant adding / removing better
|
||||
for _, n := range nodes {
|
||||
go ch.ping(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) ping(node string) {
|
||||
req, _ := http.NewRequest("GET", "http://"+node+ch.hcEndpoint, nil)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ch.hcTimeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := ch.httpClient.Do(req)
|
||||
if resp != nil && resp.Body != nil {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
logrus.WithFields(logrus.Fields{"node": node}).Error("health check failed")
|
||||
ch.fail(node)
|
||||
} else {
|
||||
ch.alive(node)
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) fail(node string) {
|
||||
// shouldn't be a hot path so shouldn't be too contended on since health
|
||||
// checks are infrequent
|
||||
ch.Lock()
|
||||
ch.ded[node]++
|
||||
failed := ch.ded[node]
|
||||
ch.Unlock()
|
||||
|
||||
if failed >= ch.hcUnhealthy {
|
||||
ch.ch.remove(node) // TODO under lock?
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *chProxy) alive(node string) {
|
||||
ch.RLock()
|
||||
_, ok := ch.ded[node]
|
||||
ch.RUnlock()
|
||||
if ok {
|
||||
ch.Lock()
|
||||
delete(ch.ded, node)
|
||||
ch.Unlock()
|
||||
ch.ch.add(node) // TODO under lock?
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user