From ee398f0d7d5a2e57bfc7f3788a1e642516ac801f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Seif=20Lotfy=20=D8=B3=D9=8A=D9=81=20=D9=84=D8=B7=D9=81?= =?UTF-8?q?=D9=8A?= Date: Thu, 19 Jan 2017 03:11:39 +0100 Subject: [PATCH] Add initial load balancer (#487) * lb: library for creation of load balancer * lb: library for creation of load balancer * Add balance subcommand to fn * make fnlb its own command * Update Changelogg * Add Makefile for fnlb --- CHANGELOG.md | 3 +- glide.lock | 29 +++++---- glide.yaml | 5 +- lb/README.md | 12 ++++ lb/fnlb/Makefile | 3 + lb/fnlb/main.go | 34 ++++++++++ lb/lb.go | 52 +++++++++++++++ lb/roundtripper.go | 157 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 278 insertions(+), 17 deletions(-) create mode 100644 lb/README.md create mode 100644 lb/fnlb/Makefile create mode 100644 lb/fnlb/main.go create mode 100644 lb/lb.go create mode 100644 lb/roundtripper.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fd1c6372b..244ad1363 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,9 @@ - [#313](https://github.com/iron-io/functions/issues/313): fnctl: Add .NET core support?. - [#310](https://github.com/iron-io/functions/issues/310): fnctl: Add python support. - [#69](https://github.com/iron-io/functions/issues/69): Long(er) running containers for better performance aka Hot Containers. -- [#472](https://github.com/iron-io/functions/pull/472): Add global lru for routes with keys being the appname + path +- [#472](https://github.com/iron-io/functions/pull/472): Add global lru for routes with keys being the appname + path. - [#484](https://github.com/iron-io/functions/pull/484): Add triggers example for OpenStack project Picasso. +- [#487](https://github.com/iron-io/functions/pull/487): Add initial load balancer. ### Bugfixes diff --git a/glide.lock b/glide.lock index 1c0cd96ab..8c5b0762a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 0c88334cb5d4f6721c7440fa441a4c7a9f095f3994a9bbe3068c1bc36ba564ac -updated: 2016-12-13T23:57:51.142189878+01:00 +hash: 78692441d2595a5a303c64d4884fd4d04eebfd6a382ddd166176548d9da02645 +updated: 2017-01-18T21:41:13.698052314+01:00 imports: - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -8,7 +8,7 @@ imports: - name: github.com/asaskevich/govalidator version: 7b3beb6df3c42abd3509abfc3bcacc0fbfb7c877 - name: github.com/aws/aws-sdk-go - version: d5fd6983e17a1644149fb634fdcf69bd03cb8b21 + version: 90dec2183a5f5458ee79cbaf4b8e9ab910bc81a6 subpackages: - aws - aws/awserr @@ -18,24 +18,18 @@ imports: - aws/corehandlers - aws/credentials - aws/credentials/ec2rolecreds - - aws/credentials/endpointcreds - - aws/credentials/stscreds - aws/defaults - aws/ec2metadata - - aws/endpoints - aws/request - aws/session - aws/signer/v4 + - private/endpoints - private/protocol - private/protocol/json/jsonutil - private/protocol/jsonrpc - - private/protocol/query - - private/protocol/query/queryutil - private/protocol/rest - private/protocol/restjson - - private/protocol/xml/xmlutil - service/lambda - - service/sts - name: github.com/Azure/go-ansiterm version: fa152c58bc15761d0200cb75fe958b89a9d4888e subpackages: @@ -109,11 +103,11 @@ imports: - name: github.com/docker/go-units version: e30f1e79f3cd72542f2026ceec18d3bd67ab859c - name: github.com/docker/libtrust - version: aabc10ec26b754e797f9028f4589c5b7bd90dc20 + version: fa567046d9b14f6aa788882a950d69651d230b21 - name: github.com/fsnotify/fsnotify version: fd9ec7deca8bf46ecd2a795baaacf2b3a9be1197 - name: github.com/fsouza/go-dockerclient - version: 4611598e6e6615762544f0805acd59dfede5c9a2 + version: e085edda407c05214cc6e71e4881de47667e77ec - name: github.com/garyburd/redigo version: 0708def8b0cf3a05acdf44a7f28b864c2958921d subpackages: @@ -151,6 +145,11 @@ imports: version: 3b6d86cd965820f968760d5d419cb4add096bdd7 - name: github.com/go-openapi/validate version: 027696d4b54399770f1cdcc6c6daa56975f9e14e +- name: github.com/golang/groupcache + version: 72d04f9fcdec7d3821820cc4a6f150eae553639a + subpackages: + - consistenthash + - singleflight - name: github.com/golang/protobuf version: 2402d76f3d41f928c7902a765dfc872356dd3aad subpackages: @@ -162,9 +161,9 @@ imports: subpackages: - query - name: github.com/gorilla/context - version: 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 + version: 14f550f51af52180c2eefed15e5fd18d63c0a64a - name: github.com/gorilla/mux - version: 0a192a193177452756c362c20087ddafcf6829c4 + version: e444e69cbd2e2e3e0749a2f3c717cec491552bbf - name: github.com/hashicorp/go-cleanhttp version: ad28ea4487f05916463e2423a55166280e8254b5 - name: github.com/hashicorp/hcl @@ -203,6 +202,8 @@ imports: - drivers/mock - name: github.com/jmespath/go-jmespath version: bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d +- name: github.com/jmoiron/jsonq + version: e874b168d07ecc7808bc950a17998a8aa3141d82 - name: github.com/juju/errgo version: 08cceb5d0b5331634b9826762a8fd53b29b86ad8 subpackages: diff --git a/glide.yaml b/glide.yaml index 1511fba91..234a5db3a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -31,5 +31,6 @@ import: - package: github.com/ccirello/supervisor version: v0.5.1 - package: github.com/iron-io/runner -- package: github.com/iron-io/functions - version: 64628300dc9a81328c4ee6a81068f2c3a1e430f0 +- package: github.com/golang/groupcache + subpackages: + - consistenthash diff --git a/lb/README.md b/lb/README.md new file mode 100644 index 000000000..45f21c602 --- /dev/null +++ b/lb/README.md @@ -0,0 +1,12 @@ +# IronFunctions LoadBalancer + +## Loadbalancing several IronFunctions +You can run multiple IronFunctions instances and balance the load amongst them using `fnlb` as follows: + +```sh +fnlb --listen --nodes ,, +``` + +And redirect all traffic to the load balancer. + +**NOTE: For the load balancer to work all function nodes need to be sharing the same DB.** \ No newline at end of file diff --git a/lb/fnlb/Makefile b/lb/fnlb/Makefile new file mode 100644 index 000000000..b5d3017ae --- /dev/null +++ b/lb/fnlb/Makefile @@ -0,0 +1,3 @@ +all: + go build -o fnlb + ./fnlb --help \ No newline at end of file diff --git a/lb/fnlb/main.go b/lb/fnlb/main.go new file mode 100644 index 000000000..d95662dd5 --- /dev/null +++ b/lb/fnlb/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + "strings" + + "github.com/iron-io/functions/lb" +) + +var ( + fnodes string + flisten string +) + +func init() { + flag.StringVar(&fnodes, "nodes", "127.0.0.1:8080", "comma separated list of IronFunction nodes") + flag.StringVar(&flisten, "listen", "0.0.0.0:8081", "listening port for incoming connections") + flag.Parse() +} + +func main() { + nodes := strings.Split(fnodes, ",") + p := lb.ConsistentHashReverseProxy(context.Background(), nodes) + fmt.Println("forwarding calls to", nodes) + fmt.Println("listening to", flisten) + if err := http.ListenAndServe(flisten, p); err != nil { + fmt.Fprintln(os.Stderr, "could not start server. error:", err) + os.Exit(1) + } +} diff --git a/lb/lb.go b/lb/lb.go new file mode 100644 index 000000000..83798ab6c --- /dev/null +++ b/lb/lb.go @@ -0,0 +1,52 @@ +package lb + +import ( + "bytes" + "context" + "net/http" + "net/http/httputil" + "strconv" + "sync" + "sync/atomic" + + "github.com/golang/groupcache/consistenthash" +) + +// ConsistentHashReverseProxy returns a new ReverseProxy that routes +// URLs to the scheme, host, and base path provided in by a consistent hash +// algorithm. If the target's path is "/base" and the incoming request was for +// "/dir", the target request will be for /base/dir. +// ConsistentHashReverseProxy does not rewrite the Host header. +func ConsistentHashReverseProxy(ctx context.Context, nodes []string) *httputil.ReverseProxy { + ch := consistenthash.New(len(nodes), nil) + ch.Add(nodes...) + + bufPool := sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + } + + var i int64 + director := func(req *http.Request) { + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + buf.Reset() + buf.WriteString(req.URL.Path) + buf.WriteString("??") + b := strconv.AppendInt(buf.Bytes(), atomic.AddInt64(&i, 1), 10) + + target := ch.Get(string(b)) + req.URL.Scheme = "http" + req.URL.Host = target + if _, ok := req.Header["User-Agent"]; !ok { + // explicitly disable User-Agent so it's not set to default value + req.Header.Set("User-Agent", "") + } + } + + return &httputil.ReverseProxy{ + Director: director, + Transport: NewRoundTripper(ctx, nodes), + } +} diff --git a/lb/roundtripper.go b/lb/roundtripper.go new file mode 100644 index 000000000..5b8270e6e --- /dev/null +++ b/lb/roundtripper.go @@ -0,0 +1,157 @@ +package lb + +import ( + "context" + "errors" + "net" + "net/http" + "sync" + "time" + + "github.com/golang/groupcache/singleflight" +) + +// ErrNoFallbackNodeFound happens when the fallback routine does not manage to +// find a TCP reachable node in alternative to the chosen one. +var ErrNoFallbackNodeFound = errors.New("no fallback node found - whole cluster seems offline") + +// FallbackRoundTripper implements http.RoundTripper in a way that when an +// outgoing request does not manage to succeed with its original target host, +// it fallsback to a list of alternative hosts. Internally it keeps a list of +// dead hosts, and pings them until they are back online, diverting traffic +// back to them. This is meant to be used by ConsistentHashReverseProxy(). +type FallbackRoundTripper struct { + nodes []string + sf singleflight.Group + + mu sync.Mutex + // a list of offline servers that must be rechecked to see when they + // get back online. If a server is in this list, it must have a fallback + // available to which requests are sent. + fallback map[string]string +} + +// NewRoundTripper creates a new FallbackRoundTripper and triggers the internal +// host TCP health checks. +func NewRoundTripper(ctx context.Context, nodes []string) *FallbackRoundTripper { + frt := &FallbackRoundTripper{ + nodes: nodes, + fallback: make(map[string]string), + } + go frt.checkHealth(ctx) + return frt +} + +func (f *FallbackRoundTripper) checkHealth(ctx context.Context) { + tick := time.NewTicker(1 * time.Second) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + f.mu.Lock() + if len(f.fallback) == 0 { + f.mu.Unlock() + continue + } + fallback := make(map[string]string) + for k, v := range f.fallback { + fallback[k] = v + } + f.mu.Unlock() + + updatedlist := make(map[string]string) + for host, target := range fallback { + if !f.ping(host) { + updatedlist[host] = target + } + } + + f.mu.Lock() + f.fallback = make(map[string]string) + for k, v := range updatedlist { + f.fallback[k] = v + } + f.mu.Unlock() + } + } +} + +func (f *FallbackRoundTripper) ping(host string) bool { + conn, err := net.Dial("tcp", host) + if err != nil { + return false + } + conn.Close() + return true +} + +func (f *FallbackRoundTripper) fallbackHost(targetHost, failedFallback string) string { + detected, err := f.sf.Do(targetHost, func() (interface{}, error) { + for _, node := range f.nodes { + if node != targetHost && node != failedFallback && f.ping(node) { + f.mu.Lock() + f.fallback[targetHost] = node + f.mu.Unlock() + return node, nil + } + } + return "", ErrNoFallbackNodeFound + }) + + if err != nil { + return "" + } + return detected.(string) + +} + +// RoundTrip implements http.RoundTrip. It tried to fullfil an *http.Request to +// its original target host, falling back to a list of nodes in case of failure. +// After the first failure, it consistently delivers traffic to the fallback +// host, until original host is back online. If no fallback node is available, +// it fails with ErrNoFallbackNodeFound. In case of cascaded failure, that is, +// the fallback node is also offline, it will look for another online host. +func (f *FallbackRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + targetHost := req.URL.Host + + f.mu.Lock() + fallback, ok := f.fallback[targetHost] + f.mu.Unlock() + if ok { + req.URL.Host = fallback + resp, err := f.callNode(req) + if err == nil { + return resp, err + } + fallback := f.fallbackHost(targetHost, fallback) + if fallback == "" { + return nil, ErrNoFallbackNodeFound + } + req.URL.Host = fallback + return f.callNode(req) + } + + resp, err := f.callNode(req) + if err == nil { + return resp, err + } + + fallback = f.fallbackHost(targetHost, "") + if fallback == "" { + return nil, ErrNoFallbackNodeFound + } + req.URL.Host = fallback + return f.callNode(req) +} + +func (f *FallbackRoundTripper) callNode(req *http.Request) (*http.Response, error) { + requestURI := req.RequestURI + req.RequestURI = "" + resp, err := http.DefaultClient.Do(req) + if err == nil { + resp.Request.RequestURI = requestURI + } + return resp, err +}