mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
First draft of modifying RunnerListener to CallListener to get it closer to the action (and named better).
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/fnproject/fn/api/agent/drivers/docker"
|
"github.com/fnproject/fn/api/agent/drivers/docker"
|
||||||
"github.com/fnproject/fn/api/agent/protocol"
|
"github.com/fnproject/fn/api/agent/protocol"
|
||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
|
"github.com/fnproject/fn/api/extenders"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
@@ -110,12 +111,14 @@ type Agent interface {
|
|||||||
|
|
||||||
// Return the http.Handler used to handle Prometheus metric requests
|
// Return the http.Handler used to handle Prometheus metric requests
|
||||||
PromHandler() http.Handler
|
PromHandler() http.Handler
|
||||||
|
AddCallListener(extenders.CallListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
type agent struct {
|
type agent struct {
|
||||||
// TODO maybe these should be on GetCall? idk. was getting bloated.
|
// TODO maybe these should be on GetCall? idk. was getting bloated.
|
||||||
mq models.MessageQueue
|
mq models.MessageQueue
|
||||||
ds models.Datastore
|
ds models.Datastore
|
||||||
|
callListeners []extenders.CallListener
|
||||||
|
|
||||||
driver drivers.Driver
|
driver drivers.Driver
|
||||||
|
|
||||||
@@ -204,6 +207,11 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
// to make this remove the container asynchronously?
|
// to make this remove the container asynchronously?
|
||||||
defer slot.Close() // notify our slot is free once we're done
|
defer slot.Close() // notify our slot is free once we're done
|
||||||
|
|
||||||
|
err = a.fireBeforeCall(ctx, call.Model())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("BeforeCall: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO Start is checking the timer now, we could do it here, too.
|
// TODO Start is checking the timer now, we could do it here, too.
|
||||||
err = call.Start(ctx)
|
err = call.Start(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -231,6 +239,11 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
ctx = opentracing.ContextWithSpan(context.Background(), span)
|
||||||
call.End(ctx, err)
|
call.End(ctx, err)
|
||||||
|
|
||||||
|
err = a.fireAfterCall(ctx, call.Model())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("AfterCall: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -331,7 +331,7 @@ func (c *call) End(ctx context.Context, err error) {
|
|||||||
case context.DeadlineExceeded:
|
case context.DeadlineExceeded:
|
||||||
c.Status = "timeout"
|
c.Status = "timeout"
|
||||||
default:
|
default:
|
||||||
// XXX (reed): should we append the error to logs? Error field?
|
// XXX (reed): should we append the error to logs? Error field? (TR) think so, otherwise it's lost looks like?
|
||||||
c.Status = "error"
|
c.Status = "error"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
32
api/agent/listeners.go
Normal file
32
api/agent/listeners.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/fnproject/fn/api/extenders"
|
||||||
|
"github.com/fnproject/fn/api/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (a *agent) AddCallListener(listener extenders.CallListener) {
|
||||||
|
a.callListeners = append(a.callListeners, listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *agent) fireBeforeCall(ctx context.Context, call *models.Call) error {
|
||||||
|
for _, l := range a.callListeners {
|
||||||
|
err := l.BeforeCall(ctx, call)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *agent) fireAfterCall(ctx context.Context, call *models.Call) error {
|
||||||
|
for _, l := range a.callListeners {
|
||||||
|
err := l.AfterCall(ctx, call)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
31
api/extenders/listeners.go
Normal file
31
api/extenders/listeners.go
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
package extenders
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/fnproject/fn/api/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AppListener is an interface used to inject custom code at key points in app lifecycle.
|
||||||
|
type AppListener interface {
|
||||||
|
// BeforeAppCreate called right before creating App in the database
|
||||||
|
BeforeAppCreate(ctx context.Context, app *models.App) error
|
||||||
|
// AfterAppCreate called after creating App in the database
|
||||||
|
AfterAppCreate(ctx context.Context, app *models.App) error
|
||||||
|
// BeforeAppUpdate called right before updating App in the database
|
||||||
|
BeforeAppUpdate(ctx context.Context, app *models.App) error
|
||||||
|
// AfterAppUpdate called after updating App in the database
|
||||||
|
AfterAppUpdate(ctx context.Context, app *models.App) error
|
||||||
|
// BeforeAppDelete called right before deleting App in the database
|
||||||
|
BeforeAppDelete(ctx context.Context, app *models.App) error
|
||||||
|
// AfterAppDelete called after deleting App in the database
|
||||||
|
AfterAppDelete(ctx context.Context, app *models.App) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// CallListener enables callbacks around Call events
|
||||||
|
type CallListener interface {
|
||||||
|
// BeforeCall called before a function is executed
|
||||||
|
BeforeCall(ctx context.Context, call *models.Call) error
|
||||||
|
// AfterCall called after a function completes
|
||||||
|
AfterCall(ctx context.Context, call *models.Call) error
|
||||||
|
}
|
||||||
@@ -3,27 +3,12 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/fnproject/fn/api/extenders"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AppListener is an interface used to inject custom code at key points in app lifecycle.
|
|
||||||
type AppListener interface {
|
|
||||||
// BeforeAppCreate called right before creating App in the database
|
|
||||||
BeforeAppCreate(ctx context.Context, app *models.App) error
|
|
||||||
// AfterAppCreate called after creating App in the database
|
|
||||||
AfterAppCreate(ctx context.Context, app *models.App) error
|
|
||||||
// BeforeAppUpdate called right before updating App in the database
|
|
||||||
BeforeAppUpdate(ctx context.Context, app *models.App) error
|
|
||||||
// AfterAppUpdate called after updating App in the database
|
|
||||||
AfterAppUpdate(ctx context.Context, app *models.App) error
|
|
||||||
// BeforeAppDelete called right before deleting App in the database
|
|
||||||
BeforeAppDelete(ctx context.Context, app *models.App) error
|
|
||||||
// AfterAppDelete called after deleting App in the database
|
|
||||||
AfterAppDelete(ctx context.Context, app *models.App) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddAppListener adds a listener that will be notified on App created.
|
// AddAppListener adds a listener that will be notified on App created.
|
||||||
func (s *Server) AddAppListener(listener AppListener) {
|
func (s *Server) AddAppListener(listener extenders.AppListener) {
|
||||||
s.appListeners = append(s.appListeners, listener)
|
s.appListeners = append(s.appListeners, listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
8
api/server/listeners.go
Normal file
8
api/server/listeners.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import "github.com/fnproject/fn/api/extenders"
|
||||||
|
|
||||||
|
// AddCallListener adds a listener that will be fired before and after a function is executed.
|
||||||
|
func (s *Server) AddCallListener(listener extenders.CallListener) {
|
||||||
|
s.Agent.AddCallListener(listener)
|
||||||
|
}
|
||||||
@@ -27,8 +27,6 @@ func (s *Server) handleRequest(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := c.Request.Context()
|
|
||||||
|
|
||||||
r, routeExists := c.Get(api.Path)
|
r, routeExists := c.Get(api.Path)
|
||||||
if !routeExists {
|
if !routeExists {
|
||||||
r = "/"
|
r = "/"
|
||||||
@@ -39,11 +37,8 @@ func (s *Server) handleRequest(c *gin.Context) {
|
|||||||
Path: path.Clean(r.(string)),
|
Path: path.Clean(r.(string)),
|
||||||
}
|
}
|
||||||
|
|
||||||
s.FireBeforeDispatch(ctx, reqRoute)
|
|
||||||
|
|
||||||
s.serve(c, reqRoute.AppName, reqRoute.Path)
|
s.serve(c, reqRoute.AppName, reqRoute.Path)
|
||||||
|
|
||||||
s.FireAfterDispatch(ctx, reqRoute)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO it would be nice if we could make this have nothing to do with the gin.Context but meh
|
// TODO it would be nice if we could make this have nothing to do with the gin.Context but meh
|
||||||
@@ -60,7 +55,6 @@ func (s *Server) serve(c *gin.Context, appName, path string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO we could add FireBeforeDispatch right here with Call in hand
|
|
||||||
model := call.Model()
|
model := call.Model()
|
||||||
{ // scope this, to disallow ctx use outside of this scope. add id for handleErrorResponse logger
|
{ // scope this, to disallow ctx use outside of this scope. add id for handleErrorResponse logger
|
||||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
||||||
@@ -92,11 +86,11 @@ func (s *Server) serve(c *gin.Context, appName, path string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = doCall(call)
|
||||||
err = s.Agent.Submit(call)
|
err = s.Agent.Submit(call)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
||||||
// we could filter that error out here too as right now it yells a little
|
// we could filter that error out here too as right now it yells a little
|
||||||
|
|
||||||
if err == context.DeadlineExceeded {
|
if err == context.DeadlineExceeded {
|
||||||
// TODO maneuver
|
// TODO maneuver
|
||||||
// add this, since it means that start may not have been called [and it's relevant]
|
// add this, since it means that start may not have been called [and it's relevant]
|
||||||
|
|||||||
@@ -1,38 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/fnproject/fn/api/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RunnerListener interface {
|
|
||||||
// BeforeDispatch called before a function run
|
|
||||||
BeforeDispatch(ctx context.Context, route *models.Route) error
|
|
||||||
// AfterDispatch called after a function run
|
|
||||||
AfterDispatch(ctx context.Context, route *models.Route) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddRunListeners adds a listener that will be fired before and after a function run.
|
|
||||||
func (s *Server) AddRunnerListener(listener RunnerListener) {
|
|
||||||
s.runnerListeners = append(s.runnerListeners, listener)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) FireBeforeDispatch(ctx context.Context, route *models.Route) error {
|
|
||||||
for _, l := range s.runnerListeners {
|
|
||||||
err := l.BeforeDispatch(ctx, route)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) FireAfterDispatch(ctx context.Context, route *models.Route) error {
|
|
||||||
for _, l := range s.runnerListeners {
|
|
||||||
err := l.AfterDispatch(ctx, route)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -17,6 +17,7 @@ import (
|
|||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
"github.com/fnproject/fn/api/datastore"
|
"github.com/fnproject/fn/api/datastore"
|
||||||
"github.com/fnproject/fn/api/datastore/cache"
|
"github.com/fnproject/fn/api/datastore/cache"
|
||||||
|
"github.com/fnproject/fn/api/extenders"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
"github.com/fnproject/fn/api/logs"
|
"github.com/fnproject/fn/api/logs"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
@@ -47,9 +48,8 @@ type Server struct {
|
|||||||
MQ models.MessageQueue
|
MQ models.MessageQueue
|
||||||
LogDB models.LogStore
|
LogDB models.LogStore
|
||||||
|
|
||||||
appListeners []AppListener
|
appListeners []extenders.AppListener
|
||||||
middlewares []Middleware
|
middlewares []Middleware
|
||||||
runnerListeners []RunnerListener
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFromEnv creates a new Functions server based on env vars.
|
// NewFromEnv creates a new Functions server based on env vars.
|
||||||
|
|||||||
@@ -14,8 +14,8 @@ Listeners are the main way to extend Fn.
|
|||||||
|
|
||||||
The following listener types are supported:
|
The following listener types are supported:
|
||||||
|
|
||||||
* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#AppListener)
|
* App Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#AppListener)
|
||||||
* Runner Listeners - [GoDoc](https://godoc.org/github.com/fnproject/functions/api/server#RunnerListener)
|
* Call Listeners - [GoDoc](https://godoc.org/github.com/fnproject/fn/api/server#CallListener)
|
||||||
|
|
||||||
### Creating a Listener
|
### Creating a Listener
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user