Files
fn-serverless/api/agent/runner_client.go
Tolga Ceylan e67d0e5f3f fn: Call extensions/overriding and more customization friendly docker driver (#1065)
In pure-runner and LB agent, service providers might want to set specific driver options.

For example, to add cpu-shares to functions, LB can add the information as extensions
to the Call and pass this via gRPC to runners. Runners then pick these extensions from
gRPC call and pass it to driver. Using a custom driver implementation, pure-runners can
process these extensions to modify docker.CreateContainerOptions.

To achieve this, LB agents can now be configured using a call overrider.

Pure-runners can be configured using a custom docker driver.

RunnerCall and Call interfaces both expose call extensions.

An example to demonstrate this is implemented in test/fn-system-tests/system_test.go
which registers a call overrider for LB agent as well as a simple custom docker driver.
In this example, LB agent adds a key-value to extensions and runners add this key-value
as an environment variable to the container.
2018-06-18 14:42:28 -07:00

321 lines
9.0 KiB
Go

package agent
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"io"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
pb "github.com/fnproject/fn/api/agent/grpc"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/grpcutil"
"github.com/sirupsen/logrus"
)
var (
ErrorRunnerClosed = errors.New("Runner is closed")
ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
)
const (
// max buffer size for grpc data messages, 10K
MaxDataChunk = 10 * 1024
)
type gRPCRunner struct {
shutWg *common.WaitGroup
address string
conn *grpc.ClientConn
client pb.RunnerProtocolClient
}
func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool.Runner, error) {
conn, client, err := runnerConnection(addr, runnerCertCN, pki)
if err != nil {
return nil, err
}
return &gRPCRunner{
shutWg: common.NewWaitGroup(),
address: addr,
conn: conn,
client: client,
}, nil
}
// implements Runner
func (r *gRPCRunner) Close(context.Context) error {
r.shutWg.CloseGroup()
return r.conn.Close()
}
func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) {
ctx := context.Background()
var creds credentials.TransportCredentials
if pki != nil {
var err error
creds, err = grpcutil.CreateCredentials(pki.Cert, pki.Key, pki.Ca, runnerCertCN)
if err != nil {
logrus.WithError(err).Error("Unable to create credentials to connect to runner node")
return nil, nil, err
}
}
// we want to set a very short timeout to fail-fast if something goes wrong
conn, err := grpcutil.DialWithBackoff(ctx, address, creds, 100*time.Millisecond, grpc.DefaultBackoffConfig)
if err != nil {
logrus.WithError(err).Error("Unable to connect to runner node")
}
protocolClient := pb.NewRunnerProtocolClient(conn)
logrus.WithField("runner_addr", address).Info("Connected to runner")
return conn, protocolClient, nil
}
// implements Runner
func (r *gRPCRunner) Address() string {
return r.address
}
func isTooBusy(err error) bool {
// A formal API error returned from pure-runner
if models.GetAPIErrorCode(err) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
return true
}
if err != nil {
// engagement/recv errors could also be a 503.
st := status.Convert(err)
if int(st.Code()) == models.GetAPIErrorCode(models.ErrCallTimeoutServerBusy) {
return true
}
}
return false
}
// implements Runner
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
log := common.Logger(ctx).WithField("runner_addr", r.address)
log.Debug("Attempting to place call")
if !r.shutWg.AddSession(1) {
// try another runner if this one is closed.
return false, ErrorRunnerClosed
}
defer r.shutWg.DoneSession()
// extract the call's model data to pass on to the pure runner
modelJSON, err := json.Marshal(call.Model())
if err != nil {
log.WithError(err).Error("Failed to encode model as JSON")
// If we can't encode the model, no runner will ever be able to run this. Give up.
return true, err
}
rid := common.RequestIDFromContext(ctx)
if rid != "" {
// Create a new gRPC metadata where we store the request ID
mp := metadata.Pairs(common.RequestIDContextKey, rid)
ctx = metadata.NewOutgoingContext(ctx, mp)
}
runnerConnection, err := r.client.Engage(ctx)
if err != nil {
log.WithError(err).Error("Unable to create client to runner node")
// Try on next runner
return false, err
}
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{
ModelsCallJson: string(modelJSON),
SlotHashId: hex.EncodeToString([]byte(call.SlotHashId())),
Extensions: call.Extensions(),
}}})
if err != nil {
log.WithError(err).Error("Failed to send message to runner node")
// Try on next runner
return false, err
}
// After this point TryCall was sent, we assume "COMMITTED" unless pure runner
// send explicit NACK
recvDone := make(chan error, 1)
go receiveFromRunner(ctx, runnerConnection, call, recvDone)
go sendToRunner(ctx, runnerConnection, call)
select {
case <-ctx.Done():
log.Infof("Engagement Context ended ctxErr=%v", ctx.Err())
return true, ctx.Err()
case recvErr := <-recvDone:
if isTooBusy(recvErr) {
// Try on next runner
return false, models.ErrCallTimeoutServerBusy
}
return true, recvErr
}
}
func sendToRunner(ctx context.Context, protocolClient pb.RunnerProtocol_EngageClient, call pool.RunnerCall) {
bodyReader := call.RequestBody()
writeBuffer := make([]byte, MaxDataChunk)
log := common.Logger(ctx)
// IMPORTANT: IO Read below can fail in multiple go-routine cases (in retry
// case especially if receiveFromRunner go-routine receives a NACK while sendToRunner is
// already blocked on a read) or in the case of reading the http body multiple times (retries.)
// Normally http.Request.Body can be read once. However runner_client users should implement/add
// http.Request.GetBody() function and cache the body content in the request.
// See lb_agent setRequestGetBody() which handles this. With GetBody installed,
// the 'Read' below is an actually non-blocking operation since GetBody() should hand out
// a new instance of io.ReadCloser() that allows repetitive reads on the http body.
for {
// WARNING: blocking read.
n, err := bodyReader.Read(writeBuffer)
if err != nil && err != io.EOF {
log.WithError(err).Error("Failed to receive data from http client body")
}
// any IO error or n == 0 is an EOF for pure-runner
isEOF := err != nil || n == 0
data := writeBuffer[:n]
log.Debugf("Sending %d bytes of data isEOF=%v to runner", n, isEOF)
sendErr := protocolClient.Send(&pb.ClientMsg{
Body: &pb.ClientMsg_Data{
Data: &pb.DataFrame{
Data: data,
Eof: isEOF,
},
},
})
if sendErr != nil {
// It's often normal to receive an EOF here as we optimistically start sending body until a NACK
// from the runner. Let's ignore EOF and rely on recv side to catch premature EOF.
if sendErr != io.EOF {
log.WithError(sendErr).Errorf("Failed to send data frame size=%d isEOF=%v", n, isEOF)
}
return
}
if isEOF {
return
}
}
}
func parseError(msg *pb.CallFinished) error {
if msg.GetSuccess() {
return nil
}
eCode := msg.GetErrorCode()
eStr := msg.GetErrorStr()
if eStr == "" {
eStr = "Unknown Error From Pure Runner"
}
return models.NewAPIError(int(eCode), errors.New(eStr))
}
func tryQueueError(err error, done chan error) {
select {
case done <- err:
default:
}
}
func receiveFromRunner(ctx context.Context, protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
w := c.ResponseWriter()
defer close(done)
log := common.Logger(ctx)
isPartialWrite := false
DataLoop:
for {
msg, err := protocolClient.Recv()
if err != nil {
log.WithError(err).Info("Receive error from runner")
tryQueueError(err, done)
return
}
switch body := msg.Body.(type) {
// Process HTTP header/status message. This may not arrive depending on
// pure runners behavior. (Eg. timeout & no IO received from function)
case *pb.RunnerMsg_ResultStart:
switch meta := body.ResultStart.Meta.(type) {
case *pb.CallResultStart_Http:
log.Debugf("Received meta http result from runner Status=%v", meta.Http.StatusCode)
for _, header := range meta.Http.Headers {
w.Header().Set(header.Key, header.Value)
}
if meta.Http.StatusCode > 0 {
w.WriteHeader(int(meta.Http.StatusCode))
}
default:
log.Errorf("Unhandled meta type in start message: %v", meta)
}
// May arrive if function has output. We ignore EOF.
case *pb.RunnerMsg_Data:
log.Debugf("Received data from runner len=%d isEOF=%v", len(body.Data.Data), body.Data.Eof)
if !isPartialWrite {
// WARNING: blocking write
n, err := w.Write(body.Data.Data)
if n != len(body.Data.Data) {
isPartialWrite = true
log.WithError(err).Infof("Failed to write full response (%d of %d) to client", n, len(body.Data.Data))
if err == nil {
err = io.ErrShortWrite
}
tryQueueError(err, done)
}
}
// Finish messages required for finish/finalize the processing.
case *pb.RunnerMsg_Finished:
log.Infof("Call finished Success=%v %v", body.Finished.Success, body.Finished.Details)
if !body.Finished.Success {
err := parseError(body.Finished)
tryQueueError(err, done)
}
break DataLoop
default:
log.Error("Ignoring unknown message type %T from runner, possible client/server mismatch", body)
}
}
// There should be an EOF following the last packet
for {
msg, err := protocolClient.Recv()
if err == io.EOF {
break
}
if err != nil {
log.WithError(err).Infof("Call Waiting EOF received error")
tryQueueError(err, done)
break
}
switch body := msg.Body.(type) {
default:
log.Infof("Call Waiting EOF ignoring message %T", body)
}
tryQueueError(ErrorPureRunnerNoEOF, done)
}
}
var _ pool.Runner = &gRPCRunner{}