diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index b5dd96491..63bb3ae1c 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -17,6 +17,7 @@ import ( runner "github.com/fnproject/fn/api/agent/grpc" "github.com/fnproject/fn/api/models" + "github.com/fnproject/fn/fnext" "github.com/go-openapi/strfmt" "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" @@ -228,6 +229,8 @@ func (prcm *pureRunnerCapacityManager) releaseCapacity(units uint64) { panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity") } +// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it +// and provides the gRPC server that implements the LB <-> Runner protocol. type pureRunner struct { gRPCServer *grpc.Server listen string @@ -236,6 +239,33 @@ type pureRunner struct { capacity pureRunnerCapacityManager } +func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) { + return pr.a.GetCall(opts...) +} + +func (pr *pureRunner) Submit(Call) error { + return errors.New("Submit cannot be called directly in a Pure Runner.") +} + +func (pr *pureRunner) Close() error { + // First stop accepting requests + pr.gRPCServer.GracefulStop() + // Then let the agent finish + err := pr.a.Close() + if err != nil { + return err + } + return nil +} + +func (pr *pureRunner) AddCallListener(cl fnext.CallListener) { + pr.a.AddCallListener(cl) +} + +func (pr *pureRunner) Enqueue(context.Context, *models.Call) error { + return errors.New("Enqueue cannot be called directly in a Pure Runner.") +} + func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) { // Only start it once! state.stateMutex.Lock() @@ -505,18 +535,36 @@ func (pr *pureRunner) Start() error { return err } -func CreatePureRunner(addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) { +func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) { + var pr *pureRunner + var err error if cert != "" && key != "" && ca != "" { c, err := creds(cert, key, ca) if err != nil { logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!") return nil, err } - return createPureRunner(addr, a, c) + pr, err = createPureRunner(addr, a, c) + if err != nil { + return nil, err + } + } else { + logrus.Warn("Running pure runner in insecure mode!") + pr, err = createPureRunner(addr, a, nil) + if err != nil { + return nil, err + } } - logrus.Warn("Running pure runner in insecure mode!") - return createPureRunner(addr, a, nil) + go func() { + err := pr.Start() + if err != nil { + logrus.WithError(err).Error("Failed to start pure runner") + cancel() + } + }() + + return pr, nil } func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) { diff --git a/api/server/server.go b/api/server/server.go index 0f9d545fb..c2d0f1c26 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -11,6 +11,7 @@ import ( "os" "path" "path/filepath" + "reflect" "strconv" "strings" "syscall" @@ -107,6 +108,8 @@ type Server struct { rootMiddlewares []fnext.Middleware apiMiddlewares []fnext.Middleware promExporter *prometheus.Exporter + // Extensions can append to this list of contexts so that cancellations are properly handled. + extraCtxs []context.Context } func nodeTypeFromString(value string) ServerNodeType { @@ -392,7 +395,15 @@ func WithAgentFromEnv() ServerOption { if err != nil { return err } - s.agent = agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) + grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort) + delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) + cancelCtx, cancel := context.WithCancel(ctx) + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority) + if err != nil { + return err + } + s.agent = prAgent + s.extraCtxs = append(s.extraCtxs, cancelCtx) case ServerTypeLB: s.nodeType = ServerTypeLB runnerURL := getEnv(EnvRunnerURL, "") @@ -451,6 +462,14 @@ func WithAgentFromEnv() ServerOption { } } +// WithExtraCtx appends a context to the list of contexts the server will watch for cancellations / errors / signals. +func WithExtraCtx(extraCtx context.Context) ServerOption { + return func(ctx context.Context, s *Server) error { + s.extraCtxs = append(s.extraCtxs, extraCtx) + return nil + } +} + // New creates a new Functions server with the opts given. For convenience, users may // prefer to use NewFromEnv but New is more flexible if needed. func New(ctx context.Context, opts ...ServerOption) *Server { @@ -600,24 +619,6 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { installChildReaper() - if s.nodeType == ServerTypePureRunner { - // Run grpc too - grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort) - pr, err := agent.CreatePureRunner(grpcAddr, s.agent, s.cert, s.certKey, s.certAuthority) - if err != nil { - logrus.WithError(err).Error("Pure runner server creation error") - cancel() - } else { - go func() { - err := pr.Start() - if err != nil { - logrus.WithError(err).Error("fail to start pure runner") - cancel() - } - }() - } - } - server := http.Server{ Addr: listen, Handler: &ochttp.Handler{Handler: s.Router}, @@ -635,8 +636,23 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { } }() - // listening for signals or listener errors... - <-ctx.Done() + // listening for signals or listener errors or cancellations on all registered contexts. + s.extraCtxs = append(s.extraCtxs, ctx) + cases := make([]reflect.SelectCase, len(s.extraCtxs)) + for i, ctx := range s.extraCtxs { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())} + } + nth, recv, wasSend := reflect.Select(cases) + if wasSend { + logrus.WithFields(logrus.Fields{ + "ctxNumber": nth, + "receivedValue": recv.String(), + }).Debug("Stopping because of received value from done context.") + } else { + logrus.WithFields(logrus.Fields{ + "ctxNumber": nth, + }).Debug("Stopping because of closed channel from done context.") + } // TODO: do not wait forever during graceful shutdown (add graceful shutdown timeout) if err := server.Shutdown(context.Background()); err != nil { diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 1cd35dfe6..6b03170dd 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -171,7 +171,14 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro if err != nil { return nil, err } - opts = append(opts, server.WithAgent(agent.NewSyncOnly(agent.NewCachedDataAccess(ds)))) + grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) + delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) + cancelCtx, cancel := context.WithCancel(ctx) + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, "", "", "") + if err != nil { + return nil, err + } + opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx)) return server.New(ctx, opts...), nil }