Files
fn-serverless/fnlb/lb/allgrouper.go
Reed Allman e637f9736e back the lb with a db for scale
now we can run multiple lbs in the same 'cluster' and they will all point to
the same nodes. all lb nodes are not guaranteed to have the same set of
functions nodes to route to at any point in time since each lb node will
perform its own health checks independently, but they will all be backed by
the same list from the db to health check at least. in cases where there will
be more than a few lbs we can rethink this strategy, we mostly need to back
the lbs with a db so that they persist nodes and remain fault tolerant in that
sense. the strategy of independent health checks is useful to reduce thrashing
the db during network partitions between lb and fn pairs. it would be nice to
have gossip health checking to reduce network traffic, but this works too, and
we'll need to seed any gossip protocol with a list from a db anyway.

db_url is the same format as what functions takes. i don't have env vars set
up for fnlb right now (low hanging fruit), the flag is `-db`, it defaults to
in memory sqlite3 so nodes will be forgotten between reboots. used the sqlx
stuff, decided not to put the lb stuff in the datastore stuff as this was easy
enough to just add here to get the sugar, and avoid bloating the datastore
interface. the tables won't collide, so can just use same pg/mysql as what the
fn servers are running in prod even, db load is low from lb (1 call every 1s
per lb).

i need to add some tests, touch testing worked as expected.
2017-07-07 07:45:17 -07:00

408 lines
10 KiB
Go

