mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
With this PR, runner client translates too busy errors from gRPC session and runner itself into Fn error type. Placers now ignore this error message to reduce unnecessary logging.
302 lines
8.5 KiB
Go
302 lines
8.5 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/status"
|
|
|
|
pb "github.com/fnproject/fn/api/agent/grpc"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/models"
|
|
pool "github.com/fnproject/fn/api/runnerpool"
|
|
"github.com/fnproject/fn/grpcutil"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var (
|
|
ErrorRunnerClosed = errors.New("Runner is closed")
|
|
ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
|
|
)
|
|
|
|
const (
|
|
// max buffer size for grpc data messages, 10K
|
|
MaxDataChunk = 10 * 1024
|
|
)
|
|
|
|
type gRPCRunner struct {
|
|
shutWg *common.WaitGroup
|
|
address string
|
|
conn *grpc.ClientConn
|
|
client pb.RunnerProtocolClient
|
|
}
|
|
|
|
func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool.Runner, error) {
|
|
conn, client, err := runnerConnection(addr, runnerCertCN, pki)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &gRPCRunner{
|
|
shutWg: common.NewWaitGroup(),
|
|
address: addr,
|
|
conn: conn,
|
|
client: client,
|
|
}, nil
|
|
}
|
|
|
|
func (r *gRPCRunner) Close(context.Context) error {
|
|
r.shutWg.CloseGroup()
|
|
return r.conn.Close()
|
|
}
|
|
|
|
func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) {
|
|
ctx := context.Background()
|
|
|
|
var creds credentials.TransportCredentials
|
|
if pki != nil {
|
|
var err error
|
|
creds, err = grpcutil.CreateCredentials(pki.Cert, pki.Key, pki.Ca, runnerCertCN)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Unable to create credentials to connect to runner node")
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// we want to set a very short timeout to fail-fast if something goes wrong
|
|
conn, err := grpcutil.DialWithBackoff(ctx, address, creds, 100*time.Millisecond, grpc.DefaultBackoffConfig)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Unable to connect to runner node")
|
|
}
|
|
|
|
protocolClient := pb.NewRunnerProtocolClient(conn)
|
|
logrus.WithField("runner_addr", address).Info("Connected to runner")
|
|
|
|
return conn, protocolClient, nil
|
|
}
|
|
|
|
func (r *gRPCRunner) Address() string {
|
|
return r.address
|
|
}
|
|
|
|
func isTooBusy(err error) bool {
|
|
// A formal API error returned from pure-runner
|
|
if models.GetAPIErrorCode(err) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
|
|
return true
|
|
}
|
|
if err != nil {
|
|
// engagement/recv errors could also be a 503.
|
|
st := status.Convert(err)
|
|
if int(st.Code()) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
|
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
|
|
if !r.shutWg.AddSession(1) {
|
|
// try another runner if this one is closed.
|
|
return false, ErrorRunnerClosed
|
|
}
|
|
defer r.shutWg.DoneSession()
|
|
|
|
// extract the call's model data to pass on to the pure runner
|
|
modelJSON, err := json.Marshal(call.Model())
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Failed to encode model as JSON")
|
|
// If we can't encode the model, no runner will ever be able to run this. Give up.
|
|
return true, err
|
|
}
|
|
|
|
runnerConnection, err := r.client.Engage(ctx)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Unable to create client to runner node")
|
|
// Try on next runner
|
|
return false, err
|
|
}
|
|
|
|
// After this point, we assume "COMMITTED" unless pure runner
|
|
// send explicit NACK
|
|
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{
|
|
ModelsCallJson: string(modelJSON),
|
|
SlotHashId: hex.EncodeToString([]byte(call.SlotHashId())),
|
|
}}})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Failed to send message to runner node")
|
|
return true, err
|
|
}
|
|
|
|
recvDone := make(chan error, 1)
|
|
|
|
go receiveFromRunner(runnerConnection, call, recvDone)
|
|
go sendToRunner(runnerConnection, call)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
logrus.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
|
|
return true, ctx.Err()
|
|
case recvErr := <-recvDone:
|
|
if isTooBusy(recvErr) {
|
|
// Try on next runner
|
|
return false, models.ErrCallTimeoutServerBusy
|
|
}
|
|
return true, recvErr
|
|
}
|
|
}
|
|
|
|
func sendToRunner(protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
|
|
bodyReader := call.RequestBody()
|
|
writeBuffer := make([]byte, MaxDataChunk)
|
|
|
|
// IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry
|
|
// case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is
|
|
// already blocked on a read) or in the case of reading the http body multiple times (retries.)
|
|
// Normally http.Request.Body can be read once. However runner_client users should implement/add
|
|
// http.Request.GetBody() function and cache the body content in the request.
|
|
// See lb_agent setRequestGetBody() which handles this. With GetBody installed,
|
|
// the 'Read' below is an actually non-blocking operation since GetBody() should hand out
|
|
// a new instance of io.ReadCloser() that allows repetitive reads on the http body.
|
|
for {
|
|
// WARNING: blocking read.
|
|
n, err := bodyReader.Read(writeBuffer)
|
|
if err != nil && err != io.EOF {
|
|
logrus.WithError(err).Error("Failed to receive data from http client body")
|
|
}
|
|
|
|
// any IO error or n == 0 is an EOF for pure-runner
|
|
isEOF := err != nil || n == 0
|
|
data := writeBuffer[:n]
|
|
|
|
logrus.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
|
|
sendErr := protocolClient.Send(&pb.ClientMsg{
|
|
Body: &pb.ClientMsg_Data{
|
|
Data: &pb.DataFrame{
|
|
Data: data,
|
|
Eof: isEOF,
|
|
},
|
|
},
|
|
})
|
|
if sendErr != nil {
|
|
// It's often normal to receive an EOF here as we optimistically start sending body until a NACK
|
|
// from the runner. Let's ignore EOF and rely on recv side to catch premature EOF.
|
|
if sendErr != io.EOF {
|
|
logrus.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
|
|
}
|
|
return
|
|
}
|
|
if isEOF {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseError(msg *pb.CallFinished) error {
|
|
if msg.GetSuccess() {
|
|
return nil
|
|
}
|
|
eCode := msg.GetErrorCode()
|
|
eStr := msg.GetErrorStr()
|
|
if eStr == "" {
|
|
eStr = "Unknown Error From Pure Runner"
|
|
}
|
|
return models.NewAPIError(int(eCode), errors.New(eStr))
|
|
}
|
|
|
|
func tryQueueError(err error, done chan error) {
|
|
select {
|
|
case done <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
|
|
w := c.ResponseWriter()
|
|
defer close(done)
|
|
|
|
isPartialWrite := false
|
|
|
|
DataLoop:
|
|
for {
|
|
msg, err := protocolClient.Recv()
|
|
if err != nil {
|
|
logrus.WithError(err).Info("Receive error from runner")
|
|
tryQueueError(err, done)
|
|
return
|
|
}
|
|
|
|
switch body := msg.Body.(type) {
|
|
|
|
// Process HTTP header/status message. This may not arrive depending on
|
|
// pure runners behavior. (Eg. timeout & no IO received from function)
|
|
case *pb.RunnerMsg_ResultStart:
|
|
switch meta := body.ResultStart.Meta.(type) {
|
|
case *pb.CallResultStart_Http:
|
|
logrus.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
|
|
for _, header := range meta.Http.Headers {
|
|
w.Header().Set(header.Key, header.Value)
|
|
}
|
|
if meta.Http.StatusCode > 0 {
|
|
w.WriteHeader(int(meta.Http.StatusCode))
|
|
}
|
|
default:
|
|
logrus.Errorf("Unhandled meta type in start message: %v", meta)
|
|
}
|
|
|
|
// May arrive if function has output. We ignore EOF.
|
|
case *pb.RunnerMsg_Data:
|
|
logrus.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
|
|
if !isPartialWrite {
|
|
// WARNING: blocking write
|
|
n, err := w.Write(body.Data.Data)
|
|
if n != len(body.Data.Data) {
|
|
isPartialWrite = true
|
|
logrus.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
|
|
if err == nil {
|
|
err = io.ErrShortWrite
|
|
}
|
|
tryQueueError(err, done)
|
|
}
|
|
}
|
|
|
|
// Finish messages required for finish/finalize the processing.
|
|
case *pb.RunnerMsg_Finished:
|
|
logrus.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
|
|
if !body.Finished.Success {
|
|
err := parseError(body.Finished)
|
|
tryQueueError(err, done)
|
|
}
|
|
break DataLoop
|
|
|
|
default:
|
|
logrus.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
|
|
}
|
|
}
|
|
|
|
// There should be an EOF following the last packet
|
|
for {
|
|
msg, err := protocolClient.Recv()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
logrus.WithError(err).Infof("Call Waiting EOF received error")
|
|
tryQueueError(err, done)
|
|
break
|
|
}
|
|
|
|
switch body := msg.Body.(type) {
|
|
default:
|
|
logrus.Infof("Call Waiting EOF ignoring message %T", body)
|
|
}
|
|
tryQueueError(ErrorPureRunnerNoEOF, done)
|
|
}
|
|
}
|