diff --git a/api/agent/agent.go b/api/agent/agent.go index 8a652a637..2d08eda3b 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -15,7 +15,7 @@ import ( "github.com/fnproject/fn/api/agent/drivers/docker" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/opentracing/opentracing-go" @@ -111,14 +111,14 @@ type Agent interface { // Return the http.Handler used to handle Prometheus metric requests PromHandler() http.Handler - AddCallListener(extenders.CallListener) + AddCallListener(extensions.CallListener) } type agent struct { // TODO maybe these should be on GetCall? idk. was getting bloated. mq models.MessageQueue ds models.Datastore - callListeners []extenders.CallListener + callListeners []extensions.CallListener driver drivers.Driver @@ -207,13 +207,8 @@ func (a *agent) Submit(callI Call) error { // to make this remove the container asynchronously? defer slot.Close() // notify our slot is free once we're done - err = a.fireBeforeCall(ctx, call.Model()) - if err != nil { - return fmt.Errorf("BeforeCall: %v", err) - } - // TODO Start is checking the timer now, we could do it here, too. - err = call.Start(ctx) + err = call.Start(ctx, a) if err != nil { a.stats.Dequeue(callI.Model().Path) return err @@ -237,13 +232,7 @@ func (a *agent) Submit(callI Call) error { // TODO: we need to allocate more time to store the call + logs in case the call timed out, // but this could put us over the timeout if the call did not reply yet (need better policy). ctx = opentracing.ContextWithSpan(context.Background(), span) - call.End(ctx, err) - - err = a.fireAfterCall(ctx, call.Model()) - if err != nil { - return fmt.Errorf("AfterCall: %v", err) - } - + err = call.End(ctx, err, a) return err } diff --git a/api/agent/call.go b/api/agent/call.go index f3392ed80..8ebe29c6e 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -29,13 +29,13 @@ type Call interface { // etc. // TODO Start and End can likely be unexported as they are only used in the agent, // and on a type which is constructed in a specific agent. meh. - Start(ctx context.Context) error + Start(ctx context.Context, t callTrigger) error // End will be called immediately after attempting a call execution, // regardless of whether the execution failed or not. An error will be passed // to End, which if nil indicates a successful execution. Any error returned // from End will be returned as the error from Submit. - End(ctx context.Context, err error) + End(ctx context.Context, err error, t callTrigger) error } // TODO build w/o closures... lazy @@ -278,7 +278,7 @@ type call struct { func (c *call) Model() *models.Call { return c.Call } -func (c *call) Start(ctx context.Context) error { +func (c *call) Start(ctx context.Context, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start") defer span.Finish() @@ -316,10 +316,16 @@ func (c *call) Start(ctx context.Context) error { return err // let another thread try this } } + + err := t.fireBeforeCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("BeforeCall: %v", err) + } + return nil } -func (c *call) End(ctx context.Context, err error) { +func (c *call) End(ctx context.Context, err error, t callTrigger) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end") defer span.Finish() @@ -331,7 +337,7 @@ func (c *call) End(ctx context.Context, err error) { case context.DeadlineExceeded: c.Status = "timeout" default: - // XXX (reed): should we append the error to logs? Error field? (TR) think so, otherwise it's lost looks like? + // XXX (reed): should we append the error to logs? Error field? (TR) yes, think so, otherwise it's lost looks like? c.Status = "error" } @@ -344,14 +350,23 @@ func (c *call) End(ctx context.Context, err error) { // TODO: this should be update, really if err := c.ds.InsertCall(ctx, c.Call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") + return err } if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil { common.Logger(ctx).WithError(err).Error("error uploading log") + return err } // NOTE call this after InsertLog or the buffer will get reset c.stderr.Close() + + err = t.fireAfterCall(ctx, c.Model()) + if err != nil { + return fmt.Errorf("AfterCall: %v", err) + } + return err + } func fakeHandler(http.ResponseWriter, *http.Request, Params) {} diff --git a/api/agent/listeners.go b/api/agent/listeners.go index d538079a8..d40fa3b0e 100644 --- a/api/agent/listeners.go +++ b/api/agent/listeners.go @@ -3,11 +3,16 @@ package agent import ( "context" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/models" ) -func (a *agent) AddCallListener(listener extenders.CallListener) { +type callTrigger interface { + fireBeforeCall(context.Context, *models.Call) error + fireAfterCall(context.Context, *models.Call) error +} + +func (a *agent) AddCallListener(listener extensions.CallListener) { a.callListeners = append(a.callListeners, listener) } diff --git a/api/extensions/listeners.go b/api/extensions/listeners.go index 431267efb..e8042bc86 100644 --- a/api/extensions/listeners.go +++ b/api/extensions/listeners.go @@ -1,4 +1,4 @@ -package extenders +package extensions import ( "context" diff --git a/api/server/app_listeners.go b/api/server/app_listeners.go index 798d931a4..8b78954ac 100644 --- a/api/server/app_listeners.go +++ b/api/server/app_listeners.go @@ -3,12 +3,12 @@ package server import ( "context" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/models" ) // AddAppListener adds a listener that will be notified on App created. -func (s *Server) AddAppListener(listener extenders.AppListener) { +func (s *Server) AddAppListener(listener extensions.AppListener) { s.appListeners = append(s.appListeners, listener) } diff --git a/api/server/listeners.go b/api/server/listeners.go index bfe8b8f49..fd842e583 100644 --- a/api/server/listeners.go +++ b/api/server/listeners.go @@ -1,8 +1,8 @@ package server -import "github.com/fnproject/fn/api/extenders" +import "github.com/fnproject/fn/api/extensions" // AddCallListener adds a listener that will be fired before and after a function is executed. -func (s *Server) AddCallListener(listener extenders.CallListener) { +func (s *Server) AddCallListener(listener extensions.CallListener) { s.Agent.AddCallListener(listener) } diff --git a/api/server/server.go b/api/server/server.go index c596c2fad..94f25f793 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -17,7 +17,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/datastore/cache" - "github.com/fnproject/fn/api/extenders" + "github.com/fnproject/fn/api/extensions" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -48,7 +48,7 @@ type Server struct { MQ models.MessageQueue LogDB models.LogStore - appListeners []extenders.AppListener + appListeners []extensions.AppListener middlewares []Middleware }