Files
fn-serverless/fnlb/main.go
Reed Allman 398ecc388e move the lb stuff around in lego form
this structure should allow us to keep the consistent hash code and just use
consistent hashing on a subset of nodes, then in order to satisfy the oracle
service stuff in functions-service we can just implement a different "Grouper"
that does vm allocation and whatever other magic we need to manage nodes and
poop out sets of nodes based on tenant id / func.

for the suga... see main.go and proxy.go, the rest is basically renaming /
moving stuff (not easy to follow changes, nature of the beast).

the only 'issues' i can think of is that down in the ch stuff (or Router) we
will need a back channel to tell the 'Grouper' to add a node (i.e. all nodes for
that shard are currently loaded) which isn't great and also the grouper has no
way of knowing that a node in the given set may not be being used anymore.
still thinking about how to couple those two. basically don't want to have to
just copy that consistent hash code but after munging with stuff i'm almost at
'fuck it' level and maybe it's worth it to just copy and hack it up in
functions-service for what we need. we'll also need to have different key
funcs for groupers and routers eventually (grouper wants tenant id, router
needs tenant id + router). anyway, open to any ideas, i haven't come up with
anything great. feedback on interface would be great

after this can plumb the datastore stuff into the allGrouper pretty easily
2017-06-10 15:21:23 -07:00

82 lines
2.0 KiB
Go

package main
import (
"context"
"crypto/tls"
"flag"
"net"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"gitlab-odx.oracle.com/odx/functions/fnlb/lb"
)
func main() {
// XXX (reed): normalize
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
var conf lb.Config
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
flag.Parse()
conf.Nodes = strings.Split(*fnodes, ",")
conf.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 120 * time.Second,
}).Dial,
MaxIdleConnsPerHost: 512,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
},
}
g := lb.NewAllGrouper(conf)
r := lb.NewConsistentRouter(conf)
k := func(r *http.Request) (string, error) {
return r.URL.Path, nil
}
h := lb.NewProxy(k, g, r, conf)
h = g.Wrap(h) // add/del/list endpoints
h = r.Wrap(h) // stats / dash endpoint
err := serve(conf.Listen, h)
if err != nil {
logrus.WithError(err).Error("server error")
}
}
func serve(addr string, handler http.Handler) error {
server := &http.Server{Addr: addr, Handler: handler}
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
go func() {
defer wg.Done()
for sig := range ch {
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
server.Shutdown(context.Background()) // safe shutdown
return
}
}()
return server.ListenAndServe()
}