mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Pushed triggers into start() and end()
This commit is contained in:
@@ -15,7 +15,7 @@ import (
|
|||||||
"github.com/fnproject/fn/api/agent/drivers/docker"
|
"github.com/fnproject/fn/api/agent/drivers/docker"
|
||||||
"github.com/fnproject/fn/api/agent/protocol"
|
"github.com/fnproject/fn/api/agent/protocol"
|
||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
"github.com/fnproject/fn/api/extenders"
|
"github.com/fnproject/fn/api/extensions"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
@@ -111,14 +111,14 @@ type Agent interface {
|
|||||||
|
|
||||||
// Return the http.Handler used to handle Prometheus metric requests
|
// Return the http.Handler used to handle Prometheus metric requests
|
||||||
PromHandler() http.Handler
|
PromHandler() http.Handler
|
||||||
AddCallListener(extenders.CallListener)
|
AddCallListener(extensions.CallListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
type agent struct {
|
type agent struct {
|
||||||
// TODO maybe these should be on GetCall? idk. was getting bloated.
|
// TODO maybe these should be on GetCall? idk. was getting bloated.
|
||||||
mq models.MessageQueue
|
mq models.MessageQueue
|
||||||
ds models.Datastore
|
ds models.Datastore
|
||||||
callListeners []extenders.CallListener
|
callListeners []extensions.CallListener
|
||||||
|
|
||||||
driver drivers.Driver
|
driver drivers.Driver
|
||||||
|
|
||||||
@@ -207,13 +207,8 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
// to make this remove the container asynchronously?
|
// to make this remove the container asynchronously?
|
||||||
defer slot.Close() // notify our slot is free once we're done
|
defer slot.Close() // notify our slot is free once we're done
|
||||||
|
|
||||||
err = a.fireBeforeCall(ctx, call.Model())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("BeforeCall: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO Start is checking the timer now, we could do it here, too.
|
// TODO Start is checking the timer now, we could do it here, too.
|
||||||
err = call.Start(ctx)
|
err = call.Start(ctx, a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.stats.Dequeue(callI.Model().Path)
|
a.stats.Dequeue(callI.Model().Path)
|
||||||
return err
|
return err
|
||||||
@@ -237,13 +232,7 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
||||||
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
||||||
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
||||||
call.End(ctx, err)
|
err = call.End(ctx, err, a)
|
||||||
|
|
||||||
err = a.fireAfterCall(ctx, call.Model())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("AfterCall: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,13 +29,13 @@ type Call interface {
|
|||||||
// etc.
|
// etc.
|
||||||
// TODO Start and End can likely be unexported as they are only used in the agent,
|
// TODO Start and End can likely be unexported as they are only used in the agent,
|
||||||
// and on a type which is constructed in a specific agent. meh.
|
// and on a type which is constructed in a specific agent. meh.
|
||||||
Start(ctx context.Context) error
|
Start(ctx context.Context, t callTrigger) error
|
||||||
|
|
||||||
// End will be called immediately after attempting a call execution,
|
// End will be called immediately after attempting a call execution,
|
||||||
// regardless of whether the execution failed or not. An error will be passed
|
// regardless of whether the execution failed or not. An error will be passed
|
||||||
// to End, which if nil indicates a successful execution. Any error returned
|
// to End, which if nil indicates a successful execution. Any error returned
|
||||||
// from End will be returned as the error from Submit.
|
// from End will be returned as the error from Submit.
|
||||||
End(ctx context.Context, err error)
|
End(ctx context.Context, err error, t callTrigger) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO build w/o closures... lazy
|
// TODO build w/o closures... lazy
|
||||||
@@ -278,7 +278,7 @@ type call struct {
|
|||||||
|
|
||||||
func (c *call) Model() *models.Call { return c.Call }
|
func (c *call) Model() *models.Call { return c.Call }
|
||||||
|
|
||||||
func (c *call) Start(ctx context.Context) error {
|
func (c *call) Start(ctx context.Context, t callTrigger) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
@@ -316,10 +316,16 @@ func (c *call) Start(ctx context.Context) error {
|
|||||||
return err // let another thread try this
|
return err // let another thread try this
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := t.fireBeforeCall(ctx, c.Model())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BeforeCall: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *call) End(ctx context.Context, err error) {
|
func (c *call) End(ctx context.Context, err error, t callTrigger) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
@@ -331,7 +337,7 @@ func (c *call) End(ctx context.Context, err error) {
|
|||||||
case context.DeadlineExceeded:
|
case context.DeadlineExceeded:
|
||||||
c.Status = "timeout"
|
c.Status = "timeout"
|
||||||
default:
|
default:
|
||||||
// XXX (reed): should we append the error to logs? Error field? (TR) think so, otherwise it's lost looks like?
|
// XXX (reed): should we append the error to logs? Error field? (TR) yes, think so, otherwise it's lost looks like?
|
||||||
c.Status = "error"
|
c.Status = "error"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -344,14 +350,23 @@ func (c *call) End(ctx context.Context, err error) {
|
|||||||
// TODO: this should be update, really
|
// TODO: this should be update, really
|
||||||
if err := c.ds.InsertCall(ctx, c.Call); err != nil {
|
if err := c.ds.InsertCall(ctx, c.Call); err != nil {
|
||||||
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil {
|
if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil {
|
||||||
common.Logger(ctx).WithError(err).Error("error uploading log")
|
common.Logger(ctx).WithError(err).Error("error uploading log")
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE call this after InsertLog or the buffer will get reset
|
// NOTE call this after InsertLog or the buffer will get reset
|
||||||
c.stderr.Close()
|
c.stderr.Close()
|
||||||
|
|
||||||
|
err = t.fireAfterCall(ctx, c.Model())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("AfterCall: %v", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func fakeHandler(http.ResponseWriter, *http.Request, Params) {}
|
func fakeHandler(http.ResponseWriter, *http.Request, Params) {}
|
||||||
|
|||||||
@@ -3,11 +3,16 @@ package agent
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/fnproject/fn/api/extenders"
|
"github.com/fnproject/fn/api/extensions"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (a *agent) AddCallListener(listener extenders.CallListener) {
|
type callTrigger interface {
|
||||||
|
fireBeforeCall(context.Context, *models.Call) error
|
||||||
|
fireAfterCall(context.Context, *models.Call) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *agent) AddCallListener(listener extensions.CallListener) {
|
||||||
a.callListeners = append(a.callListeners, listener)
|
a.callListeners = append(a.callListeners, listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package extenders
|
package extensions
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|||||||
@@ -3,12 +3,12 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/fnproject/fn/api/extenders"
|
"github.com/fnproject/fn/api/extensions"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddAppListener adds a listener that will be notified on App created.
|
// AddAppListener adds a listener that will be notified on App created.
|
||||||
func (s *Server) AddAppListener(listener extenders.AppListener) {
|
func (s *Server) AddAppListener(listener extensions.AppListener) {
|
||||||
s.appListeners = append(s.appListeners, listener)
|
s.appListeners = append(s.appListeners, listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import "github.com/fnproject/fn/api/extenders"
|
import "github.com/fnproject/fn/api/extensions"
|
||||||
|
|
||||||
// AddCallListener adds a listener that will be fired before and after a function is executed.
|
// AddCallListener adds a listener that will be fired before and after a function is executed.
|
||||||
func (s *Server) AddCallListener(listener extenders.CallListener) {
|
func (s *Server) AddCallListener(listener extensions.CallListener) {
|
||||||
s.Agent.AddCallListener(listener)
|
s.Agent.AddCallListener(listener)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ import (
|
|||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
"github.com/fnproject/fn/api/datastore"
|
"github.com/fnproject/fn/api/datastore"
|
||||||
"github.com/fnproject/fn/api/datastore/cache"
|
"github.com/fnproject/fn/api/datastore/cache"
|
||||||
"github.com/fnproject/fn/api/extenders"
|
"github.com/fnproject/fn/api/extensions"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
"github.com/fnproject/fn/api/logs"
|
"github.com/fnproject/fn/api/logs"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
@@ -48,7 +48,7 @@ type Server struct {
|
|||||||
MQ models.MessageQueue
|
MQ models.MessageQueue
|
||||||
LogDB models.LogStore
|
LogDB models.LogStore
|
||||||
|
|
||||||
appListeners []extenders.AppListener
|
appListeners []extensions.AppListener
|
||||||
middlewares []Middleware
|
middlewares []Middleware
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user