fn: lb and pure-runner with non-blocking agent (#989)

* fn: lb and pure-runner with non-blocking agent

*) Removed pure-runner capacity tracking code. This did
not play well with internal agent resource tracker.
*) In LB and runner gRPC comm, removed ACK. Now,
upon TryCall, pure-runner quickly proceeds to call
Submit. This is good since at this stage pure-runner
already has all relevant data to initiate the call.
*) Unless pure-runner emits a NACK, LB immediately
streams http body to runners.
*) For retriable requests added a CachedReader for
http.Request Body.
*) Idempotenty/retry is similar to previous code.
After initial success in Engament, after attempting
a TryCall, unless we receive NACK, we cannot retry
that call.
*) ch and naive places now wraps each TryExec with
a cancellable context to clean up gRPC contexts
quicker.

* fn: err for simpler one-time read GetBody approach

This allows for a more flexible approach since we let
users to define GetBody() to allow repetitive http body
read. In default LB case, LB executes a one-time io.ReadAll
and sets of GetBody, which is detected by RunnerCall.RequestBody().

* fn: additional check for non-nil req.body

* fn: attempt to override IO errors with ctx for TryExec

* fn: system-tests log dest

* fn: LB: EOF send handling

* fn: logging for partial IO

* fn: use buffer pool for IO storage in lb agent

* fn: pure runner should use chunks for data msgs

* fn: required config validations and pass APIErrors

* fn: additional tests and gRPC proto simplification

*) remove ACK/NACK messages as Finish message type works
OK for this purpose.
*) return resp in api tests for check for status code
*) empty body json test in api tests for lb & pure-runner

* fn: buffer adjustments

*) setRequestBody result handling correction
*) switch to bytes.Reader for read-only safety
*) io.EOF can be returned for non-nil Body in request.

* fn: clarify detection of 503 / Server Too Busy
This commit is contained in:
Tolga Ceylan
2018-05-17 12:09:03 -07:00
committed by GitHub
parent 1083623045
commit 4ccde8897e
13 changed files with 541 additions and 336 deletions

View File

@@ -319,6 +319,12 @@ func (c *call) LbDeadline() time.Time {
}
func (c *call) RequestBody() io.ReadCloser {
if c.req.Body != nil && c.req.GetBody != nil {
rdr, err := c.req.GetBody()
if err == nil {
return rdr
}
}
return c.req.Body
}

View File

@@ -55,6 +55,11 @@ const (
EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER"
MaxDisabledMsecs = time.Duration(math.MaxInt64)
// defaults
DefaultHotPoll = 200 * time.Millisecond
DefaultNBIOHotPoll = 20 * time.Millisecond
)
func NewAgentConfig() (*AgentConfig, error) {
@@ -71,7 +76,7 @@ func NewAgentConfig() (*AgentConfig, error) {
err = setEnvMsecs(err, EnvFreezeIdle, &cfg.FreezeIdle, 50*time.Millisecond)
err = setEnvMsecs(err, EnvEjectIdle, &cfg.EjectIdle, 1000*time.Millisecond)
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond)
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll)
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute)

View File

