Feature/acksync response writer (#1267)

This implements a "detached" mechanism to get an ack from the runner
once it actually starts to run a function. In this scenario the response
returned back is just a 202 if we placed the function in a specific
time-frame. If we hit some errors or we fail to place the fn in time we
return back different errors.
This commit is contained in:
Andrea Rosa
2018-11-09 18:25:43 +00:00
committed by Tolga Ceylan
parent 2df6c8d349
commit 182db94fad
17 changed files with 453 additions and 58 deletions

View File

@@ -44,16 +44,25 @@ import (
The flow of events is as follows:
1) LB sends ClientMsg_Try to runner
2) Runner allocates its resources and sends an ACK: RunnerMsg_Acknowledged
3) LB sends ClientMsg_Data messages with an EOF for last message set.
4) Runner upon receiving with ClientMsg_Data calls agent.Submit()
5) agent.Submit starts reading data from callHandle io.PipeReader, this reads
data from LB via gRPC receiver (inQueue).
6) agent.Submit starts sending data via callHandle http.ResponseWriter interface,
which is pushed to gRPC sender (outQueue) to the LB.
7) agent.Submit() completes, this means, the Function I/O is now completed.
8) Runner finalizes gRPC session with RunnerMsg_Finished to LB.
LB:
1) LB sends ClientMsg_TryCall to runner
2) LB sends ClientMsg_DataFrame messages with an EOF for last message set.
3) LB receives RunnerMsg_CallResultStart for http status and headers
4) LB receives RunnerMsg_DataFrame messages for http body with an EOF for last message set.
8) LB receives RunnerMsg_CallFinished as the final message.
LB can be interrupted with RunnerMsg_CallFinished anytime. If this is a NACK, presence of 503
means LB can retry the call.
Runner:
1) Runner upon receiving ClientMsg_TryCall calls agent.Submit()
2) Runner allocates its resources but can send a NACK: RunnerMsg_Finished if it cannot service the call in time.
3) agent.Submit starts reading data from callHandle io.PipeReader, this reads
data from LB via gRPC receiver (inQueue). The http reader detects headers/data
and sends RunnerMsg_CallResultStart and/or RunnerMsg_DataFrame messages to LB.
4) agent.Submit() completes, this means, the Function I/O is now completed. Runner sends RunnerMsg_Finished
*/
@@ -199,6 +208,25 @@ func (ch *callHandle) enqueueMsgStrict(msg *runner.RunnerMsg) error {
return err
}
func (ch *callHandle) enqueueDetached(err error) {
statusCode := http.StatusAccepted
if err != nil {
if models.IsAPIError(err) {
statusCode = models.GetAPIErrorCode(err)
} else {
statusCode = http.StatusInternalServerError
}
}
err = ch.enqueueMsg(&runner.RunnerMsg{
Body: &runner.RunnerMsg_ResultStart{
ResultStart: &runner.CallResultStart{
Meta: &runner.CallResultStart_Http{
Http: &runner.HttpRespMeta{
Headers: ch.prepHeaders(),
StatusCode: int32(statusCode)}}}}})
}
// enqueueCallResponse enqueues a Submit() response to the LB
// and initiates a graceful shutdown of the session.
func (ch *callHandle) enqueueCallResponse(err error) {
@@ -385,6 +413,10 @@ func (ch *callHandle) prepHeaders() []*runner.HttpHeader {
// received data is pushed to LB via gRPC sender queue.
// Write also sends http headers/state to the LB.
func (ch *callHandle) Write(data []byte) (int, error) {
if ch.c.Model().Type == models.TypeDetached {
//If it is an detached call we just /dev/null the data coming back from the container
return len(data), nil
}
var err error
ch.headerOnce.Do(func() {
// WARNING: we do fetch Status and Headers without
@@ -410,7 +442,6 @@ func (ch *callHandle) Write(data []byte) (int, error) {
if err != nil {
return 0, err
}
total := 0
// split up data into gRPC chunks
for {
@@ -526,10 +557,13 @@ type statusTracker struct {
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
// and provides the gRPC server that implements the LB <-> Runner protocol.
type pureRunner struct {
gRPCServer *grpc.Server
creds credentials.TransportCredentials
a Agent
status statusTracker
gRPCServer *grpc.Server
creds credentials.TransportCredentials
a Agent
status statusTracker
callHandleMap map[string]*callHandle
callHandleLock sync.Mutex
enableDetach bool
}
// implements Agent
@@ -559,6 +593,19 @@ func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
pr.a.AddCallListener(cl)
}
func (pr *pureRunner) saveCallHandle(ch *callHandle) {
pr.callHandleLock.Lock()
pr.callHandleMap[ch.c.Model().ID] = ch
pr.callHandleLock.Unlock()
}
func (pr *pureRunner) removeCallHandle(cID string) {
pr.callHandleLock.Lock()
delete(pr.callHandleMap, cID)
pr.callHandleLock.Unlock()
}
func (pr *pureRunner) spawnSubmit(state *callHandle) {
go func() {
err := pr.a.Submit(state.c)
@@ -566,6 +613,15 @@ func (pr *pureRunner) spawnSubmit(state *callHandle) {
}()
}
func (pr *pureRunner) spawnDetachSubmit(state *callHandle) {
go func() {
pr.saveCallHandle(state)
err := pr.a.Submit(state.c)
pr.removeCallHandle(state.c.Model().ID)
state.enqueueCallResponse(err)
}()
}
// handleTryCall based on the TryCall message, tries to place the call on NBIO Agent
func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error {
@@ -612,8 +668,17 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
}
state.c.slotHashId = string(hashId[:])
}
pr.spawnSubmit(state)
if state.c.Type == models.TypeDetached {
if !pr.enableDetach {
err = models.ErrDetachUnsupported
state.enqueueCallResponse(err)
return err
}
pr.spawnDetachSubmit(state)
return nil
}
pr.spawnSubmit(state)
return nil
}
@@ -862,6 +927,29 @@ func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.Runne
return pr.handleStatusCall(ctx)
}
// BeforeCall called before a function is executed
func (pr *pureRunner) BeforeCall(ctx context.Context, call *models.Call) error {
if call.Type != models.TypeDetached {
return nil
}
var err error
// it is an ack sync we send ResultStart message back
pr.callHandleLock.Lock()
ch := pr.callHandleMap[call.ID]
pr.callHandleLock.Unlock()
if ch == nil {
err = models.ErrCallHandlerNotFound
return err
}
ch.enqueueDetached(err)
return nil
}
// AfterCall called after a funcion is executed
func (pr *pureRunner) AfterCall(ctx context.Context, call *models.Call) error {
return nil
}
func DefaultPureRunner(cancel context.CancelFunc, addr string, da CallHandler, tlsCfg *tls.Config) (Agent, error) {
agent := New(da)
@@ -907,6 +995,14 @@ func PureRunnerWithStatusImage(imgName string) PureRunnerOption {
}
}
func PureRunnerWithDetached() PureRunnerOption {
return func(pr *pureRunner) error {
pr.AddCallListener(pr)
pr.enableDetach = true
return nil
}
}
func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) {
pr := &pureRunner{}
@@ -933,6 +1029,7 @@ func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunner
logrus.Warn("Running pure runner in insecure mode!")
}
pr.callHandleMap = make(map[string]*callHandle)
pr.gRPCServer = grpc.NewServer(opts...)
runner.RegisterRunnerProtocolServer(pr.gRPCServer, pr)