Files
fn-serverless/api/agent/runner_client.go
Tolga Ceylan 77086ecc24 fn: lb-agent & runner gRPC updates (#1005)
Breaking changes:

*) Removed unused ACK/NACK definitions
*) Extended Finished messages with error code/str
2018-05-17 15:02:15 -07:00

290 lines
8.1 KiB
Go

package agent
import (
"context"
"encoding/json"
"errors"
"io"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
pb "github.com/fnproject/fn/api/agent/grpc"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/grpcutil"
"github.com/sirupsen/logrus"
)
var (
ErrorRunnerClosed = errors.New("Runner is closed")
ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
)
const (
// max buffer size for grpc data messages, 10K
MaxDataChunk = 10 * 1024
)
type gRPCRunner struct {
shutWg *common.WaitGroup
address string
conn *grpc.ClientConn
client pb.RunnerProtocolClient
}
func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool.Runner, error) {
conn, client, err := runnerConnection(addr, runnerCertCN, pki)
if err != nil {
return nil, err
}
return &gRPCRunner{
shutWg: common.NewWaitGroup(),
address: addr,
conn: conn,
client: client,
}, nil
}
func (r *gRPCRunner) Close(context.Context) error {
r.shutWg.CloseGroup()
return r.conn.Close()
}
func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) {
ctx := context.Background()
var creds credentials.TransportCredentials
if pki != nil {
var err error
creds, err = grpcutil.CreateCredentials(pki.Cert, pki.Key, pki.Ca, runnerCertCN)
if err != nil {
logrus.WithError(err).Error("Unable to create credentials to connect to runner node")
return nil, nil, err
}
}
// we want to set a very short timeout to fail-fast if something goes wrong
conn, err := grpcutil.DialWithBackoff(ctx, address, creds, 100*time.Millisecond, grpc.DefaultBackoffConfig)
if err != nil {
logrus.WithError(err).Error("Unable to connect to runner node")
}
protocolClient := pb.NewRunnerProtocolClient(conn)
logrus.WithField("runner_addr", address).Info("Connected to runner")
return conn, protocolClient, nil
}
func (r *gRPCRunner) Address() string {
return r.address
}
func isRetriable(err error) bool {
// A formal API error returned from pure-runner
if models.GetAPIErrorCode(err) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
return true
}
if err != nil {
// engagement/recv errors could also be a 503.
st := status.Convert(err)
if int(st.Code()) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
return true
}
}
return false
}
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
if !r.shutWg.AddSession(1) {
// try another runner if this one is closed.
return false, ErrorRunnerClosed
}
defer r.shutWg.DoneSession()
// extract the call's model data to pass on to the pure runner
modelJSON, err := json.Marshal(call.Model())
if err != nil {
logrus.WithError(err).Error("Failed to encode model as JSON")
// If we can't encode the model, no runner will ever be able to run this. Give up.
return true, err
}
runnerConnection, err := r.client.Engage(ctx)
if err != nil {
logrus.WithError(err).Error("Unable to create client to runner node")
// Try on next runner
return false, err
}
// After this point, we assume "COMMITTED" unless pure runner
// send explicit NACK
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ModelsCallJson: string(modelJSON)}}})
if err != nil {
logrus.WithError(err).Error("Failed to send message to runner node")
return true, err
}
recvDone := make(chan error, 1)
go receiveFromRunner(runnerConnection, call, recvDone)
go sendToRunner(runnerConnection, call)
select {
case <-ctx.Done():
logrus.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
return true, ctx.Err()
case recvErr := <-recvDone:
return !isRetriable(recvErr), recvErr
}
}
func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
bodyReader := call.RequestBody()
writeBuffer := make([]byte, MaxDataChunk)
// IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry
// case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is
// already blocked on a read) or in the case of reading the http body multiple times (retries.)
// Normally http.Request.Body can be read once. However runner_client users should implement/add
// http.Request.GetBody() function and cache the body content in the request.
// See lb_agent setRequestGetBody() which handles this. With GetBody installed,
// the 'Read' below is an actually non-blocking operation since GetBody() should hand out
// a new instance of io.ReadCloser() that allows repetitive reads on the http body.
for {
// WARNING: blocking read.
n, err := bodyReader.Read(writeBuffer)
if err != nil && err != io.EOF {
logrus.WithError(err).Error("Failed to receive data from http client body")
}
// any IO error or n == 0 is an EOF for pure-runner
isEOF := err != nil || n == 0
data := writeBuffer[:n]
logrus.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
sendErr := protocolClient.Send(&pb.ClientMsg{
Body: &pb.ClientMsg_Data{
Data: &pb.DataFrame{
Data: data,
Eof: isEOF,
},
},
})
if sendErr != nil {
logrus.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
return
}
if isEOF {
return
}
}
}
func parseError(msg *pb.CallFinished) error {
if msg.GetSuccess() {
return nil
}
eCode := msg.GetErrorCode()
eStr := msg.GetErrorStr()
if eStr == "" {
eStr = "Unknown Error From Pure Runner"
}
return models.NewAPIError(int(eCode), errors.New(eStr))
}
func tryQueueError(err error, done chan error) {
select {
case done <- err:
default:
}
}
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
w := c.ResponseWriter()
defer close(done)
isPartialWrite := false
DataLoop:
for {
msg, err := protocolClient.Recv()
if err != nil {
logrus.WithError(err).Info("Receive error from runner")
tryQueueError(err, done)
return
}
switch body := msg.Body.(type) {
// Process HTTP header/status message. This may not arrive depending on
// pure runners behavior. (Eg. timeout & no IO received from function)
case *pb.RunnerMsg_ResultStart:
switch meta := body.ResultStart.Meta.(type) {
case *pb.CallResultStart_Http:
logrus.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
for _, header := range meta.Http.Headers {
w.Header().Set(header.Key, header.Value)
}
if meta.Http.StatusCode > 0 {
w.WriteHeader(int(meta.Http.StatusCode))
}
default:
logrus.Errorf("Unhandled meta type in start message: %v", meta)
}
// May arrive if function has output. We ignore EOF.
case *pb.RunnerMsg_Data:
logrus.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
if !isPartialWrite {
// WARNING: blocking write
n, err := w.Write(body.Data.Data)
if n != len(body.Data.Data) {
isPartialWrite = true
logrus.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
if err == nil {
err = io.ErrShortWrite
}
tryQueueError(err, done)
}
}
// Finish messages required for finish/finalize the processing.
case *pb.RunnerMsg_Finished:
logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
if !body.Finished.Success {
err := parseError(body.Finished)
tryQueueError(err, done)
}
break DataLoop
default:
logrus.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
}
}
// There should be an EOF following the last packet
for {
msg, err := protocolClient.Recv()
if err == io.EOF {
break
}
if err != nil {
logrus.WithError(err).Infof("Call Waiting EOF received error")
tryQueueError(err, done)
break
}
switch body := msg.Body.(type) {
default:
logrus.Infof("Call Waiting EOF ignoring message %T", body)
}
tryQueueError(ErrorPureRunnerNoEOF, done)
}
}