mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge pull request #177 from fnproject/fnlb-version
Adding per-node version check
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -24,3 +24,4 @@ data/
|
||||
.vscode/
|
||||
fn/func.yaml
|
||||
tmp/
|
||||
fnlb/fnlb
|
||||
|
||||
@@ -16,7 +16,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/lib/pq"
|
||||
@@ -38,10 +40,11 @@ func NewAllGrouper(conf Config) (Grouper, error) {
|
||||
db: db,
|
||||
|
||||
// XXX (reed): need to be reconfigurable at some point
|
||||
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
||||
hcEndpoint: conf.HealthcheckEndpoint,
|
||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||
hcInterval: time.Duration(conf.HealthcheckInterval) * time.Second,
|
||||
hcEndpoint: conf.HealthcheckEndpoint,
|
||||
hcUnhealthy: int64(conf.HealthcheckUnhealthy),
|
||||
hcTimeout: time.Duration(conf.HealthcheckTimeout) * time.Second,
|
||||
minAPIVersion: conf.MinAPIVersion,
|
||||
|
||||
// for health checks
|
||||
httpClient: &http.Client{Transport: conf.Transport},
|
||||
@@ -79,10 +82,11 @@ type allGrouper struct {
|
||||
|
||||
httpClient *http.Client
|
||||
|
||||
hcInterval time.Duration
|
||||
hcEndpoint string
|
||||
hcUnhealthy int64
|
||||
hcTimeout time.Duration
|
||||
hcInterval time.Duration
|
||||
hcEndpoint string
|
||||
hcUnhealthy int64
|
||||
hcTimeout time.Duration
|
||||
minAPIVersion *semver.Version
|
||||
}
|
||||
|
||||
// TODO put this somewhere better
|
||||
@@ -217,6 +221,10 @@ func (a *allGrouper) add(newb string) error {
|
||||
if newb == "" {
|
||||
return nil // we can't really do a lot of validation since hosts could be an ip or domain but we have health checks
|
||||
}
|
||||
err := a.checkAPIVersion(newb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return a.db.Add(newb)
|
||||
}
|
||||
|
||||
@@ -276,20 +284,56 @@ func (a *allGrouper) healthcheck() {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *allGrouper) ping(node string) {
|
||||
req, _ := http.NewRequest("GET", "http://"+node+a.hcEndpoint, nil)
|
||||
type fnVersion struct {
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
var v fnVersion
|
||||
|
||||
func (a *allGrouper) getVersion(urlString string) (string, error) {
|
||||
req, _ := http.NewRequest(http.MethodGet, urlString, nil)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), a.hcTimeout)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := a.httpClient.Do(req)
|
||||
if resp != nil && resp.Body != nil {
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(v)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return v.Version, nil
|
||||
}
|
||||
|
||||
func (a *allGrouper) checkAPIVersion(node string) error {
|
||||
versionURL := "http://" + node + "/version"
|
||||
|
||||
version, err := a.getVersion(versionURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("health check failed")
|
||||
nodeVer := semver.New(version)
|
||||
|
||||
if a.minAPIVersion.Compare(*nodeVer) == -1 {
|
||||
return fmt.Errorf("incompatible API version: %v", a.minAPIVersion)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *allGrouper) ping(node string) {
|
||||
err := a.checkAPIVersion(node)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"node": node}).Error("Unable to check API version")
|
||||
a.fail(node)
|
||||
} else {
|
||||
a.alive(node)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
)
|
||||
|
||||
// TODO the load balancers all need to have the same list of nodes. gossip?
|
||||
@@ -26,13 +27,14 @@ import (
|
||||
// TODO TLS
|
||||
|
||||
type Config struct {
|
||||
DBurl string `json:"db_url"`
|
||||
Listen string `json:"port"`
|
||||
Nodes []string `json:"nodes"`
|
||||
HealthcheckInterval int `json:"healthcheck_interval"`
|
||||
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
|
||||
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
|
||||
HealthcheckTimeout int `json:"healthcheck_timeout"`
|
||||
DBurl string `json:"db_url"`
|
||||
Listen string `json:"port"`
|
||||
Nodes []string `json:"nodes"`
|
||||
HealthcheckInterval int `json:"healthcheck_interval"`
|
||||
HealthcheckEndpoint string `json:"healthcheck_endpoint"`
|
||||
HealthcheckUnhealthy int `json:"healthcheck_unhealthy"`
|
||||
HealthcheckTimeout int `json:"healthcheck_timeout"`
|
||||
MinAPIVersion *semver.Version `json:"min_api_version"`
|
||||
|
||||
Transport *http.Transport
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/fnproject/fn/fnlb/lb"
|
||||
)
|
||||
|
||||
@@ -21,6 +22,7 @@ const VERSION = "0.0.15"
|
||||
func main() {
|
||||
// XXX (reed): normalize
|
||||
fnodes := flag.String("nodes", "", "comma separated list of functions nodes")
|
||||
minAPIVersion := flag.String("min-api-version", "0.0.1", "minimal node API to accept")
|
||||
|
||||
var conf lb.Config
|
||||
flag.StringVar(&conf.DBurl, "db", "sqlite3://:memory:", "backend to store nodes, default to in memory")
|
||||
@@ -29,8 +31,11 @@ func main() {
|
||||
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
|
||||
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
|
||||
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
conf.MinAPIVersion = semver.New(*minAPIVersion)
|
||||
|
||||
if len(*fnodes) > 0 {
|
||||
// starting w/o nodes is fine too
|
||||
conf.Nodes = strings.Split(*fnodes, ",")
|
||||
|
||||
Reference in New Issue
Block a user