fn: pure runner concurrency fixes (#928)

* fn: experimental pure_runner rewrite

* fn: minor refactor

* fn: added comments and moved pipe initialization to NewCallHandle

* fn: EOF gRPC and EOF DataFrame handling
This commit is contained in:
Tolga Ceylan
2018-04-10 15:50:32 -07:00
committed by GitHub
parent bd53ee9a2d
commit 95d7379a81
2 changed files with 429 additions and 321 deletions

View File

@@ -1,6 +1,7 @@
package agent package agent
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
@@ -27,132 +28,192 @@ import (
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
) )
/*
Pure Runner (implements Agent) proxies gRPC requests to the actual Agent instance. This is
done using http.ResponseWriter interfaces where Agent pushes the function I/O through:
1) Function output to pure runner is received through callHandle http.ResponseWriter interface.
2) Function input from pure runner to Agent is processed through callHandle io.PipeWriter.
3) LB to runner input is handled via receiver (inQueue)
4) runner to LB output is handled via sender (outQueue)
The flow of events is as follows:
1) LB sends ClientMsg_Try to runner
2) Runner allocates its resources and sends an ACK: RunnerMsg_Acknowledged
3) LB sends ClientMsg_Data messages with an EOF for last message set.
4) Runner upon receiving with ClientMsg_Data calls agent.Submit()
5) agent.Submit starts reading data from callHandle io.PipeReader, this reads
data from LB via gRPC receiver (inQueue).
6) agent.Submit starts sending data via callHandle http.ResponseWriter interface,
which is pushed to gRPC sender (outQueue) to the LB.
7) agent.Submit() completes, this means, the Function I/O is now completed.
8) Runner finalizes gRPC session with RunnerMsg_Finished to LB.
*/
var (
ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try")
ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
)
// callHandle represents the state of the call as handled by the pure runner, and additionally it implements the // callHandle represents the state of the call as handled by the pure runner, and additionally it implements the
// interface of http.ResponseWriter so that it can be used for streaming the output back. // interface of http.ResponseWriter so that it can be used for streaming the output back.
type callHandle struct { type callHandle struct {
engagement runner.RunnerProtocol_EngageServer engagement runner.RunnerProtocol_EngageServer
ctx context.Context
c *call // the agent's version of call c *call // the agent's version of call
input io.WriteCloser
started bool
done chan error // to synchronize
// As the state can be set and checked by both goroutines handling this state, we need a mutex.
stateMutex sync.Mutex
// Timings, for metrics: // Timings, for metrics:
receivedTime strfmt.DateTime // When was the call received? receivedTime strfmt.DateTime // When was the call received?
allocatedTime strfmt.DateTime // When did we finish allocating capacity? allocatedTime strfmt.DateTime // When did we finish allocating capacity?
// Last communication error on the stream (if any). This basically acts as a cancellation flag too.
streamError error
// For implementing http.ResponseWriter: // For implementing http.ResponseWriter:
outHeaders http.Header headers http.Header
outStatus int status int
headerWritten bool
headerOnce sync.Once
shutOnce sync.Once
pipeToFnCloseOnce sync.Once
outQueue chan *runner.RunnerMsg
doneQueue chan struct{}
errQueue chan error
inQueue chan *runner.ClientMsg
// Pipe to push data to the agent Function container
pipeToFnW *io.PipeWriter
pipeToFnR *io.PipeReader
} }
func (ch *callHandle) Header() http.Header { func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle {
return ch.outHeaders
}
func (ch *callHandle) WriteHeader(status int) { // set up a pipe to push data to agent Function container
ch.outStatus = status pipeR, pipeW := io.Pipe()
ch.commitHeaders()
}
func (ch *callHandle) commitHeaders() error { state := &callHandle{
if ch.headerWritten { engagement: engagement,
return nil ctx: engagement.Context(),
headers: make(http.Header),
status: 200,
outQueue: make(chan *runner.RunnerMsg),
doneQueue: make(chan struct{}),
errQueue: make(chan error, 1), // always allow one error (buffered)
inQueue: make(chan *runner.ClientMsg),
pipeToFnW: pipeW,
pipeToFnR: pipeR,
} }
ch.headerWritten = true
logrus.Debugf("Committing call result with status %d", ch.outStatus)
var outHeaders []*runner.HttpHeader // spawn one receiver and one sender go-routine.
// See: https://grpc.io/docs/reference/go/generated-code.html, which reads:
// "Thread-safety: note that client-side RPC invocations and server-side RPC handlers
// are thread-safe and are meant to be run on concurrent goroutines. But also note that
// for individual streams, incoming and outgoing data is bi-directional but serial;
// so e.g. individual streams do not support concurrent reads or concurrent writes
// (but reads are safely concurrent with writes)."
state.spawnReceiver()
state.spawnSender()
return state
}
for h, vals := range ch.outHeaders { // closePipeToFn closes the pipe that feeds data to the function in agent.
for _, v := range vals { func (ch *callHandle) closePipeToFn() {
outHeaders = append(outHeaders, &runner.HttpHeader{ ch.pipeToFnCloseOnce.Do(func() {
Key: h, ch.pipeToFnW.Close()
Value: v, })
}) }
// finalize initiates a graceful shutdown of the session. This is
// currently achieved by a sentinel nil enqueue to gRPC sender.
func (ch *callHandle) finalize() error {
// final sentinel nil msg for graceful shutdown
err := ch.enqueueMsg(nil)
if err != nil {
ch.shutdown(err)
}
return err
}
// shutdown initiates a shutdown and terminates the gRPC session with
// a given error.
func (ch *callHandle) shutdown(err error) {
ch.closePipeToFn()
ch.shutOnce.Do(func() {
logrus.WithError(err).Debugf("Shutting down call handle")
// try to queue an error message if it's not already queued.
if err != nil {
select {
case ch.errQueue <- err:
default:
}
} }
}
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel close(ch.doneQueue)
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly.
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
err := ch.streamError
if err != nil {
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
}
logrus.Debug("Sending call result start message")
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_ResultStart{
ResultStart: &runner.CallResultStart{
Meta: &runner.CallResultStart_Http{
Http: &runner.HttpRespMeta{
Headers: outHeaders,
StatusCode: int32(ch.outStatus),
},
},
},
},
}) })
if err != nil {
logrus.WithError(err).Error("Error sending call result")
ch.streamError = err
return err
}
logrus.Debug("Sent call result message")
return nil
} }
func (ch *callHandle) Write(data []byte) (int, error) { // waitError waits until the session is completed and results
err := ch.commitHeaders() // any queued error if there is any.
if err != nil { func (ch *callHandle) waitError() error {
return 0, fmt.Errorf("Error sending data: %v", err) select {
case <-ch.ctx.Done():
case <-ch.doneQueue:
} }
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel var err error
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly. // get queued error if there's any
ch.stateMutex.Lock() select {
defer ch.stateMutex.Unlock() case err = <-ch.errQueue:
err = ch.streamError default:
if err != nil { err = ch.ctx.Err()
return 0, fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
} }
logrus.Debugf("Sending call response data %d bytes long", len(data))
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Data: data,
Eof: false,
},
},
})
if err != nil { if err != nil {
ch.streamError = err logrus.WithError(err).Debugf("Wait Error")
return 0, fmt.Errorf("Error sending data: %v", err)
} }
return len(data), nil return err
} }
func (ch *callHandle) Close() error { // enqueueMsg attempts to queue a message to the gRPC sender
err := ch.commitHeaders() func (ch *callHandle) enqueueMsg(msg *runner.RunnerMsg) error {
select {
case ch.outQueue <- msg:
return nil
case <-ch.ctx.Done():
case <-ch.doneQueue:
}
return io.EOF
}
// enqueueMsgStricy enqueues a message to the gRPC sender and if
// that fails then initiates an error case shutdown.
func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error {
err := ch.enqueueMsg(msg)
if err != nil { if err != nil {
return fmt.Errorf("Error sending close frame: %v", err) ch.shutdown(err)
}
return err
}
// enqueueCallResponse enqueues a Submit() response to the LB
// and initiates a graceful shutdown of the session.
func (ch *callHandle) enqueueCallResponse(err error) {
defer ch.finalize()
// Error response
if err != nil {
ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
return
} }
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel // EOF and Success response
// the call: basically just blackhole the output and return the write error to cause the caller to fail properly. ch.enqueueMsgStrict(&runner.RunnerMsg{
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
err = ch.streamError
if err != nil {
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
}
logrus.Debug("Sending call response data end")
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{ Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{ Data: &runner.DataFrame{
Eof: true, Eof: true,
@@ -160,38 +221,236 @@ func (ch *callHandle) Close() error {
}, },
}) })
if err != nil { ch.enqueueMsgStrict(&runner.RunnerMsg{
return fmt.Errorf("Error sending close frame: %v", err) Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
} Success: true,
return nil Details: ch.c.Model().ID,
}}})
} }
// cancel implements the logic for cancelling the execution of a call based on what the state in the handle is. // enqueueAck enqueues a ACK or NACK response to the LB for ClientMsg_Try
func (ch *callHandle) cancel(ctx context.Context, err error) { // request. If NACK, then it also initiates a graceful shutdown of the
ch.stateMutex.Lock() // session.
defer ch.stateMutex.Unlock() func (ch *callHandle) enqueueAck(err error) error {
// NACK
// Do not double-cancel. if err != nil {
if ch.streamError != nil { err = ch.enqueueMsgStrict(&runner.RunnerMsg{
return Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: false,
Details: fmt.Sprintf("%v", err),
}}})
if err != nil {
return err
}
return ch.finalize()
} }
// First, record that there has been an error. // ACK
ch.streamError = err return ch.enqueueMsgStrict(&runner.RunnerMsg{
// Caller may have died or disconnected. The behaviour here depends on the state of the call. Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
// If the call was placed and is running we need to handle it... Committed: true,
if ch.c != nil { Details: ch.c.Model().ID,
// If we've actually started the call we're in the middle of an execution with i/o going back and forth. SlotAllocationLatency: time.Time(ch.allocatedTime).Sub(time.Time(ch.receivedTime)).String(),
// This is hard to stop. Side effects can be occurring at any point. However, at least we should stop }}})
// the i/o flow. Recording the stream error in the handle should have stopped the output, but we also }
// want to stop any input being sent through, so we close the input stream and let the function
// probably crash out. If it doesn't crash out, well, it means the function doesn't handle i/o errors // spawnPipeToFn pumps data to Function via callHandle io.PipeWriter (pipeToFnW)
// properly and it will hang there until the timeout, then it'll be killed properly by the timeout // which is fed using input channel.
// handling in Submit. func (ch *callHandle) spawnPipeToFn() chan *runner.DataFrame {
if ch.started {
ch.input.Close() input := make(chan *runner.DataFrame)
go func() {
defer ch.closePipeToFn()
for {
select {
case <-ch.doneQueue:
return
case <-ch.ctx.Done():
return
case data := <-input:
if data == nil {
return
}
if len(data.Data) > 0 {
_, err := io.CopyN(ch.pipeToFnW, bytes.NewReader(data.Data), int64(len(data.Data)))
if err != nil {
ch.shutdown(err)
return
}
}
if data.Eof {
return
}
}
}
}()
return input
}
// spawnReceiver starts a gRPC receiver, which
// feeds received LB messages into inQueue
func (ch *callHandle) spawnReceiver() {
go func() {
defer close(ch.inQueue)
for {
msg, err := ch.engagement.Recv()
if err != nil {
// engagement is close/cancelled from client.
if err == io.EOF {
return
}
ch.shutdown(err)
return
}
select {
case ch.inQueue <- msg:
case <-ch.doneQueue:
return
case <-ch.ctx.Done():
return
}
}
}()
}
// spawnSender starts a gRPC sender, which
// pumps messages from outQueue to the LB.
func (ch *callHandle) spawnSender() {
go func() {
for {
select {
case msg := <-ch.outQueue:
if msg == nil {
ch.shutdown(nil)
return
}
err := ch.engagement.Send(msg)
if err != nil {
ch.shutdown(err)
return
}
case <-ch.doneQueue:
return
case <-ch.ctx.Done():
return
}
}
}()
}
// Header implements http.ResponseWriter, which
// is used by Agent to push headers to pure runner
func (ch *callHandle) Header() http.Header {
return ch.headers
}
// WriteHeader implements http.ResponseWriter, which
// is used by Agent to push http status to pure runner
func (ch *callHandle) WriteHeader(status int) {
ch.status = status
}
// prepHeaders is a utility function to compile http headers
// into a flat array.
func (ch *callHandle) prepHeaders() []*runner.HttpHeader {
var headers []*runner.HttpHeader
for h, vals := range ch.headers {
for _, v := range vals {
headers = append(headers, &runner.HttpHeader{
Key: h,
Value: v,
})
} }
} }
return headers
}
// Write implements http.ResponseWriter, which
// is used by Agent to push http data to pure runner. The
// received data is pushed to LB via gRPC sender queue.
// Write also sends http headers/state to the LB.
func (ch *callHandle) Write(data []byte) (int, error) {
var err error
ch.headerOnce.Do(func() {
// WARNING: we do fetch Status and Headers without
// a lock below. This is a problem in agent in general, and needs
// to be fixed in all accessing go-routines such as protocol/http.go,
// protocol/json.go, agent.go, etc. In practice however, one go routine
// accesses them (which also compiles and writes headers), but this
// is fragile and needs to be fortified.
err = ch.enqueueMsg(&runner.RunnerMsg{
Body: &runner.RunnerMsg_ResultStart{
ResultStart: &runner.CallResultStart{
Meta: &runner.CallResultStart_Http{
Http: &runner.HttpRespMeta{
Headers: ch.prepHeaders(),
StatusCode: int32(ch.status),
},
},
},
},
})
})
if err != nil {
return 0, err
}
err = ch.enqueueMsg(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Data: data,
Eof: false,
},
},
})
if err != nil {
return 0, err
}
return len(data), nil
}
// getTryMsg fetches/waits for a TryCall message from
// the LB using inQueue (gRPC receiver)
func (ch *callHandle) getTryMsg() *runner.TryCall {
var msg *runner.TryCall
select {
case <-ch.doneQueue:
case <-ch.ctx.Done():
case item := <-ch.inQueue:
if item != nil {
msg = item.GetTry()
}
}
if msg == nil {
ch.shutdown(ErrorExpectedTry)
}
return msg
}
// getDataMsg fetches/waits for a DataFrame message from
// the LB using inQueue (gRPC receiver)
func (ch *callHandle) getDataMsg() *runner.DataFrame {
var msg *runner.DataFrame
select {
case <-ch.doneQueue:
case <-ch.ctx.Done():
case item := <-ch.inQueue:
if item != nil {
msg = item.GetData()
}
}
if msg == nil {
ch.shutdown(ErrorExpectedData)
}
return msg
} }
type CapacityGate interface { type CapacityGate interface {
@@ -285,100 +544,16 @@ func (pr *pureRunner) Enqueue(context.Context, *models.Call) error {
return errors.New("Enqueue cannot be called directly in a Pure Runner.") return errors.New("Enqueue cannot be called directly in a Pure Runner.")
} }
func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) { func (pr *pureRunner) spawnSubmit(state *callHandle) {
// Only start it once! go func() {
state.stateMutex.Lock() err := pr.a.Submit(state.c)
defer state.stateMutex.Unlock() state.enqueueCallResponse(err)
if !state.started { }()
state.started = true
go func() {
err := pr.a.Submit(state.c)
if err != nil {
// In this case the function has failed for a legitimate reason. We send a call failed message if we
// can. If there's a streaming error doing that then we are basically in the "double exception" case
// and who knows what's best to do. Submit has already finished so we don't need to cancel... but at
// least we should set streamError if it's not set.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
if err2 != nil {
state.streamError = err2
}
}
state.done <- err
return
}
// First close the writer, then send the call finished message
err = state.Close()
if err != nil {
// If we fail to close the writer we need to communicate back that the function has failed; if there's
// a streaming error doing that then we are basically in the "double exception" case and who knows
// what's best to do. Submit has already finished so we don't need to cancel... but at least we should
// set streamError if it's not set.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
if err2 != nil {
state.streamError = err2
}
}
state.done <- err
return
}
// At this point everything should have worked. Send a successful message... and if that runs afoul of a
// stream error, well, we're in a bit of trouble. Everything has finished, so there is nothing to cancel
// and we just give up, but at least we set streamError.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: true,
Details: state.c.Model().ID,
}}})
if err2 != nil {
state.streamError = err2
state.done <- err2
return
}
}
state.done <- nil
}()
}
} }
func (pr *pureRunner) handleData(ctx context.Context, data *runner.DataFrame, state *callHandle) error { // handleTryCall based on the TryCall message, allocates a resource/capacity reservation
pr.ensureFunctionIsRunning(state) // and creates callHandle.c with agent.call.
func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) {
// Only push the input if we're in a non-error situation
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
if len(data.Data) > 0 {
_, err := state.input.Write(data.Data)
if err != nil {
return err
}
}
if data.Eof {
state.input.Close()
}
}
return nil
}
func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) {
state.receivedTime = strfmt.DateTime(time.Now()) state.receivedTime = strfmt.DateTime(time.Now())
var c models.Call var c models.Call
err := json.Unmarshal([]byte(tc.ModelsCallJson), &c) err := json.Unmarshal([]byte(tc.ModelsCallJson), &c)
@@ -392,23 +567,25 @@ func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, sta
return func() {}, err return func() {}, err
} }
// Proceed! cleanup := func() {
var w http.ResponseWriter pr.capacity.ReleaseCapacity(c.Memory)
w = state
inR, inW := io.Pipe()
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w))
if err != nil {
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err
} }
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state))
if err != nil {
return cleanup, err
}
state.c = agent_call.(*call) state.c = agent_call.(*call)
state.input = inW
state.allocatedTime = strfmt.DateTime(time.Now()) state.allocatedTime = strfmt.DateTime(time.Now())
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil return cleanup, nil
} }
// Handles a client engagement // Handles a client engagement
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error { func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
grpc.EnableTracing = false
// Keep lightweight tabs on what this runner is doing: for draindown tests // Keep lightweight tabs on what this runner is doing: for draindown tests
atomic.AddInt32(&pr.inflight, 1) atomic.AddInt32(&pr.inflight, 1)
defer atomic.AddInt32(&pr.inflight, -1) defer atomic.AddInt32(&pr.inflight, -1)
@@ -423,116 +600,47 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
logrus.Debug("MD is ", md) logrus.Debug("MD is ", md)
} }
var state = callHandle{ state := NewCallHandle(engagement)
engagement: engagement,
c: nil, tryMsg := state.getTryMsg()
input: nil, if tryMsg == nil {
started: false, return state.waitError()
done: make(chan error),
streamError: nil,
outHeaders: make(http.Header),
outStatus: 200,
headerWritten: false,
} }
grpc.EnableTracing = false dealloc, errTry := pr.handleTryCall(tryMsg, state)
logrus.Debug("Entering engagement handler") defer dealloc()
// respond with handleTryCall response
msg, err := engagement.Recv() err := state.enqueueAck(errTry)
if err != nil { if err != nil || errTry != nil {
// In this case the connection has dropped before we've even started. return state.waitError()
return err
} }
switch body := msg.Body.(type) {
case *runner.ClientMsg_Try: var dataFeed chan *runner.DataFrame
dealloc, err := pr.handleTryCall(engagement.Context(), body.Try, &state) DataLoop:
defer dealloc() for {
// At the stage of TryCall, there is only one thread running and nothing has happened yet so there should dataMsg := state.getDataMsg()
// not be a streamError. We can handle `err` by sending a message back. If we cause a stream error by sending if dataMsg == nil {
// the message, we are in a "double exception" case and we might as well cancel the call with the original break
// error, so we can ignore the error from Send.
if err != nil {
_ = engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
} }
// If we succeed in creating the call, but we get a stream error sending a message back, we must cancel if dataFeed == nil {
// the call because we've probably lost the connection. pr.spawnSubmit(state)
err = engagement.Send(&runner.RunnerMsg{ dataFeed = state.spawnPipeToFn()
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: true,
Details: state.c.Model().ID,
SlotAllocationLatency: time.Time(state.allocatedTime).Sub(time.Time(state.receivedTime)).String(),
}}})
if err != nil {
state.cancel(engagement.Context(), err)
return err
} }
// Then at this point we start handling the data that should be being pushed to us.
foundEof := false
for !foundEof {
msg, err := engagement.Recv()
if err != nil {
// In this case the connection has dropped or there's something bad happening. We know we can't even
// send a message back. Cancel the call, all bets are off.
state.cancel(engagement.Context(), err)
return err
}
switch body := msg.Body.(type) {
case *runner.ClientMsg_Data:
err := pr.handleData(engagement.Context(), body.Data, &state)
if err != nil {
// If this happens, then we couldn't write into the input. The state of the function is inconsistent
// and therefore we need to cancel. We also need to communicate back that the function has failed;
// that could also run afoul of a stream error, but at that point we don't care, just cancel the
// call with the original error.
_ = state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
}
// Then break the loop if this was the last input data frame, i.e. eof is on
if body.Data.Eof {
foundEof = true
}
default:
err := errors.New("Protocol failure in communication with function runner")
// This is essentially a panic. Try to communicate back that the call has failed, and bail out; that
// could also run afoul of a stream error, but at that point we don't care, just cancel the call with
// the catastrophic error.
_ = state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
}
}
// Synchronize to the function running goroutine finishing
select { select {
case <-state.done: case dataFeed <- dataMsg:
case <-engagement.Context().Done(): if dataMsg.Eof {
return engagement.Context().Err() break DataLoop
}
case <-state.doneQueue:
break DataLoop
case <-state.ctx.Done():
break DataLoop
} }
default:
// Protocol error. This should not happen.
return errors.New("Protocol failure in communication with function runner")
} }
return nil return state.waitError()
} }
func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) { func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) {

View File

@@ -194,7 +194,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro
opts = append(opts, server.WithWebPort(8082+nodeNum)) opts = append(opts, server.WithWebPort(8082+nodeNum))
opts = append(opts, server.WithGRPCPort(9190+nodeNum)) opts = append(opts, server.WithGRPCPort(9190+nodeNum))
opts = append(opts, server.WithType(nodeType)) opts = append(opts, server.WithType(nodeType))
opts = append(opts, server.WithLogLevel(server.DefaultLogLevel)) opts = append(opts, server.WithLogLevel("debug"))
opts = append(opts, server.WithLogDest(server.DefaultLogDest, "PURE-RUNNER")) opts = append(opts, server.WithLogDest(server.DefaultLogDest, "PURE-RUNNER"))
opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithDBURL(""))
opts = append(opts, server.WithMQURL("")) opts = append(opts, server.WithMQURL(""))