mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Middleware upgrade (#554)
* Adds root level middleware * Added todo * Better way for extensions to be added. * Bad conflict merge?
This commit is contained in:
@@ -11,24 +11,19 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/datastore/cache"
|
||||
"github.com/fnproject/fn/api/extensions"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/version"
|
||||
"github.com/gin-contrib/cors"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
"github.com/openzipkin/zipkin-go-opentracing"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -58,8 +53,9 @@ type Server struct {
|
||||
MQ models.MessageQueue
|
||||
LogDB models.LogStore
|
||||
|
||||
appListeners []extensions.AppListener
|
||||
middlewares []Middleware
|
||||
appListeners []fnext.AppListener
|
||||
rootMiddlewares []fnext.Middleware
|
||||
apiMiddlewares []fnext.Middleware
|
||||
}
|
||||
|
||||
// NewFromEnv creates a new Functions server based on env vars.
|
||||
@@ -96,23 +92,6 @@ func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, opts ...
|
||||
return New(ctx, ds, mq, logDB, opts...)
|
||||
}
|
||||
|
||||
func optionalCorsWrap(r *gin.Engine) {
|
||||
// By default no CORS are allowed unless one
|
||||
// or more Origins are defined by the API_CORS
|
||||
// environment variable.
|
||||
corsStr := getEnv(EnvAPICORS, "")
|
||||
if len(corsStr) > 0 {
|
||||
origins := strings.Split(strings.Replace(corsStr, " ", "", -1), ",")
|
||||
|
||||
corsConfig := cors.DefaultConfig()
|
||||
corsConfig.AllowOrigins = origins
|
||||
|
||||
logrus.Infof("CORS enabled for domains: %s", origins)
|
||||
|
||||
r.Use(cors.New(corsConfig))
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new Functions server with the passed in datastore, message queue and API URL
|
||||
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls models.LogStore, opts ...ServerOption) *Server {
|
||||
setTracer()
|
||||
@@ -125,10 +104,10 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls mo
|
||||
LogDB: ls,
|
||||
}
|
||||
|
||||
// NOTE: testServer() in tests doesn't use these
|
||||
setMachineID()
|
||||
s.Router.Use(loggerWrap, traceWrap, panicWrap)
|
||||
optionalCorsWrap(s.Router)
|
||||
|
||||
s.bindHandlers(ctx)
|
||||
|
||||
for _, opt := range opts {
|
||||
@@ -140,26 +119,6 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls mo
|
||||
return s
|
||||
}
|
||||
|
||||
// we should use http grr
|
||||
func traceWrap(c *gin.Context) {
|
||||
// try to grab a span from the request if made from another service, ignore err if not
|
||||
wireContext, _ := opentracing.GlobalTracer().Extract(
|
||||
opentracing.HTTPHeaders,
|
||||
opentracing.HTTPHeadersCarrier(c.Request.Header))
|
||||
|
||||
// Create the span referring to the RPC client if available.
|
||||
// If wireContext == nil, a root span will be created.
|
||||
// TODO we should add more tags?
|
||||
serverSpan := opentracing.StartSpan("serve_http", ext.RPCServerOption(wireContext), opentracing.Tag{Key: "path", Value: c.Request.URL.Path})
|
||||
serverSpan.SetBaggageItem("fn_appname", c.Param(api.CApp))
|
||||
serverSpan.SetBaggageItem("fn_path", c.Param(api.CRoute))
|
||||
defer serverSpan.Finish()
|
||||
|
||||
ctx := opentracing.ContextWithSpan(c.Request.Context(), serverSpan)
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
c.Next()
|
||||
}
|
||||
|
||||
func setTracer() {
|
||||
var (
|
||||
debugMode = false
|
||||
@@ -239,51 +198,6 @@ func whoAmI() net.IP {
|
||||
return nil
|
||||
}
|
||||
|
||||
func panicWrap(c *gin.Context) {
|
||||
defer func(c *gin.Context) {
|
||||
if rec := recover(); rec != nil {
|
||||
err, ok := rec.(error)
|
||||
if !ok {
|
||||
err = fmt.Errorf("fn: %v", rec)
|
||||
}
|
||||
handleErrorResponse(c, err)
|
||||
c.Abort()
|
||||
}
|
||||
}(c)
|
||||
c.Next()
|
||||
}
|
||||
|
||||
func loggerWrap(c *gin.Context) {
|
||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), extractFields(c))
|
||||
|
||||
if appName := c.Param(api.CApp); appName != "" {
|
||||
c.Set(api.AppName, appName)
|
||||
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.AppName, appName))
|
||||
}
|
||||
|
||||
if routePath := c.Param(api.CRoute); routePath != "" {
|
||||
c.Set(api.Path, routePath)
|
||||
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.Path, routePath))
|
||||
}
|
||||
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
c.Next()
|
||||
}
|
||||
|
||||
func appWrap(c *gin.Context) {
|
||||
appName := c.GetString(api.AppName)
|
||||
if appName == "" {
|
||||
handleErrorResponse(c, models.ErrAppsMissingName)
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
c.Next()
|
||||
}
|
||||
|
||||
func (s *Server) handleRunnerRequest(c *gin.Context) {
|
||||
s.handleRequest(c)
|
||||
}
|
||||
|
||||
func extractFields(c *gin.Context) logrus.Fields {
|
||||
fields := logrus.Fields{"action": path.Base(c.HandlerName())}
|
||||
for _, param := range c.Params {
|
||||
@@ -342,15 +256,18 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
|
||||
|
||||
func (s *Server) bindHandlers(ctx context.Context) {
|
||||
engine := s.Router
|
||||
// now for extendible middleware
|
||||
engine.Use(s.rootMiddlewareWrapper())
|
||||
|
||||
engine.GET("/", handlePing)
|
||||
engine.GET("/version", handleVersion)
|
||||
// TODO: move the following under v1
|
||||
engine.GET("/stats", s.handleStats)
|
||||
engine.GET("/metrics", s.handlePrometheusMetrics)
|
||||
|
||||
{
|
||||
v1 := engine.Group("/v1")
|
||||
v1.Use(s.middlewareWrapperFunc(ctx))
|
||||
v1.Use(s.apiMiddlewareWrapper())
|
||||
v1.GET("/apps", s.handleAppList)
|
||||
v1.POST("/apps", s.handleAppCreate)
|
||||
|
||||
@@ -379,8 +296,8 @@ func (s *Server) bindHandlers(ctx context.Context) {
|
||||
{
|
||||
runner := engine.Group("/r")
|
||||
runner.Use(appWrap)
|
||||
runner.Any("/:app", s.handleRunnerRequest)
|
||||
runner.Any("/:app/*route", s.handleRunnerRequest)
|
||||
runner.Any("/:app", s.handleFunctionCall)
|
||||
runner.Any("/:app/*route", s.handleFunctionCall)
|
||||
}
|
||||
|
||||
engine.NoRoute(func(c *gin.Context) {
|
||||
|
||||
Reference in New Issue
Block a user