Files
fn-serverless/api/agent/pure_runner.go
Andrea Rosa 72a2eb933f Returning Agent on exported func for pureRunner (#905)
pureRunner is a not exported struct and it was set as return value for
few exported method, in this change we return Agent which is the
interface implemented by pureRunner to avoid to leak an unexprted type.
2018-03-30 09:15:55 -07:00

652 lines
20 KiB
Go

package agent
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
runner "github.com/fnproject/fn/api/agent/grpc"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/fnext"
"github.com/go-openapi/strfmt"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
// callHandle represents the state of the call as handled by the pure runner, and additionally it implements the
// interface of http.ResponseWriter so that it can be used for streaming the output back.
type callHandle struct {
engagement runner.RunnerProtocol_EngageServer
c *call // the agent's version of call
input io.WriteCloser
started bool
done chan error // to synchronize
// As the state can be set and checked by both goroutines handling this state, we need a mutex.
stateMutex sync.Mutex
// Timings, for metrics:
receivedTime strfmt.DateTime // When was the call received?
allocatedTime strfmt.DateTime // When did we finish allocating capacity?
// Last communication error on the stream (if any). This basically acts as a cancellation flag too.
streamError error
// For implementing http.ResponseWriter:
outHeaders http.Header
outStatus int
headerWritten bool
}
func (ch *callHandle) Header() http.Header {
return ch.outHeaders
}
func (ch *callHandle) WriteHeader(status int) {
ch.outStatus = status
ch.commitHeaders()
}
func (ch *callHandle) commitHeaders() error {
if ch.headerWritten {
return nil
}
ch.headerWritten = true
logrus.Debugf("Committing call result with status %d", ch.outStatus)
var outHeaders []*runner.HttpHeader
for h, vals := range ch.outHeaders {
for _, v := range vals {
outHeaders = append(outHeaders, &runner.HttpHeader{
Key: h,
Value: v,
})
}
}
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly.
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
err := ch.streamError
if err != nil {
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
}
logrus.Debug("Sending call result start message")
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_ResultStart{
ResultStart: &runner.CallResultStart{
Meta: &runner.CallResultStart_Http{
Http: &runner.HttpRespMeta{
Headers: outHeaders,
StatusCode: int32(ch.outStatus),
},
},
},
},
})
if err != nil {
logrus.WithError(err).Error("Error sending call result")
ch.streamError = err
return err
}
logrus.Debug("Sent call result message")
return nil
}
func (ch *callHandle) Write(data []byte) (int, error) {
err := ch.commitHeaders()
if err != nil {
return 0, fmt.Errorf("Error sending data: %v", err)
}
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly.
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
err = ch.streamError
if err != nil {
return 0, fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
}
logrus.Debugf("Sending call response data %d bytes long", len(data))
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Data: data,
Eof: false,
},
},
})
if err != nil {
ch.streamError = err
return 0, fmt.Errorf("Error sending data: %v", err)
}
return len(data), nil
}
func (ch *callHandle) Close() error {
err := ch.commitHeaders()
if err != nil {
return fmt.Errorf("Error sending close frame: %v", err)
}
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
// the call: basically just blackhole the output and return the write error to cause the caller to fail properly.
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
err = ch.streamError
if err != nil {
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
}
logrus.Debug("Sending call response data end")
err = ch.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Data{
Data: &runner.DataFrame{
Eof: true,
},
},
})
if err != nil {
return fmt.Errorf("Error sending close frame: %v", err)
}
return nil
}
// cancel implements the logic for cancelling the execution of a call based on what the state in the handle is.
func (ch *callHandle) cancel(ctx context.Context, err error) {
ch.stateMutex.Lock()
defer ch.stateMutex.Unlock()
// Do not double-cancel.
if ch.streamError != nil {
return
}
// First, record that there has been an error.
ch.streamError = err
// Caller may have died or disconnected. The behaviour here depends on the state of the call.
// If the call was placed and is running we need to handle it...
if ch.c != nil {
// If we've actually started the call we're in the middle of an execution with i/o going back and forth.
// This is hard to stop. Side effects can be occurring at any point. However, at least we should stop
// the i/o flow. Recording the stream error in the handle should have stopped the output, but we also
// want to stop any input being sent through, so we close the input stream and let the function
// probably crash out. If it doesn't crash out, well, it means the function doesn't handle i/o errors
// properly and it will hang there until the timeout, then it'll be killed properly by the timeout
// handling in Submit.
if ch.started {
ch.input.Close()
}
}
}
type CapacityGate interface {
// CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is
// guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units
// of capacity have been committed.
CheckAndReserveCapacity(units uint64) error
// ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under
// zero; implementations are free to panic in that case.
ReleaseCapacity(units uint64)
}
type pureRunnerCapacityManager struct {
totalCapacityUnits uint64
committedCapacityUnits uint64
mtx sync.Mutex
}
type capacityDeallocator func()
func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager {
return &pureRunnerCapacityManager{
totalCapacityUnits: units,
committedCapacityUnits: 0,
}
}
func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units {
prcm.committedCapacityUnits = prcm.committedCapacityUnits + units
return nil
}
return models.ErrCallTimeoutServerBusy
}
func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) {
prcm.mtx.Lock()
defer prcm.mtx.Unlock()
if units <= prcm.committedCapacityUnits {
prcm.committedCapacityUnits = prcm.committedCapacityUnits - units
return
}
panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity")
}
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
// and provides the gRPC server that implements the LB <-> Runner protocol.
type pureRunner struct {
gRPCServer *grpc.Server
listen string
a Agent
inflight int32
capacity CapacityGate
}
func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) {
return pr.a.GetAppID(ctx, appName)
}
func (pr *pureRunner) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
return pr.a.GetAppByID(ctx, appID)
}
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
return pr.a.GetCall(opts...)
}
func (pr *pureRunner) Submit(Call) error {
return errors.New("Submit cannot be called directly in a Pure Runner.")
}
func (pr *pureRunner) Close() error {
// First stop accepting requests
pr.gRPCServer.GracefulStop()
// Then let the agent finish
err := pr.a.Close()
if err != nil {
return err
}
return nil
}
func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
pr.a.AddCallListener(cl)
}
func (pr *pureRunner) Enqueue(context.Context, *models.Call) error {
return errors.New("Enqueue cannot be called directly in a Pure Runner.")
}
func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) {
// Only start it once!
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if !state.started {
state.started = true
go func() {
err := pr.a.Submit(state.c)
if err != nil {
// In this case the function has failed for a legitimate reason. We send a call failed message if we
// can. If there's a streaming error doing that then we are basically in the "double exception" case
// and who knows what's best to do. Submit has already finished so we don't need to cancel... but at
// least we should set streamError if it's not set.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
if err2 != nil {
state.streamError = err2
}
}
state.done <- err
return
}
// First close the writer, then send the call finished message
err = state.Close()
if err != nil {
// If we fail to close the writer we need to communicate back that the function has failed; if there's
// a streaming error doing that then we are basically in the "double exception" case and who knows
// what's best to do. Submit has already finished so we don't need to cancel... but at least we should
// set streamError if it's not set.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
if err2 != nil {
state.streamError = err2
}
}
state.done <- err
return
}
// At this point everything should have worked. Send a successful message... and if that runs afoul of a
// stream error, well, we're in a bit of trouble. Everything has finished, so there is nothing to cancel
// and we just give up, but at least we set streamError.
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
err2 := state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: true,
Details: state.c.Model().ID,
}}})
if err2 != nil {
state.streamError = err2
state.done <- err2
return
}
}
state.done <- nil
}()
}
}
func (pr *pureRunner) handleData(ctx context.Context, data *runner.DataFrame, state *callHandle) error {
pr.ensureFunctionIsRunning(state)
// Only push the input if we're in a non-error situation
state.stateMutex.Lock()
defer state.stateMutex.Unlock()
if state.streamError == nil {
if len(data.Data) > 0 {
_, err := state.input.Write(data.Data)
if err != nil {
return err
}
}
if data.Eof {
state.input.Close()
}
}
return nil
}
func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) {
state.receivedTime = strfmt.DateTime(time.Now())
var c models.Call
err := json.Unmarshal([]byte(tc.ModelsCallJson), &c)
if err != nil {
return func() {}, err
}
// Capacity check first
err = pr.capacity.CheckAndReserveCapacity(c.Memory)
if err != nil {
return func() {}, err
}
// Proceed!
var w http.ResponseWriter
w = state
inR, inW := io.Pipe()
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w))
if err != nil {
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err
}
state.c = agent_call.(*call)
state.input = inW
state.allocatedTime = strfmt.DateTime(time.Now())
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil
}
// Handles a client engagement
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
// Keep lightweight tabs on what this runner is doing: for draindown tests
atomic.AddInt32(&pr.inflight, 1)
defer atomic.AddInt32(&pr.inflight, -1)
pv, ok := peer.FromContext(engagement.Context())
logrus.Debug("Starting engagement")
if ok {
logrus.Debug("Peer is ", pv)
}
md, ok := metadata.FromIncomingContext(engagement.Context())
if ok {
logrus.Debug("MD is ", md)
}
var state = callHandle{
engagement: engagement,
c: nil,
input: nil,
started: false,
done: make(chan error),
streamError: nil,
outHeaders: make(http.Header),
outStatus: 200,
headerWritten: false,
}
grpc.EnableTracing = false
logrus.Debug("Entering engagement handler")
msg, err := engagement.Recv()
if err != nil {
// In this case the connection has dropped before we've even started.
return err
}
switch body := msg.Body.(type) {
case *runner.ClientMsg_Try:
dealloc, err := pr.handleTryCall(engagement.Context(), body.Try, &state)
defer dealloc()
// At the stage of TryCall, there is only one thread running and nothing has happened yet so there should
// not be a streamError. We can handle `err` by sending a message back. If we cause a stream error by sending
// the message, we are in a "double exception" case and we might as well cancel the call with the original
// error, so we can ignore the error from Send.
if err != nil {
_ = engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
}
// If we succeed in creating the call, but we get a stream error sending a message back, we must cancel
// the call because we've probably lost the connection.
err = engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
Committed: true,
Details: state.c.Model().ID,
SlotAllocationLatency: time.Time(state.allocatedTime).Sub(time.Time(state.receivedTime)).String(),
}}})
if err != nil {
state.cancel(engagement.Context(), err)
return err
}
// Then at this point we start handling the data that should be being pushed to us.
foundEof := false
for !foundEof {
msg, err := engagement.Recv()
if err != nil {
// In this case the connection has dropped or there's something bad happening. We know we can't even
// send a message back. Cancel the call, all bets are off.
state.cancel(engagement.Context(), err)
return err
}
switch body := msg.Body.(type) {
case *runner.ClientMsg_Data:
err := pr.handleData(engagement.Context(), body.Data, &state)
if err != nil {
// If this happens, then we couldn't write into the input. The state of the function is inconsistent
// and therefore we need to cancel. We also need to communicate back that the function has failed;
// that could also run afoul of a stream error, but at that point we don't care, just cancel the
// call with the original error.
_ = state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
}
// Then break the loop if this was the last input data frame, i.e. eof is on
if body.Data.Eof {
foundEof = true
}
default:
err := errors.New("Protocol failure in communication with function runner")
// This is essentially a panic. Try to communicate back that the call has failed, and bail out; that
// could also run afoul of a stream error, but at that point we don't care, just cancel the call with
// the catastrophic error.
_ = state.engagement.Send(&runner.RunnerMsg{
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
Success: false,
Details: fmt.Sprintf("%v", err),
}}})
state.cancel(engagement.Context(), err)
return err
}
}
// Synchronize to the function running goroutine finishing
select {
case <-state.done:
case <-engagement.Context().Done():
return engagement.Context().Err()
}
default:
// Protocol error. This should not happen.
return errors.New("Protocol failure in communication with function runner")
}
return nil
}
func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) {
return &runner.RunnerStatus{
Active: atomic.LoadInt32(&pr.inflight),
}, nil
}
func (pr *pureRunner) Start() error {
logrus.Info("Pure Runner listening on ", pr.listen)
lis, err := net.Listen("tcp", pr.listen)
if err != nil {
return fmt.Errorf("Could not listen on %s: %s", pr.listen, err)
}
if err := pr.gRPCServer.Serve(lis); err != nil {
return fmt.Errorf("grpc serve error: %s", err)
}
return err
}
func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error) {
return NewPureRunner(cancel, addr, da, "", "", "", nil)
}
func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (Agent, error) {
return NewPureRunner(cancel, addr, da, cert, key, ca, nil)
}
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error) {
a := createAgent(da, true)
var pr *pureRunner
var err error
if cert != "" && key != "" && ca != "" {
c, err := creds(cert, key, ca)
if err != nil {
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
return nil, err
}
pr, err = createPureRunner(addr, a, c, gate)
if err != nil {
return nil, err
}
} else {
logrus.Warn("Running pure runner in insecure mode!")
pr, err = createPureRunner(addr, a, nil, gate)
if err != nil {
return nil, err
}
}
go func() {
err := pr.Start()
if err != nil {
logrus.WithError(err).Error("Failed to start pure runner")
cancel()
}
}()
return pr, nil
}
func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
// Load the certificates from disk
certificate, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, fmt.Errorf("Could not load server key pair: %s", err)
}
// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
authority, err := ioutil.ReadFile(ca)
if err != nil {
return nil, fmt.Errorf("Could not read ca certificate: %s", err)
}
if ok := certPool.AppendCertsFromPEM(authority); !ok {
return nil, errors.New("Failed to append client certs")
}
return credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
ClientCAs: certPool,
}), nil
}
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) {
var srv *grpc.Server
if creds != nil {
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
}
if gate == nil {
memUnits := getAvailableMemoryUnits()
gate = newPureRunnerCapacityManager(memUnits)
}
pr := &pureRunner{
gRPCServer: srv,
listen: addr,
a: a,
capacity: gate,
}
runner.RegisterRunnerProtocolServer(srv, pr)
return pr, nil
}
const megabyte uint64 = 1024 * 1024
func getAvailableMemoryUnits() uint64 {
// To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that.
throwawayRT := NewResourceTracker(nil).(*resourceTracker)
return throwawayRT.ramAsyncTotal / megabyte
}