@@ -1,8 +1,11 @@
package agent
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"sync/atomic"
"time"
@@ -36,12 +39,26 @@ type lbAgent struct {
callEndCount int64
}
func NewLBAgentConfig() (*AgentConfig, error) {
cfg, err := NewAgentConfig()
if err != nil {
return cfg, err
}
if cfg.MaxRequestSize == 0 {
return cfg, errors.New("lb-agent requires MaxRequestSize limit")
}
if cfg.MaxResponseSize == 0 {
return cfg, errors.New("lb-agent requires MaxResponseSize limit")
}
return cfg, nil
}
// NewLBAgent creates an Agent that knows how to load-balance function calls
// across a group of runner nodes.
func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) {
// TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig
cfg, err := NewAgentConfig()
cfg, err := NewLBAgentConfig()
if err != nil {
logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg)
}
@@ -171,6 +188,17 @@ func (a *lbAgent) Submit(callI Call) error {
statsDequeueAndStart(ctx)
// pre-read and buffer request body if already not done based
// on GetBody presence.
buf, err := a.setRequestBody(ctx, call)
if buf != nil {
defer bufPool.Put(buf)
}
if err != nil {
logrus.WithError(err).Error("Failed to process call body")
return a.handleCallEnd(ctx, call, err, true)
}
// WARNING: isStarted (handleCallEnd) semantics
// need some consideration here. Similar to runner/agent
// we consider isCommitted true if call.Start() succeeds.
@@ -183,6 +211,48 @@ func (a *lbAgent) Submit(callI Call) error {
return a.handleCallEnd(ctx, call, err, true)
}
// setRequestGetBody sets GetBody function on the given http.Request if it is missing. GetBody allows
// reading from the request body without mutating the state of the request.
func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer, error) {
r := call.req
if r.Body == nil || r.GetBody != nil {
return nil, nil
}
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
// WARNING: we need to handle IO in a separate go-routine below
// to be able to detect a ctx timeout. When we timeout, we
// let gin/http-server to unblock the go-routine below.
errApp := make(chan error, 1)
go func() {
_, err := buf.ReadFrom(r.Body)
if err != nil && err != io.EOF {
errApp <- err
return
}
r.Body = ioutil.NopCloser(bytes.NewReader(buf.Bytes()))
// GetBody does not mutate the state of the request body
r.GetBody = func() (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(buf.Bytes())), nil
}
close(errApp)
}()
select {
case err := <-errApp:
return buf, err
case <-ctx.Done():
return buf, ctx.Err()
}
}
func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
logrus.Error("Enqueue not implemented")
return errors.New("Enqueue not implemented")

View File

