mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* disable pure runner logging there's a racey bug where the logger is being written to when it's closing, but this led to figuring out that we don't need the logger at all in pure runner really, the syslog thing isn't an in process fn thing and we don't need the logs from attach for anything further in pure runner. so this disables the logger at the docker level, to save sending the bytes back over the wire, this could be a nice little performance bump too. of course, with this, it means agents can be configured to not log debug or have logs to store at all, and not a lot of guards have been put on this for 'full' agent mode while it hangs on a cross feeling the breeze awaiting its demise - the default configuration remains the same, and no behavior changes in 'full' agent are here. it was a lot smoother to make the noop than to try to plumb in 'nil' for stdout/stderr, this has a lot lower risk of nil panic issues for the same effect, though it's not perfect relying on type casting, plumbing in an interface to check has the same issues (loss of interface adherence for any decorator), so this seems ok. defaulting to not having a logger was similarly painful, and ended up with this. but open to ideas. * replace usage of old null reader writer impl * make Read return io.EOF for io.Copy usage
1057 lines
28 KiB
Go
1057 lines
28 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/agent/grpc"
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/id"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/fnproject/fn/fnext"
|
|
"github.com/fnproject/fn/grpcutil"
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
/*
|
|
Pure Runner (implements Agent) proxies gRPC requests to the actual Agent instance. This is
|
|
done using http.ResponseWriter interfaces where Agent pushes the function I/O through:
|
|
1) Function output to pure runner is received through callHandle http.ResponseWriter interface.
|
|
2) Function input from pure runner to Agent is processed through callHandle io.PipeWriter.
|
|
3) LB to runner input is handled via receiver (inQueue)
|
|
4) runner to LB output is handled via sender (outQueue)
|
|
|
|
The flow of events is as follows:
|
|
|
|
LB:
|
|
|
|
1) LB sends ClientMsg_TryCall to runner
|
|
2) LB sends ClientMsg_DataFrame messages with an EOF for last message set.
|
|
3) LB receives RunnerMsg_CallResultStart for http status and headers
|
|
4) LB receives RunnerMsg_DataFrame messages for http body with an EOF for last message set.
|
|
8) LB receives RunnerMsg_CallFinished as the final message.
|
|
|
|
LB can be interrupted with RunnerMsg_CallFinished anytime. If this is a NACK, presence of 503
|
|
means LB can retry the call.
|
|
|
|
Runner:
|
|
|
|
1) Runner upon receiving ClientMsg_TryCall calls agent.Submit()
|
|
2) Runner allocates its resources but can send a NACK: RunnerMsg_Finished if it cannot service the call in time.
|
|
3) agent.Submit starts reading data from callHandle io.PipeReader, this reads
|
|
data from LB via gRPC receiver (inQueue). The http reader detects headers/data
|
|
and sends RunnerMsg_CallResultStart and/or RunnerMsg_DataFrame messages to LB.
|
|
4) agent.Submit() completes, this means, the Function I/O is now completed. Runner sends RunnerMsg_Finished
|
|
|
|
*/
|
|
|
|
var (
|
|
ErrorExpectedTry = errors.New("Protocol failure: expected ClientMsg_Try")
|
|
ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
|
|
)
|
|
|
|
// callHandle represents the state of the call as handled by the pure runner, and additionally it implements the
|
|
// interface of http.ResponseWriter so that it can be used for streaming the output back.
|
|
type callHandle struct {
|
|
engagement runner.RunnerProtocol_EngageServer
|
|
ctx context.Context
|
|
c *call // the agent's version of call
|
|
|
|
// For implementing http.ResponseWriter:
|
|
headers http.Header
|
|
status int
|
|
|
|
headerOnce sync.Once
|
|
shutOnce sync.Once
|
|
pipeToFnCloseOnce sync.Once
|
|
|
|
outQueue chan *runner.RunnerMsg
|
|
doneQueue chan struct{}
|
|
errQueue chan error
|
|
inQueue chan *runner.ClientMsg
|
|
|
|
// Pipe to push data to the agent Function container
|
|
pipeToFnW *io.PipeWriter
|
|
pipeToFnR *io.PipeReader
|
|
}
|
|
|
|
func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle {
|
|
|
|
// set up a pipe to push data to agent Function container
|
|
pipeR, pipeW := io.Pipe()
|
|
|
|
state := &callHandle{
|
|
engagement: engagement,
|
|
ctx: engagement.Context(),
|
|
headers: make(http.Header),
|
|
status: 200,
|
|
outQueue: make(chan *runner.RunnerMsg),
|
|
doneQueue: make(chan struct{}),
|
|
errQueue: make(chan error, 1), // always allow one error (buffered)
|
|
inQueue: make(chan *runner.ClientMsg),
|
|
pipeToFnW: pipeW,
|
|
pipeToFnR: pipeR,
|
|
}
|
|
|
|
// spawn one receiver and one sender go-routine.
|
|
// See: https://grpc.io/docs/reference/go/generated-code.html, which reads:
|
|
// "Thread-safety: note that client-side RPC invocations and server-side RPC handlers
|
|
// are thread-safe and are meant to be run on concurrent goroutines. But also note that
|
|
// for individual streams, incoming and outgoing data is bi-directional but serial;
|
|
// so e.g. individual streams do not support concurrent reads or concurrent writes
|
|
// (but reads are safely concurrent with writes)."
|
|
state.spawnReceiver()
|
|
state.spawnSender()
|
|
return state
|
|
}
|
|
|
|
// closePipeToFn closes the pipe that feeds data to the function in agent.
|
|
func (ch *callHandle) closePipeToFn() {
|
|
ch.pipeToFnCloseOnce.Do(func() {
|
|
ch.pipeToFnW.Close()
|
|
})
|
|
}
|
|
|
|
// finalize initiates a graceful shutdown of the session. This is
|
|
// currently achieved by a sentinel nil enqueue to gRPC sender.
|
|
func (ch *callHandle) finalize() error {
|
|
// final sentinel nil msg for graceful shutdown
|
|
err := ch.enqueueMsg(nil)
|
|
if err != nil {
|
|
ch.shutdown(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// shutdown initiates a shutdown and terminates the gRPC session with
|
|
// a given error.
|
|
func (ch *callHandle) shutdown(err error) {
|
|
|
|
ch.closePipeToFn()
|
|
|
|
ch.shutOnce.Do(func() {
|
|
common.Logger(ch.ctx).WithError(err).Debugf("Shutting down call handle")
|
|
|
|
// try to queue an error message if it's not already queued.
|
|
if err != nil {
|
|
select {
|
|
case ch.errQueue <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
close(ch.doneQueue)
|
|
})
|
|
}
|
|
|
|
// waitError waits until the session is completed and results
|
|
// any queued error if there is any.
|
|
func (ch *callHandle) waitError() error {
|
|
select {
|
|
case <-ch.ctx.Done():
|
|
case <-ch.doneQueue:
|
|
}
|
|
|
|
var err error
|
|
// get queued error if there's any
|
|
select {
|
|
case err = <-ch.errQueue:
|
|
default:
|
|
err = ch.ctx.Err()
|
|
}
|
|
|
|
if err != nil {
|
|
logrus.WithError(err).Debugf("Wait Error")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// enqueueMsg attempts to queue a message to the gRPC sender
|
|
func (ch *callHandle) enqueueMsg(msg *runner.RunnerMsg) error {
|
|
select {
|
|
case ch.outQueue <- msg:
|
|
return nil
|
|
case <-ch.ctx.Done():
|
|
case <-ch.doneQueue:
|
|
}
|
|
return io.EOF
|
|
}
|
|
|
|
// enqueueMsgStricy enqueues a message to the gRPC sender and if
|
|
// that fails then initiates an error case shutdown.
|
|
func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error {
|
|
err := ch.enqueueMsg(msg)
|
|
if err != nil {
|
|
ch.shutdown(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (ch *callHandle) enqueueDetached(err error) {
|
|
statusCode := http.StatusAccepted
|
|
if err != nil {
|
|
if models.IsAPIError(err) {
|
|
statusCode = models.GetAPIErrorCode(err)
|
|
} else {
|
|
statusCode = http.StatusInternalServerError
|
|
}
|
|
}
|
|
|
|
err = ch.enqueueMsg(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_ResultStart{
|
|
ResultStart: &runner.CallResultStart{
|
|
Meta: &runner.CallResultStart_Http{
|
|
Http: &runner.HttpRespMeta{
|
|
Headers: ch.prepHeaders(),
|
|
StatusCode: int32(statusCode)}}}}})
|
|
}
|
|
|
|
// enqueueCallResponse enqueues a Submit() response to the LB
|
|
// and initiates a graceful shutdown of the session.
|
|
func (ch *callHandle) enqueueCallResponse(err error) {
|
|
|
|
var createdAt string
|
|
var startedAt string
|
|
var completedAt string
|
|
var details string
|
|
var errCode int
|
|
var errStr string
|
|
|
|
log := common.Logger(ch.ctx)
|
|
|
|
if err != nil {
|
|
errCode = models.GetAPIErrorCode(err)
|
|
errStr = err.Error()
|
|
}
|
|
|
|
if ch.c != nil {
|
|
mcall := ch.c.Model()
|
|
|
|
// These timestamps are related. To avoid confusion
|
|
// and for robustness, nested if stmts below.
|
|
if !time.Time(mcall.CreatedAt).IsZero() {
|
|
createdAt = mcall.CreatedAt.String()
|
|
|
|
if !time.Time(mcall.StartedAt).IsZero() {
|
|
startedAt = mcall.StartedAt.String()
|
|
|
|
if !time.Time(mcall.CompletedAt).IsZero() {
|
|
completedAt = mcall.CompletedAt.String()
|
|
} else {
|
|
// IMPORTANT: We punch this in ourselves.
|
|
// This is because call.End() is executed asynchronously.
|
|
completedAt = common.DateTime(time.Now()).String()
|
|
}
|
|
}
|
|
}
|
|
|
|
details = mcall.ID
|
|
|
|
}
|
|
log.Debugf("Sending Call Finish details=%v", details)
|
|
|
|
errTmp := ch.enqueueMsgStrict(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: err == nil,
|
|
Details: details,
|
|
ErrorCode: int32(errCode),
|
|
ErrorStr: errStr,
|
|
CreatedAt: createdAt,
|
|
StartedAt: startedAt,
|
|
CompletedAt: completedAt,
|
|
}}})
|
|
|
|
if errTmp != nil {
|
|
log.WithError(errTmp).Infof("enqueueCallResponse Send Error details=%v err=%v:%v", details, errCode, errStr)
|
|
return
|
|
}
|
|
|
|
errTmp = ch.finalize()
|
|
if errTmp != nil {
|
|
log.WithError(errTmp).Infof("enqueueCallResponse Finalize Error details=%v err=%v:%v", details, errCode, errStr)
|
|
}
|
|
}
|
|
|
|
// spawnPipeToFn pumps data to Function via callHandle io.PipeWriter (pipeToFnW)
|
|
// which is fed using input channel.
|
|
func (ch *callHandle) spawnPipeToFn() chan *runner.DataFrame {
|
|
|
|
input := make(chan *runner.DataFrame)
|
|
go func() {
|
|
defer ch.closePipeToFn()
|
|
for {
|
|
select {
|
|
case <-ch.doneQueue:
|
|
return
|
|
case <-ch.ctx.Done():
|
|
return
|
|
case data := <-input:
|
|
if data == nil {
|
|
return
|
|
}
|
|
|
|
if len(data.Data) > 0 {
|
|
_, err := io.CopyN(ch.pipeToFnW, bytes.NewReader(data.Data), int64(len(data.Data)))
|
|
if err != nil {
|
|
ch.shutdown(err)
|
|
return
|
|
}
|
|
}
|
|
if data.Eof {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return input
|
|
}
|
|
|
|
// spawnReceiver starts a gRPC receiver, which
|
|
// feeds received LB messages into inQueue
|
|
func (ch *callHandle) spawnReceiver() {
|
|
|
|
go func() {
|
|
defer close(ch.inQueue)
|
|
for {
|
|
msg, err := ch.engagement.Recv()
|
|
if err != nil {
|
|
// engagement is close/cancelled from client.
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
ch.shutdown(err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case ch.inQueue <- msg:
|
|
case <-ch.doneQueue:
|
|
return
|
|
case <-ch.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// spawnSender starts a gRPC sender, which
|
|
// pumps messages from outQueue to the LB.
|
|
func (ch *callHandle) spawnSender() {
|
|
go func() {
|
|
for {
|
|
select {
|
|
case msg := <-ch.outQueue:
|
|
if msg == nil {
|
|
ch.shutdown(nil)
|
|
return
|
|
}
|
|
err := ch.engagement.Send(msg)
|
|
if err != nil {
|
|
ch.shutdown(err)
|
|
return
|
|
}
|
|
case <-ch.doneQueue:
|
|
return
|
|
case <-ch.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Header implements http.ResponseWriter, which
|
|
// is used by Agent to push headers to pure runner
|
|
func (ch *callHandle) Header() http.Header {
|
|
return ch.headers
|
|
}
|
|
|
|
// WriteHeader implements http.ResponseWriter, which
|
|
// is used by Agent to push http status to pure runner
|
|
func (ch *callHandle) WriteHeader(status int) {
|
|
ch.status = status
|
|
}
|
|
|
|
// prepHeaders is a utility function to compile http headers
|
|
// into a flat array.
|
|
func (ch *callHandle) prepHeaders() []*runner.HttpHeader {
|
|
var headers []*runner.HttpHeader
|
|
for h, vals := range ch.headers {
|
|
for _, v := range vals {
|
|
headers = append(headers, &runner.HttpHeader{
|
|
Key: h,
|
|
Value: v,
|
|
})
|
|
}
|
|
}
|
|
return headers
|
|
}
|
|
|
|
// Write implements http.ResponseWriter, which
|
|
// is used by Agent to push http data to pure runner. The
|
|
// received data is pushed to LB via gRPC sender queue.
|
|
// Write also sends http headers/state to the LB.
|
|
func (ch *callHandle) Write(data []byte) (int, error) {
|
|
if ch.c.Model().Type == models.TypeDetached {
|
|
//If it is an detached call we just /dev/null the data coming back from the container
|
|
return len(data), nil
|
|
}
|
|
var err error
|
|
ch.headerOnce.Do(func() {
|
|
// WARNING: we do fetch Status and Headers without
|
|
// a lock below. This is a problem in agent in general, and needs
|
|
// to be fixed in all accessing go-routines such as protocol/http.go,
|
|
// protocol/json.go, agent.go, etc. In practice however, one go routine
|
|
// accesses them (which also compiles and writes headers), but this
|
|
// is fragile and needs to be fortified.
|
|
err = ch.enqueueMsg(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_ResultStart{
|
|
ResultStart: &runner.CallResultStart{
|
|
Meta: &runner.CallResultStart_Http{
|
|
Http: &runner.HttpRespMeta{
|
|
Headers: ch.prepHeaders(),
|
|
StatusCode: int32(ch.status),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
total := 0
|
|
// split up data into gRPC chunks
|
|
for {
|
|
chunkSize := len(data)
|
|
if chunkSize > MaxDataChunk {
|
|
chunkSize = MaxDataChunk
|
|
}
|
|
if chunkSize == 0 {
|
|
break
|
|
}
|
|
|
|
// we cannot retain 'data'
|
|
cpData := make([]byte, chunkSize)
|
|
copy(cpData, data[0:chunkSize])
|
|
data = data[chunkSize:]
|
|
|
|
err = ch.enqueueMsg(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Data{
|
|
Data: &runner.DataFrame{
|
|
Data: cpData,
|
|
Eof: false,
|
|
},
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
return total, err
|
|
}
|
|
total += chunkSize
|
|
}
|
|
|
|
return total, nil
|
|
}
|
|
|
|
// getTryMsg fetches/waits for a TryCall message from
|
|
// the LB using inQueue (gRPC receiver)
|
|
func (ch *callHandle) getTryMsg() *runner.TryCall {
|
|
var msg *runner.TryCall
|
|
|
|
select {
|
|
case <-ch.doneQueue:
|
|
case <-ch.ctx.Done():
|
|
// if ctx timed out while waiting, then this is a 503 (retriable)
|
|
err := status.Errorf(codes.Code(models.ErrCallTimeoutServerBusy.Code()), models.ErrCallTimeoutServerBusy.Error())
|
|
ch.shutdown(err)
|
|
return nil
|
|
case item := <-ch.inQueue:
|
|
if item != nil {
|
|
msg = item.GetTry()
|
|
}
|
|
}
|
|
if msg == nil {
|
|
ch.shutdown(ErrorExpectedTry)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
// getDataMsg fetches/waits for a DataFrame message from
|
|
// the LB using inQueue (gRPC receiver)
|
|
func (ch *callHandle) getDataMsg() *runner.DataFrame {
|
|
var msg *runner.DataFrame
|
|
|
|
select {
|
|
case <-ch.doneQueue:
|
|
case <-ch.ctx.Done():
|
|
case item := <-ch.inQueue:
|
|
if item != nil {
|
|
msg = item.GetData()
|
|
}
|
|
}
|
|
if msg == nil {
|
|
ch.shutdown(ErrorExpectedData)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
const (
|
|
// Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to
|
|
// ensure we control idle timeout & timeout as well as how long should cache be valid.
|
|
// A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs,
|
|
// and during this time, since we allow no queries to go through, the hot container times out.
|
|
//
|
|
// For now, status tests a single case: a new hot container is spawned when cache is expired
|
|
// and when a query is allowed to run.
|
|
// TODO: we might want to mix this up and perhaps allow that hot container to handle
|
|
// more than one query to test both 'new hot container' and 'old hot container' cases.
|
|
StatusCallTimeout = int32(5)
|
|
StatusCallIdleTimeout = int32(1)
|
|
StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second
|
|
|
|
// Total context timeout (scheduler+execution.) We need to allocate plenty of time here.
|
|
// 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc.
|
|
StatusCtxTimeout = time.Duration(60 * time.Second)
|
|
)
|
|
|
|
// statusTracker maintains cache data/state/locks for Status Call invocations.
|
|
type statusTracker struct {
|
|
inflight int32
|
|
requestsReceived uint64
|
|
requestsHandled uint64
|
|
imageName string
|
|
|
|
// lock protects expiry/cache/wait fields below. RunnerStatus ptr itself
|
|
// stored every time status image is executed. Cache fetches use a shallow
|
|
// copy of RunnerStatus to ensure consistency. Shallow copy is sufficient
|
|
// since we set/save contents of RunnerStatus once.
|
|
lock sync.Mutex
|
|
expiry time.Time
|
|
cache *runner.RunnerStatus
|
|
wait chan struct{}
|
|
}
|
|
|
|
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
|
|
// and provides the gRPC server that implements the LB <-> Runner protocol.
|
|
type pureRunner struct {
|
|
gRPCServer *grpc.Server
|
|
creds credentials.TransportCredentials
|
|
a Agent
|
|
status statusTracker
|
|
callHandleMap map[string]*callHandle
|
|
callHandleLock sync.Mutex
|
|
enableDetach bool
|
|
}
|
|
|
|
// implements Agent
|
|
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
|
|
return pr.a.GetCall(opts...)
|
|
}
|
|
|
|
// implements Agent
|
|
func (pr *pureRunner) Submit(Call) error {
|
|
return errors.New("Submit cannot be called directly in a Pure Runner.")
|
|
}
|
|
|
|
// implements Agent
|
|
func (pr *pureRunner) Close() error {
|
|
// First stop accepting requests
|
|
pr.gRPCServer.GracefulStop()
|
|
// Then let the agent finish
|
|
err := pr.a.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// implements Agent
|
|
func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
|
|
pr.a.AddCallListener(cl)
|
|
}
|
|
|
|
func (pr *pureRunner) saveCallHandle(ch *callHandle) {
|
|
pr.callHandleLock.Lock()
|
|
pr.callHandleMap[ch.c.Model().ID] = ch
|
|
pr.callHandleLock.Unlock()
|
|
}
|
|
|
|
func (pr *pureRunner) removeCallHandle(cID string) {
|
|
pr.callHandleLock.Lock()
|
|
delete(pr.callHandleMap, cID)
|
|
pr.callHandleLock.Unlock()
|
|
|
|
}
|
|
|
|
func (pr *pureRunner) spawnSubmit(state *callHandle) {
|
|
go func() {
|
|
err := pr.a.Submit(state.c)
|
|
state.enqueueCallResponse(err)
|
|
}()
|
|
}
|
|
|
|
func (pr *pureRunner) spawnDetachSubmit(state *callHandle) {
|
|
go func() {
|
|
pr.saveCallHandle(state)
|
|
err := pr.a.Submit(state.c)
|
|
pr.removeCallHandle(state.c.Model().ID)
|
|
state.enqueueCallResponse(err)
|
|
}()
|
|
}
|
|
|
|
// handleTryCall based on the TryCall message, tries to place the call on NBIO Agent
|
|
func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error {
|
|
|
|
start := time.Now()
|
|
|
|
var c models.Call
|
|
err := json.Unmarshal([]byte(tc.ModelsCallJson), &c)
|
|
if err != nil {
|
|
state.enqueueCallResponse(err)
|
|
return err
|
|
}
|
|
|
|
// Status image is reserved for internal Status checks.
|
|
// We need to make sure normal functions calls cannot call it.
|
|
if pr.status.imageName != "" && c.Image == pr.status.imageName {
|
|
err = models.ErrFnsInvalidImage
|
|
state.enqueueCallResponse(err)
|
|
return err
|
|
}
|
|
|
|
// IMPORTANT: We clear/initialize these dates as start/created/completed dates from
|
|
// unmarshalled Model from LB-agent represent unrelated time-line events.
|
|
// From this point, CreatedAt/StartedAt/CompletedAt are based on our local clock.
|
|
c.CreatedAt = common.DateTime(start)
|
|
c.StartedAt = common.DateTime(time.Time{})
|
|
c.CompletedAt = common.DateTime(time.Time{})
|
|
|
|
agentCall, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR),
|
|
WithLogger(common.NoopReadWriteCloser{}),
|
|
WithWriter(state),
|
|
WithContext(state.ctx),
|
|
WithExtensions(tc.GetExtensions()),
|
|
)
|
|
if err != nil {
|
|
state.enqueueCallResponse(err)
|
|
return err
|
|
}
|
|
|
|
state.c = agentCall.(*call)
|
|
if tc.SlotHashId != "" {
|
|
hashID, err := hex.DecodeString(tc.SlotHashId)
|
|
if err != nil {
|
|
state.enqueueCallResponse(err)
|
|
return err
|
|
}
|
|
state.c.slotHashId = string(hashID[:])
|
|
}
|
|
|
|
if state.c.Type == models.TypeDetached {
|
|
if !pr.enableDetach {
|
|
err = models.ErrDetachUnsupported
|
|
state.enqueueCallResponse(err)
|
|
return err
|
|
}
|
|
pr.spawnDetachSubmit(state)
|
|
return nil
|
|
}
|
|
pr.spawnSubmit(state)
|
|
return nil
|
|
}
|
|
|
|
// implements RunnerProtocolServer
|
|
// Handles a client engagement
|
|
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
|
|
grpc.EnableTracing = false
|
|
ctx := engagement.Context()
|
|
log := common.Logger(ctx)
|
|
// Keep lightweight tabs on what this runner is doing: for draindown tests
|
|
atomic.AddInt32(&pr.status.inflight, 1)
|
|
atomic.AddUint64(&pr.status.requestsReceived, 1)
|
|
|
|
pv, ok := peer.FromContext(ctx)
|
|
log.Debug("Starting engagement")
|
|
if ok {
|
|
log.Debug("Peer is ", pv)
|
|
}
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if ok {
|
|
log.Debug("MD is ", md)
|
|
}
|
|
state := NewCallHandle(engagement)
|
|
|
|
tryMsg := state.getTryMsg()
|
|
if tryMsg != nil {
|
|
errTry := pr.handleTryCall(tryMsg, state)
|
|
if errTry == nil {
|
|
dataFeed := state.spawnPipeToFn()
|
|
|
|
DataLoop:
|
|
for {
|
|
dataMsg := state.getDataMsg()
|
|
if dataMsg == nil {
|
|
break
|
|
}
|
|
|
|
select {
|
|
case dataFeed <- dataMsg:
|
|
if dataMsg.Eof {
|
|
break DataLoop
|
|
}
|
|
case <-state.doneQueue:
|
|
break DataLoop
|
|
case <-state.ctx.Done():
|
|
break DataLoop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
err := state.waitError()
|
|
|
|
// if we didn't respond with TooBusy, then this means the request
|
|
// was processed.
|
|
if err != models.ErrCallTimeoutServerBusy {
|
|
atomic.AddUint64(&pr.status.requestsHandled, 1)
|
|
}
|
|
|
|
atomic.AddInt32(&pr.status.inflight, -1)
|
|
return err
|
|
}
|
|
|
|
// Runs a status call using status image with baked in parameters.
|
|
func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus {
|
|
|
|
// IMPORTANT: We have to use our own context with a set timeout in order
|
|
// to ignore client timeouts. Original 'ctx' here carries client side deadlines.
|
|
// Since these deadlines can vary, in order to make sure Status runs predictably we
|
|
// use a hardcoded preset timeout 'ctxTimeout' instead.
|
|
// TODO: It would be good to copy original ctx key/value pairs into our new
|
|
// context for tracking, etc. But go-lang context today does not seem to allow this.
|
|
execCtx, execCtxCancel := context.WithTimeout(context.Background(), StatusCtxTimeout)
|
|
defer execCtxCancel()
|
|
|
|
result := &runner.RunnerStatus{}
|
|
log := common.Logger(ctx)
|
|
start := time.Now()
|
|
|
|
// construct call
|
|
var c models.Call
|
|
|
|
// Most of these arguments are baked in. We might want to make this
|
|
// more configurable.
|
|
c.ID = id.New().String()
|
|
c.Image = pr.status.imageName
|
|
c.Type = "sync"
|
|
c.TmpFsSize = 0
|
|
c.Memory = 0
|
|
c.CPUs = models.MilliCPUs(0)
|
|
c.URL = "/"
|
|
c.Method = "GET"
|
|
c.CreatedAt = common.DateTime(start)
|
|
c.Config = make(models.Config)
|
|
c.Payload = "{}"
|
|
c.Timeout = StatusCallTimeout
|
|
c.IdleTimeout = StatusCallIdleTimeout
|
|
|
|
// TODO: reliably shutdown this container after executing one request.
|
|
|
|
log.Debugf("Running status call with id=%v image=%v", c.ID, c.Image)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
player := ioutil.NopCloser(strings.NewReader(c.Payload))
|
|
|
|
agentCall, err := pr.a.GetCall(FromModelAndInput(&c, player),
|
|
WithLogger(common.NoopReadWriteCloser{}),
|
|
WithWriter(recorder),
|
|
WithContext(execCtx),
|
|
)
|
|
|
|
if err == nil {
|
|
var mcall *call
|
|
mcall = agentCall.(*call)
|
|
err = pr.a.Submit(mcall)
|
|
}
|
|
|
|
resp := recorder.Result()
|
|
|
|
if err != nil {
|
|
result.ErrorCode = int32(models.GetAPIErrorCode(err))
|
|
result.ErrorStr = err.Error()
|
|
result.Failed = true
|
|
} else if resp.StatusCode >= http.StatusBadRequest {
|
|
result.ErrorCode = int32(resp.StatusCode)
|
|
result.Failed = true
|
|
}
|
|
|
|
// These timestamps are related. To avoid confusion
|
|
// and for robustness, nested if stmts below.
|
|
if !time.Time(c.CreatedAt).IsZero() {
|
|
result.CreatedAt = c.CreatedAt.String()
|
|
|
|
if !time.Time(c.StartedAt).IsZero() {
|
|
result.StartedAt = c.StartedAt.String()
|
|
|
|
if !time.Time(c.CompletedAt).IsZero() {
|
|
result.CompletedAt = c.CompletedAt.String()
|
|
} else {
|
|
// IMPORTANT: We punch this in ourselves.
|
|
// This is because call.End() is executed asynchronously.
|
|
result.CompletedAt = common.DateTime(time.Now()).String()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Status images should not output excessive data since we echo the
|
|
// data back to caller.
|
|
body, _ := ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
|
|
// Clamp the log output to 256 bytes if output is too large for logging.
|
|
dLen := len(body)
|
|
if dLen > 256 {
|
|
dLen = 256
|
|
}
|
|
log.Debugf("Status call with id=%v result=%+v body[0:%v]=%v", c.ID, result, dLen, string(body[:dLen]))
|
|
|
|
result.Details = string(body)
|
|
result.Id = c.ID
|
|
return result
|
|
}
|
|
|
|
// Handles a status call concurrency and caching.
|
|
func (pr *pureRunner) handleStatusCall(ctx context.Context) (*runner.RunnerStatus, error) {
|
|
var myChan chan struct{}
|
|
|
|
isWaiter := false
|
|
isCached := false
|
|
now := time.Now()
|
|
|
|
pr.status.lock.Lock()
|
|
|
|
if now.Before(pr.status.expiry) {
|
|
// cache is still valid.
|
|
isCached = true
|
|
} else if pr.status.wait != nil {
|
|
// A wait channel is already installed, we must wait
|
|
isWaiter = true
|
|
myChan = pr.status.wait
|
|
} else {
|
|
// Wait channel is not present, we install a new one.
|
|
myChan = make(chan struct{})
|
|
pr.status.wait = myChan
|
|
}
|
|
|
|
pr.status.lock.Unlock()
|
|
|
|
// We either need to wait and/or serve the request from cache
|
|
if isWaiter || isCached {
|
|
if isWaiter {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-myChan:
|
|
}
|
|
}
|
|
|
|
var cacheObj runner.RunnerStatus
|
|
|
|
// A shallow copy is sufficient here, as we do not modify nested data in
|
|
// RunnerStatus in any way.
|
|
pr.status.lock.Lock()
|
|
|
|
cacheObj = *pr.status.cache
|
|
|
|
pr.status.lock.Unlock()
|
|
|
|
cacheObj.Cached = true
|
|
cacheObj.Active = atomic.LoadInt32(&pr.status.inflight)
|
|
cacheObj.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
|
|
cacheObj.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
|
|
return &cacheObj, nil
|
|
}
|
|
|
|
cachePtr := pr.runStatusCall(ctx)
|
|
cachePtr.Active = atomic.LoadInt32(&pr.status.inflight)
|
|
cachePtr.RequestsReceived = atomic.LoadUint64(&pr.status.requestsReceived)
|
|
cachePtr.RequestsHandled = atomic.LoadUint64(&pr.status.requestsHandled)
|
|
now = time.Now()
|
|
|
|
// Pointer store of 'cachePtr' is sufficient here as isWaiter/isCached above perform a shallow
|
|
// copy of 'cache'
|
|
pr.status.lock.Lock()
|
|
|
|
pr.status.cache = cachePtr
|
|
pr.status.expiry = now.Add(StatusCallCacheDuration)
|
|
pr.status.wait = nil
|
|
|
|
pr.status.lock.Unlock()
|
|
|
|
// signal waiters
|
|
close(myChan)
|
|
return cachePtr, nil
|
|
}
|
|
|
|
// implements RunnerProtocolServer
|
|
func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) {
|
|
// Status using image name is disabled. We return inflight request count only
|
|
if pr.status.imageName == "" {
|
|
return &runner.RunnerStatus{
|
|
Active: atomic.LoadInt32(&pr.status.inflight),
|
|
RequestsReceived: atomic.LoadUint64(&pr.status.requestsReceived),
|
|
RequestsHandled: atomic.LoadUint64(&pr.status.requestsHandled),
|
|
}, nil
|
|
}
|
|
return pr.handleStatusCall(ctx)
|
|
}
|
|
|
|
// BeforeCall called before a function is executed
|
|
func (pr *pureRunner) BeforeCall(ctx context.Context, call *models.Call) error {
|
|
if call.Type != models.TypeDetached {
|
|
return nil
|
|
}
|
|
var err error
|
|
// it is an ack sync we send ResultStart message back
|
|
pr.callHandleLock.Lock()
|
|
ch := pr.callHandleMap[call.ID]
|
|
pr.callHandleLock.Unlock()
|
|
if ch == nil {
|
|
err = models.ErrCallHandlerNotFound
|
|
return err
|
|
}
|
|
ch.enqueueDetached(err)
|
|
return nil
|
|
}
|
|
|
|
// AfterCall called after a funcion is executed
|
|
func (pr *pureRunner) AfterCall(ctx context.Context, call *models.Call) error {
|
|
return nil
|
|
}
|
|
|
|
func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error) {
|
|
|
|
agent := New(da)
|
|
|
|
// WARNING: SSL creds are optional.
|
|
if tlsCfg == nil {
|
|
return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent))
|
|
}
|
|
return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent), PureRunnerWithSSL(tlsCfg))
|
|
}
|
|
|
|
type PureRunnerOption func(*pureRunner) error
|
|
|
|
func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption {
|
|
return func(pr *pureRunner) error {
|
|
pr.creds = credentials.NewTLS(tlsCfg)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func PureRunnerWithAgent(a Agent) PureRunnerOption {
|
|
return func(pr *pureRunner) error {
|
|
if pr.a != nil {
|
|
return errors.New("Failed to create pure runner: agent already created")
|
|
}
|
|
|
|
pr.a = a
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a
|
|
// statusImageName attribute. This attribute names an image name to use for the status checks.
|
|
// Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid
|
|
// docker pull during status checks.
|
|
func PureRunnerWithStatusImage(imgName string) PureRunnerOption {
|
|
return func(pr *pureRunner) error {
|
|
if pr.status.imageName != "" {
|
|
return fmt.Errorf("Duplicate status image configuration old=%s new=%s", pr.status.imageName, imgName)
|
|
}
|
|
pr.status.imageName = imgName
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func PureRunnerWithDetached() PureRunnerOption {
|
|
return func(pr *pureRunner) error {
|
|
pr.AddCallListener(pr)
|
|
pr.enableDetach = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) {
|
|
|
|
pr := &pureRunner{}
|
|
|
|
for _, option := range options {
|
|
err := option(pr)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatalf("error in pure runner options")
|
|
}
|
|
}
|
|
|
|
if pr.a == nil {
|
|
logrus.Fatal("agent not provided in pure runner options")
|
|
}
|
|
|
|
var opts []grpc.ServerOption
|
|
|
|
opts = append(opts, grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor))
|
|
opts = append(opts, grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor))
|
|
|
|
if pr.creds != nil {
|
|
opts = append(opts, grpc.Creds(pr.creds))
|
|
} else {
|
|
logrus.Warn("Running pure runner in insecure mode!")
|
|
}
|
|
|
|
pr.callHandleMap = make(map[string]*callHandle)
|
|
pr.gRPCServer = grpc.NewServer(opts...)
|
|
runner.RegisterRunnerProtocolServer(pr.gRPCServer, pr)
|
|
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatalf("Could not listen on %s", addr)
|
|
}
|
|
|
|
logrus.Info("Pure Runner listening on ", addr)
|
|
|
|
go func() {
|
|
if err := pr.gRPCServer.Serve(lis); err != nil {
|
|
logrus.WithError(err).Error("grpc serve error")
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
return pr, nil
|
|
}
|
|
|
|
var _ runner.RunnerProtocolServer = &pureRunner{}
|
|
var _ Agent = &pureRunner{}
|