From 4ccde8897e603db8eabc2115d29534021a89b0e7 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 17 May 2018 12:09:03 -0700 Subject: [PATCH] fn: lb and pure-runner with non-blocking agent (#989) * fn: lb and pure-runner with non-blocking agent *) Removed pure-runner capacity tracking code. This did not play well with internal agent resource tracker. *) In LB and runner gRPC comm, removed ACK. Now, upon TryCall, pure-runner quickly proceeds to call Submit. This is good since at this stage pure-runner already has all relevant data to initiate the call. *) Unless pure-runner emits a NACK, LB immediately streams http body to runners. *) For retriable requests added a CachedReader for http.Request Body. *) Idempotenty/retry is similar to previous code. After initial success in Engament, after attempting a TryCall, unless we receive NACK, we cannot retry that call. *) ch and naive places now wraps each TryExec with a cancellable context to clean up gRPC contexts quicker. * fn: err for simpler one-time read GetBody approach This allows for a more flexible approach since we let users to define GetBody() to allow repetitive http body read. In default LB case, LB executes a one-time io.ReadAll and sets of GetBody, which is detected by RunnerCall.RequestBody(). * fn: additional check for non-nil req.body * fn: attempt to override IO errors with ctx for TryExec * fn: system-tests log dest * fn: LB: EOF send handling * fn: logging for partial IO * fn: use buffer pool for IO storage in lb agent * fn: pure runner should use chunks for data msgs * fn: required config validations and pass APIErrors * fn: additional tests and gRPC proto simplification *) remove ACK/NACK messages as Finish message type works OK for this purpose. *) return resp in api tests for check for status code *) empty body json test in api tests for lb & pure-runner * fn: buffer adjustments *) setRequestBody result handling correction *) switch to bytes.Reader for read-only safety *) io.EOF can be returned for non-nil Body in request. * fn: clarify detection of 503 / Server Too Busy --- api/agent/call.go | 6 + api/agent/config.go | 7 +- api/agent/lb_agent.go | 72 ++++++++- api/agent/pure_runner.go | 238 +++++++++++----------------- api/agent/runner_client.go | 203 ++++++++++++++++-------- api/runnerpool/ch_placer.go | 5 +- api/runnerpool/naive_placer.go | 5 +- system_test.sh | 6 + test/fn-api-tests/exec_test.go | 10 +- test/fn-api-tests/formats_test.go | 4 +- test/fn-api-tests/utils.go | 4 +- test/fn-system-tests/exec_test.go | 179 +++++++++++++++++++-- test/fn-system-tests/system_test.go | 138 +++++----------- 13 files changed, 541 insertions(+), 336 deletions(-) diff --git a/api/agent/call.go b/api/agent/call.go index dbecab6bf..8c53a1f80 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -319,6 +319,12 @@ func (c *call) LbDeadline() time.Time { } func (c *call) RequestBody() io.ReadCloser { + if c.req.Body != nil && c.req.GetBody != nil { + rdr, err := c.req.GetBody() + if err == nil { + return rdr + } + } return c.req.Body } diff --git a/api/agent/config.go b/api/agent/config.go index 1a4728b82..fb0a7f6e1 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -55,6 +55,11 @@ const ( EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" MaxDisabledMsecs = time.Duration(math.MaxInt64) + + // defaults + + DefaultHotPoll = 200 * time.Millisecond + DefaultNBIOHotPoll = 20 * time.Millisecond ) func NewAgentConfig() (*AgentConfig, error) { @@ -71,7 +76,7 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvMsecs(err, EnvFreezeIdle, &cfg.FreezeIdle, 50*time.Millisecond) err = setEnvMsecs(err, EnvEjectIdle, &cfg.EjectIdle, 1000*time.Millisecond) - err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond) + err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll) err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index c318ad102..14a4960d6 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -1,8 +1,11 @@ package agent import ( + "bytes" "context" "errors" + "io" + "io/ioutil" "sync/atomic" "time" @@ -36,12 +39,26 @@ type lbAgent struct { callEndCount int64 } +func NewLBAgentConfig() (*AgentConfig, error) { + cfg, err := NewAgentConfig() + if err != nil { + return cfg, err + } + if cfg.MaxRequestSize == 0 { + return cfg, errors.New("lb-agent requires MaxRequestSize limit") + } + if cfg.MaxResponseSize == 0 { + return cfg, errors.New("lb-agent requires MaxResponseSize limit") + } + return cfg, nil +} + // NewLBAgent creates an Agent that knows how to load-balance function calls // across a group of runner nodes. func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) { // TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig - cfg, err := NewAgentConfig() + cfg, err := NewLBAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) } @@ -171,6 +188,17 @@ func (a *lbAgent) Submit(callI Call) error { statsDequeueAndStart(ctx) + // pre-read and buffer request body if already not done based + // on GetBody presence. + buf, err := a.setRequestBody(ctx, call) + if buf != nil { + defer bufPool.Put(buf) + } + if err != nil { + logrus.WithError(err).Error("Failed to process call body") + return a.handleCallEnd(ctx, call, err, true) + } + // WARNING: isStarted (handleCallEnd) semantics // need some consideration here. Similar to runner/agent // we consider isCommitted true if call.Start() succeeds. @@ -183,6 +211,48 @@ func (a *lbAgent) Submit(callI Call) error { return a.handleCallEnd(ctx, call, err, true) } +// setRequestGetBody sets GetBody function on the given http.Request if it is missing. GetBody allows +// reading from the request body without mutating the state of the request. +func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer, error) { + + r := call.req + if r.Body == nil || r.GetBody != nil { + return nil, nil + } + + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + + // WARNING: we need to handle IO in a separate go-routine below + // to be able to detect a ctx timeout. When we timeout, we + // let gin/http-server to unblock the go-routine below. + errApp := make(chan error, 1) + go func() { + + _, err := buf.ReadFrom(r.Body) + if err != nil && err != io.EOF { + errApp <- err + return + } + + r.Body = ioutil.NopCloser(bytes.NewReader(buf.Bytes())) + + // GetBody does not mutate the state of the request body + r.GetBody = func() (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(buf.Bytes())), nil + } + + close(errApp) + }() + + select { + case err := <-errApp: + return buf, err + case <-ctx.Done(): + return buf, ctx.Err() + } +} + func (a *lbAgent) Enqueue(context.Context, *models.Call) error { logrus.Error("Enqueue not implemented") return errors.New("Enqueue not implemented") diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index d7622797e..720e53eb6 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -197,61 +197,39 @@ func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error { return err } +func convertError(err error) string { + code := models.GetAPIErrorCode(err) + return fmt.Sprintf("%d:%s", code, err.Error()) +} + // enqueueCallResponse enqueues a Submit() response to the LB // and initiates a graceful shutdown of the session. func (ch *callHandle) enqueueCallResponse(err error) { - defer ch.finalize() + var details string - // Error response if err != nil { - ch.enqueueMsgStrict(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: false, - Details: fmt.Sprintf("%v", err), - }}}) + details = convertError(err) + } else if ch.c != nil { + details = ch.c.Model().ID + } + + logrus.Debugf("Sending Call Finish details=%v", details) + + errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ + Success: err == nil, + Details: details, + }}}) + + if errTmp != nil { + logrus.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v", details) return } - // EOF and Success response - ch.enqueueMsgStrict(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Data{ - Data: &runner.DataFrame{ - Eof: true, - }, - }, - }) - - ch.enqueueMsgStrict(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: true, - Details: ch.c.Model().ID, - }}}) -} - -// enqueueAck enqueues a ACK or NACK response to the LB for ClientMsg_Try -// request. If NACK, then it also initiates a graceful shutdown of the -// session. -func (ch *callHandle) enqueueAck(err error) error { - // NACK - if err != nil { - err = ch.enqueueMsgStrict(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ - Committed: false, - Details: fmt.Sprintf("%v", err), - }}}) - if err != nil { - return err - } - return ch.finalize() + errTmp = ch.finalize() + if errTmp != nil { + logrus.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v", details) } - - // ACK - return ch.enqueueMsgStrict(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ - Committed: true, - Details: ch.c.Model().ID, - SlotAllocationLatency: time.Time(ch.allocatedTime).Sub(time.Time(ch.receivedTime)).String(), - }}}) } // spawnPipeToFn pumps data to Function via callHandle io.PipeWriter (pipeToFnW) @@ -400,23 +378,38 @@ func (ch *callHandle) Write(data []byte) (int, error) { return 0, err } - // we cannot retain 'data' - cpData := make([]byte, len(data)) - copy(cpData, data) + total := 0 + // split up data into gRPC chunks + for { + chunkSize := len(data) + if chunkSize > MaxDataChunk { + chunkSize = MaxDataChunk + } + if chunkSize == 0 { + break + } - err = ch.enqueueMsg(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Data{ - Data: &runner.DataFrame{ - Data: cpData, - Eof: false, + // we cannot retain 'data' + cpData := make([]byte, chunkSize) + copy(cpData, data[0:chunkSize]) + data = data[chunkSize:] + + err = ch.enqueueMsg(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Data{ + Data: &runner.DataFrame{ + Data: cpData, + Eof: false, + }, }, - }, - }) + }) - if err != nil { - return 0, err + if err != nil { + return total, err + } + total += chunkSize } - return len(data), nil + + return total, nil } // getTryMsg fetches/waits for a TryCall message from @@ -457,52 +450,12 @@ func (ch *callHandle) getDataMsg() *runner.DataFrame { return msg } +// TODO: decomission/remove this once dependencies are cleaned up type CapacityGate interface { - // CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is - // guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units - // of capacity have been committed. CheckAndReserveCapacity(units uint64) error - - // ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under - // zero; implementations are free to panic in that case. ReleaseCapacity(units uint64) } -type pureRunnerCapacityManager struct { - totalCapacityUnits uint64 - committedCapacityUnits uint64 - mtx sync.Mutex -} - -type capacityDeallocator func() - -func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager { - return &pureRunnerCapacityManager{ - totalCapacityUnits: units, - committedCapacityUnits: 0, - } -} - -func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error { - prcm.mtx.Lock() - defer prcm.mtx.Unlock() - if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units { - prcm.committedCapacityUnits = prcm.committedCapacityUnits + units - return nil - } - return models.ErrCallTimeoutServerBusy -} - -func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) { - prcm.mtx.Lock() - defer prcm.mtx.Unlock() - if units <= prcm.committedCapacityUnits { - prcm.committedCapacityUnits = prcm.committedCapacityUnits - units - return - } - panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity") -} - // pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it // and provides the gRPC server that implements the LB <-> Runner protocol. type pureRunner struct { @@ -510,7 +463,6 @@ type pureRunner struct { listen string a Agent inflight int32 - capacity CapacityGate } func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) { @@ -559,35 +511,27 @@ func (pr *pureRunner) spawnSubmit(state *callHandle) { }() } -// handleTryCall based on the TryCall message, allocates a resource/capacity reservation -// and creates callHandle.c with agent.call. -func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) { +// handleTryCall based on the TryCall message, tries to place the call on NBIO Agent +func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error { state.receivedTime = strfmt.DateTime(time.Now()) var c models.Call err := json.Unmarshal([]byte(tc.ModelsCallJson), &c) if err != nil { - return func() {}, err - } - - // Capacity check first - err = pr.capacity.CheckAndReserveCapacity(c.Memory) - if err != nil { - return func() {}, err - } - - cleanup := func() { - pr.capacity.ReleaseCapacity(c.Memory) + state.enqueueCallResponse(err) + return err } agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state)) if err != nil { - return cleanup, err + state.enqueueCallResponse(err) + return err } state.c = agent_call.(*call) state.allocatedTime = strfmt.DateTime(time.Now()) + pr.spawnSubmit(state) - return cleanup, nil + return nil } // Handles a client engagement @@ -615,15 +559,13 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro return state.waitError() } - dealloc, errTry := pr.handleTryCall(tryMsg, state) - defer dealloc() - // respond with handleTryCall response - err := state.enqueueAck(errTry) - if err != nil || errTry != nil { + errTry := pr.handleTryCall(tryMsg, state) + if errTry != nil { return state.waitError() } - var dataFeed chan *runner.DataFrame + dataFeed := state.spawnPipeToFn() + DataLoop: for { dataMsg := state.getDataMsg() @@ -631,11 +573,6 @@ DataLoop: break } - if dataFeed == nil { - pr.spawnSubmit(state) - dataFeed = state.spawnPipeToFn() - } - select { case dataFeed <- dataMsg: if dataMsg.Eof { @@ -678,8 +615,29 @@ func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, ce return NewPureRunner(cancel, addr, da, cert, key, ca, nil) } -func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error) { - a := createAgent(da) +func ValidatePureRunnerConfig() AgentOption { + return func(a *agent) error { + + if a.cfg.MaxResponseSize == 0 { + return errors.New("pure runner requires MaxResponseSize limits") + } + if a.cfg.MaxRequestSize == 0 { + return errors.New("pure runner requires MaxRequestSize limits") + } + + // pure runner requires a non-blocking resource tracker + if !a.cfg.EnableNBResourceTracker { + return errors.New("pure runner requires EnableNBResourceTracker true") + } + + return nil + } +} + +func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, unused CapacityGate) (Agent, error) { + // TODO: gate unused, decommission/remove it after cleaning up dependencies to it. + + a := createAgent(da, ValidatePureRunnerConfig()) var pr *pureRunner var err error if cert != "" && key != "" && ca != "" { @@ -688,13 +646,13 @@ func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert s logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!") return nil, err } - pr, err = createPureRunner(addr, a, c, gate) + pr, err = createPureRunner(addr, a, c) if err != nil { return nil, err } } else { logrus.Warn("Running pure runner in insecure mode!") - pr, err = createPureRunner(addr, a, nil, gate) + pr, err = createPureRunner(addr, a, nil) if err != nil { return nil, err } @@ -736,32 +694,20 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials }), nil } -func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) { +func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) { var srv *grpc.Server if creds != nil { srv = grpc.NewServer(grpc.Creds(creds)) } else { srv = grpc.NewServer() } - if gate == nil { - memUnits := getAvailableMemoryUnits() - gate = newPureRunnerCapacityManager(memUnits) - } + pr := &pureRunner{ gRPCServer: srv, listen: addr, a: a, - capacity: gate, } runner.RegisterRunnerProtocolServer(srv, pr) return pr, nil } - -const megabyte uint64 = 1024 * 1024 - -func getAvailableMemoryUnits() uint64 { - // To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that. - throwawayRT := NewResourceTracker(nil).(*resourceTracker) - return throwawayRT.ramAsyncTotal / megabyte -} diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index 5a0ac8a25..46342892d 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "io" + "strconv" + "strings" "time" "google.golang.org/grpc" @@ -12,13 +14,20 @@ import ( pb "github.com/fnproject/fn/api/agent/grpc" "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" pool "github.com/fnproject/fn/api/runnerpool" "github.com/fnproject/fn/grpcutil" "github.com/sirupsen/logrus" ) var ( - ErrorRunnerClosed = errors.New("Runner is closed") + ErrorRunnerClosed = errors.New("Runner is closed") + ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response") +) + +const ( + // max buffer size for grpc data messages, 10K + MaxDataChunk = 10 * 1024 ) type gRPCRunner struct { @@ -76,10 +85,15 @@ func (r *gRPCRunner) Address() string { return r.address } +func isRetriable(err error) bool { + return models.GetAPIErrorCode(err) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) +} + func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) { logrus.WithField("runner_addr", r.address).Debug("Attempting to place call") if !r.shutWg.AddSession(1) { - return true, ErrorRunnerClosed + // try another runner if this one is closed. + return false, ErrorRunnerClosed } defer r.shutWg.DoneSession() @@ -90,6 +104,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e // If we can't encode the model, no runner will ever be able to run this. Give up. return true, err } + runnerConnection, err := r.client.Engage(ctx) if err != nil { logrus.WithError(err).Error("Unable to create client to runner node") @@ -97,113 +112,171 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e return false, err } + // After this point, we assume "COMMITTED" unless pure runner + // send explicit NACK err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ModelsCallJson: string(modelJSON)}}}) if err != nil { logrus.WithError(err).Error("Failed to send message to runner node") - return false, err - } - msg, err := runnerConnection.Recv() - if err != nil { - logrus.WithError(err).Error("Failed to receive first message from runner node") - return false, err + return true, err } - switch body := msg.Body.(type) { - case *pb.RunnerMsg_Acknowledged: - if !body.Acknowledged.Committed { - logrus.Debugf("Runner didn't commit invocation request: %v", body.Acknowledged.Details) - return false, nil - // Try the next runner - } - logrus.Debug("Runner committed invocation request, sending data frames") - done := make(chan error) - go receiveFromRunner(runnerConnection, call, done) - sendToRunner(call, runnerConnection) - return true, <-done + recvDone := make(chan error, 1) - default: - logrus.Errorf("Unhandled message type received from runner: %v\n", msg) - return true, nil + go receiveFromRunner(runnerConnection, call, recvDone) + go sendToRunner(runnerConnection, call) + + select { + case <-ctx.Done(): + logrus.Infof("Engagement Context ended ctxErr=%v", ctx.Err()) + return true, ctx.Err() + case recvErr := <-recvDone: + return !isRetriable(recvErr), recvErr } - } -func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error { +func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) { bodyReader := call.RequestBody() - writeBufferSize := 10 * 1024 // 10KB - writeBuffer := make([]byte, writeBufferSize) - for { - n, err := bodyReader.Read(writeBuffer) - logrus.Debugf("Wrote %v bytes to the runner", n) + writeBuffer := make([]byte, MaxDataChunk) - if err == io.EOF { - err = protocolClient.Send(&pb.ClientMsg{ - Body: &pb.ClientMsg_Data{ - Data: &pb.DataFrame{ - Data: writeBuffer, - Eof: true, - }, - }, - }) - if err != nil { - logrus.WithError(err).Error("Failed to send data frame with EOF to runner") - } - break + // IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry + // case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is + // already blocked on a read) or in the case of reading the http body multiple times (retries.) + // Normally http.Request.Body can be read once. However runner_client users should implement/add + // http.Request.GetBody() function and cache the body content in the request. + // See lb_agent setRequestGetBody() which handles this. With GetBody installed, + // the 'Read' below is an actually non-blocking operation since GetBody() should hand out + // a new instance of io.ReadCloser() that allows repetitive reads on the http body. + for { + // WARNING: blocking read. + n, err := bodyReader.Read(writeBuffer) + if err != nil && err != io.EOF { + logrus.WithError(err).Error("Failed to receive data from http client body") } - err = protocolClient.Send(&pb.ClientMsg{ + + // any IO error or n == 0 is an EOF for pure-runner + isEOF := err != nil || n == 0 + data := writeBuffer[:n] + + logrus.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF) + sendErr := protocolClient.Send(&pb.ClientMsg{ Body: &pb.ClientMsg_Data{ Data: &pb.DataFrame{ - Data: writeBuffer, - Eof: false, + Data: data, + Eof: isEOF, }, }, }) - if err != nil { - logrus.WithError(err).Error("Failed to send data frame") - return err + if sendErr != nil { + logrus.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF) + return + } + if isEOF { + return } } - return nil +} + +func parseError(details string) error { + tokens := strings.SplitN(details, ":", 2) + if len(tokens) != 2 || tokens[0] == "" || tokens[1] == "" { + return errors.New(details) + } + code, err := strconv.ParseInt(tokens[0], 10, 64) + if err != nil { + return errors.New(details) + } + if code != 0 { + return models.NewAPIError(int(code), errors.New(tokens[1])) + } + return errors.New(tokens[1]) +} + +func tryQueueError(err error, done chan error) { + select { + case done <- err: + default: + } } func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) { w := c.ResponseWriter() + defer close(done) + isPartialWrite := false + +DataLoop: for { msg, err := protocolClient.Recv() if err != nil { - logrus.WithError(err).Error("Failed to receive message from runner") - done <- err + logrus.WithError(err).Info("Receive error from runner") + tryQueueError(err, done) return } switch body := msg.Body.(type) { + + // Process HTTP header/status message. This may not arrive depending on + // pure runners behavior. (Eg. timeout & no IO received from function) case *pb.RunnerMsg_ResultStart: switch meta := body.ResultStart.Meta.(type) { case *pb.CallResultStart_Http: + logrus.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode) for _, header := range meta.Http.Headers { w.Header().Set(header.Key, header.Value) } + if meta.Http.StatusCode > 0 { + w.WriteHeader(int(meta.Http.StatusCode)) + } default: logrus.Errorf("Unhandled meta type in start message: %v", meta) } + + // May arrive if function has output. We ignore EOF. case *pb.RunnerMsg_Data: - w.Write(body.Data.Data) + logrus.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof) + if !isPartialWrite { + // WARNING: blocking write + n, err := w.Write(body.Data.Data) + if n != len(body.Data.Data) { + isPartialWrite = true + logrus.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data)) + if err == nil { + err = io.ErrShortWrite + } + tryQueueError(err, done) + } + } + + // Finish messages required for finish/finalize the processing. case *pb.RunnerMsg_Finished: - if body.Finished.Success { - logrus.Infof("Call finished successfully: %v", body.Finished.Details) - } else { - logrus.Infof("Call finished unsuccessfully: %v", body.Finished.Details) + logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details) + if !body.Finished.Success { + err := parseError(body.Finished.GetDetails()) + tryQueueError(err, done) } - // There should be an EOF following the last packet - if _, err := protocolClient.Recv(); err != io.EOF { - logrus.WithError(err).Error("Did not receive expected EOF from runner stream") - done <- err - } - close(done) - return + break DataLoop + default: - logrus.Errorf("Unhandled message type from runner: %v", body) + logrus.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body) } } + + // There should be an EOF following the last packet + for { + msg, err := protocolClient.Recv() + if err == io.EOF { + break + } + if err != nil { + logrus.WithError(err).Infof("Call Waiting EOF received error") + tryQueueError(err, done) + break + } + + switch body := msg.Body.(type) { + default: + logrus.Infof("Call Waiting EOF ignoring message %T", body) + } + tryQueueError(ErrorPureRunnerNoEOF, done) + } } diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index e368dcb97..ce5b14bb1 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -47,7 +47,10 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall r := runners[i] - placed, err := r.TryExec(ctx, call) + tryCtx, tryCancel := context.WithCancel(ctx) + placed, err := r.TryExec(tryCtx, call) + tryCancel() + if err != nil { logrus.WithError(err).Error("Failed during call placement") } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index 9730f833e..2ab9912ba 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -49,7 +49,10 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner i := atomic.AddUint64(&sp.rrIndex, uint64(1)) r := runners[int(i)%len(runners)] - placed, err := r.TryExec(ctx, call) + tryCtx, tryCancel := context.WithCancel(ctx) + placed, err := r.TryExec(tryCtx, call) + tryCancel() + if err != nil { logrus.WithError(err).Error("Failed during call placement") } diff --git a/system_test.sh b/system_test.sh index 305ea78f1..08370f9c7 100755 --- a/system_test.sh +++ b/system_test.sh @@ -39,6 +39,12 @@ esac # avoid port conflicts with api_test.sh which are run in parallel export FN_API_URL="http://localhost:8085" export FN_DS_DB_PING_MAX_RETRIES=60 + +# pure runner and LB agent required settings below +export FN_MAX_REQUEST_SIZE=6291456 +export FN_MAX_RESPONSE_SIZE=6291456 +export FN_ENABLE_NB_RESOURCE_TRACKER=1 + cd test/fn-system-tests && FN_DB_URL=${FN_DB_URL} FN_API_URL=${FN_API_URL} go test -v -parallel ${2:-1} ./...; cd ../../ remove_system_containers diff --git a/test/fn-api-tests/exec_test.go b/test/fn-api-tests/exec_test.go index 84cd1fc63..9198b077e 100644 --- a/test/fn-api-tests/exec_test.go +++ b/test/fn-api-tests/exec_test.go @@ -43,14 +43,14 @@ func CallAsync(t *testing.T, u url.URL, content io.Reader) string { func CallSync(t *testing.T, u url.URL, content io.Reader) string { output := &bytes.Buffer{} - hdrs, err := CallFN(u.String(), content, output, "POST", []string{}) + resp, err := CallFN(u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } - callId := hdrs.Get("FN_CALL_ID") + callId := resp.Header.Get("FN_CALL_ID") if callId == "" { - t.Errorf("Assertion error.\n\tExpected call id header in response, got: %v", hdrs) + t.Errorf("Assertion error.\n\tExpected call id header in response, got: %v", resp.Header) } t.Logf("Sync execution call ID: %v", callId) @@ -221,13 +221,13 @@ func TestCanCauseTimeout(t *testing.T) { }{Seconds: 11}) output := &bytes.Buffer{} - headers, _ := CallFN(u.String(), content, output, "POST", []string{}) + resp, _ := CallFN(u.String(), content, output, "POST", []string{}) if !strings.Contains(output.String(), "Timed out") { t.Errorf("Must fail because of timeout, but got error message: %v", output.String()) } cfg := &call.GetAppsAppCallsCallParams{ - Call: headers.Get("FN_CALL_ID"), + Call: resp.Header.Get("FN_CALL_ID"), App: s.AppName, Context: s.Context, } diff --git a/test/fn-api-tests/formats_test.go b/test/fn-api-tests/formats_test.go index b07fb797d..6f461b3f5 100644 --- a/test/fn-api-tests/formats_test.go +++ b/test/fn-api-tests/formats_test.go @@ -41,7 +41,7 @@ func TestFnJSONFormats(t *testing.T) { }) content := bytes.NewBuffer(b) output := &bytes.Buffer{} - headers, err := CallFN(u.String(), content, output, "POST", []string{}) + resp, err := CallFN(u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -56,7 +56,7 @@ func TestFnJSONFormats(t *testing.T) { expectedHeaderNames := []string{"Content-Type", "Content-Length"} expectedHeaderValues := []string{"application/json; charset=utf-8", strconv.Itoa(output.Len())} for i, name := range expectedHeaderNames { - actual := headers.Get(name) + actual := resp.Header.Get(name) expected := expectedHeaderValues[i] if !strings.Contains(expected, actual) { t.Errorf("HTTP header assertion error for %v."+ diff --git a/test/fn-api-tests/utils.go b/test/fn-api-tests/utils.go index 0f1b6f58d..27e2bf23b 100644 --- a/test/fn-api-tests/utils.go +++ b/test/fn-api-tests/utils.go @@ -212,7 +212,7 @@ func EnvAsHeader(req *http.Request, selectedEnv []string) { } } -func CallFN(u string, content io.Reader, output io.Writer, method string, env []string) (http.Header, error) { +func CallFN(u string, content io.Reader, output io.Writer, method string, env []string) (*http.Response, error) { if method == "" { if content == nil { method = "GET" @@ -239,7 +239,7 @@ func CallFN(u string, content io.Reader, output io.Writer, method string, env [] io.Copy(output, resp.Body) - return resp.Header, nil + return resp, nil } func init() { diff --git a/test/fn-system-tests/exec_test.go b/test/fn-system-tests/exec_test.go index 1796d22f8..ef7358d81 100644 --- a/test/fn-system-tests/exec_test.go +++ b/test/fn-system-tests/exec_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "net/url" "path" "strings" @@ -55,6 +56,7 @@ func TestCanExecuteFunction(t *testing.T) { rt := s.BasicRoute() rt.Image = "fnproject/fn-test-utils" rt.Format = "json" + rt.Memory = 64 rt.Type = "sync" s.GivenRouteExists(t, s.AppName, rt) @@ -73,7 +75,7 @@ func TestCanExecuteFunction(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - _, err = apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) if err != nil { t.Errorf("Got unexpected error: %v", err) } @@ -82,11 +84,147 @@ func TestCanExecuteFunction(t *testing.T) { if err != nil || echo != "HelloWorld" { t.Fatalf("getEchoContent/HelloWorld check failed on %v", output) } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } +} + +func TestCanExecuteBigOutput(t *testing.T) { + s := apiutils.SetupHarness() + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) + defer s.Cleanup() + + rt := s.BasicRoute() + rt.Image = "fnproject/fn-test-utils" + rt.Format = "json" + rt.Memory = 64 + rt.Type = "sync" + + s.GivenRouteExists(t, s.AppName, rt) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) + + // Approx 5.3MB output + body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true, "trailerRepeat": 410000}` + content := bytes.NewBuffer([]byte(body)) + output := &bytes.Buffer{} + + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + if err != nil { + t.Errorf("Got unexpected error: %v", err) + } + + t.Logf("getEchoContent/HelloWorld size %d", len(output.Bytes())) + + echo, err := getEchoContent(output.Bytes()) + if err != nil || echo != "HelloWorld" { + t.Fatalf("getEchoContent/HelloWorld check failed on %v", output) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } +} + +func TestCanExecuteTooBigOutput(t *testing.T) { + s := apiutils.SetupHarness() + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) + defer s.Cleanup() + + rt := s.BasicRoute() + rt.Image = "fnproject/fn-test-utils" + rt.Format = "json" + rt.Memory = 64 + rt.Type = "sync" + + s.GivenRouteExists(t, s.AppName, rt) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) + + // > 6MB output + body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true, "trailerRepeat": 600000}` + content := bytes.NewBuffer([]byte(body)) + output := &bytes.Buffer{} + + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + if err != nil { + t.Errorf("Got unexpected error: %v", err) + } + + exp := "{\"error\":{\"message\":\"function response too large\"}}\n" + actual := output.String() + + if !strings.Contains(exp, actual) || len(exp) != len(actual) { + t.Errorf("Assertion error.\n\tExpected: %v\n\tActual: %v", exp, output.String()) + } + + if resp.StatusCode != http.StatusBadGateway { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } +} + +func TestCanExecuteEmptyOutput(t *testing.T) { + s := apiutils.SetupHarness() + s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) + defer s.Cleanup() + + rt := s.BasicRoute() + rt.Image = "fnproject/fn-test-utils" + rt.Format = "json" + rt.Memory = 64 + rt.Type = "sync" + + s.GivenRouteExists(t, s.AppName, rt) + + lb, err := LB() + if err != nil { + t.Fatalf("Got unexpected error: %v", err) + } + u := url.URL{ + Scheme: "http", + Host: lb, + } + u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) + + // empty body output + body := `{"sleepTime": 0, "isDebug": true, "isEmptyBody": true}` + content := bytes.NewBuffer([]byte(body)) + output := &bytes.Buffer{} + + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) + if err != nil { + t.Errorf("Got unexpected error: %v", err) + } + + actual := output.String() + + if 0 != len(actual) { + t.Errorf("Assertion error.\n\tExpected empty\n\tActual: %v", output.String()) + } + + if resp.StatusCode != http.StatusOK { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) + } } func TestBasicConcurrentExecution(t *testing.T) { - SystemTweaker().ChangeNodeCapacities(512) - defer SystemTweaker().RestoreInitialNodeCapacities() s := apiutils.SetupHarness() @@ -96,6 +234,7 @@ func TestBasicConcurrentExecution(t *testing.T) { rt := s.BasicRoute() rt.Image = "fnproject/fn-test-utils" rt.Format = "json" + rt.Memory = 32 rt.Type = "sync" s.GivenRouteExists(t, s.AppName, rt) @@ -117,7 +256,7 @@ func TestBasicConcurrentExecution(t *testing.T) { body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - _, err = apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) if err != nil { results <- fmt.Errorf("Got unexpected error: %v", err) return @@ -128,6 +267,10 @@ func TestBasicConcurrentExecution(t *testing.T) { results <- fmt.Errorf("Assertion error.\n\tActual: %v", output.String()) return } + if resp.StatusCode != http.StatusOK { + results <- fmt.Errorf("StatusCode check failed on %v", resp.StatusCode) + return + } results <- nil }() @@ -142,18 +285,19 @@ func TestBasicConcurrentExecution(t *testing.T) { } func TestSaturatedSystem(t *testing.T) { - // Set the capacity to 0 so we always look out of capacity. - SystemTweaker().ChangeNodeCapacities(0) - defer SystemTweaker().RestoreInitialNodeCapacities() s := apiutils.SetupHarness() s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) defer s.Cleanup() + timeout := int32(5) + rt := s.BasicRoute() rt.Image = "fnproject/fn-test-utils" rt.Format = "json" + rt.Timeout = &timeout + rt.Memory = 300 rt.Type = "sync" s.GivenRouteExists(t, s.AppName, rt) @@ -172,15 +316,28 @@ func TestSaturatedSystem(t *testing.T) { content := bytes.NewBuffer([]byte(body)) output := &bytes.Buffer{} - _, err = apiutils.CallFN(u.String(), content, output, "POST", []string{}) + resp, err := apiutils.CallFN(u.String(), content, output, "POST", []string{}) if err != nil { if err != apimodels.ErrCallTimeoutServerBusy { t.Errorf("Got unexpected error: %v", err) } } - expectedOutput := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n" + + // LB may respond either with: + // timeout: a timeout during a call to a runner + // too busy: a timeout during LB retry loop + exp1 := "{\"error\":{\"message\":\"Timed out - server too busy\"}}\n" + exp2 := "{\"error\":{\"message\":\"Timed out\"}}\n" + actual := output.String() - if !strings.Contains(expectedOutput, actual) || len(expectedOutput) != len(actual) { - t.Errorf("Assertion error.\n\tExpected: %v\n\tActual: %v", expectedOutput, output.String()) + + if strings.Contains(exp1, actual) && len(exp1) == len(actual) { + } else if strings.Contains(exp2, actual) && len(exp2) == len(actual) { + } else { + t.Errorf("Assertion error.\n\tExpected: %v or %v\n\tActual: %v", exp1, exp2, output.String()) + } + + if resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusGatewayTimeout { + t.Fatalf("StatusCode check failed on %v", resp.StatusCode) } } diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 1102e3266..939137278 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -7,7 +7,6 @@ import ( "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/agent/hybrid" - "github.com/fnproject/fn/api/models" pool "github.com/fnproject/fn/api/runnerpool" "github.com/fnproject/fn/api/server" @@ -18,7 +17,6 @@ import ( "os" "strconv" "strings" - "sync" "testing" "time" ) @@ -37,35 +35,42 @@ func NewSystemTestNodePool() (pool.RunnerPool, error) { return agent.DefaultStaticRunnerPool(runners), nil } -func SetUpSystem() error { +type state struct { + memory string +} + +func SetUpSystem() (*state, error) { ctx := context.Background() + state := &state{} api, err := SetUpAPINode(ctx) if err != nil { - return err + return state, err } logrus.Info("Created API node") lb, err := SetUpLBNode(ctx) if err != nil { - return err + return state, err } logrus.Info("Created LB node") - pr0, nc0, err := SetUpPureRunnerNode(ctx, 0) + state.memory = os.Getenv(agent.EnvMaxTotalMemory) + os.Setenv(agent.EnvMaxTotalMemory, strconv.FormatUint(256*1024*1024, 10)) + + pr0, err := SetUpPureRunnerNode(ctx, 0) if err != nil { - return err + return state, err } - pr1, nc1, err := SetUpPureRunnerNode(ctx, 1) + pr1, err := SetUpPureRunnerNode(ctx, 1) if err != nil { - return err + return state, err } - pr2, nc2, err := SetUpPureRunnerNode(ctx, 2) + pr2, err := SetUpPureRunnerNode(ctx, 2) if err != nil { - return err + return state, err } logrus.Info("Created Pure Runner nodes") - internalSystemTweaker.nodeCaps = []*testCapacityGate{nc0, nc1, nc2} go func() { api.Start(ctx) }() logrus.Info("Started API node") @@ -77,10 +82,10 @@ func SetUpSystem() error { logrus.Info("Started Pure Runner nodes") // Wait for init - not great time.Sleep(5 * time.Second) - return nil + return state, nil } -func CleanUpSystem() error { +func CleanUpSystem(st *state) error { _, err := http.Get("http://127.0.0.1:8081/shutdown") if err != nil { return err @@ -103,6 +108,13 @@ func CleanUpSystem() error { } // Wait for shutdown - not great time.Sleep(5 * time.Second) + + if st.memory != "" { + os.Setenv(agent.EnvMaxTotalMemory, st.memory) + } else { + os.Unsetenv(agent.EnvMaxTotalMemory) + } + return nil } @@ -116,7 +128,7 @@ func SetUpAPINode(ctx context.Context) (*server.Server, error) { opts = append(opts, server.WithWebPort(8085)) opts = append(opts, server.WithType(nodeType)) opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) - opts = append(opts, server.WithLogDest(server.DefaultLogDest, "API")) + opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "API")) opts = append(opts, server.WithDBURL(getEnv(server.EnvDBURL, defaultDB))) opts = append(opts, server.WithMQURL(getEnv(server.EnvMQURL, defaultMQ))) opts = append(opts, server.WithLogURL("")) @@ -131,7 +143,7 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { opts = append(opts, server.WithWebPort(8081)) opts = append(opts, server.WithType(nodeType)) opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) - opts = append(opts, server.WithLogDest(server.DefaultLogDest, "LB")) + opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "LB")) opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithMQURL("")) opts = append(opts, server.WithLogURL("")) @@ -156,85 +168,14 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { return server.New(ctx, opts...), nil } -type testCapacityGate struct { - runnerNumber int - committedCapacityUnits uint64 - maxCapacityUnits uint64 - mtx sync.Mutex -} - -const ( - InitialTestCapacityUnitsPerRunner = 1024 -) - -func NewTestCapacityGate(nodeNum int, capacity uint64) *testCapacityGate { - return &testCapacityGate{ - runnerNumber: nodeNum, - maxCapacityUnits: capacity, - committedCapacityUnits: 0, - } -} - -func (tcg *testCapacityGate) CheckAndReserveCapacity(units uint64) error { - tcg.mtx.Lock() - defer tcg.mtx.Unlock() - if tcg.committedCapacityUnits+units <= tcg.maxCapacityUnits { - logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is committing capacity") - tcg.committedCapacityUnits = tcg.committedCapacityUnits + units - return nil - } - logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("currentlyCommitted", tcg.committedCapacityUnits).Debug("Runner is out of capacity") - return models.ErrCallTimeoutServerBusy -} - -func (tcg *testCapacityGate) ReleaseCapacity(units uint64) { - tcg.mtx.Lock() - defer tcg.mtx.Unlock() - if units <= tcg.committedCapacityUnits { - logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("units", units).WithField("currentlyCommitted", tcg.committedCapacityUnits).Info("Runner is releasing capacity") - tcg.committedCapacityUnits = tcg.committedCapacityUnits - units - return - } - panic("Fatal error in test capacity calculation, getting to sub-zero capacity") -} - -func (tcg *testCapacityGate) ChangeMaxCapacity(newCapacity uint64) { - tcg.mtx.Lock() - defer tcg.mtx.Unlock() - logrus.WithField("nodeNumber", tcg.runnerNumber).WithField("oldCapacity", tcg.maxCapacityUnits).WithField("newCapacity", newCapacity).Info("Runner is changing max capacity") - tcg.maxCapacityUnits = newCapacity -} - -type systemTweaker struct { - nodeCaps []*testCapacityGate -} - -var internalSystemTweaker systemTweaker - -func SystemTweaker() *systemTweaker { - return &internalSystemTweaker -} - -func (twk *systemTweaker) ChangeNodeCapacities(newCapacity uint64) { - for _, nc := range twk.nodeCaps { - nc.ChangeMaxCapacity(newCapacity) - } -} - -func (twk *systemTweaker) RestoreInitialNodeCapacities() { - for _, nc := range twk.nodeCaps { - nc.ChangeMaxCapacity(InitialTestCapacityUnitsPerRunner) - } -} - -func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, *testCapacityGate, error) { +func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, error) { nodeType := server.ServerTypePureRunner opts := make([]server.ServerOption, 0) opts = append(opts, server.WithWebPort(8082+nodeNum)) opts = append(opts, server.WithGRPCPort(9190+nodeNum)) opts = append(opts, server.WithType(nodeType)) opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) - opts = append(opts, server.WithLogDest(server.DefaultLogDest, "PURE-RUNNER")) + opts = append(opts, server.WithLogDest(getEnv(server.EnvLogDest, server.DefaultLogDest), "PURE-RUNNER")) opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithMQURL("")) opts = append(opts, server.WithLogURL("")) @@ -242,18 +183,18 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, *tes ds, err := hybrid.NewNopDataStore() if err != nil { - return nil, nil, err + return nil, err } grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) cancelCtx, cancel := context.WithCancel(ctx) - capacityGate := NewTestCapacityGate(nodeNum, InitialTestCapacityUnitsPerRunner) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", capacityGate) + + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", nil) if err != nil { - return nil, nil, err + return nil, err } opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx)) - return server.New(ctx, opts...), capacityGate, nil + return server.New(ctx, opts...), nil } func pwd() string { @@ -308,20 +249,15 @@ func whoAmI() net.IP { return nil } -func TestCanInstantiateSystem(t *testing.T) { - SystemTweaker().ChangeNodeCapacities(128) - defer SystemTweaker().RestoreInitialNodeCapacities() -} - func TestMain(m *testing.M) { - err := SetUpSystem() + state, err := SetUpSystem() if err != nil { logrus.WithError(err).Fatal("Could not initialize system") os.Exit(1) } // call flag.Parse() here if TestMain uses flags result := m.Run() - err = CleanUpSystem() + err = CleanUpSystem(state) if err != nil { logrus.WithError(err).Warn("Could not clean up system") }