diff --git a/api/agent/agent.go b/api/agent/agent.go index 5936bfa1f..201d92bba 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -15,6 +15,7 @@ import ( "github.com/fnproject/fn/api/agent/drivers/docker" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/opentracing/opentracing-go" @@ -110,12 +111,14 @@ type Agent interface { // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler + AddCallListener(extensions.CallListener) } type agent struct { // TODO maybe these should be on GetCall? idk. was getting bloated. - mq models.MessageQueue - ds models.Datastore + mq models.MessageQueue + ds models.Datastore + callListeners []extensions.CallListener driver drivers.Driver @@ -207,7 +210,7 @@ func (a *agent) Submit(callI Call) error { defer slot.Close() // notify our slot is free once we're done // TODO Start is checking the timer now, we could do it here, too. - err = call.Start(ctx) + err = call.Start(ctx, a) if err != nil { a.stats.Dequeue(callI.Model().Path) return err @@ -231,8 +234,7 @@ func (a *agent) Submit(callI Call) error { // TODO: we need to allocate more time to store the call + logs in case the call timed out, // but this could put us over the timeout if the call did not reply yet (need better policy). ctx = opentracing.ContextWithSpan(context.Background(), span) - call.End(ctx, err) - + err = call.End(ctx, err, a) return err } diff --git a/api/agent/call.go b/api/agent/call.go index 07a69e8d9..f8a29b60d 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -29,13 +29,13 @@ type Call interface { // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. - Start(ctx context.Context) error + Start(ctx context.Context, t callTrigger) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. - End(ctx context.Context, err error) + End(ctx context.Context, err error, t callTrigger) error } // TODO build w/o closures... lazy @@ -278,7 +278,7 @@ type call struct { func (c *call) Model() *models.Call { return c.Call } -func (c *call) Start(ctx context.Context) error { +func (c *call) Start(ctx context.Context, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start") defer span.Finish() @@ -316,22 +316,28 @@ func (c *call) Start(ctx context.Context) error { return err // let another thread try this } } + + err := t.fireBeforeCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("BeforeCall: %v", err) + } + return nil } -func (c *call) End(ctx context.Context, err error) { +func (c *call) End(ctx context.Context, errIn error, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end") defer span.Finish() c.CompletedAt = strfmt.DateTime(time.Now()) - switch err { + switch errIn { case nil: c.Status = "success" case context.DeadlineExceeded: c.Status = "timeout" default: - // XXX (reed): should we append the error to logs? Error field? + // XXX (reed): should we append the error to logs? Error field? (TR) yes, think so, otherwise it's lost looks like? c.Status = "error" } @@ -344,14 +350,21 @@ func (c *call) End(ctx context.Context, err error) { // TODO: this should be update, really if err := c.ds.InsertCall(ctx, c.Call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + // note: Not returning err here since the job could have already finished successfully. } if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") + // note: Not returning err here since the job could have already finished successfully. } // NOTE call this after InsertLog or the buffer will get reset c.stderr.Close() + err := t.fireAfterCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("AfterCall: %v", err) + } + return errIn } func fakeHandler(http.ResponseWriter, *http.Request, Params) {} diff --git a/api/agent/listeners.go b/api/agent/listeners.go new file mode 100644 index 000000000..d40fa3b0e --- /dev/null +++ b/api/agent/listeners.go @@ -0,0 +1,37 @@ +package agent + +import ( + "context" + + "github.com/fnproject/fn/api/extensions" + "github.com/fnproject/fn/api/models" +) + +type callTrigger interface { + fireBeforeCall(context.Context, *models.Call) error + fireAfterCall(context.Context, *models.Call) error +} + +func (a *agent) AddCallListener(listener extensions.CallListener) { + a.callListeners = append(a.callListeners, listener) +} + +func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { + for _, l := range a.callListeners { + err := l.BeforeCall(ctx, call) + if err != nil { + return err + } + } + return nil +} + +func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { + for _, l := range a.callListeners { + err := l.AfterCall(ctx, call) + if err != nil { + return err + } + } + return nil +} diff --git a/api/extensions/listeners.go b/api/extensions/listeners.go new file mode 100644 index 000000000..e8042bc86 --- /dev/null +++ b/api/extensions/listeners.go @@ -0,0 +1,31 @@ +package extensions + +import ( + "context" + + "github.com/fnproject/fn/api/models" +) + +// AppListener is an interface used to inject custom code at key points in app lifecycle. +type AppListener interface { + // BeforeAppCreate called right before creating App in the database + BeforeAppCreate(ctx context.Context, app *models.App) error + // AfterAppCreate called after creating App in the database + AfterAppCreate(ctx context.Context, app *models.App) error + // BeforeAppUpdate called right before updating App in the database + BeforeAppUpdate(ctx context.Context, app *models.App) error + // AfterAppUpdate called after updating App in the database + AfterAppUpdate(ctx context.Context, app *models.App) error + // BeforeAppDelete called right before deleting App in the database + BeforeAppDelete(ctx context.Context, app *models.App) error + // AfterAppDelete called after deleting App in the database + AfterAppDelete(ctx context.Context, app *models.App) error +} + +// CallListener enables callbacks around Call events +type CallListener interface { + // BeforeCall called before a function is executed + BeforeCall(ctx context.Context, call *models.Call) error + // AfterCall called after a function completes + AfterCall(ctx context.Context, call *models.Call) error +} diff --git a/api/server/app_listeners.go b/api/server/app_listeners.go index fdfa3505d..8b78954ac 100644 --- a/api/server/app_listeners.go +++ b/api/server/app_listeners.go @@ -3,27 +3,12 @@ package server import ( "context" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/models" ) -// AppListener is an interface used to inject custom code at key points in app lifecycle. -type AppListener interface { - // BeforeAppCreate called right before creating App in the database - BeforeAppCreate(ctx context.Context, app *models.App) error - // AfterAppCreate called after creating App in the database - AfterAppCreate(ctx context.Context, app *models.App) error - // BeforeAppUpdate called right before updating App in the database - BeforeAppUpdate(ctx context.Context, app *models.App) error - // AfterAppUpdate called after updating App in the database - AfterAppUpdate(ctx context.Context, app *models.App) error - // BeforeAppDelete called right before deleting App in the database - BeforeAppDelete(ctx context.Context, app *models.App) error - // AfterAppDelete called after deleting App in the database - AfterAppDelete(ctx context.Context, app *models.App) error -} - // AddAppListener adds a listener that will be notified on App created. -func (s *Server) AddAppListener(listener AppListener) { +func (s *Server) AddAppListener(listener extensions.AppListener) { s.appListeners = append(s.appListeners, listener) } diff --git a/api/server/listeners.go b/api/server/listeners.go new file mode 100644 index 000000000..fd842e583 --- /dev/null +++ b/api/server/listeners.go @@ -0,0 +1,8 @@ +package server + +import "github.com/fnproject/fn/api/extensions" + +// AddCallListener adds a listener that will be fired before and after a function is executed. +func (s *Server) AddCallListener(listener extensions.CallListener) { + s.Agent.AddCallListener(listener) +} diff --git a/api/server/runner.go b/api/server/runner.go index d3791da40..48237fe02 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -27,8 +27,6 @@ func (s *Server) handleRequest(c *gin.Context) { return } - ctx := c.Request.Context() - r, routeExists := c.Get(api.Path) if !routeExists { r = "/" @@ -39,11 +37,8 @@ func (s *Server) handleRequest(c *gin.Context) { Path: path.Clean(r.(string)), } - s.FireBeforeDispatch(ctx, reqRoute) - s.serve(c, reqRoute.AppName, reqRoute.Path) - s.FireAfterDispatch(ctx, reqRoute) } // TODO it would be nice if we could make this have nothing to do with the gin.Context but meh @@ -60,7 +55,6 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - // TODO we could add FireBeforeDispatch right here with Call in hand model := call.Model() { // scope this, to disallow ctx use outside of this scope. add id for handleErrorResponse logger ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID}) @@ -96,7 +90,6 @@ func (s *Server) serve(c *gin.Context, appName, path string) { if err != nil { // NOTE if they cancel the request then it will stop the call (kind of cool), // we could filter that error out here too as right now it yells a little - if err == context.DeadlineExceeded { // TODO maneuver // add this, since it means that start may not have been called [and it's relevant] diff --git a/api/server/runner_listeners.go b/api/server/runner_listeners.go deleted file mode 100644 index 2934ae2ca..000000000 --- a/api/server/runner_listeners.go +++ /dev/null @@ -1,38 +0,0 @@ -package server - -import ( - "context" - "github.com/fnproject/fn/api/models" -) - -type RunnerListener interface { - // BeforeDispatch called before a function run - BeforeDispatch(ctx context.Context, route *models.Route) error - // AfterDispatch called after a function run - AfterDispatch(ctx context.Context, route *models.Route) error -} - -// AddRunListeners adds a listener that will be fired before and after a function run. -func (s *Server) AddRunnerListener(listener RunnerListener) { - s.runnerListeners = append(s.runnerListeners, listener) -} - -func (s *Server) FireBeforeDispatch(ctx context.Context, route *models.Route) error { - for _, l := range s.runnerListeners { - err := l.BeforeDispatch(ctx, route) - if err != nil { - return err - } - } - return nil -} - -func (s *Server) FireAfterDispatch(ctx context.Context, route *models.Route) error { - for _, l := range s.runnerListeners { - err := l.AfterDispatch(ctx, route) - if err != nil { - return err - } - } - return nil -} diff --git a/api/server/server.go b/api/server/server.go index 24610a50d..9f05bc41f 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/datastore/cache" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -47,9 +48,8 @@ type Server struct { MQ models.MessageQueue LogDB models.LogStore - appListeners []AppListener - middlewares []Middleware - runnerListeners []RunnerListener + appListeners []extensions.AppListener + middlewares []Middleware } // NewFromEnv creates a new Functions server based on env vars. diff --git a/docs/operating/extending.md b/docs/operating/extending.md index 0921f173b..e17c48276 100644 --- a/docs/operating/extending.md +++ b/docs/operating/extending.md @@ -14,12 +14,13 @@ Listeners are the main way to extend Fn. The following listener types are supported: -* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#AppListener) -* Runner Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#RunnerListener) +* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#AppListener) +* Call Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#CallListener) ### Creating a Listener -You can easily use app and runner listeners by creating a struct with valid methods satisfying the interface for the respective listener and adding it to the Fn API +You can easily use add listeners by creating a struct with valid methods satisfying the interface +for the respective listener, adding it to `main.go` then compiling. Example: