From a787ccac3610ebe7544a5d453a91daa23b1db7df Mon Sep 17 00:00:00 2001 From: Matt Stephenson Date: Mon, 12 Mar 2018 12:50:55 -0700 Subject: [PATCH] Refactor controlplane into a go plugin (#833) * Refactor controlplane into a go plugin * Move vbox to controlplane package --- docs/operating/multitenant.md | 4 +- poolmanager/manager.go | 18 ++--- .../server/{cp => controlplane}/Readme.md | 0 .../server/{cp => controlplane}/Vagrantfile | 0 .../server/controlplane/controlplane.go | 20 +++++ .../{cp/cp.go => controlplane/plugin/noop.go} | 81 ++++++++++--------- .../server/{cp => controlplane}/vbox.go | 2 +- poolmanager/server/main.go | 40 ++++++--- 8 files changed, 104 insertions(+), 61 deletions(-) rename poolmanager/server/{cp => controlplane}/Readme.md (100%) rename poolmanager/server/{cp => controlplane}/Vagrantfile (100%) create mode 100644 poolmanager/server/controlplane/controlplane.go rename poolmanager/server/{cp/cp.go => controlplane/plugin/noop.go} (55%) rename poolmanager/server/{cp => controlplane}/vbox.go (99%) diff --git a/docs/operating/multitenant.md b/docs/operating/multitenant.md index 3a26c7987..163e3f1a2 100644 --- a/docs/operating/multitenant.md +++ b/docs/operating/multitenant.md @@ -2,7 +2,7 @@ ## Motivation -By running Fn in multitenant mode, you can define independent pools of compute resources available to functions in the platform. By associating a function with a particular _load balancing group_, its invocations are guaranteed to execute on the compute resources assigned to that specific group. The pluggable _node pool manager_ abstraction provides a mechanism to scale compute resources dynamically, based on capacity requirements advertised by the load-balancing layer. Together with load balancer groups, it allows you to implement independent capacity and scaling policies for different sets of users or tenants. +By running Fn in multitenant mode, you can define independent pools of compute resources available to functions in the platform. By associating a function with a particular _load balancing group_, its invocations are guaranteed to execute on the compute resources assigned to that specific group. The pluggable _node pool manager_ abstraction provides a mechanism to scale compute resources dynamically, based on capacity requirements advertised by the load-balancing layer. Together with load balancer groups, it allows you to implement independent capacity and scaling policies for different sets of users or tenants. ## Create certificates @@ -56,6 +56,7 @@ but the GRPC port is 9190. Grap the runner address and put in as value for the `FN_RUNNER_ADDRESSES` env variable. ```bash +go build -o noop.so poolmanager/server/controlplane/plugin/noop.go go build -o fnnpm poolmanager/server/main.go FN_LOG_LEVEL=DEBUG \ @@ -64,6 +65,7 @@ FN_NODE_CERT_KEY=key.pem \ FN_NODE_CERT_AUTHORITY=cert.pem \ FN_PORT=8083 \ FN_RUNNER_ADDRESSES=:9190 \ +CONTROL_PLANE_SO=noop.so \ ./fnnpm ``` diff --git a/poolmanager/manager.go b/poolmanager/manager.go index 5478a2093..84989c779 100644 --- a/poolmanager/manager.go +++ b/poolmanager/manager.go @@ -8,7 +8,7 @@ import ( "sync" model "github.com/fnproject/fn/poolmanager/grpc" - "github.com/fnproject/fn/poolmanager/server/cp" + "github.com/fnproject/fn/poolmanager/server/controlplane" "github.com/sirupsen/logrus" ) @@ -32,12 +32,12 @@ type Predictor interface { type capacityManager struct { ctx context.Context mx sync.RWMutex - cp cp.ControlPlane + cp controlplane.ControlPlane lbg map[string]LBGroup predictorFactory func() Predictor } -func NewCapacityManager(ctx context.Context, cp cp.ControlPlane, opts ...func(*capacityManager) error) (CapacityManager, error) { +func NewCapacityManager(ctx context.Context, cp controlplane.ControlPlane, opts ...func(*capacityManager) error) (CapacityManager, error) { cm := &capacityManager{ ctx: ctx, cp: cp, @@ -107,7 +107,7 @@ type lbGroup struct { // Attributes for managing runner pool membership run_mx sync.RWMutex - cp cp.ControlPlane + cp controlplane.ControlPlane current_capacity int64 // Of all active runners target_capacity int64 // All active runners plus any we've already asked for @@ -140,7 +140,7 @@ type runner struct { kill_after time.Time } -func newLBGroup(lbgid string, ctx context.Context, cp cp.ControlPlane, predictorFactory func() Predictor) LBGroup { +func newLBGroup(lbgid string, ctx context.Context, cp controlplane.ControlPlane, predictorFactory func() Predictor) LBGroup { lbg := &lbGroup{ ctx: ctx, id: lbgid, @@ -296,20 +296,20 @@ func (lbg *lbGroup) target(ts time.Time, target int64) { if desiredScale > lbg.target_capacity { // We still need additional capacity - wanted := math.Min(math.Ceil(float64(target-lbg.target_capacity)/cp.CAPACITY_PER_RUNNER), LARGEST_REQUEST_AT_ONCE) + wanted := math.Min(math.Ceil(float64(target-lbg.target_capacity)/controlplane.CapacityPerRunner), LARGEST_REQUEST_AT_ONCE) asked_for, err := lbg.cp.ProvisionRunners(lbg.Id(), int(wanted)) // Send the request; they'll show up later if err != nil { // Some kind of error during attempt to scale up logrus.WithError(err).Error("Error occured during attempt to scale up") return } - lbg.target_capacity += int64(asked_for) * cp.CAPACITY_PER_RUNNER + lbg.target_capacity += int64(asked_for) * controlplane.CapacityPerRunner } - } else if desiredScale <= lbg.current_capacity-cp.CAPACITY_PER_RUNNER { + } else if desiredScale <= lbg.current_capacity-controlplane.CapacityPerRunner { // Scale down. // We pick a node to turn off and move it to the draining pool. - for target <= lbg.current_capacity-cp.CAPACITY_PER_RUNNER && len(lbg.active_runners) > 0 { + for target <= lbg.current_capacity-controlplane.CapacityPerRunner && len(lbg.active_runners) > 0 { // Begin with the one we added last. runner := lbg.active_runners[len(lbg.active_runners)-1] logrus.Infof("Marking runner %v at %v for draindown", runner.id, runner.address) diff --git a/poolmanager/server/cp/Readme.md b/poolmanager/server/controlplane/Readme.md similarity index 100% rename from poolmanager/server/cp/Readme.md rename to poolmanager/server/controlplane/Readme.md diff --git a/poolmanager/server/cp/Vagrantfile b/poolmanager/server/controlplane/Vagrantfile similarity index 100% rename from poolmanager/server/cp/Vagrantfile rename to poolmanager/server/controlplane/Vagrantfile diff --git a/poolmanager/server/controlplane/controlplane.go b/poolmanager/server/controlplane/controlplane.go new file mode 100644 index 000000000..f3f190a10 --- /dev/null +++ b/poolmanager/server/controlplane/controlplane.go @@ -0,0 +1,20 @@ +/* + Interface between the Node Pool Manager and the Control Plane +*/ + +package controlplane + +const CapacityPerRunner = 4096 + +type Runner struct { + Id string + Address string + // Other: certs etc here as managed and installed by CP + Capacity int64 +} + +type ControlPlane interface { + GetLBGRunners(lgbId string) ([]*Runner, error) + ProvisionRunners(lgbId string, n int) (int, error) + RemoveRunner(lbgId string, id string) error +} diff --git a/poolmanager/server/cp/cp.go b/poolmanager/server/controlplane/plugin/noop.go similarity index 55% rename from poolmanager/server/cp/cp.go rename to poolmanager/server/controlplane/plugin/noop.go index b0c32fff9..92e47cbc7 100644 --- a/poolmanager/server/cp/cp.go +++ b/poolmanager/server/controlplane/plugin/noop.go @@ -1,56 +1,47 @@ -/* - Interface between the Node Pool Manager and the Control Plane -*/ - -package cp +/** + * Dummy implementation for the controlplane that just adds delays + */ +package main import ( "crypto/rand" "fmt" "log" + "os" + "strings" "sync" "time" + + "github.com/fnproject/fn/poolmanager/server/controlplane" ) -type Runner struct { - Id string - Address string - // Other: certs etc here as managed and installed by CP - Capacity int64 -} - -const CAPACITY_PER_RUNNER = 4096 - -type ControlPlane interface { - GetLBGRunners(lgbId string) ([]*Runner, error) - ProvisionRunners(lgbId string, n int) (int, error) - RemoveRunner(lbgId string, id string) error -} - -type controlPlane struct { - mx sync.RWMutex - - runners map[string][]*Runner +const ( + EnvFixedRunners = "FN_RUNNER_ADDRESSES" +) +type noopControlPlane struct { + mx sync.RWMutex + runners map[string][]*controlplane.Runner _fakeRunners []string } const REQUEST_DURATION = 5 * time.Second -func NewControlPlane(fakeRunners []string) ControlPlane { - cp := &controlPlane{ - runners: make(map[string][]*Runner), - - _fakeRunners: fakeRunners, +func init() { + ControlPlane = noopControlPlane{ + runners: make(map[string][]*controlplane.Runner), + _fakeRunners: strings.Split(getEnv(EnvFixedRunners), ","), } - return cp } -func (cp *controlPlane) GetLBGRunners(lbgId string) ([]*Runner, error) { +func main() { +} + +func (cp *noopControlPlane) GetLBGRunners(lbgId string) ([]*controlplane.Runner, error) { cp.mx.RLock() defer cp.mx.RUnlock() - runners := make([]*Runner, 0) + runners := make([]*controlplane.Runner, 0) if hosts, ok := cp.runners[lbgId]; ok { for _, host := range hosts { runners = append(runners, host) // In this CP implementation, a Runner is an immutable thing, so passing the pointer is fine @@ -60,7 +51,7 @@ func (cp *controlPlane) GetLBGRunners(lbgId string) ([]*Runner, error) { return runners, nil } -func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) { +func (cp *noopControlPlane) ProvisionRunners(lbgId string, n int) (int, error) { // Simulate some small amount of time for the CP to service this request go func() { time.Sleep(REQUEST_DURATION) @@ -69,7 +60,7 @@ func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) { runners, ok := cp.runners[lbgId] if !ok { - runners = make([]*Runner, 0) + runners = make([]*controlplane.Runner, 0) } for i := 0; i < n; i++ { runners = append(runners, cp.makeRunners(lbgId)...) @@ -81,9 +72,9 @@ func (cp *controlPlane) ProvisionRunners(lbgId string, n int) (int, error) { } // Make runner(s) -func (cp *controlPlane) makeRunners(lbg string) []*Runner { +func (cp *noopControlPlane) makeRunners(lbg string) []*controlplane.Runner { - var runners []*Runner + var runners []*controlplane.Runner for _, fakeRunner := range cp._fakeRunners { b := make([]byte, 16) @@ -94,10 +85,10 @@ func (cp *controlPlane) makeRunners(lbg string) []*Runner { uuid := fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) - runners = append(runners, &Runner{ + runners = append(runners, &controlplane.Runner{ Id: uuid, Address: fakeRunner, - Capacity: CAPACITY_PER_RUNNER, + Capacity: controlplane.CapacityPerRunner, }) } return runners @@ -106,12 +97,12 @@ func (cp *controlPlane) makeRunners(lbg string) []*Runner { // Ditch a runner from the pool. // We do this immediately - no point modelling a wait here // note: if this actually took time, we'd want to detect this properly so as to not confuse the NPM -func (cp *controlPlane) RemoveRunner(lbgId string, id string) error { +func (cp *noopControlPlane) RemoveRunner(lbgId string, id string) error { cp.mx.Lock() defer cp.mx.Unlock() if runners, ok := cp.runners[lbgId]; ok { - newRunners := make([]*Runner, 0) + newRunners := make([]*controlplane.Runner, 0) for _, host := range runners { if host.Id != id { newRunners = append(newRunners, host) @@ -121,3 +112,13 @@ func (cp *controlPlane) RemoveRunner(lbgId string, id string) error { } return nil } + +func getEnv(key string) string { + value, ok := os.LookupEnv(key) + if !ok { + log.Panicf("Missing config key: %v", key) + } + return value +} + +var ControlPlane noopControlPlane diff --git a/poolmanager/server/cp/vbox.go b/poolmanager/server/controlplane/vbox.go similarity index 99% rename from poolmanager/server/cp/vbox.go rename to poolmanager/server/controlplane/vbox.go index e93fece0b..2dcd8a10a 100644 --- a/poolmanager/server/cp/vbox.go +++ b/poolmanager/server/controlplane/vbox.go @@ -1,4 +1,4 @@ -package cp +package controlplane import ( "bytes" diff --git a/poolmanager/server/main.go b/poolmanager/server/main.go index 7dac3e847..06fc7a024 100644 --- a/poolmanager/server/main.go +++ b/poolmanager/server/main.go @@ -9,7 +9,7 @@ import ( "github.com/fnproject/fn/poolmanager" model "github.com/fnproject/fn/poolmanager/grpc" - "github.com/fnproject/fn/poolmanager/server/cp" + "github.com/fnproject/fn/poolmanager/server/controlplane" "crypto/tls" "crypto/x509" @@ -19,7 +19,7 @@ import ( "log" "os" "path/filepath" - "strings" + "plugin" "github.com/sirupsen/logrus" "google.golang.org/grpc/credentials" @@ -30,7 +30,7 @@ type npmService struct { capMan poolmanager.CapacityManager } -func newNPMService(ctx context.Context, cp cp.ControlPlane) *npmService { +func newNPMService(ctx context.Context, cp controlplane.ControlPlane) *npmService { cm, err := poolmanager.NewCapacityManager(ctx, cp) if err != nil { logrus.Panic("Cannot construct capacity manager") @@ -66,11 +66,11 @@ func (npm *npmService) GetLBGroup(ctx context.Context, gid *model.LBGroupId) (*m const ( // Certificates to communicate with other FN nodes - EnvCert = "FN_NODE_CERT" - EnvCertKey = "FN_NODE_CERT_KEY" - EnvCertAuth = "FN_NODE_CERT_AUTHORITY" - EnvPort = "FN_PORT" - EnvFixedRunners = "FN_RUNNER_ADDRESSES" + EnvCert = "FN_NODE_CERT" + EnvCertKey = "FN_NODE_CERT_KEY" + EnvCertAuth = "FN_NODE_CERT_AUTHORITY" + EnvPort = "FN_PORT" + ControlPlaneSO = "CONTROL_PLANE_SO" ) func getAndCheckFile(envVar string) (string, error) { @@ -116,6 +116,27 @@ func createGrpcCreds(cert string, key string, ca string) (grpc.ServerOption, err return grpc.Creds(creds), nil } +func newPluggableControlPlane() controlplane.ControlPlane { + pluginLocation := getEnv(ControlPlaneSO) + controlPlanePlugin, err := plugin.Open(pluginLocation) + + if err != nil { + panic(err) + } + + cpSymbol, err := controlPlanePlugin.Lookup("ControlPlane") + + if err != nil { + panic(err) + } + + cp := cpSymbol.(controlplane.ControlPlane) + + logrus.Infof("Started controlplane : %s", cp) + + return cp +} + func main() { level, err := logrus.ParseLevel(getEnv("FN_LOG_LEVEL")) if err != nil { @@ -146,8 +167,7 @@ func main() { logrus.Info("Starting Node Pool Manager gRPC service") - fakeRunners := strings.Split(getEnv(EnvFixedRunners), ",") - svc := newNPMService(context.Background(), cp.NewControlPlane(fakeRunners)) + svc := newNPMService(context.Background(), newPluggableControlPlane()) model.RegisterNodePoolScalerServer(gRPCServer, svc) model.RegisterRunnerManagerServer(gRPCServer, svc)