mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Cleanup main (#470)
* main: clean up * server: replace magical constants and use them for app name tracking
This commit is contained in:
7
api/const.go
Normal file
7
api/const.go
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
// Request context key names
|
||||||
|
const (
|
||||||
|
AppName string = "app_name"
|
||||||
|
Path string = "path"
|
||||||
|
)
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -13,7 +14,7 @@ func (s *Server) handleAppDelete(c *gin.Context) {
|
|||||||
ctx := c.MustGet("ctx").(context.Context)
|
ctx := c.MustGet("ctx").(context.Context)
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
|
|
||||||
app := &models.App{Name: ctx.Value("appName").(string)}
|
app := &models.App{Name: c.MustGet(api.AppName).(string)}
|
||||||
|
|
||||||
routes, err := s.Datastore.GetRoutesByApp(ctx, app.Name, &models.RouteFilter{})
|
routes, err := s.Datastore.GetRoutesByApp(ctx, app.Name, &models.RouteFilter{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -13,7 +14,7 @@ func (s *Server) handleAppGet(c *gin.Context) {
|
|||||||
ctx := c.MustGet("ctx").(context.Context)
|
ctx := c.MustGet("ctx").(context.Context)
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
|
|
||||||
appName := ctx.Value("appName").(string)
|
appName := c.MustGet(api.AppName).(string)
|
||||||
app, err := s.Datastore.GetApp(ctx, appName)
|
app, err := s.Datastore.GetApp(ctx, appName)
|
||||||
|
|
||||||
if err != nil && err != models.ErrAppsNotFound {
|
if err != nil && err != models.ErrAppsNotFound {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -34,7 +35,7 @@ func (s *Server) handleAppUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wapp.App.Name = ctx.Value("appName").(string)
|
wapp.App.Name = c.MustGet(api.AppName).(string)
|
||||||
|
|
||||||
err = s.FireAfterAppUpdate(ctx, wapp.App)
|
err = s.FireAfterAppUpdate(ctx, wapp.App)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
"github.com/iron-io/functions/api/runner/task"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
@@ -29,7 +30,7 @@ func (s *Server) handleRouteCreate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wroute.Route.AppName = ctx.Value("appName").(string)
|
wroute.Route.AppName = c.MustGet(api.AppName).(string)
|
||||||
|
|
||||||
if err := wroute.Validate(); err != nil {
|
if err := wroute.Validate(); err != nil {
|
||||||
log.WithError(err).Debug(models.ErrRoutesCreate)
|
log.WithError(err).Debug(models.ErrRoutesCreate)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -14,8 +15,8 @@ func (s *Server) handleRouteDelete(c *gin.Context) {
|
|||||||
ctx := c.MustGet("ctx").(context.Context)
|
ctx := c.MustGet("ctx").(context.Context)
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
|
|
||||||
appName := ctx.Value("appName").(string)
|
appName := c.MustGet(api.AppName).(string)
|
||||||
routePath := path.Clean(ctx.Value("routePath").(string))
|
routePath := path.Clean(c.MustGet(api.Path).(string))
|
||||||
|
|
||||||
if err := s.Datastore.RemoveRoute(ctx, appName, routePath); err != nil {
|
if err := s.Datastore.RemoveRoute(ctx, appName, routePath); err != nil {
|
||||||
if err == models.ErrRoutesNotFound {
|
if err == models.ErrRoutesNotFound {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -14,8 +15,8 @@ func (s *Server) handleRouteGet(c *gin.Context) {
|
|||||||
ctx := c.MustGet("ctx").(context.Context)
|
ctx := c.MustGet("ctx").(context.Context)
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
|
|
||||||
appName := ctx.Value("appName").(string)
|
appName := c.MustGet(api.AppName).(string)
|
||||||
routePath := path.Clean(ctx.Value("routePath").(string))
|
routePath := path.Clean(c.MustGet(api.Path).(string))
|
||||||
|
|
||||||
route, err := s.Datastore.GetRoute(ctx, appName, routePath)
|
route, err := s.Datastore.GetRoute(ctx, appName, routePath)
|
||||||
if err != nil && err != models.ErrRoutesNotFound {
|
if err != nil && err != models.ErrRoutesNotFound {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
@@ -21,7 +22,7 @@ func (s *Server) handleRouteList(c *gin.Context) {
|
|||||||
|
|
||||||
var routes []*models.Route
|
var routes []*models.Route
|
||||||
var err error
|
var err error
|
||||||
if appName, ok := ctx.Value("appName").(string); ok && appName != "" {
|
if appName, ok := c.MustGet(api.AppName).(string); ok && appName != "" {
|
||||||
routes, err = s.Datastore.GetRoutesByApp(ctx, appName, filter)
|
routes, err = s.Datastore.GetRoutesByApp(ctx, appName, filter)
|
||||||
} else {
|
} else {
|
||||||
routes, err = s.Datastore.GetRoutes(ctx, filter)
|
routes, err = s.Datastore.GetRoutes(ctx, filter)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
"github.com/iron-io/functions/api/runner/task"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
@@ -36,8 +37,8 @@ func (s *Server) handleRouteUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wroute.Route.AppName = ctx.Value("appName").(string)
|
wroute.Route.AppName = c.MustGet(api.AppName).(string)
|
||||||
wroute.Route.Path = path.Clean(ctx.Value("routePath").(string))
|
wroute.Route.Path = path.Clean(c.MustGet(api.Path).(string))
|
||||||
|
|
||||||
if wroute.Route.Image != "" {
|
if wroute.Route.Image != "" {
|
||||||
err = s.Runner.EnsureImageExists(ctx, &task.Config{
|
err = s.Runner.EnsureImageExists(ctx, &task.Config{
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/runner"
|
"github.com/iron-io/functions/api/runner"
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
"github.com/iron-io/functions/api/runner/task"
|
||||||
@@ -25,18 +26,25 @@ func (s *Server) handleSpecial(c *gin.Context) {
|
|||||||
ctx := c.MustGet("ctx").(context.Context)
|
ctx := c.MustGet("ctx").(context.Context)
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
|
|
||||||
ctx = context.WithValue(ctx, "appName", "")
|
ctx = context.WithValue(ctx, api.AppName, "")
|
||||||
ctx = context.WithValue(ctx, "routePath", c.Request.URL.Path)
|
c.Set(api.AppName, "")
|
||||||
|
ctx = context.WithValue(ctx, api.Path, c.Request.URL.Path)
|
||||||
|
c.Set(api.Path, c.Request.URL.Path)
|
||||||
|
|
||||||
ctx, err := s.UseSpecialHandlers(ctx, c.Request, c.Writer)
|
ctx, err := s.UseSpecialHandlers(ctx, c.Request, c.Writer)
|
||||||
if err != nil {
|
if err == ErrNoSpecialHandlerFound {
|
||||||
|
log.WithError(err).Errorln("Not special handler found")
|
||||||
|
c.JSON(http.StatusNotFound, http.StatusText(http.StatusNotFound))
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
log.WithError(err).Errorln("Error using special handler!")
|
log.WithError(err).Errorln("Error using special handler!")
|
||||||
c.JSON(http.StatusInternalServerError, simpleError(errors.New("Failed to run function")))
|
c.JSON(http.StatusInternalServerError, simpleError(errors.New("Failed to run function")))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Set("ctx", ctx)
|
c.Set("ctx", ctx)
|
||||||
if ctx.Value("appName").(string) == "" {
|
c.Set(api.AppName, ctx.Value(api.AppName).(string))
|
||||||
|
if c.MustGet(api.AppName).(string) == "" {
|
||||||
log.WithError(err).Errorln("Specialhandler returned empty app name")
|
log.WithError(err).Errorln("Specialhandler returned empty app name")
|
||||||
c.JSON(http.StatusBadRequest, simpleError(models.ErrRunnerRouteNotFound))
|
c.JSON(http.StatusBadRequest, simpleError(models.ErrRunnerRouteNotFound))
|
||||||
return
|
return
|
||||||
@@ -78,8 +86,8 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
reqRoute := &models.Route{
|
reqRoute := &models.Route{
|
||||||
AppName: ctx.Value("appName").(string),
|
AppName: c.MustGet(api.AppName).(string),
|
||||||
Path: path.Clean(ctx.Value("routePath").(string)),
|
Path: path.Clean(c.MustGet(api.Path).(string)),
|
||||||
}
|
}
|
||||||
|
|
||||||
s.FireBeforeDispatch(ctx, reqRoute)
|
s.FireBeforeDispatch(ctx, reqRoute)
|
||||||
|
|||||||
@@ -18,7 +18,16 @@ import (
|
|||||||
|
|
||||||
func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan task.Request, enqueue models.Enqueue) *gin.Engine {
|
func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan task.Request, enqueue models.Enqueue) *gin.Engine {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
s := New(ctx, ds, mq, rnr, tasks, enqueue)
|
|
||||||
|
s := &Server{
|
||||||
|
Runner: rnr,
|
||||||
|
Router: gin.New(),
|
||||||
|
Datastore: ds,
|
||||||
|
MQ: mq,
|
||||||
|
tasks: tasks,
|
||||||
|
Enqueue: enqueue,
|
||||||
|
}
|
||||||
|
|
||||||
r := s.Router
|
r := s.Router
|
||||||
r.Use(gin.Logger())
|
r.Use(gin.Logger())
|
||||||
|
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func TestRouteRunnerGet(t *testing.T) {
|
|||||||
expectedCode int
|
expectedCode int
|
||||||
expectedError error
|
expectedError error
|
||||||
}{
|
}{
|
||||||
{"/route", "", http.StatusBadRequest, models.ErrRunnerRouteNotFound},
|
{"/route", "", http.StatusNotFound, nil},
|
||||||
{"/r/app/route", "", http.StatusNotFound, models.ErrAppsNotFound},
|
{"/r/app/route", "", http.StatusNotFound, models.ErrAppsNotFound},
|
||||||
{"/r/myapp/route", "", http.StatusNotFound, models.ErrRunnerRouteNotFound},
|
{"/r/myapp/route", "", http.StatusNotFound, models.ErrRunnerRouteNotFound},
|
||||||
} {
|
} {
|
||||||
@@ -85,7 +85,7 @@ func TestRouteRunnerPost(t *testing.T) {
|
|||||||
expectedCode int
|
expectedCode int
|
||||||
expectedError error
|
expectedError error
|
||||||
}{
|
}{
|
||||||
{"/route", `{ "payload": "" }`, http.StatusBadRequest, models.ErrRunnerRouteNotFound},
|
{"/route", `{ "payload": "" }`, http.StatusNotFound, nil},
|
||||||
{"/r/app/route", `{ "payload": "" }`, http.StatusNotFound, models.ErrAppsNotFound},
|
{"/r/app/route", `{ "payload": "" }`, http.StatusNotFound, models.ErrAppsNotFound},
|
||||||
{"/r/myapp/route", `{ "payload": "" }`, http.StatusNotFound, models.ErrRunnerRouteNotFound},
|
{"/r/myapp/route", `{ "payload": "" }`, http.StatusNotFound, models.ErrRunnerRouteNotFound},
|
||||||
} {
|
} {
|
||||||
|
|||||||
@@ -9,13 +9,23 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/ccirello/supervisor"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/runner"
|
"github.com/iron-io/functions/api/runner"
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
"github.com/iron-io/functions/api/runner/task"
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
EnvLogLevel = "log_level"
|
||||||
|
EnvMQURL = "mq_url"
|
||||||
|
EnvDBURL = "db_url"
|
||||||
|
EnvPort = "port" // be careful, Gin expects this variable to be "port"
|
||||||
|
EnvAPIURL = "api_url"
|
||||||
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
Datastore models.Datastore
|
Datastore models.Datastore
|
||||||
Runner *runner.Runner
|
Runner *runner.Runner
|
||||||
@@ -23,6 +33,8 @@ type Server struct {
|
|||||||
MQ models.MessageQueue
|
MQ models.MessageQueue
|
||||||
Enqueue models.Enqueue
|
Enqueue models.Enqueue
|
||||||
|
|
||||||
|
apiURL string
|
||||||
|
|
||||||
specialHandlers []SpecialHandler
|
specialHandlers []SpecialHandler
|
||||||
appCreateListeners []AppCreateListener
|
appCreateListeners []AppCreateListener
|
||||||
appUpdateListeners []AppUpdateListener
|
appUpdateListeners []AppUpdateListener
|
||||||
@@ -33,14 +45,26 @@ type Server struct {
|
|||||||
singleflight singleflight // singleflight assists Datastore
|
singleflight singleflight // singleflight assists Datastore
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan task.Request, enqueue models.Enqueue, opts ...ServerOption) *Server {
|
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, apiURL string, opts ...ServerOption) *Server {
|
||||||
|
metricLogger := runner.NewMetricLogger()
|
||||||
|
funcLogger := runner.NewFuncLogger()
|
||||||
|
|
||||||
|
rnr, err := runner.New(ctx, funcLogger, metricLogger)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Fatalln("Failed to create a runner")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks := make(chan task.Request)
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Runner: r,
|
Runner: rnr,
|
||||||
Router: gin.New(),
|
Router: gin.New(),
|
||||||
Datastore: ds,
|
Datastore: ds,
|
||||||
MQ: mq,
|
MQ: mq,
|
||||||
tasks: tasks,
|
tasks: tasks,
|
||||||
Enqueue: enqueue,
|
Enqueue: DefaultEnqueue,
|
||||||
|
apiURL: apiURL,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.Router.Use(prepareMiddleware(ctx))
|
s.Router.Use(prepareMiddleware(ctx))
|
||||||
@@ -57,11 +81,11 @@ func prepareMiddleware(ctx context.Context) gin.HandlerFunc {
|
|||||||
ctx, _ := common.LoggerWithFields(ctx, extractFields(c))
|
ctx, _ := common.LoggerWithFields(ctx, extractFields(c))
|
||||||
|
|
||||||
if appName := c.Param("app"); appName != "" {
|
if appName := c.Param("app"); appName != "" {
|
||||||
ctx = context.WithValue(ctx, "appName", appName)
|
c.Set(api.AppName, appName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if routePath := c.Param("route"); routePath != "" {
|
if routePath := c.Param("route"); routePath != "" {
|
||||||
ctx = context.WithValue(ctx, "routePath", routePath)
|
c.Set(api.Path, routePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Set("ctx", ctx)
|
c.Set("ctx", ctx)
|
||||||
@@ -120,12 +144,36 @@ func extractFields(c *gin.Context) logrus.Fields {
|
|||||||
return fields
|
return fields
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Run() {
|
func (s *Server) Start(ctx context.Context) {
|
||||||
s.bindHandlers()
|
s.bindHandlers()
|
||||||
|
s.startGears(ctx)
|
||||||
|
close(s.tasks)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) startGears(ctx context.Context) {
|
||||||
|
svr := &supervisor.Supervisor{
|
||||||
|
MaxRestarts: supervisor.AlwaysRestart,
|
||||||
|
Log: func(msg interface{}) {
|
||||||
|
logrus.Debug("supervisor: ", msg)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// By default it serves on :8080 unless a
|
// By default it serves on :8080 unless a
|
||||||
// PORT environment variable was defined.
|
// PORT environment variable was defined.
|
||||||
|
svr.AddFunc(func(ctx context.Context) {
|
||||||
go s.Router.Run()
|
go s.Router.Run()
|
||||||
|
<-ctx.Done()
|
||||||
|
})
|
||||||
|
|
||||||
|
svr.AddFunc(func(ctx context.Context) {
|
||||||
|
runner.StartWorkers(ctx, s.Runner, s.tasks)
|
||||||
|
})
|
||||||
|
|
||||||
|
svr.AddFunc(func(ctx context.Context) {
|
||||||
|
runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner)
|
||||||
|
})
|
||||||
|
|
||||||
|
svr.Serve(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) bindHandlers() {
|
func (s *Server) bindHandlers() {
|
||||||
|
|||||||
@@ -23,7 +23,16 @@ var tmpBolt = "/tmp/func_test_bolt.db"
|
|||||||
|
|
||||||
func testRouter(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan task.Request) *gin.Engine {
|
func testRouter(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan task.Request) *gin.Engine {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
s := New(ctx, ds, mq, rnr, tasks, DefaultEnqueue)
|
|
||||||
|
s := &Server{
|
||||||
|
Runner: rnr,
|
||||||
|
Router: gin.New(),
|
||||||
|
Datastore: ds,
|
||||||
|
MQ: mq,
|
||||||
|
tasks: tasks,
|
||||||
|
Enqueue: DefaultEnqueue,
|
||||||
|
}
|
||||||
|
|
||||||
r := s.Router
|
r := s.Router
|
||||||
r.Use(gin.Logger())
|
r.Use(gin.Logger())
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,13 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrNoSpecialHandlerFound = errors.New("Path not found")
|
||||||
|
|
||||||
type SpecialHandler interface {
|
type SpecialHandler interface {
|
||||||
Handle(c HandlerContext) error
|
Handle(c HandlerContext) error
|
||||||
}
|
}
|
||||||
@@ -53,6 +55,10 @@ func (s *Server) AddSpecialHandler(handler SpecialHandler) {
|
|||||||
|
|
||||||
// UseSpecialHandlers execute all special handlers
|
// UseSpecialHandlers execute all special handlers
|
||||||
func (s *Server) UseSpecialHandlers(ctx context.Context, req *http.Request, resp http.ResponseWriter) (context.Context, error) {
|
func (s *Server) UseSpecialHandlers(ctx context.Context, req *http.Request, resp http.ResponseWriter) (context.Context, error) {
|
||||||
|
if len(s.specialHandlers) == 0 {
|
||||||
|
return ctx, ErrNoSpecialHandlerFound
|
||||||
|
}
|
||||||
|
|
||||||
c := &SpecialHandlerContext{
|
c := &SpecialHandlerContext{
|
||||||
request: req,
|
request: req,
|
||||||
response: resp,
|
response: resp,
|
||||||
|
|||||||
@@ -2,8 +2,11 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net/http/httputil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api"
|
||||||
"github.com/iron-io/functions/api/datastore"
|
"github.com/iron-io/functions/api/datastore"
|
||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/mqs"
|
"github.com/iron-io/functions/api/mqs"
|
||||||
@@ -14,7 +17,7 @@ import (
|
|||||||
type testSpecialHandler struct{}
|
type testSpecialHandler struct{}
|
||||||
|
|
||||||
func (h *testSpecialHandler) Handle(c HandlerContext) error {
|
func (h *testSpecialHandler) Handle(c HandlerContext) error {
|
||||||
c.Set("appName", "test")
|
c.Set(api.AppName, "test")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -30,14 +33,22 @@ func TestSpecialHandlerSet(t *testing.T) {
|
|||||||
|
|
||||||
go runner.StartWorkers(ctx, rnr, tasks)
|
go runner.StartWorkers(ctx, rnr, tasks)
|
||||||
|
|
||||||
s := New(ctx, &datastore.Mock{
|
s := &Server{
|
||||||
|
Runner: rnr,
|
||||||
|
Router: gin.New(),
|
||||||
|
Datastore: &datastore.Mock{
|
||||||
Apps: []*models.App{
|
Apps: []*models.App{
|
||||||
{Name: "test"},
|
{Name: "test"},
|
||||||
},
|
},
|
||||||
Routes: []*models.Route{
|
Routes: []*models.Route{
|
||||||
{Path: "/test", Image: "iron/hello", AppName: "test"},
|
{Path: "/test", Image: "iron/hello", AppName: "test"},
|
||||||
},
|
},
|
||||||
}, &mqs.Mock{}, rnr, tasks, DefaultEnqueue)
|
},
|
||||||
|
MQ: &mqs.Mock{},
|
||||||
|
tasks: tasks,
|
||||||
|
Enqueue: DefaultEnqueue,
|
||||||
|
}
|
||||||
|
|
||||||
router := s.Router
|
router := s.Router
|
||||||
router.Use(prepareMiddleware(ctx))
|
router.Use(prepareMiddleware(ctx))
|
||||||
s.bindHandlers()
|
s.bindHandlers()
|
||||||
@@ -45,6 +56,7 @@ func TestSpecialHandlerSet(t *testing.T) {
|
|||||||
|
|
||||||
_, rec := routerRequest(t, router, "GET", "/test", nil)
|
_, rec := routerRequest(t, router, "GET", "/test", nil)
|
||||||
if rec.Code != 200 {
|
if rec.Code != 200 {
|
||||||
t.Fatal("Test SpecialHandler: expected special handler to run functions successfully")
|
dump, _ := httputil.DumpResponse(rec.Result(), true)
|
||||||
|
t.Fatalf("Test SpecialHandler: expected special handler to run functions successfully. Response:\n%s", dump)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
50
init.go
Normal file
50
init.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/iron-io/functions/api/server"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cwd, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Fatalln("")
|
||||||
|
}
|
||||||
|
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||||
|
viper.SetDefault(server.EnvLogLevel, "info")
|
||||||
|
viper.SetDefault(server.EnvMQURL, fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd))
|
||||||
|
viper.SetDefault(server.EnvDBURL, fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd))
|
||||||
|
viper.SetDefault(server.EnvPort, 8080)
|
||||||
|
viper.SetDefault(server.EnvAPIURL, fmt.Sprintf("http://127.0.0.1:%d", viper.GetInt(server.EnvPort)))
|
||||||
|
viper.AutomaticEnv() // picks up env vars automatically
|
||||||
|
logLevel, err := logrus.ParseLevel(viper.GetString(server.EnvLogLevel))
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Fatalln("Invalid log level.")
|
||||||
|
}
|
||||||
|
logrus.SetLevel(logLevel)
|
||||||
|
|
||||||
|
gin.SetMode(gin.ReleaseMode)
|
||||||
|
if logLevel == logrus.DebugLevel {
|
||||||
|
gin.SetMode(gin.DebugMode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func contextWithSignal(ctx context.Context, signals ...os.Signal) context.Context {
|
||||||
|
ctx, halt := context.WithCancel(context.Background())
|
||||||
|
c := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(c, signals...)
|
||||||
|
go func() {
|
||||||
|
<-c
|
||||||
|
logrus.Info("Halting...")
|
||||||
|
halt()
|
||||||
|
}()
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
97
main.go
97
main.go
@@ -2,114 +2,31 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/ccirello/supervisor"
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/iron-io/functions/api/datastore"
|
"github.com/iron-io/functions/api/datastore"
|
||||||
"github.com/iron-io/functions/api/mqs"
|
"github.com/iron-io/functions/api/mqs"
|
||||||
"github.com/iron-io/functions/api/runner"
|
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
|
||||||
"github.com/iron-io/functions/api/server"
|
"github.com/iron-io/functions/api/server"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
envLogLevel = "log_level"
|
|
||||||
envMQ = "mq_url"
|
|
||||||
envDB = "db_url"
|
|
||||||
envPort = "port" // be careful, Gin expects this variable to be "port"
|
|
||||||
envAPIURL = "api_url"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
cwd, err := os.Getwd()
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Fatalln("")
|
|
||||||
}
|
|
||||||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
||||||
viper.SetDefault(envLogLevel, "info")
|
|
||||||
viper.SetDefault(envMQ, fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd))
|
|
||||||
viper.SetDefault(envDB, fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd))
|
|
||||||
viper.SetDefault(envPort, 8080)
|
|
||||||
viper.SetDefault(envAPIURL, fmt.Sprintf("http://127.0.0.1:%d", viper.GetInt(envPort)))
|
|
||||||
viper.AutomaticEnv() // picks up env vars automatically
|
|
||||||
logLevel, err := log.ParseLevel(viper.GetString("log_level"))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Fatalln("Invalid log level.")
|
|
||||||
}
|
|
||||||
log.SetLevel(logLevel)
|
|
||||||
|
|
||||||
gin.SetMode(gin.ReleaseMode)
|
|
||||||
if logLevel == log.DebugLevel {
|
|
||||||
gin.SetMode(gin.DebugMode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ctx, halt := context.WithCancel(context.Background())
|
ctx := contextWithSignal(context.Background(), os.Interrupt)
|
||||||
c := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(c, os.Interrupt)
|
|
||||||
go func() {
|
|
||||||
<-c
|
|
||||||
log.Info("Halting...")
|
|
||||||
halt()
|
|
||||||
}()
|
|
||||||
|
|
||||||
ds, err := datastore.New(viper.GetString(envDB))
|
ds, err := datastore.New(viper.GetString(server.EnvDBURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Fatalln("Invalid DB url.")
|
log.WithError(err).Fatalln("Invalid DB url.")
|
||||||
}
|
}
|
||||||
|
|
||||||
mq, err := mqs.New(viper.GetString(envMQ))
|
mq, err := mqs.New(viper.GetString(server.EnvMQURL))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Fatal("Error on init MQ")
|
log.WithError(err).Fatal("Error on init MQ")
|
||||||
}
|
}
|
||||||
|
|
||||||
metricLogger := runner.NewMetricLogger()
|
apiURL := viper.GetString(server.EnvAPIURL)
|
||||||
funcLogger := runner.NewFuncLogger()
|
|
||||||
|
|
||||||
rnr, err := runner.New(ctx, funcLogger, metricLogger)
|
funcServer := server.New(ctx, ds, mq, apiURL)
|
||||||
if err != nil {
|
// Setup your custom extensions, listeners, etc here
|
||||||
log.WithError(err).Fatalln("Failed to create a runner")
|
funcServer.Start(ctx)
|
||||||
}
|
|
||||||
|
|
||||||
svr := &supervisor.Supervisor{
|
|
||||||
MaxRestarts: supervisor.AlwaysRestart,
|
|
||||||
Log: func(msg interface{}) {
|
|
||||||
log.Debug("supervisor: ", msg)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks := make(chan task.Request)
|
|
||||||
|
|
||||||
svr.AddFunc(func(ctx context.Context) {
|
|
||||||
runner.StartWorkers(ctx, rnr, tasks)
|
|
||||||
})
|
|
||||||
|
|
||||||
svr.AddFunc(func(ctx context.Context) {
|
|
||||||
srv := server.New(
|
|
||||||
ctx,
|
|
||||||
ds,
|
|
||||||
mq,
|
|
||||||
rnr,
|
|
||||||
tasks,
|
|
||||||
server.DefaultEnqueue,
|
|
||||||
server.EnableShutdownEndpoint(halt),
|
|
||||||
)
|
|
||||||
srv.Run()
|
|
||||||
<-ctx.Done()
|
|
||||||
})
|
|
||||||
|
|
||||||
apiURL := viper.GetString(envAPIURL)
|
|
||||||
svr.AddFunc(func(ctx context.Context) {
|
|
||||||
runner.RunAsyncRunner(ctx, apiURL, tasks, rnr)
|
|
||||||
})
|
|
||||||
|
|
||||||
svr.Serve(ctx)
|
|
||||||
close(tasks)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user