package lb
import (
"context"
"database/sql"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/mattn/go-sqlite3"
)
// NewAllGrouper returns a Grouper that will return the entire list of nodes
// that are being maintained, regardless of key. An 'AllGrouper' will health
// check servers at a specified interval, taking them in and out as they
// pass/fail and exposes endpoints for adding, removing and listing nodes.
func NewAllGrouper(conf Config) (Grouper, error) {
db, err := db(conf.DBurl)
if err != nil {
return nil, err
}
a := &allGrouper{
ded: make(map[string]int64),
db: db,
// XXX (reed): need to be reconfigurable at some point
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
hcEndpoint: conf.HealthcheckEndpoint,
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
// for health checks
httpClient: &http.Client{Transport: conf.Transport},
}
for _, n := range conf.Nodes {
err := a.add(n)
if err != nil {
// XXX (reed): could prob ignore these but meh
logrus.WithError(err).WithFields(logrus.Fields{"node": n}).Error("error adding node")
}
}
go a.healthcheck()
return a, nil
}
// allGrouper will return all healthy nodes it is tracking from List.
// nodes may be added / removed through the HTTP api. each allGrouper will
// poll its database for the full list of nodes, and then run its own
// health checks on those nodes to maintain a list of healthy nodes.
// the list of healthy nodes will be maintained in sorted order so that,
// without any network partitions, all lbs may consistently hash with the
// same backing list, such that H(k) -> v for any k->v pair (vs attempting
// to maintain a list among nodes in the db, which could have thrashing
// due to network connectivity between any pair).
type allGrouper struct {
// protects allNodes, healthy & ded
sync.RWMutex
// TODO rename nodes to 'allNodes' or something so everything breaks and then stitch
// ded is the set of disjoint nodes nodes from intersecting nodes & healthy
allNodes, healthy []string
ded map[string]int64 // [node] -> failedCount
// allNodes is a cache of db.List, we can probably trash it..
db DBStore
httpClient *http.Client
hcInterval time.Duration
hcEndpoint string
hcUnhealthy int64
hcTimeout time.Duration
}
// TODO put this somewhere better
type DBStore interface {
Add(string) error
Delete(string) error
List() ([]string, error)
}
// implements DBStore
type sqlStore struct {
db *sqlx.DB
// TODO we should prepare all of the statements, rebind them
// and store them all here.
}
// New will open the db specified by url, create any tables necessary
// and return a models.Datastore safe for concurrent usage.
func db(uri string) (DBStore, error) {
url, err := url.Parse(uri)
if err != nil {
return nil, err
}
driver := url.Scheme
// driver must be one of these for sqlx to work, double check:
switch driver {
case "postgres", "pgx", "mysql", "sqlite3", "oci8", "ora", "goracle":
default:
return nil, errors.New("invalid db driver, refer to the code")
}
if driver == "sqlite3" {
// make all the dirs so we can make the file..
dir := filepath.Dir(url.Path)
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
}
uri = url.String()
if driver != "postgres" {
// postgres seems to need this as a prefix in lib/pq, everyone else wants it stripped of scheme
uri = strings.TrimPrefix(url.String(), url.Scheme+"://")
}
sqldb, err := sql.Open(driver, uri)
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db")
return nil, err
}
db := sqlx.NewDb(sqldb, driver)
// force a connection and test that it worked
err = db.Ping()
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db")
return nil, err
}
maxIdleConns := 30 // c.MaxIdleConnections
db.SetMaxIdleConns(maxIdleConns)
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS lb_nodes (
address text NOT NULL PRIMARY KEY
);`)
if err != nil {
return nil, err
}
return &sqlStore{db: db}, nil
}
func (s *sqlStore) Add(node string) error {
query := s.db.Rebind("INSERT INTO lb_nodes (address) VALUES (?);")
_, err := s.db.Exec(query, node)
if err != nil {
// if it already exists, just filter that error out
switch err := err.(type) {
case *mysql.MySQLError:
if err.Number == 1062 {
return nil
}
case *pq.Error:
if err.Code == "23505" {
return nil
}
case sqlite3.Error:
if err.ExtendedCode == sqlite3.ErrConstraintUnique || err.ExtendedCode == sqlite3.ErrConstraintPrimaryKey {
return nil
}
}
}
return err
}
func (s *sqlStore) Delete(node string) error {
query := s.db.Rebind(`DELETE FROM lb_nodes WHERE address=?`)
_, err := s.db.Exec(query, node)
// TODO we can filter if it didn't exist, too...
return err
}
func (s *sqlStore) List() ([]string, error) {
query := s.db.Rebind(`SELECT DISTINCT address FROM lb_nodes`)
rows, err := s.db.Query(query)
if err != nil {
return nil, err
}
var nodes []string
for rows.Next() {
var node string
err := rows.Scan(&node)
if err == nil {
nodes = append(nodes, node)
}
}
err = rows.Err()
if err == sql.ErrNoRows {
err = nil // don't care...
}
return nodes, err
}
func (a *allGrouper) add(newb string) error {
if newb == "" {
return nil // we can't really do a lot of validation since hosts could be an ip or domain but we have health checks
}
return a.db.Add(newb)
}
func (a *allGrouper) remove(ded string) error {
return a.db.Delete(ded)
}
// call with a.Lock held
func (a *allGrouper) addHealthy(newb string) {
// filter dupes, under lock. sorted, so binary search
i := sort.SearchStrings(a.healthy, newb)
if i < len(a.healthy) && a.healthy[i] == newb {
return
}
a.healthy = append(a.healthy, newb)
// need to keep in sorted order so that hash index works across nodes
sort.Sort(sort.StringSlice(a.healthy))
}
// call with a.Lock held
func (a *allGrouper) removeHealthy(ded string) {
i := sort.SearchStrings(a.healthy, ded)
if i < len(a.healthy) && a.healthy[i] == ded {
a.healthy = append(a.healthy[:i], a.healthy[i+1:]...)
}
}
// return a copy
func (a *allGrouper) List(string) ([]string, error) {
a.RLock()
ret := make([]string, len(a.healthy))
copy(ret, a.healthy)
a.RUnlock()
var err error
if len(ret) == 0 {
err = ErrNoNodes
}
return ret, err
}
func (a *allGrouper) healthcheck() {
for range time.Tick(a.hcInterval) {
// health check the entire list of nodes [from db]
list, err := a.db.List()
if err != nil {
logrus.WithError(err).Error("error checking db for nodes")
continue
}
a.Lock()
a.allNodes = list
a.Unlock()
for _, n := range list {
go a.ping(n)
}
}
}
func (a *allGrouper) ping(node string) {
req, _ := http.NewRequest("GET", "http://"+node+a.hcEndpoint, nil)
ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
defer cancel()
req = req.WithContext(ctx)
resp, err := a.httpClient.Do(req)
if resp != nil && resp.Body != nil {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("health check failed")
a.fail(node)
} else {
a.alive(node)
}
}
func (a *allGrouper) fail(node string) {
// shouldn't be a hot path so shouldn't be too contended on since health
// checks are infrequent
a.Lock()
a.ded[node]++
failed := a.ded[node]
if failed >= a.hcUnhealthy {
a.removeHealthy(node)
}
a.Unlock()
}
func (a *allGrouper) alive(node string) {
// TODO alive is gonna get called a lot, should maybe start w/ every node in ded
// so we can RLock (but lock contention should be low since these are ~quick) --
// "a lot" being every 1s per node, so not too crazy really, but 1k nodes @ ms each...
a.Lock()
delete(a.ded, node)
a.addHealthy(node)
a.Unlock()
}
func (a *allGrouper) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
// XXX (reed): probably do these on a separate port to avoid conflicts
case "/1/lb/nodes":
switch r.Method {
case "PUT":
a.addNode(w, r)
case "DELETE":
a.removeNode(w, r)
case "GET":
a.listNodes(w, r)
}
return
}
next.ServeHTTP(w, r)
})
}
func (a *allGrouper) addNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
err = a.add(bod.Node)
if err != nil {
sendError(w, 500, err.Error()) // TODO filter ?
return
}
sendSuccess(w, "node added")
}
func (a *allGrouper) removeNode(w http.ResponseWriter, r *http.Request) {
var bod struct {
Node string `json:"node"`
}
err := json.NewDecoder(r.Body).Decode(&bod)
if err != nil {
sendError(w, http.StatusBadRequest, err.Error())
return
}
err = a.remove(bod.Node)
if err != nil {
sendError(w, 500, err.Error()) // TODO filter ?
return
}
sendSuccess(w, "node deleted")
}
func (a *allGrouper) listNodes(w http.ResponseWriter, r *http.Request) {
a.RLock()
nodes := make([]string, len(a.allNodes))
copy(nodes, a.allNodes)
a.RUnlock()
// TODO this isn't correct until at least one health check has hit all nodes (on start up).
// seems like not a huge deal, but here's a note anyway (every node will simply 'appear' healthy
// from this api even if we aren't routing to it [until first health check]).
out := make(map[string]string, len(nodes))
for _, n := range nodes {
if a.isDead(n) {
out[n] = "offline"
} else {
out[n] = "online"
}
}
sendValue(w, struct {
Nodes map[string]string `json:"nodes"`
}{
Nodes: out,
})
}
func (a *allGrouper) isDead(node string) bool {
a.RLock()
val, ok := a.ded[node]
a.RUnlock()
return ok && val >= a.hcUnhealthy
}