mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
server, examples, extensions lint compliant (#1109)
these are all automated changes suggested by golint
This commit is contained in:
23
api/const.go
23
api/const.go
@@ -1,16 +1,23 @@
|
||||
package api
|
||||
|
||||
const (
|
||||
// Gin Request context key names
|
||||
// AppName is the app name context key
|
||||
AppName string = "app_name"
|
||||
AppID string = "app_id"
|
||||
Path string = "path"
|
||||
// AppID is the app id context key
|
||||
AppID string = "app_id"
|
||||
// Path is a route's path context key
|
||||
Path string = "path"
|
||||
|
||||
// Gin URL template parameters
|
||||
ParamAppID string = "appID"
|
||||
ParamAppName string = "appName"
|
||||
// ParamAppID is the url path parameter for app id
|
||||
ParamAppID string = "appID"
|
||||
// ParamAppName is the url path parameter for app name
|
||||
ParamAppName string = "appName"
|
||||
// ParamRouteName is the url path parameter for route name
|
||||
ParamRouteName string = "route"
|
||||
// ParamTriggerID is the url path parameter for trigger id
|
||||
ParamTriggerID string = "triggerID"
|
||||
ParamCallID string = "call"
|
||||
ParamFnID string = "fnID"
|
||||
// ParamCallID is the url path parameter for call id
|
||||
ParamCallID string = "call"
|
||||
// ParamFnID is the url path parameter for fn id
|
||||
ParamFnID string = "fnID"
|
||||
)
|
||||
|
||||
@@ -16,10 +16,10 @@ import (
|
||||
// note: for backward compatibility, will go away later
|
||||
type callLogResponse struct {
|
||||
Message string `json:"message"`
|
||||
Log *CallLog `json:"log"`
|
||||
Log *callLog `json:"log"`
|
||||
}
|
||||
|
||||
type CallLog struct {
|
||||
type callLog struct {
|
||||
CallID string `json:"call_id" db:"id"`
|
||||
Log string `json:"log" db:"log"`
|
||||
}
|
||||
@@ -28,7 +28,7 @@ func writeJSON(c *gin.Context, callID string, logReader io.Reader) {
|
||||
var b bytes.Buffer
|
||||
b.ReadFrom(logReader)
|
||||
c.JSON(http.StatusOK, callLogResponse{"Successfully loaded log",
|
||||
&CallLog{
|
||||
&callLog{
|
||||
CallID: callID,
|
||||
Log: b.String(),
|
||||
}})
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
// defaultexts are the extensions that are auto-loaded in to the default fnserver binary
|
||||
// included here as a package to simplify inclusion in testing
|
||||
// Package defaultexts are the extensions that are auto-loaded in to the
|
||||
// default fnserver binary included here as a package to simplify inclusion in
|
||||
// testing
|
||||
package defaultexts
|
||||
|
||||
import (
|
||||
// import all datastore/log/mq modules for runtime config
|
||||
_ "github.com/fnproject/fn/api/datastore/sql"
|
||||
_ "github.com/fnproject/fn/api/datastore/sql/mysql"
|
||||
_ "github.com/fnproject/fn/api/datastore/sql/postgres"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// TODO: it would be nice to move these into the top level folder so people can use these with the "functions" package, eg: functions.ApiHandler
|
||||
package server
|
||||
|
||||
// TODO: it would be nice to move these into the top level folder so people can use these with the "functions" package, eg: functions.ApiHandler
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
@@ -10,13 +11,13 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) apiHandlerWrapperFn(apiHandler fnext.ApiHandler) gin.HandlerFunc {
|
||||
func (s *Server) apiHandlerWrapperFn(apiHandler fnext.APIHandler) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
apiHandler.ServeHTTP(c.Writer, c.Request)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) apiAppHandlerWrapperFn(apiHandler fnext.ApiAppHandler) gin.HandlerFunc {
|
||||
func (s *Server) apiAppHandlerWrapperFn(apiHandler fnext.APIAppHandler) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
// get the app
|
||||
appID := c.MustGet(api.AppID).(string)
|
||||
@@ -36,7 +37,7 @@ func (s *Server) apiAppHandlerWrapperFn(apiHandler fnext.ApiAppHandler) gin.Hand
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) apiRouteHandlerWrapperFn(apiHandler fnext.ApiRouteHandler) gin.HandlerFunc {
|
||||
func (s *Server) apiRouteHandlerWrapperFn(apiHandler fnext.APIRouteHandler) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
context := c.Request.Context()
|
||||
appID := c.MustGet(api.AppID).(string)
|
||||
@@ -70,37 +71,37 @@ func (s *Server) apiRouteHandlerWrapperFn(apiHandler fnext.ApiRouteHandler) gin.
|
||||
}
|
||||
|
||||
// AddEndpoint adds an endpoint to /v1/x
|
||||
func (s *Server) AddEndpoint(method, path string, handler fnext.ApiHandler) {
|
||||
func (s *Server) AddEndpoint(method, path string, handler fnext.APIHandler) {
|
||||
v1 := s.Router.Group("/v1")
|
||||
// v1.GET("/apps/:app/log", logHandler(cfg))
|
||||
v1.Handle(method, path, s.apiHandlerWrapperFn(handler))
|
||||
}
|
||||
|
||||
// AddEndpoint adds an endpoint to /v1/x
|
||||
// AddEndpointFunc adds an endpoint to /v1/x
|
||||
func (s *Server) AddEndpointFunc(method, path string, handler func(w http.ResponseWriter, r *http.Request)) {
|
||||
s.AddEndpoint(method, path, fnext.ApiHandlerFunc(handler))
|
||||
s.AddEndpoint(method, path, fnext.APIHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// AddAppEndpoint adds an endpoints to /v1/apps/:app/x
|
||||
func (s *Server) AddAppEndpoint(method, path string, handler fnext.ApiAppHandler) {
|
||||
func (s *Server) AddAppEndpoint(method, path string, handler fnext.APIAppHandler) {
|
||||
v1 := s.Router.Group("/v1")
|
||||
v1.Use(s.checkAppPresenceByName())
|
||||
v1.Handle(method, "/apps/:app"+path, s.apiAppHandlerWrapperFn(handler))
|
||||
}
|
||||
|
||||
// AddAppEndpoint adds an endpoints to /v1/apps/:app/x
|
||||
// AddAppEndpointFunc adds an endpoints to /v1/apps/:app/x
|
||||
func (s *Server) AddAppEndpointFunc(method, path string, handler func(w http.ResponseWriter, r *http.Request, app *models.App)) {
|
||||
s.AddAppEndpoint(method, path, fnext.ApiAppHandlerFunc(handler))
|
||||
s.AddAppEndpoint(method, path, fnext.APIAppHandlerFunc(handler))
|
||||
}
|
||||
|
||||
// AddRouteEndpoint adds an endpoints to /v1/apps/:app/routes/:route/x
|
||||
func (s *Server) AddRouteEndpoint(method, path string, handler fnext.ApiRouteHandler) {
|
||||
func (s *Server) AddRouteEndpoint(method, path string, handler fnext.APIRouteHandler) {
|
||||
v1 := s.Router.Group("/v1")
|
||||
v1.Use(s.checkAppPresenceByName())
|
||||
v1.Handle(method, "/apps/:app/routes/:route"+path, s.apiRouteHandlerWrapperFn(handler)) // conflicts with existing wildcard
|
||||
}
|
||||
|
||||
// AddRouteEndpoint adds an endpoints to /v1/apps/:app/routes/:route/x
|
||||
// AddRouteEndpointFunc adds an endpoints to /v1/apps/:app/routes/:route/x
|
||||
func (s *Server) AddRouteEndpointFunc(method, path string, handler func(w http.ResponseWriter, r *http.Request, app *models.App, route *models.Route)) {
|
||||
s.AddRouteEndpoint(method, path, fnext.ApiRouteHandlerFunc(handler))
|
||||
s.AddRouteEndpoint(method, path, fnext.APIRouteHandlerFunc(handler))
|
||||
}
|
||||
|
||||
@@ -79,7 +79,6 @@ func traceWrap(c *gin.Context) {
|
||||
}
|
||||
|
||||
func apiMetricsWrap(s *Server) {
|
||||
|
||||
measure := func(engine *gin.Engine) func(*gin.Context) {
|
||||
var routes gin.RoutesInfo
|
||||
return func(c *gin.Context) {
|
||||
@@ -145,18 +144,48 @@ func loggerWrap(c *gin.Context) {
|
||||
|
||||
if appName := c.Param(api.ParamAppName); appName != "" {
|
||||
c.Set(api.AppName, appName)
|
||||
ctx = context.WithValue(ctx, api.AppName, appName)
|
||||
ctx = ContextWithApp(ctx, appName)
|
||||
}
|
||||
|
||||
if routePath := c.Param(api.ParamRouteName); routePath != "" {
|
||||
c.Set(api.Path, routePath)
|
||||
ctx = context.WithValue(ctx, api.Path, routePath)
|
||||
ctx = ContextWithPath(ctx, routePath)
|
||||
}
|
||||
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
c.Next()
|
||||
}
|
||||
|
||||
type ctxPathKey string
|
||||
|
||||
// ContextWithPath sets the routePath value on a context, it may be retrieved
|
||||
// using PathFromContext.
|
||||
// TODO this is also used as a gin.Key -- stop one of these two things.
|
||||
func ContextWithPath(ctx context.Context, routePath string) context.Context {
|
||||
return context.WithValue(ctx, ctxPathKey(api.Path), routePath)
|
||||
}
|
||||
|
||||
// PathFromContext returns the path from a context, if set.
|
||||
func PathFromContext(ctx context.Context) string {
|
||||
r, _ := ctx.Value(ctxPathKey(api.Path)).(string)
|
||||
return r
|
||||
}
|
||||
|
||||
type ctxAppKey string
|
||||
|
||||
// ContextWithApp sets the app name value on a context, it may be retrieved
|
||||
// using AppFromContext.
|
||||
// TODO this is also used as a gin.Key -- stop one of these two things.
|
||||
func ContextWithApp(ctx context.Context, app string) context.Context {
|
||||
return context.WithValue(ctx, ctxAppKey(api.AppName), app)
|
||||
}
|
||||
|
||||
// AppFromContext returns the app from a context, if set.
|
||||
func AppFromContext(ctx context.Context) string {
|
||||
r, _ := ctx.Value(ctxAppKey(api.AppName)).(string)
|
||||
return r
|
||||
}
|
||||
|
||||
func (s *Server) checkAppPresenceByNameAtRunner() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), extractFields(c))
|
||||
|
||||
@@ -32,7 +32,7 @@ func (c *middlewareController) CallFunction(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
// since we added middleware that checks the app ID
|
||||
// we need to ensure that we set it as soon as possible
|
||||
appName := ctx.Value(api.AppName).(string)
|
||||
appName := AppFromContext(ctx)
|
||||
if appName != "" {
|
||||
appID, err := c.server.datastore.GetAppID(ctx, appName)
|
||||
if err != nil {
|
||||
|
||||
@@ -67,6 +67,12 @@ func TestMiddlewareChaining(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRootMiddleware(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
defer func() {
|
||||
if t.Failed() {
|
||||
t.Log(buf.String())
|
||||
}
|
||||
}()
|
||||
|
||||
app1 := &models.App{ID: "app_id_1", Name: "myapp", Config: models.Config{}}
|
||||
app2 := &models.App{ID: "app_id_2", Name: "myapp2", Config: models.Config{}}
|
||||
@@ -92,9 +98,8 @@ func TestRootMiddleware(t *testing.T) {
|
||||
if r.Header.Get("funcit") != "" {
|
||||
t.Log("breaker breaker!")
|
||||
ctx := r.Context()
|
||||
// TODO: this is a little dicey, should have some functions to set these in case the context keys change or something.
|
||||
ctx = context.WithValue(ctx, "app_name", "myapp2")
|
||||
ctx = context.WithValue(ctx, "path", "/app2func")
|
||||
ctx = ContextWithApp(ctx, "myapp2")
|
||||
ctx = ContextWithPath(ctx, "/app2func")
|
||||
mctx := fnext.GetMiddlewareController(ctx)
|
||||
mctx.CallFunction(w, r.WithContext(ctx))
|
||||
return
|
||||
|
||||
@@ -11,6 +11,8 @@ type routeListeners []fnext.RouteListener
|
||||
|
||||
var _ fnext.RouteListener = new(routeListeners)
|
||||
|
||||
// AddRouteListener adds a route listener extension to the set of listeners
|
||||
// to be called around each route operation.
|
||||
func (s *Server) AddRouteListener(listener fnext.RouteListener) {
|
||||
*s.routeListeners = append(*s.routeListeners, listener)
|
||||
}
|
||||
|
||||
@@ -32,11 +32,11 @@ func (s *Server) handleFunctionCall(c *gin.Context) {
|
||||
func (s *Server) handleFunctionCall2(c *gin.Context) error {
|
||||
ctx := c.Request.Context()
|
||||
var p string
|
||||
r := ctx.Value(api.Path)
|
||||
if r == nil {
|
||||
r := PathFromContext(ctx)
|
||||
if r == "" {
|
||||
p = "/"
|
||||
} else {
|
||||
p = r.(string)
|
||||
p = r
|
||||
}
|
||||
|
||||
appID := c.MustGet(api.AppID).(string)
|
||||
|
||||
@@ -3,7 +3,6 @@ package server
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -381,29 +380,6 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func getDockerId(respBytes []byte) (string, error) {
|
||||
|
||||
var respJs map[string]interface{}
|
||||
var data map[string]interface{}
|
||||
|
||||
err := json.Unmarshal(respBytes, &respJs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
data, ok := respJs["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
return "", errors.New("unexpected json: data map")
|
||||
}
|
||||
|
||||
id, ok := data["DockerId"].(string)
|
||||
if !ok {
|
||||
return "", errors.New("unexpected json: docker id string")
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool {
|
||||
|
||||
logReader, err := ds.GetLog(context.Background(), "myapp", callID)
|
||||
|
||||
@@ -43,51 +43,119 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
EnvLogLevel = "FN_LOG_LEVEL"
|
||||
EnvLogDest = "FN_LOG_DEST"
|
||||
EnvLogPrefix = "FN_LOG_PREFIX"
|
||||
EnvMQURL = "FN_MQ_URL"
|
||||
EnvDBURL = "FN_DB_URL"
|
||||
EnvLOGDBURL = "FN_LOGSTORE_URL"
|
||||
EnvRunnerURL = "FN_RUNNER_API_URL"
|
||||
// TODO these are kind of redundant as exported values since the env vars
|
||||
// have to be set using these values (hopefully from an env), consider
|
||||
// forcing usage through WithXxx configuration methods and documenting there vs.
|
||||
// expecting users to use os.SetEnv(EnvLogLevel, "debug") // why ?
|
||||
|
||||
// EnvLogLevel sets the stderr logging level
|
||||
EnvLogLevel = "FN_LOG_LEVEL"
|
||||
|
||||
// EnvLogDest is a url of a log destination:
|
||||
// possible schemes: { udp, tcp, file }
|
||||
// file url must contain only a path, syslog must contain only a host[:port]
|
||||
// expect: [scheme://][host][:port][/path]
|
||||
// default scheme to udp:// if none given
|
||||
EnvLogDest = "FN_LOG_DEST"
|
||||
|
||||
// EnvLogPrefix is a prefix to affix to each log line.
|
||||
EnvLogPrefix = "FN_LOG_PREFIX"
|
||||
|
||||
// EnvMQURL is a url to an MQ service:
|
||||
// possible out-of-the-box schemes: { memory, redis, bolt }
|
||||
EnvMQURL = "FN_MQ_URL"
|
||||
|
||||
// EnvDBURL is a url to a db service:
|
||||
// possible schemes: { postgres, sqlite3, mysql }
|
||||
EnvDBURL = "FN_DB_URL"
|
||||
|
||||
// EnvLogDBURL is a url to a log storage service:
|
||||
// possible schemes: { postgres, sqlite3, mysql, s3 }
|
||||
EnvLogDBURL = "FN_LOGSTORE_URL"
|
||||
|
||||
// EnvRunnerURL is a url pointing to an Fn API service.
|
||||
EnvRunnerURL = "FN_RUNNER_API_URL"
|
||||
|
||||
// EnvRunnerAddresses is a list of runner urls for an lb to use.
|
||||
EnvRunnerAddresses = "FN_RUNNER_ADDRESSES"
|
||||
|
||||
// EnvPublicLoadBalancerURL is the url to inject into trigger responses to get a public url.
|
||||
EnvPublicLoadBalancerURL = "FN_PUBLIC_LB_URL"
|
||||
|
||||
EnvRunnerAddresses = "FN_RUNNER_ADDRESSES"
|
||||
EnvNodeType = "FN_NODE_TYPE"
|
||||
EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port"
|
||||
EnvGRPCPort = "FN_GRPC_PORT"
|
||||
EnvAPICORSOrigins = "FN_API_CORS_ORIGINS"
|
||||
EnvAPICORSHeaders = "FN_API_CORS_HEADERS"
|
||||
EnvZipkinURL = "FN_ZIPKIN_URL"
|
||||
EnvJaegerURL = "FN_JAEGER_URL"
|
||||
// Certificates to communicate with other FN nodes
|
||||
EnvCert = "FN_NODE_CERT"
|
||||
EnvCertKey = "FN_NODE_CERT_KEY"
|
||||
// EnvNodeType defines the runtime mode for fn to run in, options
|
||||
// are one of: { full, api, lb, runner, pure-runner }
|
||||
EnvNodeType = "FN_NODE_TYPE"
|
||||
|
||||
// EnvPort is the port to listen on for fn http server.
|
||||
EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port"
|
||||
|
||||
// EnvGRPCPort is the port to run the grpc server on for a pure-runner node.
|
||||
EnvGRPCPort = "FN_GRPC_PORT"
|
||||
|
||||
// EnvAPICORSOrigins is the list of CORS origins to allow.
|
||||
EnvAPICORSOrigins = "FN_API_CORS_ORIGINS"
|
||||
|
||||
// EnvAPICORSHeaders is the list of CORS headers allowed.
|
||||
EnvAPICORSHeaders = "FN_API_CORS_HEADERS"
|
||||
|
||||
// EnvZipkinURL is the url of a zipkin node to send traces to.
|
||||
EnvZipkinURL = "FN_ZIPKIN_URL"
|
||||
|
||||
// EnvJaegerURL is the url of a jaeger node to send traces to.
|
||||
EnvJaegerURL = "FN_JAEGER_URL"
|
||||
|
||||
// EnvCert is the certificate used to communicate with other fn nodes.
|
||||
EnvCert = "FN_NODE_CERT"
|
||||
|
||||
// EnvCertKey is the key for the specified cert.
|
||||
EnvCertKey = "FN_NODE_CERT_KEY"
|
||||
|
||||
// EnvCertAuth is the CA for the cert provided.
|
||||
EnvCertAuth = "FN_NODE_CERT_AUTHORITY"
|
||||
// The header name of the incoming request which holds the request ID
|
||||
EnvRidHeader = "FN_RID_HEADER"
|
||||
|
||||
// EnvRIDHeader is the header name of the incoming request which holds the request ID
|
||||
EnvRIDHeader = "FN_RID_HEADER"
|
||||
|
||||
// EnvProcessCollectorList is the list of procid's to collect metrics for.
|
||||
EnvProcessCollectorList = "FN_PROCESS_COLLECTOR_LIST"
|
||||
EnvLBPlacementAlg = "FN_PLACER"
|
||||
|
||||
// Defaults
|
||||
// EnvLBPlacementAlg is the algorithm to place fn calls to fn runners in lb.[0w
|
||||
EnvLBPlacementAlg = "FN_PLACER"
|
||||
|
||||
// DefaultLogLevel is info
|
||||
DefaultLogLevel = "info"
|
||||
DefaultLogDest = "stderr"
|
||||
DefaultPort = 8080
|
||||
|
||||
// DefaultLogDest is stderr
|
||||
DefaultLogDest = "stderr"
|
||||
|
||||
// DefaultPort is 8080
|
||||
DefaultPort = 8080
|
||||
|
||||
// DefaultGRPCPort is 9190
|
||||
DefaultGRPCPort = 9190
|
||||
)
|
||||
|
||||
type ServerNodeType int32
|
||||
// NodeType is the mode to run fn in.
|
||||
type NodeType int32
|
||||
|
||||
const (
|
||||
ServerTypeFull ServerNodeType = iota
|
||||
// ServerTypeFull runs all API endpoints, including executing tasks.
|
||||
ServerTypeFull NodeType = iota
|
||||
|
||||
// ServerTypeAPI runs only /v1 endpoints, to manage resources.
|
||||
ServerTypeAPI
|
||||
|
||||
// ServerTypeLB runs only /r/ endpoints, routing to runner nodes.
|
||||
ServerTypeLB
|
||||
|
||||
// ServerTypeRunner runs only /r/ endpoints, to execute tasks.
|
||||
ServerTypeRunner
|
||||
|
||||
// ServerTypePureRunner runs only grpc server, to execute tasks.
|
||||
ServerTypePureRunner
|
||||
)
|
||||
|
||||
func (s ServerNodeType) String() string {
|
||||
func (s NodeType) String() string {
|
||||
switch s {
|
||||
default:
|
||||
return "full"
|
||||
@@ -102,8 +170,9 @@ func (s ServerNodeType) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
// Server is the object which ties together all the fn things, it is the entrypoint
|
||||
// for managing fn resources and executing tasks.
|
||||
type Server struct {
|
||||
// TODO this one maybe we have `AddRoute` in extensions?
|
||||
Router *gin.Engine
|
||||
AdminRouter *gin.Engine
|
||||
|
||||
@@ -114,7 +183,7 @@ type Server struct {
|
||||
datastore models.Datastore
|
||||
mq models.MessageQueue
|
||||
logstore models.LogStore
|
||||
nodeType ServerNodeType
|
||||
nodeType NodeType
|
||||
cert string
|
||||
certKey string
|
||||
certAuthority string
|
||||
@@ -130,7 +199,7 @@ type Server struct {
|
||||
extraCtxs []context.Context
|
||||
}
|
||||
|
||||
func nodeTypeFromString(value string) ServerNodeType {
|
||||
func nodeTypeFromString(value string) NodeType {
|
||||
switch value {
|
||||
case "api":
|
||||
return ServerTypeAPI
|
||||
@@ -146,7 +215,7 @@ func nodeTypeFromString(value string) ServerNodeType {
|
||||
}
|
||||
|
||||
// NewFromEnv creates a new Functions server based on env vars.
|
||||
func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
|
||||
func NewFromEnv(ctx context.Context, opts ...Option) *Server {
|
||||
curDir := pwd()
|
||||
var defaultDB, defaultMQ string
|
||||
nodeType := nodeTypeFromString(getEnv(EnvNodeType, "")) // default to full
|
||||
@@ -168,7 +237,7 @@ func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
|
||||
opts = append(opts, WithPrometheus()) // TODO option to turn this off?
|
||||
opts = append(opts, WithDBURL(getEnv(EnvDBURL, defaultDB)))
|
||||
opts = append(opts, WithMQURL(getEnv(EnvMQURL, defaultMQ)))
|
||||
opts = append(opts, WithLogURL(getEnv(EnvLOGDBURL, "")))
|
||||
opts = append(opts, WithLogURL(getEnv(EnvLogDBURL, "")))
|
||||
opts = append(opts, WithRunnerURL(getEnv(EnvRunnerURL, "")))
|
||||
opts = append(opts, WithType(nodeType))
|
||||
opts = append(opts, WithNodeCert(getEnv(EnvCert, "")))
|
||||
@@ -204,7 +273,8 @@ func pwd() string {
|
||||
return strings.Replace(cwd, "\\", "/", -1)
|
||||
}
|
||||
|
||||
func WithWebPort(port int) ServerOption {
|
||||
// WithWebPort maps EnvPort
|
||||
func WithWebPort(port int) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if s.adminListenPort == s.webListenPort {
|
||||
s.adminListenPort = port
|
||||
@@ -214,28 +284,32 @@ func WithWebPort(port int) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithGRPCPort(port int) ServerOption {
|
||||
// WithGRPCPort maps EnvGRPCPort
|
||||
func WithGRPCPort(port int) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.grpcListenPort = port
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogLevel(ll string) ServerOption {
|
||||
// WithLogLevel maps EnvLogLevel
|
||||
func WithLogLevel(ll string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
common.SetLogLevel(ll)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogDest(dst, prefix string) ServerOption {
|
||||
// WithLogDest maps EnvLogDest
|
||||
func WithLogDest(dst, prefix string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
common.SetLogDest(dst, prefix)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithDBURL(dbURL string) ServerOption {
|
||||
// WithDBURL maps EnvDBURL
|
||||
func WithDBURL(dbURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if dbURL != "" {
|
||||
ds, err := datastore.New(ctx, dbURL)
|
||||
@@ -248,7 +322,8 @@ func WithDBURL(dbURL string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithMQURL(mqURL string) ServerOption {
|
||||
// WithMQURL maps EnvMQURL
|
||||
func WithMQURL(mqURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if mqURL != "" {
|
||||
mq, err := mqs.New(mqURL)
|
||||
@@ -261,7 +336,8 @@ func WithMQURL(mqURL string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogURL(logstoreURL string) ServerOption {
|
||||
// WithLogURL maps EnvLogURL
|
||||
func WithLogURL(logstoreURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if ldb := logstoreURL; ldb != "" {
|
||||
logDB, err := logs.New(ctx, logstoreURL)
|
||||
@@ -274,7 +350,8 @@ func WithLogURL(logstoreURL string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithRunnerURL(runnerURL string) ServerOption {
|
||||
// WithRunnerURL maps EnvRunnerURL
|
||||
func WithRunnerURL(runnerURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if runnerURL != "" {
|
||||
cl, err := hybrid.NewClient(runnerURL)
|
||||
@@ -287,14 +364,16 @@ func WithRunnerURL(runnerURL string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithType(t ServerNodeType) ServerOption {
|
||||
// WithType maps EnvNodeType
|
||||
func WithType(t NodeType) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.nodeType = t
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithNodeCert(cert string) ServerOption {
|
||||
// WithNodeCert maps EnvNodeCert
|
||||
func WithNodeCert(cert string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if cert != "" {
|
||||
abscert, err := filepath.Abs(cert)
|
||||
@@ -311,7 +390,8 @@ func WithNodeCert(cert string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithNodeCertKey(key string) ServerOption {
|
||||
// WithNodeCertKey maps EnvNodeCertKey
|
||||
func WithNodeCertKey(key string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if key != "" {
|
||||
abskey, err := filepath.Abs(key)
|
||||
@@ -328,7 +408,8 @@ func WithNodeCertKey(key string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithNodeCertAuthority(ca string) ServerOption {
|
||||
// WithNodeCertAuthority maps EnvNodeCertAuthority
|
||||
func WithNodeCertAuthority(ca string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if ca != "" {
|
||||
absca, err := filepath.Abs(ca)
|
||||
@@ -345,28 +426,32 @@ func WithNodeCertAuthority(ca string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithDatastore(ds models.Datastore) ServerOption {
|
||||
// WithDatastore allows directly setting a datastore
|
||||
func WithDatastore(ds models.Datastore) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.datastore = ds
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithMQ(mq models.MessageQueue) ServerOption {
|
||||
// WithMQ allows directly setting an MQ
|
||||
func WithMQ(mq models.MessageQueue) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.mq = mq
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogstore(ls models.LogStore) ServerOption {
|
||||
// WithLogstore allows directly setting a logstore
|
||||
func WithLogstore(ls models.LogStore) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.logstore = ls
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgent(agent agent.Agent) ServerOption {
|
||||
// WithAgent allows directly setting an agent
|
||||
func WithAgent(agent agent.Agent) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.agent = agent
|
||||
return nil
|
||||
@@ -376,12 +461,14 @@ func WithAgent(agent agent.Agent) ServerOption {
|
||||
func (s *Server) defaultRunnerPool() (pool.RunnerPool, error) {
|
||||
runnerAddresses := getEnv(EnvRunnerAddresses, "")
|
||||
if runnerAddresses == "" {
|
||||
return nil, errors.New("Must provide FN_RUNNER_ADDRESSES when running in default load-balanced mode!")
|
||||
return nil, errors.New("must provide FN_RUNNER_ADDRESSES when running in default load-balanced mode")
|
||||
}
|
||||
return agent.DefaultStaticRunnerPool(strings.Split(runnerAddresses, ",")), nil
|
||||
}
|
||||
|
||||
func WithLogstoreFromDatastore() ServerOption {
|
||||
// WithLogstoreFromDatastore sets the logstore to the datastore, iff
|
||||
// the datastore implements the logstore interface.
|
||||
func WithLogstoreFromDatastore() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if s.datastore == nil {
|
||||
return errors.New("Need a datastore in order to use it as a logstore")
|
||||
@@ -398,7 +485,7 @@ func WithLogstoreFromDatastore() ServerOption {
|
||||
}
|
||||
|
||||
// WithFullAgent is a shorthand for WithAgent(... create a full agent here ...)
|
||||
func WithFullAgent() ServerOption {
|
||||
func WithFullAgent() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.nodeType = ServerTypeFull
|
||||
|
||||
@@ -408,7 +495,7 @@ func WithFullAgent() ServerOption {
|
||||
}
|
||||
|
||||
if s.datastore == nil || s.logstore == nil || s.mq == nil {
|
||||
return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
|
||||
return errors.New("full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL")
|
||||
}
|
||||
s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq)))
|
||||
return nil
|
||||
@@ -417,15 +504,15 @@ func WithFullAgent() ServerOption {
|
||||
|
||||
// WithAgentFromEnv must be provided as the last server option because it relies
|
||||
// on all other options being set first.
|
||||
func WithAgentFromEnv() ServerOption {
|
||||
func WithAgentFromEnv() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
switch s.nodeType {
|
||||
case ServerTypeAPI:
|
||||
return errors.New("Should not initialize an agent for an Fn API node.")
|
||||
return errors.New("should not initialize an agent for an Fn API node")
|
||||
case ServerTypeRunner:
|
||||
runnerURL := getEnv(EnvRunnerURL, "")
|
||||
if runnerURL == "" {
|
||||
return errors.New("No FN_RUNNER_API_URL provided for an Fn Runner node.")
|
||||
return errors.New("no FN_RUNNER_API_URL provided for an Fn Runner node")
|
||||
}
|
||||
cl, err := hybrid.NewClient(runnerURL)
|
||||
if err != nil {
|
||||
@@ -434,10 +521,10 @@ func WithAgentFromEnv() ServerOption {
|
||||
s.agent = agent.New(agent.NewCachedDataAccess(cl))
|
||||
case ServerTypePureRunner:
|
||||
if s.datastore != nil {
|
||||
return errors.New("Pure runner nodes must not be configured with a datastore (FN_DB_URL).")
|
||||
return errors.New("pure runner nodes must not be configured with a datastore (FN_DB_URL)")
|
||||
}
|
||||
if s.mq != nil {
|
||||
return errors.New("Pure runner nodes must not be configured with a message queue (FN_MQ_URL).")
|
||||
return errors.New("pure runner nodes must not be configured with a message queue (FN_MQ_URL)")
|
||||
}
|
||||
ds, err := hybrid.NewNopDataStore()
|
||||
if err != nil {
|
||||
@@ -455,13 +542,13 @@ func WithAgentFromEnv() ServerOption {
|
||||
s.nodeType = ServerTypeLB
|
||||
runnerURL := getEnv(EnvRunnerURL, "")
|
||||
if runnerURL == "" {
|
||||
return errors.New("No FN_RUNNER_API_URL provided for an Fn NuLB node.")
|
||||
return errors.New("no FN_RUNNER_API_URL provided for an Fn NuLB node")
|
||||
}
|
||||
if s.datastore != nil {
|
||||
return errors.New("NuLB nodes must not be configured with a datastore (FN_DB_URL).")
|
||||
return errors.New("lb nodes must not be configured with a datastore (FN_DB_URL)")
|
||||
}
|
||||
if s.mq != nil {
|
||||
return errors.New("NuLB nodes must not be configured with a message queue (FN_MQ_URL).")
|
||||
return errors.New("lb nodes must not be configured with a message queue (FN_MQ_URL)")
|
||||
}
|
||||
|
||||
cl, err := hybrid.NewClient(runnerURL)
|
||||
@@ -499,7 +586,7 @@ func WithAgentFromEnv() ServerOption {
|
||||
}
|
||||
|
||||
// WithExtraCtx appends a context to the list of contexts the server will watch for cancellations / errors / signals.
|
||||
func WithExtraCtx(extraCtx context.Context) ServerOption {
|
||||
func WithExtraCtx(extraCtx context.Context) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.extraCtxs = append(s.extraCtxs, extraCtx)
|
||||
return nil
|
||||
@@ -507,7 +594,7 @@ func WithExtraCtx(extraCtx context.Context) ServerOption {
|
||||
}
|
||||
|
||||
//WithTriggerAnnotator adds a trigggerEndpoint provider to the server
|
||||
func WithTriggerAnnotator(provider TriggerAnnotator) ServerOption {
|
||||
func WithTriggerAnnotator(provider TriggerAnnotator) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.triggerAnnotator = provider
|
||||
return nil
|
||||
@@ -515,7 +602,7 @@ func WithTriggerAnnotator(provider TriggerAnnotator) ServerOption {
|
||||
}
|
||||
|
||||
// WithAdminServer starts the admin server on the specified port.
|
||||
func WithAdminServer(port int) ServerOption {
|
||||
func WithAdminServer(port int) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.AdminRouter = gin.New()
|
||||
s.adminListenPort = port
|
||||
@@ -525,7 +612,7 @@ func WithAdminServer(port int) ServerOption {
|
||||
|
||||
// New creates a new Functions server with the opts given. For convenience, users may
|
||||
// prefer to use NewFromEnv but New is more flexible if needed.
|
||||
func New(ctx context.Context, opts ...ServerOption) *Server {
|
||||
func New(ctx context.Context, opts ...Option) *Server {
|
||||
ctx, span := trace.StartSpan(ctx, "server_init")
|
||||
defer span.End()
|
||||
|
||||
@@ -584,7 +671,8 @@ func New(ctx context.Context, opts ...ServerOption) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
func WithPrometheus() ServerOption {
|
||||
// WithPrometheus activates the prometheus collection and /metrics endpoint
|
||||
func WithPrometheus() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
reg := promclient.NewRegistry()
|
||||
reg.MustRegister(promclient.NewProcessCollector(os.Getpid(), "fn"),
|
||||
@@ -614,7 +702,8 @@ func WithPrometheus() ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithJaeger(jaegerURL string) ServerOption {
|
||||
// WithJaeger maps EnvJaegerURL
|
||||
func WithJaeger(jaegerURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
// ex: "http://localhost:14268"
|
||||
if jaegerURL == "" {
|
||||
@@ -637,7 +726,8 @@ func WithJaeger(jaegerURL string) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithZipkin(zipkinURL string) ServerOption {
|
||||
// WithZipkin maps EnvZipkinURL
|
||||
func WithZipkin(zipkinURL string) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
// ex: "http://zipkin:9411/api/v2/spans"
|
||||
|
||||
@@ -794,6 +884,8 @@ func extractFields(c *gin.Context) logrus.Fields {
|
||||
return fields
|
||||
}
|
||||
|
||||
// Start runs any configured machinery, including the http server, agent, etc.
|
||||
// Start will block until the context is cancelled or times out.
|
||||
func (s *Server) Start(ctx context.Context) {
|
||||
newctx, cancel := contextWithSignal(ctx, os.Interrupt, syscall.SIGTERM)
|
||||
s.startGears(newctx, cancel)
|
||||
@@ -1008,11 +1100,12 @@ func (s *Server) bindHandlers(ctx context.Context) {
|
||||
|
||||
}
|
||||
|
||||
// implements fnext.ExtServer
|
||||
// Datastore implements fnext.ExtServer
|
||||
func (s *Server) Datastore() models.Datastore {
|
||||
return s.datastore
|
||||
}
|
||||
|
||||
// Agent implements fnext.ExtServer
|
||||
func (s *Server) Agent() agent.Agent {
|
||||
return s.agent
|
||||
}
|
||||
|
||||
@@ -10,15 +10,18 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ServerOption func(context.Context, *Server) error
|
||||
// Option is a func that allows configuring a Server
|
||||
type Option func(context.Context, *Server) error
|
||||
|
||||
//RIDProvider is used to manage request ID
|
||||
// RIDProvider is used to generate request IDs
|
||||
type RIDProvider struct {
|
||||
HeaderName string //The name of the header where the reques id is stored in the incoming request
|
||||
HeaderName string // The name of the header where the reques id is stored in the incoming request
|
||||
RIDGenerator func(string) string // Function to generate the requestID
|
||||
}
|
||||
|
||||
func WithRIDProvider(ridProvider *RIDProvider) ServerOption {
|
||||
// WithRIDProvider will generate request ids for each http request using the
|
||||
// given generator.
|
||||
func WithRIDProvider(ridProvider *RIDProvider) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.Router.Use(withRIDProvider(ridProvider))
|
||||
return nil
|
||||
@@ -37,14 +40,16 @@ func withRIDProvider(ridp *RIDProvider) func(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func EnableShutdownEndpoint(ctx context.Context, halt context.CancelFunc) ServerOption {
|
||||
// EnableShutdownEndpoint adds /shutdown to initiate a shutdown of an fn server.
|
||||
func EnableShutdownEndpoint(ctx context.Context, halt context.CancelFunc) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.Router.GET("/shutdown", s.handleShutdown(halt))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func LimitRequestBody(max int64) ServerOption {
|
||||
// LimitRequestBody wraps every http request to limit its size to the specified max bytes.
|
||||
func LimitRequestBody(max int64) Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.Router.Use(limitRequestBody(max))
|
||||
return nil
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
|
||||
var tmpDatastoreTests = "/tmp/func_test_datastore.db"
|
||||
|
||||
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent, nodeType ServerNodeType) *Server {
|
||||
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent, nodeType NodeType) *Server {
|
||||
return New(context.Background(),
|
||||
WithLogLevel(getEnv(EnvLogLevel, DefaultLogLevel)),
|
||||
WithDatastore(ds),
|
||||
|
||||
@@ -23,7 +23,7 @@ var (
|
||||
)
|
||||
|
||||
var (
|
||||
ApiRequestCountView = &view.View{
|
||||
apiRequestCountView = &view.View{
|
||||
Name: "api/request_count",
|
||||
Description: "Count of API requests started",
|
||||
Measure: apiRequestCount,
|
||||
@@ -31,7 +31,7 @@ var (
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
ApiResponseCountView = &view.View{
|
||||
apiResponseCountView = &view.View{
|
||||
Name: "api/response_count",
|
||||
Description: "API response count",
|
||||
TagKeys: []tag.Key{pathKey, methodKey, statusKey},
|
||||
@@ -39,7 +39,7 @@ var (
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
ApiLatencyView = &view.View{
|
||||
apiLatencyView = &view.View{
|
||||
Name: "api/latency",
|
||||
Description: "Latency distribution of API requests",
|
||||
Measure: apiLatency,
|
||||
@@ -50,9 +50,9 @@ var (
|
||||
|
||||
func registerViews() {
|
||||
err := view.Register(
|
||||
ApiRequestCountView,
|
||||
ApiResponseCountView,
|
||||
ApiLatencyView,
|
||||
apiRequestCountView,
|
||||
apiResponseCountView,
|
||||
apiLatencyView,
|
||||
)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatal("cannot register view")
|
||||
|
||||
Reference in New Issue
Block a user