Make dataplane system test behave deterministically (#849)

Make dataplane system test deterministic by injecting capacity constraints
This commit is contained in:
Dario Domizioli
2018-03-16 11:50:44 +00:00
committed by GitHub
parent 9ed3dd86ea
commit 362e910d9d
3 changed files with 74 additions and 17 deletions

View File

@@ -194,6 +194,17 @@ func (ch *callHandle) cancel(ctx context.Context, err error) {
}
}
type CapacityGate interface {
// CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is
// guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units
// of capacity have been committed.
CheckAndReserveCapacity(units uint64) error
// ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under
// zero; implementations are free to panic in that case.
ReleaseCapacity(units uint64)
}
type pureRunnerCapacityManager struct {
totalCapacityUnits uint64
committedCapacityUnits uint64
@@ -202,24 +213,24 @@ type pureRunnerCapacityManager struct {
type capacityDeallocator func()
func newPureRunnerCapacityManager(units uint64) pureRunnerCapacityManager {
return pureRunnerCapacityManager{
func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager {
return &pureRunnerCapacityManager{
totalCapacityUnits: units,
committedCapacityUnits: 0,
}
}
func (prcm *pureRunnerCapacityManager) checkAndReserveCapacity(units uint64) error {
func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if prcm.committedCapacityUnits+units < prcm.totalCapacityUnits {
if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units {
prcm.committedCapacityUnits = prcm.committedCapacityUnits + units
return nil
}
return models.ErrCallTimeoutServerBusy
}
func (prcm *pureRunnerCapacityManager) releaseCapacity(units uint64) {
func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if units <= prcm.committedCapacityUnits {
@@ -236,7 +247,7 @@ type pureRunner struct {
listen string
a Agent
inflight int32
capacity pureRunnerCapacityManager
capacity CapacityGate
}
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
@@ -368,7 +379,7 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta
}
// Capacity check first
err = pr.capacity.checkAndReserveCapacity(c.Memory)
err = pr.capacity.CheckAndReserveCapacity(c.Memory)
if err != nil {
return func() {}, err
}
@@ -379,13 +390,13 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta
inR, inW := io.Pipe()
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w))
if err != nil {
return func() { pr.capacity.releaseCapacity(c.Memory) }, err
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err
}
state.c = agent_call.(*call)
state.input = inW
state.allocatedTime = strfmt.DateTime(time.Now())
return func() { pr.capacity.releaseCapacity(c.Memory) }, nil
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil
}
// Handles a client engagement
@@ -535,7 +546,15 @@ func (pr *pureRunner) Start() error {
return err
}
func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) {
func UnsecuredPureRunner(cancel context.CancelFunc, addr string, a Agent) (*pureRunner, error) {
return NewPureRunner(cancel, addr, a, "", "", "", nil)
}
func DefaultPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) {
return NewPureRunner(cancel, addr, a, cert, key, ca, nil)
}
func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string, gate CapacityGate) (*pureRunner, error) {
var pr *pureRunner
var err error
if cert != "" && key != "" && ca != "" {
@@ -544,13 +563,13 @@ func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string,
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
return nil, err
}
pr, err = createPureRunner(addr, a, c)
pr, err = createPureRunner(addr, a, c, gate)
if err != nil {
return nil, err
}
} else {
logrus.Warn("Running pure runner in insecure mode!")
pr, err = createPureRunner(addr, a, nil)
pr, err = createPureRunner(addr, a, nil, gate)
if err != nil {
return nil, err
}
@@ -592,19 +611,22 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials
}), nil
}
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) {
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) {
var srv *grpc.Server
if creds != nil {
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
}
memUnits := getAvailableMemoryUnits()
if gate == nil {
memUnits := getAvailableMemoryUnits()
gate = newPureRunnerCapacityManager(memUnits)
}
pr := &pureRunner{
gRPCServer: srv,
listen: addr,
a: a,
capacity: newPureRunnerCapacityManager(memUnits),
capacity: gate,
}
runner.RegisterRunnerProtocolServer(srv, pr)

View File

@@ -409,7 +409,7 @@ func WithAgentFromEnv() ServerOption {
grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort)
delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds))
cancelCtx, cancel := context.WithCancel(ctx)
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority)
prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority)
if err != nil {
return err
}