From 362e910d9d2b14ff261e8da19ce99e8cb42f1f87 Mon Sep 17 00:00:00 2001 From: Dario Domizioli Date: Fri, 16 Mar 2018 11:50:44 +0000 Subject: [PATCH] Make dataplane system test behave deterministically (#849) Make dataplane system test deterministic by injecting capacity constraints --- api/agent/pure_runner.go | 52 ++++++++++++++++++++--------- api/server/server.go | 2 +- test/fn-system-tests/system_test.go | 37 +++++++++++++++++++- 3 files changed, 74 insertions(+), 17 deletions(-) diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 3e778c966..4da91ebb5 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -194,6 +194,17 @@ func (ch *callHandle) cancel(ctx context.Context, err error) { } } +type CapacityGate interface { + // CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is + // guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units + // of capacity have been committed. + CheckAndReserveCapacity(units uint64) error + + // ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under + // zero; implementations are free to panic in that case. + ReleaseCapacity(units uint64) +} + type pureRunnerCapacityManager struct { totalCapacityUnits uint64 committedCapacityUnits uint64 @@ -202,24 +213,24 @@ type pureRunnerCapacityManager struct { type capacityDeallocator func() -func newPureRunnerCapacityManager(units uint64) pureRunnerCapacityManager { - return pureRunnerCapacityManager{ +func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager { + return &pureRunnerCapacityManager{ totalCapacityUnits: units, committedCapacityUnits: 0, } } -func (prcm *pureRunnerCapacityManager) checkAndReserveCapacity(units uint64) error { +func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error { prcm.mtx.Lock() defer prcm.mtx.Unlock() - if prcm.committedCapacityUnits+units < prcm.totalCapacityUnits { + if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units { prcm.committedCapacityUnits = prcm.committedCapacityUnits + units return nil } return models.ErrCallTimeoutServerBusy } -func (prcm *pureRunnerCapacityManager) releaseCapacity(units uint64) { +func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) { prcm.mtx.Lock() defer prcm.mtx.Unlock() if units <= prcm.committedCapacityUnits { @@ -236,7 +247,7 @@ type pureRunner struct { listen string a Agent inflight int32 - capacity pureRunnerCapacityManager + capacity CapacityGate } func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) { @@ -368,7 +379,7 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta } // Capacity check first - err = pr.capacity.checkAndReserveCapacity(c.Memory) + err = pr.capacity.CheckAndReserveCapacity(c.Memory) if err != nil { return func() {}, err } @@ -379,13 +390,13 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta inR, inW := io.Pipe() agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w)) if err != nil { - return func() { pr.capacity.releaseCapacity(c.Memory) }, err + return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err } state.c = agent_call.(*call) state.input = inW state.allocatedTime = strfmt.DateTime(time.Now()) - return func() { pr.capacity.releaseCapacity(c.Memory) }, nil + return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil } // Handles a client engagement @@ -535,7 +546,15 @@ func (pr *pureRunner) Start() error { return err } -func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) { +func UnsecuredPureRunner(cancel context.CancelFunc, addr string, a Agent) (*pureRunner, error) { + return NewPureRunner(cancel, addr, a, "", "", "", nil) +} + +func DefaultPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) { + return NewPureRunner(cancel, addr, a, cert, key, ca, nil) +} + +func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string, gate CapacityGate) (*pureRunner, error) { var pr *pureRunner var err error if cert != "" && key != "" && ca != "" { @@ -544,13 +563,13 @@ func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!") return nil, err } - pr, err = createPureRunner(addr, a, c) + pr, err = createPureRunner(addr, a, c, gate) if err != nil { return nil, err } } else { logrus.Warn("Running pure runner in insecure mode!") - pr, err = createPureRunner(addr, a, nil) + pr, err = createPureRunner(addr, a, nil, gate) if err != nil { return nil, err } @@ -592,19 +611,22 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials }), nil } -func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) { +func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) { var srv *grpc.Server if creds != nil { srv = grpc.NewServer(grpc.Creds(creds)) } else { srv = grpc.NewServer() } - memUnits := getAvailableMemoryUnits() + if gate == nil { + memUnits := getAvailableMemoryUnits() + gate = newPureRunnerCapacityManager(memUnits) + } pr := &pureRunner{ gRPCServer: srv, listen: addr, a: a, - capacity: newPureRunnerCapacityManager(memUnits), + capacity: gate, } runner.RegisterRunnerProtocolServer(srv, pr) diff --git a/api/server/server.go b/api/server/server.go index 435d8f87f..8e05ca260 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -409,7 +409,7 @@ func WithAgentFromEnv() ServerOption { grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort) delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority) + prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority) if err != nil { return err } diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 6b03170dd..f8c4b1611 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -8,6 +8,7 @@ import ( "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/agent/hybrid" agent_grpc "github.com/fnproject/fn/api/agent/nodepool/grpc" + "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/server" "github.com/sirupsen/logrus" @@ -17,6 +18,7 @@ import ( "os" "strconv" "strings" + "sync" "testing" "time" ) @@ -154,6 +156,39 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { return server.New(ctx, opts...), nil } +type testCapacityGate struct { + runnerNumber int + committedCapacityUnits uint64 + mtx sync.Mutex +} + +const ( + FixedTestCapacityUnitsPerRunner = 512 +) + +func (tcg *testCapacityGate) CheckAndReserveCapacity(units uint64) error { + tcg.mtx.Lock() + defer tcg.mtx.Unlock() + if tcg.committedCapacityUnits+units <= FixedTestCapacityUnitsPerRunner { + logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is committing capacity") + tcg.committedCapacityUnits = tcg.committedCapacityUnits + units + return nil + } + logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is out of capacity") + return models.ErrCallTimeoutServerBusy +} + +func (tcg *testCapacityGate) ReleaseCapacity(units uint64) { + tcg.mtx.Lock() + defer tcg.mtx.Unlock() + if units <= tcg.committedCapacityUnits { + logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is releasing capacity") + tcg.committedCapacityUnits = tcg.committedCapacityUnits - units + return + } + panic("Fatal error in test capacity calculation, getting to sub-zero capacity") +} + func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) { nodeType := server.ServerTypePureRunner opts := make([]server.ServerOption, 0) @@ -174,7 +209,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, "", "", "") + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, "", "", "", &testCapacityGate{runnerNumber: nodeNum}) if err != nil { return nil, err }