From d23f4e7b39df60121153cf6607915ca494a277dc Mon Sep 17 00:00:00 2001 From: Dario Domizioli Date: Tue, 17 Apr 2018 11:46:17 +0100 Subject: [PATCH] Add a way to tweak node capacities in system tests to enable more tests (#869) * Add a way to tweak node capacities in system tests to enable more tests * Add test for saturated system --- test/fn-system-tests/exec_test.go | 49 ++++++++++++++++++++-- test/fn-system-tests/system_test.go | 65 +++++++++++++++++++++++------ 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/test/fn-system-tests/exec_test.go b/test/fn-system-tests/exec_test.go index c038caad9..b8d967a6f 100644 --- a/test/fn-system-tests/exec_test.go +++ b/test/fn-system-tests/exec_test.go @@ -9,8 +9,9 @@ import ( "strings" "testing" + apimodels "github.com/fnproject/fn/api/models" apiutils "github.com/fnproject/fn/test/fn-api-tests" - "github.com/fnproject/fn_go/models" + sdkmodels "github.com/fnproject/fn_go/models" ) func LB() (string, error) { @@ -25,7 +26,7 @@ func LB() (string, error) { func TestCanExecuteFunction(t *testing.T) { s := apiutils.SetupHarness() - s.GivenAppExists(t, &models.App{Name: s.AppName}) + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) defer s.Cleanup() rt := s.BasicRoute() @@ -56,9 +57,12 @@ func TestCanExecuteFunction(t *testing.T) { } func TestBasicConcurrentExecution(t *testing.T) { + SystemTweaker().ChangeNodeCapacities(512) + defer SystemTweaker().RestoreInitialNodeCapacities() + s := apiutils.SetupHarness() - s.GivenAppExists(t, &models.App{Name: s.AppName}) + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) defer s.Cleanup() rt := s.BasicRoute() @@ -103,3 +107,42 @@ func TestBasicConcurrentExecution(t *testing.T) { } } + +func TestSaturatedSystem(t *testing.T) { + // Set the capacity to 0 so we always look out of capacity. + SystemTweaker().ChangeNodeCapacities(0) + defer SystemTweaker().RestoreInitialNodeCapacities() + + s := apiutils.SetupHarness() + + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) + defer s.Cleanup() + + rt := s.BasicRoute() + rt.Type = "sync" + + s.GivenRouteExists(t, s.AppName, rt) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) + + content := &bytes.Buffer{} + output := &bytes.Buffer{} + _, err = apiutils.CallFN(u.String(), content, output, "POST", []string{}) + if err != nil { + if err != apimodels.ErrCallTimeoutServerBusy { + t.Errorf("Got unexpected error: %v", err) + } + } + expectedOutput := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n" + if !strings.Contains(expectedOutput, output.String()) { + t.Errorf("Assertion error.\n\tExpected: %v\n\tActual: %v", expectedOutput, output.String()) + } +} diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index b041ae3db..1102e3266 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -52,19 +52,20 @@ func SetUpSystem() error { } logrus.Info("Created LB node") - pr0, err := SetUpPureRunnerNode(ctx, 0) + pr0, nc0, err := SetUpPureRunnerNode(ctx, 0) if err != nil { return err } - pr1, err := SetUpPureRunnerNode(ctx, 1) + pr1, nc1, err := SetUpPureRunnerNode(ctx, 1) if err != nil { return err } - pr2, err := SetUpPureRunnerNode(ctx, 2) + pr2, nc2, err := SetUpPureRunnerNode(ctx, 2) if err != nil { return err } logrus.Info("Created Pure Runner nodes") + internalSystemTweaker.nodeCaps = []*testCapacityGate{nc0, nc1, nc2} go func() { api.Start(ctx) }() logrus.Info("Started API node") @@ -158,22 +159,31 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { type testCapacityGate struct { runnerNumber int committedCapacityUnits uint64 + maxCapacityUnits uint64 mtx sync.Mutex } const ( - FixedTestCapacityUnitsPerRunner = 512 + InitialTestCapacityUnitsPerRunner = 1024 ) +func NewTestCapacityGate(nodeNum int, capacity uint64) *testCapacityGate { + return &testCapacityGate{ + runnerNumber: nodeNum, + maxCapacityUnits: capacity, + committedCapacityUnits: 0, + } +} + func (tcg *testCapacityGate) CheckAndReserveCapacity(units uint64) error { tcg.mtx.Lock() defer tcg.mtx.Unlock() - if tcg.committedCapacityUnits+units <= FixedTestCapacityUnitsPerRunner { + if tcg.committedCapacityUnits+units <= tcg.maxCapacityUnits { logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is committing capacity") tcg.committedCapacityUnits = tcg.committedCapacityUnits + units return nil } - logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is out of capacity") + logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Debug("Runner is out of capacity") return models.ErrCallTimeoutServerBusy } @@ -188,7 +198,36 @@ func (tcg *testCapacityGate) ReleaseCapacity(units uint64) { panic("Fatal error in test capacity calculation, getting to sub-zero capacity") } -func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) { +func (tcg *testCapacityGate) ChangeMaxCapacity(newCapacity uint64) { + tcg.mtx.Lock() + defer tcg.mtx.Unlock() + logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("oldCapacity", tcg.maxCapacityUnits).WithField("newCapacity", newCapacity).Info("Runner is changing max capacity") + tcg.maxCapacityUnits = newCapacity +} + +type systemTweaker struct { + nodeCaps []*testCapacityGate +} + +var internalSystemTweaker systemTweaker + +func SystemTweaker() *systemTweaker { + return &internalSystemTweaker +} + +func (twk *systemTweaker) ChangeNodeCapacities(newCapacity uint64) { + for _, nc := range twk.nodeCaps { + nc.ChangeMaxCapacity(newCapacity) + } +} + +func (twk *systemTweaker) RestoreInitialNodeCapacities() { + for _, nc := range twk.nodeCaps { + nc.ChangeMaxCapacity(InitialTestCapacityUnitsPerRunner) + } +} + +func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, *testCapacityGate, error) { nodeType := server.ServerTypePureRunner opts := make([]server.ServerOption, 0) opts = append(opts, server.WithWebPort(8082+nodeNum)) @@ -203,17 +242,18 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro ds, err := hybrid.NewNopDataStore() if err != nil { - return nil, err + return nil, nil, err } grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", &testCapacityGate{runnerNumber: nodeNum}) + capacityGate := NewTestCapacityGate(nodeNum, InitialTestCapacityUnitsPerRunner) + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", capacityGate) if err != nil { - return nil, err + return nil, nil, err } opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx)) - return server.New(ctx, opts...), nil + return server.New(ctx, opts...), capacityGate, nil } func pwd() string { @@ -269,7 +309,8 @@ func whoAmI() net.IP { } func TestCanInstantiateSystem(t *testing.T) { - + SystemTweaker().ChangeNodeCapacities(128) + defer SystemTweaker().RestoreInitialNodeCapacities() } func TestMain(m *testing.M) {