mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
this structure should allow us to keep the consistent hash code and just use consistent hashing on a subset of nodes, then in order to satisfy the oracle service stuff in functions-service we can just implement a different "Grouper" that does vm allocation and whatever other magic we need to manage nodes and poop out sets of nodes based on tenant id / func. for the suga... see main.go and proxy.go, the rest is basically renaming / moving stuff (not easy to follow changes, nature of the beast). the only 'issues' i can think of is that down in the ch stuff (or Router) we will need a back channel to tell the 'Grouper' to add a node (i.e. all nodes for that shard are currently loaded) which isn't great and also the grouper has no way of knowing that a node in the given set may not be being used anymore. still thinking about how to couple those two. basically don't want to have to just copy that consistent hash code but after munging with stuff i'm almost at 'fuck it' level and maybe it's worth it to just copy and hack it up in functions-service for what we need. we'll also need to have different key funcs for groupers and routers eventually (grouper wants tenant id, router needs tenant id + router). anyway, open to any ideas, i haven't come up with anything great. feedback on interface would be great after this can plumb the datastore stuff into the allGrouper pretty easily
82 lines
2.0 KiB
Go
82 lines
2.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"flag"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"gitlab-odx.oracle.com/odx/functions/fnlb/lb"
|
|
)
|
|
|
|
func main() {
|
|
// XXX (reed): normalize
|
|
fnodes := flag.String("nodes", "", "comma separated list of IronFunction nodes")
|
|
|
|
var conf lb.Config
|
|
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
|
|
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
|
|
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
|
|
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
|
|
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
|
|
flag.Parse()
|
|
|
|
conf.Nodes = strings.Split(*fnodes, ",")
|
|
|
|
conf.Transport = &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
Dial: (&net.Dialer{
|
|
Timeout: 10 * time.Second,
|
|
KeepAlive: 120 * time.Second,
|
|
}).Dial,
|
|
MaxIdleConnsPerHost: 512,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
TLSClientConfig: &tls.Config{
|
|
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
|
},
|
|
}
|
|
|
|
g := lb.NewAllGrouper(conf)
|
|
r := lb.NewConsistentRouter(conf)
|
|
k := func(r *http.Request) (string, error) {
|
|
return r.URL.Path, nil
|
|
}
|
|
|
|
h := lb.NewProxy(k, g, r, conf)
|
|
h = g.Wrap(h) // add/del/list endpoints
|
|
h = r.Wrap(h) // stats / dash endpoint
|
|
|
|
err := serve(conf.Listen, h)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("server error")
|
|
}
|
|
}
|
|
|
|
func serve(addr string, handler http.Handler) error {
|
|
server := &http.Server{Addr: addr, Handler: handler}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
defer wg.Wait()
|
|
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
|
|
go func() {
|
|
defer wg.Done()
|
|
for sig := range ch {
|
|
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
|
|
server.Shutdown(context.Background()) // safe shutdown
|
|
return
|
|
}
|
|
}()
|
|
return server.ListenAndServe()
|
|
}
|