Add a way to tweak node capacities in system tests to enable more tests (#869)

* Add a way to tweak node capacities in system tests to enable more tests
* Add test for saturated system
This commit is contained in:
Dario Domizioli
2018-04-17 11:46:17 +01:00
committed by GitHub
parent eb6630f22c
commit d23f4e7b39
2 changed files with 99 additions and 15 deletions

View File

@@ -9,8 +9,9 @@ import (
"strings"
"testing"
apimodels "github.com/fnproject/fn/api/models"
apiutils "github.com/fnproject/fn/test/fn-api-tests"
"github.com/fnproject/fn_go/models"
sdkmodels "github.com/fnproject/fn_go/models"
)
func LB() (string, error) {
@@ -25,7 +26,7 @@ func LB() (string, error) {
func TestCanExecuteFunction(t *testing.T) {
s := apiutils.SetupHarness()
s.GivenAppExists(t, &models.App{Name: s.AppName})
s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName})
defer s.Cleanup()
rt := s.BasicRoute()
@@ -56,9 +57,12 @@ func TestCanExecuteFunction(t *testing.T) {
}
func TestBasicConcurrentExecution(t *testing.T) {
SystemTweaker().ChangeNodeCapacities(512)
defer SystemTweaker().RestoreInitialNodeCapacities()
s := apiutils.SetupHarness()
s.GivenAppExists(t, &models.App{Name: s.AppName})
s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName})
defer s.Cleanup()
rt := s.BasicRoute()
@@ -103,3 +107,42 @@ func TestBasicConcurrentExecution(t *testing.T) {
}
}
func TestSaturatedSystem(t *testing.T) {
// Set the capacity to 0 so we always look out of capacity.
SystemTweaker().ChangeNodeCapacities(0)
defer SystemTweaker().RestoreInitialNodeCapacities()
s := apiutils.SetupHarness()
s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName})
defer s.Cleanup()
rt := s.BasicRoute()
rt.Type = "sync"
s.GivenRouteExists(t, s.AppName, rt)
lb, err := LB()
if err != nil {
t.Fatalf("Got unexpected error: %v", err)
}
u := url.URL{
Scheme: "http",
Host: lb,
}
u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath)
content := &bytes.Buffer{}
output := &bytes.Buffer{}
_, err = apiutils.CallFN(u.String(), content, output, "POST", []string{})
if err != nil {
if err != apimodels.ErrCallTimeoutServerBusy {
t.Errorf("Got unexpected error: %v", err)
}
}
expectedOutput := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n"
if !strings.Contains(expectedOutput, output.String()) {
t.Errorf("Assertion error.\n\tExpected: %v\n\tActual: %v", expectedOutput, output.String())
}
}

View File

@@ -52,19 +52,20 @@ func SetUpSystem() error {
}
logrus.Info("Created LB node")
pr0, err := SetUpPureRunnerNode(ctx, 0)
pr0, nc0, err := SetUpPureRunnerNode(ctx, 0)
if err != nil {
return err
}
pr1, err := SetUpPureRunnerNode(ctx, 1)
pr1, nc1, err := SetUpPureRunnerNode(ctx, 1)
if err != nil {
return err
}
pr2, err := SetUpPureRunnerNode(ctx, 2)
pr2, nc2, err := SetUpPureRunnerNode(ctx, 2)
if err != nil {
return err
}
logrus.Info("Created Pure Runner nodes")
internalSystemTweaker.nodeCaps = []*testCapacityGate{nc0, nc1, nc2}
go func() { api.Start(ctx) }()
logrus.Info("Started API node")
@@ -158,22 +159,31 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) {
type testCapacityGate struct {
runnerNumber int
committedCapacityUnits uint64
maxCapacityUnits uint64
mtx sync.Mutex
}
const (
FixedTestCapacityUnitsPerRunner = 512
InitialTestCapacityUnitsPerRunner = 1024
)
func NewTestCapacityGate(nodeNum int, capacity uint64) *testCapacityGate {
return &testCapacityGate{
runnerNumber: nodeNum,
maxCapacityUnits: capacity,
committedCapacityUnits: 0,
}
}
func (tcg *testCapacityGate) CheckAndReserveCapacity(units uint64) error {
tcg.mtx.Lock()
defer tcg.mtx.Unlock()
if tcg.committedCapacityUnits+units <= FixedTestCapacityUnitsPerRunner {
if tcg.committedCapacityUnits+units <= tcg.maxCapacityUnits {
logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is committing capacity")
tcg.committedCapacityUnits = tcg.committedCapacityUnits + units
return nil
}
logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is out of capacity")
logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Debug("Runner is out of capacity")
return models.ErrCallTimeoutServerBusy
}
@@ -188,7 +198,36 @@ func (tcg *testCapacityGate) ReleaseCapacity(units uint64) {
panic("Fatal error in test capacity calculation, getting to sub-zero capacity")
}
func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) {
func (tcg *testCapacityGate) ChangeMaxCapacity(newCapacity uint64) {
tcg.mtx.Lock()
defer tcg.mtx.Unlock()
logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("oldCapacity", tcg.maxCapacityUnits).WithField("newCapacity", newCapacity).Info("Runner is changing max capacity")
tcg.maxCapacityUnits = newCapacity
}
type systemTweaker struct {
nodeCaps []*testCapacityGate
}
var internalSystemTweaker systemTweaker
func SystemTweaker() *systemTweaker {
return &internalSystemTweaker
}
func (twk *systemTweaker) ChangeNodeCapacities(newCapacity uint64) {
for _, nc := range twk.nodeCaps {
nc.ChangeMaxCapacity(newCapacity)
}
}
func (twk *systemTweaker) RestoreInitialNodeCapacities() {
for _, nc := range twk.nodeCaps {
nc.ChangeMaxCapacity(InitialTestCapacityUnitsPerRunner)
}
}
func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, *testCapacityGate, error) {
nodeType := server.ServerTypePureRunner
opts := make([]server.ServerOption, 0)
opts = append(opts, server.WithWebPort(8082+nodeNum))
@@ -203,17 +242,18 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro
ds, err := hybrid.NewNopDataStore()
if err != nil {
return nil, err
return nil, nil, err
}
grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum)
cancelCtx, cancel := context.WithCancel(ctx)
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", &testCapacityGate{runnerNumber: nodeNum})
capacityGate := NewTestCapacityGate(nodeNum, InitialTestCapacityUnitsPerRunner)
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", capacityGate)
if err != nil {
return nil, err
return nil, nil, err
}
opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx))
return server.New(ctx, opts...), nil
return server.New(ctx, opts...), capacityGate, nil
}
func pwd() string {
@@ -269,7 +309,8 @@ func whoAmI() net.IP {
}
func TestCanInstantiateSystem(t *testing.T) {
SystemTweaker().ChangeNodeCapacities(128)
defer SystemTweaker().RestoreInitialNodeCapacities()
}
func TestMain(m *testing.M) {