Refactor controlplane into a go plugin (#833)

* Refactor controlplane into a go plugin

* Move vbox to controlplane package
This commit is contained in:
Matt Stephenson
2018-03-12 12:50:55 -07:00
committed by GitHub
parent ce4551e129
commit a787ccac36
8 changed files with 104 additions and 61 deletions

View File

@@ -56,6 +56,7 @@ but the GRPC port is 9190.
Grap the runner address and put in as value for the `FN_RUNNER_ADDRESSES` env variable. Grap the runner address and put in as value for the `FN_RUNNER_ADDRESSES` env variable.
```bash ```bash
go build -o noop.so poolmanager/server/controlplane/plugin/noop.go
go build -o fnnpm poolmanager/server/main.go go build -o fnnpm poolmanager/server/main.go
FN_LOG_LEVEL=DEBUG \ FN_LOG_LEVEL=DEBUG \
@@ -64,6 +65,7 @@ FN_NODE_CERT_KEY=key.pem \
FN_NODE_CERT_AUTHORITY=cert.pem \ FN_NODE_CERT_AUTHORITY=cert.pem \
FN_PORT=8083 \ FN_PORT=8083 \
FN_RUNNER_ADDRESSES=<RUNNER_ADDRESS_HERE>:9190 \ FN_RUNNER_ADDRESSES=<RUNNER_ADDRESS_HERE>:9190 \
CONTROL_PLANE_SO=noop.so \
./fnnpm ./fnnpm
``` ```

View File

@@ -8,7 +8,7 @@ import (
"sync" "sync"
model "github.com/fnproject/fn/poolmanager/grpc" model "github.com/fnproject/fn/poolmanager/grpc"
"github.com/fnproject/fn/poolmanager/server/cp" "github.com/fnproject/fn/poolmanager/server/controlplane"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@@ -32,12 +32,12 @@ type Predictor interface {
type capacityManager struct { type capacityManager struct {
ctx context.Context ctx context.Context
mx sync.RWMutex mx sync.RWMutex
cp cp.ControlPlane cp controlplane.ControlPlane
lbg map[string]LBGroup lbg map[string]LBGroup
predictorFactory func() Predictor predictorFactory func() Predictor
} }
func NewCapacityManager(ctx context.Context, cp cp.ControlPlane, opts ...func(*capacityManager) error) (CapacityManager, error) { func NewCapacityManager(ctx context.Context, cp controlplane.ControlPlane, opts ...func(*capacityManager) error) (CapacityManager, error) {
cm := &capacityManager{ cm := &capacityManager{
ctx: ctx, ctx: ctx,
cp: cp, cp: cp,
@@ -107,7 +107,7 @@ type lbGroup struct {
// Attributes for managing runner pool membership // Attributes for managing runner pool membership
run_mx sync.RWMutex run_mx sync.RWMutex
cp cp.ControlPlane cp controlplane.ControlPlane
current_capacity int64 // Of all active runners current_capacity int64 // Of all active runners
target_capacity int64 // All active runners plus any we've already asked for target_capacity int64 // All active runners plus any we've already asked for
@@ -140,7 +140,7 @@ type runner struct {
kill_after time.Time kill_after time.Time
} }
func newLBGroup(lbgid string, ctx context.Context, cp cp.ControlPlane, predictorFactory func() Predictor) LBGroup { func newLBGroup(lbgid string, ctx context.Context, cp controlplane.ControlPlane, predictorFactory func() Predictor) LBGroup {
lbg := &lbGroup{ lbg := &lbGroup{
ctx: ctx, ctx: ctx,
id: lbgid, id: lbgid,
@@ -296,20 +296,20 @@ func (lbg *lbGroup) target(ts time.Time, target int64) {
if desiredScale > lbg.target_capacity { if desiredScale > lbg.target_capacity {
// We still need additional capacity // We still need additional capacity
wanted := math.Min(math.Ceil(float64(target-lbg.target_capacity)/cp.CAPACITY_PER_RUNNER), LARGEST_REQUEST_AT_ONCE) wanted := math.Min(math.Ceil(float64(target-lbg.target_capacity)/controlplane.CapacityPerRunner), LARGEST_REQUEST_AT_ONCE)
asked_for, err := lbg.cp.ProvisionRunners(lbg.Id(), int(wanted)) // Send the request; they'll show up later asked_for, err := lbg.cp.ProvisionRunners(lbg.Id(), int(wanted)) // Send the request; they'll show up later
if err != nil { if err != nil {
// Some kind of error during attempt to scale up // Some kind of error during attempt to scale up
logrus.WithError(err).Error("Error occured during attempt to scale up") logrus.WithError(err).Error("Error occured during attempt to scale up")
return return
} }
lbg.target_capacity += int64(asked_for) * cp.CAPACITY_PER_RUNNER lbg.target_capacity += int64(asked_for) * controlplane.CapacityPerRunner
} }
} else if desiredScale <= lbg.current_capacity-cp.CAPACITY_PER_RUNNER { } else if desiredScale <= lbg.current_capacity-controlplane.CapacityPerRunner {
// Scale down. // Scale down.
// We pick a node to turn off and move it to the draining pool. // We pick a node to turn off and move it to the draining pool.
for target <= lbg.current_capacity-cp.CAPACITY_PER_RUNNER && len(lbg.active_runners) > 0 { for target <= lbg.current_capacity-controlplane.CapacityPerRunner && len(lbg.active_runners) > 0 {
// Begin with the one we added last. // Begin with the one we added last.
runner := lbg.active_runners[len(lbg.active_runners)-1] runner := lbg.active_runners[len(lbg.active_runners)-1]
logrus.Infof("Marking runner %v at %v for draindown", runner.id, runner.address) logrus.Infof("Marking runner %v at %v for draindown", runner.id, runner.address)

View File

@@ -0,0 +1,20 @@
/*
Interface between the Node Pool Manager and the Control Plane
*/
package controlplane
const CapacityPerRunner = 4096
type Runner struct {
Id string
Address string
// Other: certs etc here as managed and installed by CP
Capacity int64
}
type ControlPlane interface {
GetLBGRunners(lgbId string) ([]*Runner, error)
ProvisionRunners(lgbId string, n int) (int, error)
RemoveRunner(lbgId string, id string) error
}

View File

@@ -1,56 +1,47 @@
/* /**
Interface between the Node Pool Manager and the Control Plane * Dummy implementation for the controlplane that just adds delays
*/ */
package main
package cp
import ( import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"log" "log"
"os"
"strings"
"sync" "sync"
"time" "time"
"github.com/fnproject/fn/poolmanager/server/controlplane"
) )
type Runner struct { const (
Id string EnvFixedRunners = "FN_RUNNER_ADDRESSES"
Address string )
// Other: certs etc here as managed and installed by CP
Capacity int64
}
const CAPACITY_PER_RUNNER = 4096
type ControlPlane interface {
GetLBGRunners(lgbId string) ([]*Runner, error)
ProvisionRunners(lgbId string, n int) (int, error)
RemoveRunner(lbgId string, id string) error
}
type controlPlane struct {
mx sync.RWMutex
runners map[string][]*Runner
type noopControlPlane struct {
mx sync.RWMutex
runners map[string][]*controlplane.Runner
_fakeRunners []string _fakeRunners []string
} }
const REQUEST_DURATION = 5 * time.Second const REQUEST_DURATION = 5 * time.Second
func NewControlPlane(fakeRunners []string) ControlPlane { func init() {
cp := &controlPlane{ ControlPlane = noopControlPlane{
runners: make(map[string][]*Runner), runners: make(map[string][]*controlplane.Runner),
_fakeRunners: strings.Split(getEnv(EnvFixedRunners), ","),
_fakeRunners: fakeRunners,
} }
return cp
} }
func (cp *controlPlane) GetLBGRunners(lbgId string) ([]*Runner, error) { func main() {
}
func (cp *noopControlPlane) GetLBGRunners(lbgId string) ([]*controlplane.Runner, error) {
cp.mx.RLock() cp.mx.RLock()
defer cp.mx.RUnlock() defer cp.mx.RUnlock()
runners := make([]*Runner, 0) runners := make([]*controlplane.Runner, 0)
if hosts, ok := cp.runners[lbgId]; ok { if hosts, ok := cp.runners[lbgId]; ok {
for _, host := range hosts { for _, host := range hosts {
runners = append(runners, host) // In this CP implementation, a Runner is an immutable thing, so passing the pointer is fine runners = append(runners, host) // In this CP implementation, a Runner is an immutable thing, so passing the pointer is fine
@@ -60,7 +51,7 @@ func (cp *controlPlane) GetLBGRunners(lbgId string) ([]*Runner, error) {
return runners, nil return runners, nil
} }
func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) { func (cp *noopControlPlane) ProvisionRunners(lbgId string, n int) (int, error) {
// Simulate some small amount of time for the CP to service this request // Simulate some small amount of time for the CP to service this request
go func() { go func() {
time.Sleep(REQUEST_DURATION) time.Sleep(REQUEST_DURATION)
@@ -69,7 +60,7 @@ func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) {
runners, ok := cp.runners[lbgId] runners, ok := cp.runners[lbgId]
if !ok { if !ok {
runners = make([]*Runner, 0) runners = make([]*controlplane.Runner, 0)
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
runners = append(runners, cp.makeRunners(lbgId)...) runners = append(runners, cp.makeRunners(lbgId)...)
@@ -81,9 +72,9 @@ func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) {
} }
// Make runner(s) // Make runner(s)
func (cp *controlPlane) makeRunners(lbg string) []*Runner { func (cp *noopControlPlane) makeRunners(lbg string) []*controlplane.Runner {
var runners []*Runner var runners []*controlplane.Runner
for _, fakeRunner := range cp._fakeRunners { for _, fakeRunner := range cp._fakeRunners {
b := make([]byte, 16) b := make([]byte, 16)
@@ -94,10 +85,10 @@ func (cp *controlPlane) makeRunners(lbg string) []*Runner {
uuid := fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) uuid := fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
runners = append(runners, &Runner{ runners = append(runners, &controlplane.Runner{
Id: uuid, Id: uuid,
Address: fakeRunner, Address: fakeRunner,
Capacity: CAPACITY_PER_RUNNER, Capacity: controlplane.CapacityPerRunner,
}) })
} }
return runners return runners
@@ -106,12 +97,12 @@ func (cp *controlPlane) makeRunners(lbg string) []*Runner {
// Ditch a runner from the pool. // Ditch a runner from the pool.
// We do this immediately - no point modelling a wait here // We do this immediately - no point modelling a wait here
// note: if this actually took time, we'd want to detect this properly so as to not confuse the NPM // note: if this actually took time, we'd want to detect this properly so as to not confuse the NPM
func (cp *controlPlane) RemoveRunner(lbgId string, id string) error { func (cp *noopControlPlane) RemoveRunner(lbgId string, id string) error {
cp.mx.Lock() cp.mx.Lock()
defer cp.mx.Unlock() defer cp.mx.Unlock()
if runners, ok := cp.runners[lbgId]; ok { if runners, ok := cp.runners[lbgId]; ok {
newRunners := make([]*Runner, 0) newRunners := make([]*controlplane.Runner, 0)
for _, host := range runners { for _, host := range runners {
if host.Id != id { if host.Id != id {
newRunners = append(newRunners, host) newRunners = append(newRunners, host)
@@ -121,3 +112,13 @@ func (cp *controlPlane) RemoveRunner(lbgId string, id string) error {
} }
return nil return nil
} }
func getEnv(key string) string {
value, ok := os.LookupEnv(key)
if !ok {
log.Panicf("Missing config key: %v", key)
}
return value
}
var ControlPlane noopControlPlane

View File

@@ -1,4 +1,4 @@
package cp package controlplane
import ( import (
"bytes" "bytes"

View File

@@ -9,7 +9,7 @@ import (
"github.com/fnproject/fn/poolmanager" "github.com/fnproject/fn/poolmanager"
model "github.com/fnproject/fn/poolmanager/grpc" model "github.com/fnproject/fn/poolmanager/grpc"
"github.com/fnproject/fn/poolmanager/server/cp" "github.com/fnproject/fn/poolmanager/server/controlplane"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
@@ -19,7 +19,7 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "plugin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@@ -30,7 +30,7 @@ type npmService struct {
capMan poolmanager.CapacityManager capMan poolmanager.CapacityManager
} }
func newNPMService(ctx context.Context, cp cp.ControlPlane) *npmService { func newNPMService(ctx context.Context, cp controlplane.ControlPlane) *npmService {
cm, err := poolmanager.NewCapacityManager(ctx, cp) cm, err := poolmanager.NewCapacityManager(ctx, cp)
if err != nil { if err != nil {
logrus.Panic("Cannot construct capacity manager") logrus.Panic("Cannot construct capacity manager")
@@ -66,11 +66,11 @@ func (npm *npmService) GetLBGroup(ctx context.Context, gid *model.LBGroupId) (*m
const ( const (
// Certificates to communicate with other FN nodes // Certificates to communicate with other FN nodes
EnvCert = "FN_NODE_CERT" EnvCert = "FN_NODE_CERT"
EnvCertKey = "FN_NODE_CERT_KEY" EnvCertKey = "FN_NODE_CERT_KEY"
EnvCertAuth = "FN_NODE_CERT_AUTHORITY" EnvCertAuth = "FN_NODE_CERT_AUTHORITY"
EnvPort = "FN_PORT" EnvPort = "FN_PORT"
EnvFixedRunners = "FN_RUNNER_ADDRESSES" ControlPlaneSO = "CONTROL_PLANE_SO"
) )
func getAndCheckFile(envVar string) (string, error) { func getAndCheckFile(envVar string) (string, error) {
@@ -116,6 +116,27 @@ func createGrpcCreds(cert string, key string, ca string) (grpc.ServerOption, err
return grpc.Creds(creds), nil return grpc.Creds(creds), nil
} }
func newPluggableControlPlane() controlplane.ControlPlane {
pluginLocation := getEnv(ControlPlaneSO)
controlPlanePlugin, err := plugin.Open(pluginLocation)
if err != nil {
panic(err)
}
cpSymbol, err := controlPlanePlugin.Lookup("ControlPlane")
if err != nil {
panic(err)
}
cp := cpSymbol.(controlplane.ControlPlane)
logrus.Infof("Started controlplane : %s", cp)
return cp
}
func main() { func main() {
level, err := logrus.ParseLevel(getEnv("FN_LOG_LEVEL")) level, err := logrus.ParseLevel(getEnv("FN_LOG_LEVEL"))
if err != nil { if err != nil {
@@ -146,8 +167,7 @@ func main() {
logrus.Info("Starting Node Pool Manager gRPC service") logrus.Info("Starting Node Pool Manager gRPC service")
fakeRunners := strings.Split(getEnv(EnvFixedRunners), ",") svc := newNPMService(context.Background(), newPluggableControlPlane())
svc := newNPMService(context.Background(), cp.NewControlPlane(fakeRunners))
model.RegisterNodePoolScalerServer(gRPCServer, svc) model.RegisterNodePoolScalerServer(gRPCServer, svc)
model.RegisterRunnerManagerServer(gRPCServer, svc) model.RegisterRunnerManagerServer(gRPCServer, svc)