From 95d7379a813c1c12687d8e16e8e0f64881d3e860 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Tue, 10 Apr 2018 15:50:32 -0700 Subject: [PATCH] fn: pure runner concurrency fixes (#928) * fn: experimental pure_runner rewrite * fn: minor refactor * fn: added comments and moved pipe initialization to NewCallHandle * fn: EOF gRPC and EOF DataFrame handling --- api/agent/pure_runner.go | 748 ++++++++++++++++------------ test/fn-system-tests/system_test.go | 2 +- 2 files changed, 429 insertions(+), 321 deletions(-) diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index e37a4c4a6..887839ec9 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" "context" "crypto/tls" "crypto/x509" @@ -27,132 +28,192 @@ import ( "google.golang.org/grpc/peer" ) +/* + Pure Runner (implements Agent) proxies gRPC requests to the actual Agent instance. This is + done using http.ResponseWriter interfaces where Agent pushes the function I/O through: + 1) Function output to pure runner is received through callHandle http.ResponseWriter interface. + 2) Function input from pure runner to Agent is processed through callHandle io.PipeWriter. + 3) LB to runner input is handled via receiver (inQueue) + 4) runner to LB output is handled via sender (outQueue) + + The flow of events is as follows: + + 1) LB sends ClientMsg_Try to runner + 2) Runner allocates its resources and sends an ACK: RunnerMsg_Acknowledged + 3) LB sends ClientMsg_Data messages with an EOF for last message set. + 4) Runner upon receiving with ClientMsg_Data calls agent.Submit() + 5) agent.Submit starts reading data from callHandle io.PipeReader, this reads + data from LB via gRPC receiver (inQueue). + 6) agent.Submit starts sending data via callHandle http.ResponseWriter interface, + which is pushed to gRPC sender (outQueue) to the LB. + 7) agent.Submit() completes, this means, the Function I/O is now completed. + 8) Runner finalizes gRPC session with RunnerMsg_Finished to LB. + +*/ + +var ( + ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try") + ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data") +) + // callHandle represents the state of the call as handled by the pure runner, and additionally it implements the // interface of http.ResponseWriter so that it can be used for streaming the output back. type callHandle struct { engagement runner.RunnerProtocol_EngageServer + ctx context.Context c *call // the agent's version of call - input io.WriteCloser - started bool - done chan error // to synchronize - // As the state can be set and checked by both goroutines handling this state, we need a mutex. - stateMutex sync.Mutex + // Timings, for metrics: receivedTime strfmt.DateTime // When was the call received? allocatedTime strfmt.DateTime // When did we finish allocating capacity? - // Last communication error on the stream (if any). This basically acts as a cancellation flag too. - streamError error + // For implementing http.ResponseWriter: - outHeaders http.Header - outStatus int - headerWritten bool + headers http.Header + status int + + headerOnce sync.Once + shutOnce sync.Once + pipeToFnCloseOnce sync.Once + + outQueue chan *runner.RunnerMsg + doneQueue chan struct{} + errQueue chan error + inQueue chan *runner.ClientMsg + + // Pipe to push data to the agent Function container + pipeToFnW *io.PipeWriter + pipeToFnR *io.PipeReader } -func (ch *callHandle) Header() http.Header { - return ch.outHeaders -} +func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle { -func (ch *callHandle) WriteHeader(status int) { - ch.outStatus = status - ch.commitHeaders() -} + // set up a pipe to push data to agent Function container + pipeR, pipeW := io.Pipe() -func (ch *callHandle) commitHeaders() error { - if ch.headerWritten { - return nil + state := &callHandle{ + engagement: engagement, + ctx: engagement.Context(), + headers: make(http.Header), + status: 200, + outQueue: make(chan *runner.RunnerMsg), + doneQueue: make(chan struct{}), + errQueue: make(chan error, 1), // always allow one error (buffered) + inQueue: make(chan *runner.ClientMsg), + pipeToFnW: pipeW, + pipeToFnR: pipeR, } - ch.headerWritten = true - logrus.Debugf("Committing call result with status %d", ch.outStatus) - var outHeaders []*runner.HttpHeader + // spawn one receiver and one sender go-routine. + // See: https://grpc.io/docs/reference/go/generated-code.html, which reads: + // "Thread-safety: note that client-side RPC invocations and server-side RPC handlers + // are thread-safe and are meant to be run on concurrent goroutines. But also note that + // for individual streams, incoming and outgoing data is bi-directional but serial; + // so e.g. individual streams do not support concurrent reads or concurrent writes + // (but reads are safely concurrent with writes)." + state.spawnReceiver() + state.spawnSender() + return state +} - for h, vals := range ch.outHeaders { - for _, v := range vals { - outHeaders = append(outHeaders, &runner.HttpHeader{ - Key: h, - Value: v, - }) +// closePipeToFn closes the pipe that feeds data to the function in agent. +func (ch *callHandle) closePipeToFn() { + ch.pipeToFnCloseOnce.Do(func() { + ch.pipeToFnW.Close() + }) +} + +// finalize initiates a graceful shutdown of the session. This is +// currently achieved by a sentinel nil enqueue to gRPC sender. +func (ch *callHandle) finalize() error { + // final sentinel nil msg for graceful shutdown + err := ch.enqueueMsg(nil) + if err != nil { + ch.shutdown(err) + } + return err +} + +// shutdown initiates a shutdown and terminates the gRPC session with +// a given error. +func (ch *callHandle) shutdown(err error) { + + ch.closePipeToFn() + + ch.shutOnce.Do(func() { + logrus.WithError(err).Debugf("Shutting down call handle") + + // try to queue an error message if it's not already queued. + if err != nil { + select { + case ch.errQueue <- err: + default: + } } - } - // Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel - // the call: basically just blackhole the output and return the write error to cause Submit to fail properly. - ch.stateMutex.Lock() - defer ch.stateMutex.Unlock() - err := ch.streamError - if err != nil { - return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError) - } - - logrus.Debug("Sending call result start message") - err = ch.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_ResultStart{ - ResultStart: &runner.CallResultStart{ - Meta: &runner.CallResultStart_Http{ - Http: &runner.HttpRespMeta{ - Headers: outHeaders, - StatusCode: int32(ch.outStatus), - }, - }, - }, - }, + close(ch.doneQueue) }) - if err != nil { - logrus.WithError(err).Error("Error sending call result") - ch.streamError = err - return err - } - logrus.Debug("Sent call result message") - return nil } -func (ch *callHandle) Write(data []byte) (int, error) { - err := ch.commitHeaders() - if err != nil { - return 0, fmt.Errorf("Error sending data: %v", err) +// waitError waits until the session is completed and results +// any queued error if there is any. +func (ch *callHandle) waitError() error { + select { + case <-ch.ctx.Done(): + case <-ch.doneQueue: } - // Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel - // the call: basically just blackhole the output and return the write error to cause Submit to fail properly. - ch.stateMutex.Lock() - defer ch.stateMutex.Unlock() - err = ch.streamError - if err != nil { - return 0, fmt.Errorf("Bailing out because of communication error: %v", ch.streamError) + var err error + // get queued error if there's any + select { + case err = <-ch.errQueue: + default: + err = ch.ctx.Err() } - logrus.Debugf("Sending call response data %d bytes long", len(data)) - err = ch.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Data{ - Data: &runner.DataFrame{ - Data: data, - Eof: false, - }, - }, - }) if err != nil { - ch.streamError = err - return 0, fmt.Errorf("Error sending data: %v", err) + logrus.WithError(err).Debugf("Wait Error") } - return len(data), nil + return err } -func (ch *callHandle) Close() error { - err := ch.commitHeaders() +// enqueueMsg attempts to queue a message to the gRPC sender +func (ch *callHandle) enqueueMsg(msg *runner.RunnerMsg) error { + select { + case ch.outQueue <- msg: + return nil + case <-ch.ctx.Done(): + case <-ch.doneQueue: + } + return io.EOF +} + +// enqueueMsgStricy enqueues a message to the gRPC sender and if +// that fails then initiates an error case shutdown. +func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error { + err := ch.enqueueMsg(msg) if err != nil { - return fmt.Errorf("Error sending close frame: %v", err) + ch.shutdown(err) + } + return err +} + +// enqueueCallResponse enqueues a Submit() response to the LB +// and initiates a graceful shutdown of the session. +func (ch *callHandle) enqueueCallResponse(err error) { + defer ch.finalize() + + // Error response + if err != nil { + ch.enqueueMsgStrict(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ + Success: false, + Details: fmt.Sprintf("%v", err), + }}}) + return } - // Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel - // the call: basically just blackhole the output and return the write error to cause the caller to fail properly. - ch.stateMutex.Lock() - defer ch.stateMutex.Unlock() - err = ch.streamError - if err != nil { - return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError) - } - logrus.Debug("Sending call response data end") - err = ch.engagement.Send(&runner.RunnerMsg{ + // EOF and Success response + ch.enqueueMsgStrict(&runner.RunnerMsg{ Body: &runner.RunnerMsg_Data{ Data: &runner.DataFrame{ Eof: true, @@ -160,38 +221,236 @@ func (ch *callHandle) Close() error { }, }) - if err != nil { - return fmt.Errorf("Error sending close frame: %v", err) - } - return nil + ch.enqueueMsgStrict(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ + Success: true, + Details: ch.c.Model().ID, + }}}) } -// cancel implements the logic for cancelling the execution of a call based on what the state in the handle is. -func (ch *callHandle) cancel(ctx context.Context, err error) { - ch.stateMutex.Lock() - defer ch.stateMutex.Unlock() - - // Do not double-cancel. - if ch.streamError != nil { - return +// enqueueAck enqueues a ACK or NACK response to the LB for ClientMsg_Try +// request. If NACK, then it also initiates a graceful shutdown of the +// session. +func (ch *callHandle) enqueueAck(err error) error { + // NACK + if err != nil { + err = ch.enqueueMsgStrict(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ + Committed: false, + Details: fmt.Sprintf("%v", err), + }}}) + if err != nil { + return err + } + return ch.finalize() } - // First, record that there has been an error. - ch.streamError = err - // Caller may have died or disconnected. The behaviour here depends on the state of the call. - // If the call was placed and is running we need to handle it... - if ch.c != nil { - // If we've actually started the call we're in the middle of an execution with i/o going back and forth. - // This is hard to stop. Side effects can be occurring at any point. However, at least we should stop - // the i/o flow. Recording the stream error in the handle should have stopped the output, but we also - // want to stop any input being sent through, so we close the input stream and let the function - // probably crash out. If it doesn't crash out, well, it means the function doesn't handle i/o errors - // properly and it will hang there until the timeout, then it'll be killed properly by the timeout - // handling in Submit. - if ch.started { - ch.input.Close() + // ACK + return ch.enqueueMsgStrict(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ + Committed: true, + Details: ch.c.Model().ID, + SlotAllocationLatency: time.Time(ch.allocatedTime).Sub(time.Time(ch.receivedTime)).String(), + }}}) +} + +// spawnPipeToFn pumps data to Function via callHandle io.PipeWriter (pipeToFnW) +// which is fed using input channel. +func (ch *callHandle) spawnPipeToFn() chan *runner.DataFrame { + + input := make(chan *runner.DataFrame) + go func() { + defer ch.closePipeToFn() + for { + select { + case <-ch.doneQueue: + return + case <-ch.ctx.Done(): + return + case data := <-input: + if data == nil { + return + } + + if len(data.Data) > 0 { + _, err := io.CopyN(ch.pipeToFnW, bytes.NewReader(data.Data), int64(len(data.Data))) + if err != nil { + ch.shutdown(err) + return + } + } + if data.Eof { + return + } + } + } + }() + + return input +} + +// spawnReceiver starts a gRPC receiver, which +// feeds received LB messages into inQueue +func (ch *callHandle) spawnReceiver() { + + go func() { + defer close(ch.inQueue) + for { + msg, err := ch.engagement.Recv() + if err != nil { + // engagement is close/cancelled from client. + if err == io.EOF { + return + } + ch.shutdown(err) + return + } + + select { + case ch.inQueue <- msg: + case <-ch.doneQueue: + return + case <-ch.ctx.Done(): + return + } + } + }() +} + +// spawnSender starts a gRPC sender, which +// pumps messages from outQueue to the LB. +func (ch *callHandle) spawnSender() { + go func() { + for { + select { + case msg := <-ch.outQueue: + if msg == nil { + ch.shutdown(nil) + return + } + err := ch.engagement.Send(msg) + if err != nil { + ch.shutdown(err) + return + } + case <-ch.doneQueue: + return + case <-ch.ctx.Done(): + return + } + } + }() +} + +// Header implements http.ResponseWriter, which +// is used by Agent to push headers to pure runner +func (ch *callHandle) Header() http.Header { + return ch.headers +} + +// WriteHeader implements http.ResponseWriter, which +// is used by Agent to push http status to pure runner +func (ch *callHandle) WriteHeader(status int) { + ch.status = status +} + +// prepHeaders is a utility function to compile http headers +// into a flat array. +func (ch *callHandle) prepHeaders() []*runner.HttpHeader { + var headers []*runner.HttpHeader + for h, vals := range ch.headers { + for _, v := range vals { + headers = append(headers, &runner.HttpHeader{ + Key: h, + Value: v, + }) } } + return headers +} + +// Write implements http.ResponseWriter, which +// is used by Agent to push http data to pure runner. The +// received data is pushed to LB via gRPC sender queue. +// Write also sends http headers/state to the LB. +func (ch *callHandle) Write(data []byte) (int, error) { + var err error + ch.headerOnce.Do(func() { + // WARNING: we do fetch Status and Headers without + // a lock below. This is a problem in agent in general, and needs + // to be fixed in all accessing go-routines such as protocol/http.go, + // protocol/json.go, agent.go, etc. In practice however, one go routine + // accesses them (which also compiles and writes headers), but this + // is fragile and needs to be fortified. + err = ch.enqueueMsg(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_ResultStart{ + ResultStart: &runner.CallResultStart{ + Meta: &runner.CallResultStart_Http{ + Http: &runner.HttpRespMeta{ + Headers: ch.prepHeaders(), + StatusCode: int32(ch.status), + }, + }, + }, + }, + }) + }) + + if err != nil { + return 0, err + } + + err = ch.enqueueMsg(&runner.RunnerMsg{ + Body: &runner.RunnerMsg_Data{ + Data: &runner.DataFrame{ + Data: data, + Eof: false, + }, + }, + }) + + if err != nil { + return 0, err + } + return len(data), nil +} + +// getTryMsg fetches/waits for a TryCall message from +// the LB using inQueue (gRPC receiver) +func (ch *callHandle) getTryMsg() *runner.TryCall { + var msg *runner.TryCall + + select { + case <-ch.doneQueue: + case <-ch.ctx.Done(): + case item := <-ch.inQueue: + if item != nil { + msg = item.GetTry() + } + } + if msg == nil { + ch.shutdown(ErrorExpectedTry) + } + return msg +} + +// getDataMsg fetches/waits for a DataFrame message from +// the LB using inQueue (gRPC receiver) +func (ch *callHandle) getDataMsg() *runner.DataFrame { + var msg *runner.DataFrame + + select { + case <-ch.doneQueue: + case <-ch.ctx.Done(): + case item := <-ch.inQueue: + if item != nil { + msg = item.GetData() + } + } + if msg == nil { + ch.shutdown(ErrorExpectedData) + } + return msg } type CapacityGate interface { @@ -285,100 +544,16 @@ func (pr *pureRunner) Enqueue(context.Context, *models.Call) error { return errors.New("Enqueue cannot be called directly in a Pure Runner.") } -func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) { - // Only start it once! - state.stateMutex.Lock() - defer state.stateMutex.Unlock() - if !state.started { - state.started = true - go func() { - err := pr.a.Submit(state.c) - if err != nil { - // In this case the function has failed for a legitimate reason. We send a call failed message if we - // can. If there's a streaming error doing that then we are basically in the "double exception" case - // and who knows what's best to do. Submit has already finished so we don't need to cancel... but at - // least we should set streamError if it's not set. - state.stateMutex.Lock() - defer state.stateMutex.Unlock() - if state.streamError == nil { - err2 := state.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: false, - Details: fmt.Sprintf("%v", err), - }}}) - if err2 != nil { - state.streamError = err2 - } - } - state.done <- err - return - } - // First close the writer, then send the call finished message - err = state.Close() - if err != nil { - // If we fail to close the writer we need to communicate back that the function has failed; if there's - // a streaming error doing that then we are basically in the "double exception" case and who knows - // what's best to do. Submit has already finished so we don't need to cancel... but at least we should - // set streamError if it's not set. - state.stateMutex.Lock() - defer state.stateMutex.Unlock() - if state.streamError == nil { - err2 := state.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: false, - Details: fmt.Sprintf("%v", err), - }}}) - if err2 != nil { - state.streamError = err2 - } - } - state.done <- err - return - } - // At this point everything should have worked. Send a successful message... and if that runs afoul of a - // stream error, well, we're in a bit of trouble. Everything has finished, so there is nothing to cancel - // and we just give up, but at least we set streamError. - state.stateMutex.Lock() - defer state.stateMutex.Unlock() - if state.streamError == nil { - err2 := state.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: true, - Details: state.c.Model().ID, - }}}) - if err2 != nil { - state.streamError = err2 - state.done <- err2 - return - } - } - - state.done <- nil - }() - } +func (pr *pureRunner) spawnSubmit(state *callHandle) { + go func() { + err := pr.a.Submit(state.c) + state.enqueueCallResponse(err) + }() } -func (pr *pureRunner) handleData(ctx context.Context, data *runner.DataFrame, state *callHandle) error { - pr.ensureFunctionIsRunning(state) - - // Only push the input if we're in a non-error situation - state.stateMutex.Lock() - defer state.stateMutex.Unlock() - if state.streamError == nil { - if len(data.Data) > 0 { - _, err := state.input.Write(data.Data) - if err != nil { - return err - } - } - if data.Eof { - state.input.Close() - } - } - return nil -} - -func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) { +// handleTryCall based on the TryCall message, allocates a resource/capacity reservation +// and creates callHandle.c with agent.call. +func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) { state.receivedTime = strfmt.DateTime(time.Now()) var c models.Call err := json.Unmarshal([]byte(tc.ModelsCallJson), &c) @@ -392,23 +567,25 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta return func() {}, err } - // Proceed! - var w http.ResponseWriter - w = state - inR, inW := io.Pipe() - agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w)) - if err != nil { - return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err + cleanup := func() { + pr.capacity.ReleaseCapacity(c.Memory) } + + agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state)) + if err != nil { + return cleanup, err + } + state.c = agent_call.(*call) - state.input = inW state.allocatedTime = strfmt.DateTime(time.Now()) - return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil + return cleanup, nil } // Handles a client engagement func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error { + grpc.EnableTracing = false + // Keep lightweight tabs on what this runner is doing: for draindown tests atomic.AddInt32(&pr.inflight, 1) defer atomic.AddInt32(&pr.inflight, -1) @@ -423,116 +600,47 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro logrus.Debug("MD is ", md) } - var state = callHandle{ - engagement: engagement, - c: nil, - input: nil, - started: false, - done: make(chan error), - streamError: nil, - outHeaders: make(http.Header), - outStatus: 200, - headerWritten: false, + state := NewCallHandle(engagement) + + tryMsg := state.getTryMsg() + if tryMsg == nil { + return state.waitError() } - grpc.EnableTracing = false - logrus.Debug("Entering engagement handler") - - msg, err := engagement.Recv() - if err != nil { - // In this case the connection has dropped before we've even started. - return err + dealloc, errTry := pr.handleTryCall(tryMsg, state) + defer dealloc() + // respond with handleTryCall response + err := state.enqueueAck(errTry) + if err != nil || errTry != nil { + return state.waitError() } - switch body := msg.Body.(type) { - case *runner.ClientMsg_Try: - dealloc, err := pr.handleTryCall(engagement.Context(), body.Try, &state) - defer dealloc() - // At the stage of TryCall, there is only one thread running and nothing has happened yet so there should - // not be a streamError. We can handle `err` by sending a message back. If we cause a stream error by sending - // the message, we are in a "double exception" case and we might as well cancel the call with the original - // error, so we can ignore the error from Send. - if err != nil { - _ = engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ - Committed: false, - Details: fmt.Sprintf("%v", err), - }}}) - state.cancel(engagement.Context(), err) - return err + + var dataFeed chan *runner.DataFrame +DataLoop: + for { + dataMsg := state.getDataMsg() + if dataMsg == nil { + break } - // If we succeed in creating the call, but we get a stream error sending a message back, we must cancel - // the call because we've probably lost the connection. - err = engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{ - Committed: true, - Details: state.c.Model().ID, - SlotAllocationLatency: time.Time(state.allocatedTime).Sub(time.Time(state.receivedTime)).String(), - }}}) - if err != nil { - state.cancel(engagement.Context(), err) - return err + if dataFeed == nil { + pr.spawnSubmit(state) + dataFeed = state.spawnPipeToFn() } - // Then at this point we start handling the data that should be being pushed to us. - foundEof := false - for !foundEof { - msg, err := engagement.Recv() - if err != nil { - // In this case the connection has dropped or there's something bad happening. We know we can't even - // send a message back. Cancel the call, all bets are off. - state.cancel(engagement.Context(), err) - return err - } - - switch body := msg.Body.(type) { - case *runner.ClientMsg_Data: - err := pr.handleData(engagement.Context(), body.Data, &state) - if err != nil { - // If this happens, then we couldn't write into the input. The state of the function is inconsistent - // and therefore we need to cancel. We also need to communicate back that the function has failed; - // that could also run afoul of a stream error, but at that point we don't care, just cancel the - // call with the original error. - _ = state.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: false, - Details: fmt.Sprintf("%v", err), - }}}) - state.cancel(engagement.Context(), err) - return err - } - // Then break the loop if this was the last input data frame, i.e. eof is on - if body.Data.Eof { - foundEof = true - } - default: - err := errors.New("Protocol failure in communication with function runner") - // This is essentially a panic. Try to communicate back that the call has failed, and bail out; that - // could also run afoul of a stream error, but at that point we don't care, just cancel the call with - // the catastrophic error. - _ = state.engagement.Send(&runner.RunnerMsg{ - Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{ - Success: false, - Details: fmt.Sprintf("%v", err), - }}}) - state.cancel(engagement.Context(), err) - return err - } - } - - // Synchronize to the function running goroutine finishing select { - case <-state.done: - case <-engagement.Context().Done(): - return engagement.Context().Err() + case dataFeed <- dataMsg: + if dataMsg.Eof { + break DataLoop + } + case <-state.doneQueue: + break DataLoop + case <-state.ctx.Done(): + break DataLoop } - - default: - // Protocol error. This should not happen. - return errors.New("Protocol failure in communication with function runner") } - return nil + return state.waitError() } func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) { diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index efae61be0..a0737f833 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -194,7 +194,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro opts = append(opts, server.WithWebPort(8082+nodeNum)) opts = append(opts, server.WithGRPCPort(9190+nodeNum)) opts = append(opts, server.WithType(nodeType)) - opts = append(opts, server.WithLogLevel(server.DefaultLogLevel)) + opts = append(opts, server.WithLogLevel("debug")) opts = append(opts, server.WithLogDest(server.DefaultLogDest, "PURE-RUNNER")) opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithMQURL(""))