@@ -197,61 +197,39 @@ func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error {
return err
}
func convertError(err error) string {
code := models.GetAPIErrorCode(err)
return fmt.Sprintf("%d:%s", code, err.Error())
}
// enqueueCallResponse enqueues a Submit() response to the LB
// and initiates a graceful shutdown of the session.
func (ch *callHandle) enqueueCallResponse(err error) {
defer ch.finalize()
var details string
// Error response
if err != nil {
ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
details = convertError(err)
} else if ch.c != nil {
details = ch.c.Model().ID
}
logrus.Debugf("Sending Call Finish details=%v", details)
errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: err == nil,
Details: details,
}}})
if errTmp != nil {
logrus.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v", details)
return
}
// EOF and Success response
ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Eof: true,
},
},
})
ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: true,
Details: ch.c.Model().ID,
}}})
}
// enqueueAck enqueues a ACK or NACK response to the LB for ClientMsg_Try
// request. If NACK, then it also initiates a graceful shutdown of the
// session.
func (ch *callHandle) enqueueAck(err error) error {
// NACK
if err != nil {
err = ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: false,
Details: fmt.Sprintf("%v", err),
}}})
if err != nil {
return err
}
return ch.finalize()
errTmp = ch.finalize()
if errTmp != nil {
logrus.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v", details)
}
// ACK
return ch.enqueueMsgStrict(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: true,
Details: ch.c.Model().ID,
SlotAllocationLatency: time.Time(ch.allocatedTime).Sub(time.Time(ch.receivedTime)).String(),
}}})
}
// spawnPipeToFn pumps data to Function via callHandle io.PipeWriter (pipeToFnW)
@@ -400,23 +378,38 @@ func (ch *callHandle) Write(data []byte) (int, error) {
return 0, err
}
// we cannot retain 'data'
cpData := make([]byte, len(data))
copy(cpData, data)
total := 0
// split up data into gRPC chunks
for {
chunkSize := len(data)
if chunkSize > MaxDataChunk {
chunkSize = MaxDataChunk
}
if chunkSize == 0 {
break
}
err = ch.enqueueMsg(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Data: cpData,
Eof: false,
// we cannot retain 'data'
cpData := make([]byte, chunkSize)
copy(cpData, data[0:chunkSize])
data = data[chunkSize:]
err = ch.enqueueMsg(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Data: cpData,
Eof: false,
},
},
},
})
})
if err != nil {
return 0, err
if err != nil {
return total, err
}
total += chunkSize
}
return len(data), nil
return total, nil
}
// getTryMsg fetches/waits for a TryCall message from
@@ -457,52 +450,12 @@ func (ch *callHandle) getDataMsg() *runner.DataFrame {
return msg
}
// TODO: decomission/remove this once dependencies are cleaned up
type CapacityGate interface {
// CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is
// guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units
// of capacity have been committed.
CheckAndReserveCapacity(units uint64) error
// ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under
// zero; implementations are free to panic in that case.
ReleaseCapacity(units uint64)
}
type pureRunnerCapacityManager struct {
totalCapacityUnits uint64
committedCapacityUnits uint64
mtx sync.Mutex
}
type capacityDeallocator func()
func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager {
return &pureRunnerCapacityManager{
totalCapacityUnits: units,
committedCapacityUnits: 0,
}
}
func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units {
prcm.committedCapacityUnits = prcm.committedCapacityUnits + units
return nil
}
return models.ErrCallTimeoutServerBusy
}
func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if units <= prcm.committedCapacityUnits {
prcm.committedCapacityUnits = prcm.committedCapacityUnits - units
return
}
panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity")
}
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
// and provides the gRPC server that implements the LB <-> Runner protocol.
type pureRunner struct {
@@ -510,7 +463,6 @@ type pureRunner struct {
listen string
a Agent
inflight int32
capacity CapacityGate
}
func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) {
@@ -559,35 +511,27 @@ func (pr *pureRunner) spawnSubmit(state *callHandle) {
}()
}
// handleTryCall based on the TryCall message, allocates a resource/capacity reservation
// and creates callHandle.c with agent.call.
func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) {
// handleTryCall based on the TryCall message, tries to place the call on NBIO Agent
func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error {
state.receivedTime = strfmt.DateTime(time.Now())
var c models.Call
err := json.Unmarshal([]byte(tc.ModelsCallJson), &c)
if err != nil {
return func() {}, err
}
// Capacity check first
err = pr.capacity.CheckAndReserveCapacity(c.Memory)
if err != nil {
return func() {}, err
}
cleanup := func() {
pr.capacity.ReleaseCapacity(c.Memory)
state.enqueueCallResponse(err)
return err
}
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state))
if err != nil {
return cleanup, err
state.enqueueCallResponse(err)
return err
}
state.c = agent_call.(*call)
state.allocatedTime = strfmt.DateTime(time.Now())
pr.spawnSubmit(state)
return cleanup, nil
return nil
}
// Handles a client engagement
@@ -615,15 +559,13 @@ func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) erro
return state.waitError()
}
dealloc, errTry := pr.handleTryCall(tryMsg, state)
defer dealloc()
// respond with handleTryCall response
err := state.enqueueAck(errTry)
if err != nil || errTry != nil {
errTry := pr.handleTryCall(tryMsg, state)
if errTry != nil {
return state.waitError()
}
var dataFeed chan *runner.DataFrame
dataFeed := state.spawnPipeToFn()
DataLoop:
for {
dataMsg := state.getDataMsg()
@@ -631,11 +573,6 @@ DataLoop:
break
}
if dataFeed == nil {
pr.spawnSubmit(state)
dataFeed = state.spawnPipeToFn()
}
select {
case dataFeed <- dataMsg:
if dataMsg.Eof {
@@ -678,8 +615,29 @@ func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, ce
return NewPureRunner(cancel, addr, da, cert, key, ca, nil)
}
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error) {
a := createAgent(da)
func ValidatePureRunnerConfig() AgentOption {
return func(a *agent) error {
if a.cfg.MaxResponseSize == 0 {
return errors.New("pure runner requires MaxResponseSize limits")
}
if a.cfg.MaxRequestSize == 0 {
return errors.New("pure runner requires MaxRequestSize limits")
}
// pure runner requires a non-blocking resource tracker
if !a.cfg.EnableNBResourceTracker {
return errors.New("pure runner requires EnableNBResourceTracker true")
}
return nil
}
}
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, unused CapacityGate) (Agent, error) {
// TODO: gate unused, decommission/remove it after cleaning up dependencies to it.
a := createAgent(da, ValidatePureRunnerConfig())
var pr *pureRunner
var err error
if cert != "" && key != "" && ca != "" {
@@ -688,13 +646,13 @@ func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert s
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
return nil, err
}
pr, err = createPureRunner(addr, a, c, gate)
pr, err = createPureRunner(addr, a, c)
if err != nil {
return nil, err
}
} else {
logrus.Warn("Running pure runner in insecure mode!")
pr, err = createPureRunner(addr, a, nil, gate)
pr, err = createPureRunner(addr, a, nil)
if err != nil {
return nil, err
}
@@ -736,32 +694,20 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials
}), nil
}
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) {
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) {
var srv *grpc.Server
if creds != nil {
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
}
if gate == nil {
memUnits := getAvailableMemoryUnits()
gate = newPureRunnerCapacityManager(memUnits)
}
pr := &pureRunner{
gRPCServer: srv,
listen: addr,
a: a,
capacity: gate,
}
runner.RegisterRunnerProtocolServer(srv, pr)
return pr, nil
}
const megabyte uint64 = 1024 * 1024
func getAvailableMemoryUnits() uint64 {
// To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that.
throwawayRT := NewResourceTracker(nil).(*resourceTracker)
return throwawayRT.ramAsyncTotal / megabyte
}

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"io"
"strconv"
"strings"
"time"
"google.golang.org/grpc"
@@ -12,13 +14,20 @@ import (
pb "github.com/fnproject/fn/api/agent/grpc"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/grpcutil"
"github.com/sirupsen/logrus"
)
var (
ErrorRunnerClosed = errors.New("Runner is closed")
ErrorRunnerClosed = errors.New("Runner is closed")
ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
)
const (
// max buffer size for grpc data messages, 10K
MaxDataChunk = 10 * 1024
)
type gRPCRunner struct {
@@ -76,10 +85,15 @@ func (r *gRPCRunner) Address() string {
return r.address
}
func isRetriable(err error) bool {
return models.GetAPIErrorCode(err) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy)
}
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
if !r.shutWg.AddSession(1) {
return true, ErrorRunnerClosed
// try another runner if this one is closed.
return false, ErrorRunnerClosed
}
defer r.shutWg.DoneSession()
@@ -90,6 +104,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
// If we can't encode the model, no runner will ever be able to run this. Give up.
return true, err
}
runnerConnection, err := r.client.Engage(ctx)
if err != nil {
logrus.WithError(err).Error("Unable to create client to runner node")
@@ -97,113 +112,171 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
return false, err
}
// After this point, we assume "COMMITTED" unless pure runner
// send explicit NACK
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ModelsCallJson: string(modelJSON)}}})
if err != nil {
logrus.WithError(err).Error("Failed to send message to runner node")
return false, err
}
msg, err := runnerConnection.Recv()
if err != nil {
logrus.WithError(err).Error("Failed to receive first message from runner node")
return false, err
return true, err
}
switch body := msg.Body.(type) {
case *pb.RunnerMsg_Acknowledged:
if !body.Acknowledged.Committed {
logrus.Debugf("Runner didn't commit invocation request: %v", body.Acknowledged.Details)
return false, nil
// Try the next runner
}
logrus.Debug("Runner committed invocation request, sending data frames")
done := make(chan error)
go receiveFromRunner(runnerConnection, call, done)
sendToRunner(call, runnerConnection)
return true, <-done
recvDone := make(chan error, 1)
default:
logrus.Errorf("Unhandled message type received from runner: %v\n", msg)
return true, nil
go receiveFromRunner(runnerConnection, call, recvDone)
go sendToRunner(runnerConnection, call)
select {
case <-ctx.Done():
logrus.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
return true, ctx.Err()
case recvErr := <-recvDone:
return !isRetriable(recvErr), recvErr
}
}
func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error {
func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
bodyReader := call.RequestBody()
writeBufferSize := 10 * 1024 // 10KB
writeBuffer := make([]byte, writeBufferSize)
for {
n, err := bodyReader.Read(writeBuffer)
logrus.Debugf("Wrote %v bytes to the runner", n)
writeBuffer := make([]byte, MaxDataChunk)
if err == io.EOF {
err = protocolClient.Send(&pb.ClientMsg{
Body: &pb.ClientMsg_Data{
Data: &pb.DataFrame{
Data: writeBuffer,
Eof: true,
},
},
})
if err != nil {
logrus.WithError(err).Error("Failed to send data frame with EOF to runner")
}
break
// IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry
// case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is
// already blocked on a read) or in the case of reading the http body multiple times (retries.)
// Normally http.Request.Body can be read once. However runner_client users should implement/add
// http.Request.GetBody() function and cache the body content in the request.
// See lb_agent setRequestGetBody() which handles this. With GetBody installed,
// the 'Read' below is an actually non-blocking operation since GetBody() should hand out
// a new instance of io.ReadCloser() that allows repetitive reads on the http body.
for {
// WARNING: blocking read.
n, err := bodyReader.Read(writeBuffer)
if err != nil && err != io.EOF {
logrus.WithError(err).Error("Failed to receive data from http client body")
}
err = protocolClient.Send(&pb.ClientMsg{
// any IO error or n == 0 is an EOF for pure-runner
isEOF := err != nil || n == 0
data := writeBuffer[:n]
logrus.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
sendErr := protocolClient.Send(&pb.ClientMsg{
Body: &pb.ClientMsg_Data{
Data: &pb.DataFrame{
Data: writeBuffer,
Eof: false,
Data: data,
Eof: isEOF,
},
},
})
if err != nil {
logrus.WithError(err).Error("Failed to send data frame")
return err
if sendErr != nil {
logrus.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
return
}
if isEOF {
return
}
}
return nil
}
func parseError(details string) error {
tokens := strings.SplitN(details, ":", 2)
if len(tokens) != 2 || tokens[0] == "" || tokens[1] == "" {
return errors.New(details)
}
code, err := strconv.ParseInt(tokens[0], 10, 64)
if err != nil {
return errors.New(details)
}
if code != 0 {
return models.NewAPIError(int(code), errors.New(tokens[1]))
}
return errors.New(tokens[1])
}
func tryQueueError(err error, done chan error) {
select {
case done <- err:
default:
}
}
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
w := c.ResponseWriter()
defer close(done)
isPartialWrite := false
DataLoop:
for {
msg, err := protocolClient.Recv()
if err != nil {
logrus.WithError(err).Error("Failed to receive message from runner")
done <- err
logrus.WithError(err).Info("Receive error from runner")
tryQueueError(err, done)
return
}
switch body := msg.Body.(type) {
// Process HTTP header/status message. This may not arrive depending on
// pure runners behavior. (Eg. timeout & no IO received from function)
case *pb.RunnerMsg_ResultStart:
switch meta := body.ResultStart.Meta.(type) {
case *pb.CallResultStart_Http:
logrus.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
for _, header := range meta.Http.Headers {
w.Header().Set(header.Key, header.Value)
}
if meta.Http.StatusCode > 0 {
w.WriteHeader(int(meta.Http.StatusCode))
}
default:
logrus.Errorf("Unhandled meta type in start message: %v", meta)
}
// May arrive if function has output. We ignore EOF.
case *pb.RunnerMsg_Data:
w.Write(body.Data.Data)
logrus.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
if !isPartialWrite {
// WARNING: blocking write
n, err := w.Write(body.Data.Data)
if n != len(body.Data.Data) {
isPartialWrite = true
logrus.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
if err == nil {
err = io.ErrShortWrite
}
tryQueueError(err, done)
}
}
// Finish messages required for finish/finalize the processing.
case *pb.RunnerMsg_Finished:
if body.Finished.Success {
logrus.Infof("Call finished successfully: %v", body.Finished.Details)
} else {
logrus.Infof("Call finished unsuccessfully: %v", body.Finished.Details)
logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
if !body.Finished.Success {
err := parseError(body.Finished.GetDetails())
tryQueueError(err, done)
}
// There should be an EOF following the last packet
if _, err := protocolClient.Recv(); err != io.EOF {
logrus.WithError(err).Error("Did not receive expected EOF from runner stream")
done <- err
}
close(done)
return
break DataLoop
default:
logrus.Errorf("Unhandled message type from runner: %v", body)
logrus.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
}
}
// There should be an EOF following the last packet
for {
msg, err := protocolClient.Recv()
if err == io.EOF {
break
}
if err != nil {
logrus.WithError(err).Infof("Call Waiting EOF received error")
tryQueueError(err, done)
break
}
switch body := msg.Body.(type) {
default:
logrus.Infof("Call Waiting EOF ignoring message %T", body)
}
tryQueueError(ErrorPureRunnerNoEOF, done)
}
}