mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Make PureRunner an Agent so that it encapsulates its grpc server (#834)
* Refactor PureRunner as an Agent so that it encapsulates its grpc server * Maintain a list of extra contexts for the server to select on to handle errors and cancellations
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
runner "github.com/fnproject/fn/api/agent/grpc"
|
runner "github.com/fnproject/fn/api/agent/grpc"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
|
"github.com/fnproject/fn/fnext"
|
||||||
"github.com/go-openapi/strfmt"
|
"github.com/go-openapi/strfmt"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -228,6 +229,8 @@ func (prcm *pureRunnerCapacityManager) releaseCapacity(units uint64) {
|
|||||||
panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity")
|
panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
|
||||||
|
// and provides the gRPC server that implements the LB <-> Runner protocol.
|
||||||
type pureRunner struct {
|
type pureRunner struct {
|
||||||
gRPCServer *grpc.Server
|
gRPCServer *grpc.Server
|
||||||
listen string
|
listen string
|
||||||
@@ -236,6 +239,33 @@ type pureRunner struct {
|
|||||||
capacity pureRunnerCapacityManager
|
capacity pureRunnerCapacityManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
|
||||||
|
return pr.a.GetCall(opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *pureRunner) Submit(Call) error {
|
||||||
|
return errors.New("Submit cannot be called directly in a Pure Runner.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *pureRunner) Close() error {
|
||||||
|
// First stop accepting requests
|
||||||
|
pr.gRPCServer.GracefulStop()
|
||||||
|
// Then let the agent finish
|
||||||
|
err := pr.a.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
|
||||||
|
pr.a.AddCallListener(cl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *pureRunner) Enqueue(context.Context, *models.Call) error {
|
||||||
|
return errors.New("Enqueue cannot be called directly in a Pure Runner.")
|
||||||
|
}
|
||||||
|
|
||||||
func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) {
|
func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) {
|
||||||
// Only start it once!
|
// Only start it once!
|
||||||
state.stateMutex.Lock()
|
state.stateMutex.Lock()
|
||||||
@@ -505,18 +535,36 @@ func (pr *pureRunner) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreatePureRunner(addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) {
|
func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) {
|
||||||
|
var pr *pureRunner
|
||||||
|
var err error
|
||||||
if cert != "" && key != "" && ca != "" {
|
if cert != "" && key != "" && ca != "" {
|
||||||
c, err := creds(cert, key, ca)
|
c, err := creds(cert, key, ca)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
|
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return createPureRunner(addr, a, c)
|
pr, err = createPureRunner(addr, a, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warn("Running pure runner in insecure mode!")
|
||||||
|
pr, err = createPureRunner(addr, a, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Warn("Running pure runner in insecure mode!")
|
go func() {
|
||||||
return createPureRunner(addr, a, nil)
|
err := pr.Start()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("Failed to start pure runner")
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return pr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
|
func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -107,6 +108,8 @@ type Server struct {
|
|||||||
rootMiddlewares []fnext.Middleware
|
rootMiddlewares []fnext.Middleware
|
||||||
apiMiddlewares []fnext.Middleware
|
apiMiddlewares []fnext.Middleware
|
||||||
promExporter *prometheus.Exporter
|
promExporter *prometheus.Exporter
|
||||||
|
// Extensions can append to this list of contexts so that cancellations are properly handled.
|
||||||
|
extraCtxs []context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func nodeTypeFromString(value string) ServerNodeType {
|
func nodeTypeFromString(value string) ServerNodeType {
|
||||||
@@ -392,7 +395,15 @@ func WithAgentFromEnv() ServerOption {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.agent = agent.NewSyncOnly(agent.NewCachedDataAccess(ds))
|
grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort)
|
||||||
|
delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds))
|
||||||
|
cancelCtx, cancel := context.WithCancel(ctx)
|
||||||
|
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.agent = prAgent
|
||||||
|
s.extraCtxs = append(s.extraCtxs, cancelCtx)
|
||||||
case ServerTypeLB:
|
case ServerTypeLB:
|
||||||
s.nodeType = ServerTypeLB
|
s.nodeType = ServerTypeLB
|
||||||
runnerURL := getEnv(EnvRunnerURL, "")
|
runnerURL := getEnv(EnvRunnerURL, "")
|
||||||
@@ -451,6 +462,14 @@ func WithAgentFromEnv() ServerOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithExtraCtx appends a context to the list of contexts the server will watch for cancellations / errors / signals.
|
||||||
|
func WithExtraCtx(extraCtx context.Context) ServerOption {
|
||||||
|
return func(ctx context.Context, s *Server) error {
|
||||||
|
s.extraCtxs = append(s.extraCtxs, extraCtx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new Functions server with the opts given. For convenience, users may
|
// New creates a new Functions server with the opts given. For convenience, users may
|
||||||
// prefer to use NewFromEnv but New is more flexible if needed.
|
// prefer to use NewFromEnv but New is more flexible if needed.
|
||||||
func New(ctx context.Context, opts ...ServerOption) *Server {
|
func New(ctx context.Context, opts ...ServerOption) *Server {
|
||||||
@@ -600,24 +619,6 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
|
|||||||
|
|
||||||
installChildReaper()
|
installChildReaper()
|
||||||
|
|
||||||
if s.nodeType == ServerTypePureRunner {
|
|
||||||
// Run grpc too
|
|
||||||
grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort)
|
|
||||||
pr, err := agent.CreatePureRunner(grpcAddr, s.agent, s.cert, s.certKey, s.certAuthority)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("Pure runner server creation error")
|
|
||||||
cancel()
|
|
||||||
} else {
|
|
||||||
go func() {
|
|
||||||
err := pr.Start()
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("fail to start pure runner")
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
server := http.Server{
|
server := http.Server{
|
||||||
Addr: listen,
|
Addr: listen,
|
||||||
Handler: &ochttp.Handler{Handler: s.Router},
|
Handler: &ochttp.Handler{Handler: s.Router},
|
||||||
@@ -635,8 +636,23 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// listening for signals or listener errors...
|
// listening for signals or listener errors or cancellations on all registered contexts.
|
||||||
<-ctx.Done()
|
s.extraCtxs = append(s.extraCtxs, ctx)
|
||||||
|
cases := make([]reflect.SelectCase, len(s.extraCtxs))
|
||||||
|
for i, ctx := range s.extraCtxs {
|
||||||
|
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
|
||||||
|
}
|
||||||
|
nth, recv, wasSend := reflect.Select(cases)
|
||||||
|
if wasSend {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"ctxNumber": nth,
|
||||||
|
"receivedValue": recv.String(),
|
||||||
|
}).Debug("Stopping because of received value from done context.")
|
||||||
|
} else {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"ctxNumber": nth,
|
||||||
|
}).Debug("Stopping because of closed channel from done context.")
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: do not wait forever during graceful shutdown (add graceful shutdown timeout)
|
// TODO: do not wait forever during graceful shutdown (add graceful shutdown timeout)
|
||||||
if err := server.Shutdown(context.Background()); err != nil {
|
if err := server.Shutdown(context.Background()); err != nil {
|
||||||
|
|||||||
@@ -171,7 +171,14 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
opts = append(opts, server.WithAgent(agent.NewSyncOnly(agent.NewCachedDataAccess(ds))))
|
grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum)
|
||||||
|
delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds))
|
||||||
|
cancelCtx, cancel := context.WithCancel(ctx)
|
||||||
|
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, "", "", "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx))
|
||||||
|
|
||||||
return server.New(ctx, opts...), nil
|
return server.New(ctx, opts...), nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user