From d080c23981f6528877749e65aad8d37adbbd3009 Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Mon, 9 Oct 2017 18:38:29 -0600 Subject: [PATCH 1/6] First draft of modifying RunnerListener to CallListener to get it closer to the action (and named better). --- api/agent/agent.go | 17 +++++++++++++-- api/agent/call.go | 2 +- api/agent/listeners.go | 32 ++++++++++++++++++++++++++++ api/extenders/listeners.go | 31 +++++++++++++++++++++++++++ api/server/app_listeners.go | 19 ++--------------- api/server/listeners.go | 8 +++++++ api/server/runner.go | 8 +------ api/server/runner_listeners.go | 38 ---------------------------------- api/server/server.go | 6 +++--- docs/operating/extending.md | 4 ++-- 10 files changed, 95 insertions(+), 70 deletions(-) create mode 100644 api/agent/listeners.go create mode 100644 api/extenders/listeners.go create mode 100644 api/server/listeners.go delete mode 100644 api/server/runner_listeners.go diff --git a/api/agent/agent.go b/api/agent/agent.go index 2f6ecf681..8a652a637 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -15,6 +15,7 @@ import ( "github.com/fnproject/fn/api/agent/drivers/docker" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/extenders" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/opentracing/opentracing-go" @@ -110,12 +111,14 @@ type Agent interface { // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler + AddCallListener(extenders.CallListener) } type agent struct { // TODO maybe these should be on GetCall? idk. was getting bloated. - mq models.MessageQueue - ds models.Datastore + mq models.MessageQueue + ds models.Datastore + callListeners []extenders.CallListener driver drivers.Driver @@ -204,6 +207,11 @@ func (a *agent) Submit(callI Call) error { // to make this remove the container asynchronously? defer slot.Close() // notify our slot is free once we're done + err = a.fireBeforeCall(ctx, call.Model()) + if err != nil { + return fmt.Errorf("BeforeCall: %v", err) + } + // TODO Start is checking the timer now, we could do it here, too. err = call.Start(ctx) if err != nil { @@ -231,6 +239,11 @@ func (a *agent) Submit(callI Call) error { ctx = opentracing.ContextWithSpan(context.Background(), span) call.End(ctx, err) + err = a.fireAfterCall(ctx, call.Model()) + if err != nil { + return fmt.Errorf("AfterCall: %v", err) + } + return err } diff --git a/api/agent/call.go b/api/agent/call.go index 07a69e8d9..f3392ed80 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -331,7 +331,7 @@ func (c *call) End(ctx context.Context, err error) { case context.DeadlineExceeded: c.Status = "timeout" default: - // XXX (reed): should we append the error to logs? Error field? + // XXX (reed): should we append the error to logs? Error field? (TR) think so, otherwise it's lost looks like? c.Status = "error" } diff --git a/api/agent/listeners.go b/api/agent/listeners.go new file mode 100644 index 000000000..d538079a8 --- /dev/null +++ b/api/agent/listeners.go @@ -0,0 +1,32 @@ +package agent + +import ( + "context" + + "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/models" +) + +func (a *agent) AddCallListener(listener extenders.CallListener) { + a.callListeners = append(a.callListeners, listener) +} + +func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error { + for _, l := range a.callListeners { + err := l.BeforeCall(ctx, call) + if err != nil { + return err + } + } + return nil +} + +func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error { + for _, l := range a.callListeners { + err := l.AfterCall(ctx, call) + if err != nil { + return err + } + } + return nil +} diff --git a/api/extenders/listeners.go b/api/extenders/listeners.go new file mode 100644 index 000000000..431267efb --- /dev/null +++ b/api/extenders/listeners.go @@ -0,0 +1,31 @@ +package extenders + +import ( + "context" + + "github.com/fnproject/fn/api/models" +) + +// AppListener is an interface used to inject custom code at key points in app lifecycle. +type AppListener interface { + // BeforeAppCreate called right before creating App in the database + BeforeAppCreate(ctx context.Context, app *models.App) error + // AfterAppCreate called after creating App in the database + AfterAppCreate(ctx context.Context, app *models.App) error + // BeforeAppUpdate called right before updating App in the database + BeforeAppUpdate(ctx context.Context, app *models.App) error + // AfterAppUpdate called after updating App in the database + AfterAppUpdate(ctx context.Context, app *models.App) error + // BeforeAppDelete called right before deleting App in the database + BeforeAppDelete(ctx context.Context, app *models.App) error + // AfterAppDelete called after deleting App in the database + AfterAppDelete(ctx context.Context, app *models.App) error +} + +// CallListener enables callbacks around Call events +type CallListener interface { + // BeforeCall called before a function is executed + BeforeCall(ctx context.Context, call *models.Call) error + // AfterCall called after a function completes + AfterCall(ctx context.Context, call *models.Call) error +} diff --git a/api/server/app_listeners.go b/api/server/app_listeners.go index fdfa3505d..798d931a4 100644 --- a/api/server/app_listeners.go +++ b/api/server/app_listeners.go @@ -3,27 +3,12 @@ package server import ( "context" + "github.com/fnproject/fn/api/extenders" "github.com/fnproject/fn/api/models" ) -// AppListener is an interface used to inject custom code at key points in app lifecycle. -type AppListener interface { - // BeforeAppCreate called right before creating App in the database - BeforeAppCreate(ctx context.Context, app *models.App) error - // AfterAppCreate called after creating App in the database - AfterAppCreate(ctx context.Context, app *models.App) error - // BeforeAppUpdate called right before updating App in the database - BeforeAppUpdate(ctx context.Context, app *models.App) error - // AfterAppUpdate called after updating App in the database - AfterAppUpdate(ctx context.Context, app *models.App) error - // BeforeAppDelete called right before deleting App in the database - BeforeAppDelete(ctx context.Context, app *models.App) error - // AfterAppDelete called after deleting App in the database - AfterAppDelete(ctx context.Context, app *models.App) error -} - // AddAppListener adds a listener that will be notified on App created. -func (s *Server) AddAppListener(listener AppListener) { +func (s *Server) AddAppListener(listener extenders.AppListener) { s.appListeners = append(s.appListeners, listener) } diff --git a/api/server/listeners.go b/api/server/listeners.go new file mode 100644 index 000000000..bfe8b8f49 --- /dev/null +++ b/api/server/listeners.go @@ -0,0 +1,8 @@ +package server + +import "github.com/fnproject/fn/api/extenders" + +// AddCallListener adds a listener that will be fired before and after a function is executed. +func (s *Server) AddCallListener(listener extenders.CallListener) { + s.Agent.AddCallListener(listener) +} diff --git a/api/server/runner.go b/api/server/runner.go index d3791da40..70bdb0263 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -27,8 +27,6 @@ func (s *Server) handleRequest(c *gin.Context) { return } - ctx := c.Request.Context() - r, routeExists := c.Get(api.Path) if !routeExists { r = "/" @@ -39,11 +37,8 @@ func (s *Server) handleRequest(c *gin.Context) { Path: path.Clean(r.(string)), } - s.FireBeforeDispatch(ctx, reqRoute) - s.serve(c, reqRoute.AppName, reqRoute.Path) - s.FireAfterDispatch(ctx, reqRoute) } // TODO it would be nice if we could make this have nothing to do with the gin.Context but meh @@ -60,7 +55,6 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - // TODO we could add FireBeforeDispatch right here with Call in hand model := call.Model() { // scope this, to disallow ctx use outside of this scope. add id for handleErrorResponse logger ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID}) @@ -92,11 +86,11 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } + err = doCall(call) err = s.Agent.Submit(call) if err != nil { // NOTE if they cancel the request then it will stop the call (kind of cool), // we could filter that error out here too as right now it yells a little - if err == context.DeadlineExceeded { // TODO maneuver // add this, since it means that start may not have been called [and it's relevant] diff --git a/api/server/runner_listeners.go b/api/server/runner_listeners.go deleted file mode 100644 index 2934ae2ca..000000000 --- a/api/server/runner_listeners.go +++ /dev/null @@ -1,38 +0,0 @@ -package server - -import ( - "context" - "github.com/fnproject/fn/api/models" -) - -type RunnerListener interface { - // BeforeDispatch called before a function run - BeforeDispatch(ctx context.Context, route *models.Route) error - // AfterDispatch called after a function run - AfterDispatch(ctx context.Context, route *models.Route) error -} - -// AddRunListeners adds a listener that will be fired before and after a function run. -func (s *Server) AddRunnerListener(listener RunnerListener) { - s.runnerListeners = append(s.runnerListeners, listener) -} - -func (s *Server) FireBeforeDispatch(ctx context.Context, route *models.Route) error { - for _, l := range s.runnerListeners { - err := l.BeforeDispatch(ctx, route) - if err != nil { - return err - } - } - return nil -} - -func (s *Server) FireAfterDispatch(ctx context.Context, route *models.Route) error { - for _, l := range s.runnerListeners { - err := l.AfterDispatch(ctx, route) - if err != nil { - return err - } - } - return nil -} diff --git a/api/server/server.go b/api/server/server.go index 6ea1432b7..c596c2fad 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/datastore/cache" + "github.com/fnproject/fn/api/extenders" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -47,9 +48,8 @@ type Server struct { MQ models.MessageQueue LogDB models.LogStore - appListeners []AppListener - middlewares []Middleware - runnerListeners []RunnerListener + appListeners []extenders.AppListener + middlewares []Middleware } // NewFromEnv creates a new Functions server based on env vars. diff --git a/docs/operating/extending.md b/docs/operating/extending.md index 0921f173b..59de6ed62 100644 --- a/docs/operating/extending.md +++ b/docs/operating/extending.md @@ -14,8 +14,8 @@ Listeners are the main way to extend Fn. The following listener types are supported: -* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#AppListener) -* Runner Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#RunnerListener) +* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#AppListener) +* Call Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#CallListener) ### Creating a Listener From 633b26594ebfbb33ac92fcc555d14e0aa2ea8b7f Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Mon, 9 Oct 2017 19:26:40 -0600 Subject: [PATCH 2/6] minor --- api/server/runner.go | 1 - docs/operating/extending.md | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/server/runner.go b/api/server/runner.go index 70bdb0263..48237fe02 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -86,7 +86,6 @@ func (s *Server) serve(c *gin.Context, appName, path string) { return } - err = doCall(call) err = s.Agent.Submit(call) if err != nil { // NOTE if they cancel the request then it will stop the call (kind of cool), diff --git a/docs/operating/extending.md b/docs/operating/extending.md index 59de6ed62..e17c48276 100644 --- a/docs/operating/extending.md +++ b/docs/operating/extending.md @@ -19,7 +19,8 @@ The following listener types are supported: ### Creating a Listener -You can easily use app and runner listeners by creating a struct with valid methods satisfying the interface for the respective listener and adding it to the Fn API +You can easily use add listeners by creating a struct with valid methods satisfying the interface +for the respective listener, adding it to `main.go` then compiling. Example: From fb386b286470aaafc006cb20151d509e119b8814 Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Wed, 25 Oct 2017 13:50:36 +0200 Subject: [PATCH 3/6] Change extenders -> extensions --- api/{extenders => extensions}/listeners.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename api/{extenders => extensions}/listeners.go (100%) diff --git a/api/extenders/listeners.go b/api/extensions/listeners.go similarity index 100% rename from api/extenders/listeners.go rename to api/extensions/listeners.go From de04562b8e19901b6639907f2f915885b855422a Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Wed, 25 Oct 2017 14:11:27 +0200 Subject: [PATCH 4/6] Pushed triggers into start() and end() --- api/agent/agent.go | 21 +++++---------------- api/agent/call.go | 25 ++++++++++++++++++++----- api/agent/listeners.go | 9 +++++++-- api/extensions/listeners.go | 2 +- api/server/app_listeners.go | 4 ++-- api/server/listeners.go | 4 ++-- api/server/server.go | 4 ++-- 7 files changed, 39 insertions(+), 30 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 8a652a637..2d08eda3b 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -15,7 +15,7 @@ import ( "github.com/fnproject/fn/api/agent/drivers/docker" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/opentracing/opentracing-go" @@ -111,14 +111,14 @@ type Agent interface { // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler - AddCallListener(extenders.CallListener) + AddCallListener(extensions.CallListener) } type agent struct { // TODO maybe these should be on GetCall? idk. was getting bloated. mq models.MessageQueue ds models.Datastore - callListeners []extenders.CallListener + callListeners []extensions.CallListener driver drivers.Driver @@ -207,13 +207,8 @@ func (a *agent) Submit(callI Call) error { // to make this remove the container asynchronously? defer slot.Close() // notify our slot is free once we're done - err = a.fireBeforeCall(ctx, call.Model()) - if err != nil { - return fmt.Errorf("BeforeCall: %v", err) - } - // TODO Start is checking the timer now, we could do it here, too. - err = call.Start(ctx) + err = call.Start(ctx, a) if err != nil { a.stats.Dequeue(callI.Model().Path) return err @@ -237,13 +232,7 @@ func (a *agent) Submit(callI Call) error { // TODO: we need to allocate more time to store the call + logs in case the call timed out, // but this could put us over the timeout if the call did not reply yet (need better policy). ctx = opentracing.ContextWithSpan(context.Background(), span) - call.End(ctx, err) - - err = a.fireAfterCall(ctx, call.Model()) - if err != nil { - return fmt.Errorf("AfterCall: %v", err) - } - + err = call.End(ctx, err, a) return err } diff --git a/api/agent/call.go b/api/agent/call.go index f3392ed80..8ebe29c6e 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -29,13 +29,13 @@ type Call interface { // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. - Start(ctx context.Context) error + Start(ctx context.Context, t callTrigger) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. - End(ctx context.Context, err error) + End(ctx context.Context, err error, t callTrigger) error } // TODO build w/o closures... lazy @@ -278,7 +278,7 @@ type call struct { func (c *call) Model() *models.Call { return c.Call } -func (c *call) Start(ctx context.Context) error { +func (c *call) Start(ctx context.Context, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start") defer span.Finish() @@ -316,10 +316,16 @@ func (c *call) Start(ctx context.Context) error { return err // let another thread try this } } + + err := t.fireBeforeCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("BeforeCall: %v", err) + } + return nil } -func (c *call) End(ctx context.Context, err error) { +func (c *call) End(ctx context.Context, err error, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end") defer span.Finish() @@ -331,7 +337,7 @@ func (c *call) End(ctx context.Context, err error) { case context.DeadlineExceeded: c.Status = "timeout" default: - // XXX (reed): should we append the error to logs? Error field? (TR) think so, otherwise it's lost looks like? + // XXX (reed): should we append the error to logs? Error field? (TR) yes, think so, otherwise it's lost looks like? c.Status = "error" } @@ -344,14 +350,23 @@ func (c *call) End(ctx context.Context, err error) { // TODO: this should be update, really if err := c.ds.InsertCall(ctx, c.Call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + return err } if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") + return err } // NOTE call this after InsertLog or the buffer will get reset c.stderr.Close() + + err = t.fireAfterCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("AfterCall: %v", err) + } + return err + } func fakeHandler(http.ResponseWriter, *http.Request, Params) {} diff --git a/api/agent/listeners.go b/api/agent/listeners.go index d538079a8..d40fa3b0e 100644 --- a/api/agent/listeners.go +++ b/api/agent/listeners.go @@ -3,11 +3,16 @@ package agent import ( "context" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/models" ) -func (a *agent) AddCallListener(listener extenders.CallListener) { +type callTrigger interface { + fireBeforeCall(context.Context, *models.Call) error + fireAfterCall(context.Context, *models.Call) error +} + +func (a *agent) AddCallListener(listener extensions.CallListener) { a.callListeners = append(a.callListeners, listener) } diff --git a/api/extensions/listeners.go b/api/extensions/listeners.go index 431267efb..e8042bc86 100644 --- a/api/extensions/listeners.go +++ b/api/extensions/listeners.go @@ -1,4 +1,4 @@ -package extenders +package extensions import ( "context" diff --git a/api/server/app_listeners.go b/api/server/app_listeners.go index 798d931a4..8b78954ac 100644 --- a/api/server/app_listeners.go +++ b/api/server/app_listeners.go @@ -3,12 +3,12 @@ package server import ( "context" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/models" ) // AddAppListener adds a listener that will be notified on App created. -func (s *Server) AddAppListener(listener extenders.AppListener) { +func (s *Server) AddAppListener(listener extensions.AppListener) { s.appListeners = append(s.appListeners, listener) } diff --git a/api/server/listeners.go b/api/server/listeners.go index bfe8b8f49..fd842e583 100644 --- a/api/server/listeners.go +++ b/api/server/listeners.go @@ -1,8 +1,8 @@ package server -import "github.com/fnproject/fn/api/extenders" +import "github.com/fnproject/fn/api/extensions" // AddCallListener adds a listener that will be fired before and after a function is executed. -func (s *Server) AddCallListener(listener extenders.CallListener) { +func (s *Server) AddCallListener(listener extensions.CallListener) { s.Agent.AddCallListener(listener) } diff --git a/api/server/server.go b/api/server/server.go index c596c2fad..94f25f793 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -17,7 +17,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/datastore/cache" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -48,7 +48,7 @@ type Server struct { MQ models.MessageQueue LogDB models.LogStore - appListeners []extenders.AppListener + appListeners []extensions.AppListener middlewares []Middleware } From d30bcb03973c88d493d0f12e149cc9a0da90f36f Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Wed, 25 Oct 2017 14:41:18 +0200 Subject: [PATCH 5/6] Fix lost error --- api/agent/call.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/api/agent/call.go b/api/agent/call.go index 8ebe29c6e..155b02af1 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -325,13 +325,13 @@ func (c *call) Start(ctx context.Context, t callTrigger) error { return nil } -func (c *call) End(ctx context.Context, err error, t callTrigger) error { +func (c *call) End(ctx context.Context, errIn error, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end") defer span.Finish() c.CompletedAt = strfmt.DateTime(time.Now()) - switch err { + switch errIn { case nil: c.Status = "success" case context.DeadlineExceeded: @@ -360,13 +360,11 @@ func (c *call) End(ctx context.Context, err error, t callTrigger) error { // NOTE call this after InsertLog or the buffer will get reset c.stderr.Close() - - err = t.fireAfterCall(ctx, c.Model()) + err := t.fireAfterCall(ctx, c.Model()) if err != nil { return fmt.Errorf("AfterCall: %v", err) } - return err - + return errIn } func fakeHandler(http.ResponseWriter, *http.Request, Params) {} From 965630af1561de242d32af9bdc0ef5b614f4c54c Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Thu, 26 Oct 2017 11:12:08 +0200 Subject: [PATCH 6/6] Remove error returns. --- api/agent/call.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/agent/call.go b/api/agent/call.go index 155b02af1..f8a29b60d 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -350,12 +350,12 @@ func (c *call) End(ctx context.Context, errIn error, t callTrigger) error { // TODO: this should be update, really if err := c.ds.InsertCall(ctx, c.Call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") - return err + // note: Not returning err here since the job could have already finished successfully. } if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") - return err + // note: Not returning err here since the job could have already finished successfully. } // NOTE call this after InsertLog or the buffer will get reset