From 0343c4990c9c1fc0d66ea4ef877d40879c17773e Mon Sep 17 00:00:00 2001 From: Pedro Nasser Date: Mon, 21 Nov 2016 14:11:01 -0200 Subject: [PATCH] server.New signature changes and test fixes. (#324) * ctx middleware should always be the first added to router * plugable enqueue func, changed server.New signature * fix tests * remove ctx/ctx.Done from server --- api/models/mq.go | 2 +- api/server/apps_test.go | 15 +++++---------- api/server/routes_test.go | 15 +++++---------- api/server/runner.go | 3 ++- api/server/runner_async_test.go | 16 +++++++++------- api/server/runner_test.go | 9 +++------ api/server/server.go | 33 +++++++++++++++++---------------- api/server/server_test.go | 9 +++++---- main.go | 8 +++++--- 9 files changed, 52 insertions(+), 58 deletions(-) diff --git a/api/models/mq.go b/api/models/mq.go index 8528f3a98..b754c4666 100644 --- a/api/models/mq.go +++ b/api/models/mq.go @@ -51,4 +51,4 @@ type MessageQueue interface { Delete(context.Context, *Task) error } -type Enqueue func(*Task) (*Task, error) +type Enqueue func(context.Context, MessageQueue, *Task) (*Task, error) diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 0aea0d16d..689b13523 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -57,8 +57,7 @@ func TestAppCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusCreated, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "POST", test.path, body) @@ -86,8 +85,7 @@ func TestAppDelete(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -123,8 +121,7 @@ func TestAppList(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -159,8 +156,7 @@ func TestAppGet(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -212,8 +208,7 @@ func TestAppUpdate(t *testing.T) { }, }, "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "PUT", test.path, body) diff --git a/api/server/routes_test.go b/api/server/routes_test.go index 3b387baa5..f056691ed 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -36,8 +36,7 @@ func TestRouteCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps/a/routes", `{ "route": { "image": "iron/hello", "path": "/myroute" } }`, http.StatusCreated, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "POST", test.path, body) @@ -65,8 +64,7 @@ func TestRouteDelete(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -102,8 +100,7 @@ func TestRouteList(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -138,8 +135,7 @@ func TestRouteGet(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string @@ -174,8 +170,7 @@ func TestRouteUpdate(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string diff --git a/api/server/runner.go b/api/server/runner.go index 8c6e37ccf..5ee69cd36 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -176,8 +176,9 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) { task.EnvVars = cfg.Env task.Payload = string(pl) // Push to queue - enqueue(task) + enqueue(ctx, s.MQ, task) log.Info("Added new task to queue") + c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) default: diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 175fa6769..c578eaef2 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -12,13 +12,16 @@ import ( "github.com/iron-io/functions/api/datastore" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" + "github.com/iron-io/functions/api/runner" "github.com/iron-io/runner/common" ) -func testRouterAsync(s *Server, enqueueFunc models.Enqueue) *gin.Engine { +func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan runner.TaskRequest, enqueue models.Enqueue) *gin.Engine { + ctx := context.Background() + s := New(ctx, ds, mq, rnr, tasks, enqueue) r := s.Router r.Use(gin.Logger()) - ctx := context.Background() + r.Use(func(c *gin.Context) { ctx, _ := common.LoggerWithFields(ctx, extractFields(c)) c.Set("ctx", ctx) @@ -31,9 +34,7 @@ func testRouterAsync(s *Server, enqueueFunc models.Enqueue) *gin.Engine { func TestRouteRunnerAsyncExecution(t *testing.T) { t.Skip() tasks := mockTasksConduit() - - // todo: I broke how this test works trying to clean up the code a bit. Is there a better way to do this test rather than having to override the default route behavior? - s := New(&datastore.Mock{ + ds := &datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: map[string]string{"app": "true"}}, }, @@ -42,7 +43,8 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { {Type: "async", Path: "/myerror", AppName: "myapp", Image: "iron/error", Config: map[string]string{"test": "true"}}, {Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "iron/hello", Config: map[string]string{"test": "true"}}, }, - }, &mqs.Mock{}, testRunner(t), tasks) + } + mq := &mqs.Mock{} for i, test := range []struct { path string @@ -73,7 +75,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { wg.Add(1) fmt.Println("About to start router") - router := testRouterAsync(s, func(task *models.Task) (*models.Task, error) { + router := testRouterAsync(ds, mq, testRunner(t), tasks, func(_ context.Context, _ models.MessageQueue, task *models.Task) (*models.Task, error) { if test.body != task.Payload { t.Errorf("Test %d: Expected task Payload to be the same as the test body", i) } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index b585e26c6..7cdd48031 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -25,12 +25,11 @@ func TestRouteRunnerGet(t *testing.T) { buf := setLogBuffer() tasks := mockTasksConduit() - s := New(&datastore.Mock{ + router := testRouter(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, }, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) for i, test := range []struct { path string @@ -66,12 +65,11 @@ func TestRouteRunnerPost(t *testing.T) { buf := setLogBuffer() tasks := mockTasksConduit() - s := New(&datastore.Mock{ + router := testRouter(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, }, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) for i, test := range []struct { path string @@ -114,7 +112,7 @@ func TestRouteRunnerExecution(t *testing.T) { go runner.StartWorkers(ctx, testRunner(t), tasks) - s := New(&datastore.Mock{ + router := testRouter(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, @@ -123,7 +121,6 @@ func TestRouteRunnerExecution(t *testing.T) { {Path: "/myerror", AppName: "myapp", Image: "iron/error", Headers: map[string][]string{"X-Function": {"Test"}}}, }, }, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) for i, test := range []struct { path string diff --git a/api/server/server.go b/api/server/server.go index 6007372f4..fe9d2c38f 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -27,18 +27,27 @@ type Server struct { MQ models.MessageQueue AppListeners []ifaces.AppListener SpecialHandlers []ifaces.SpecialHandler + Enqueue models.Enqueue tasks chan runner.TaskRequest } -func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan runner.TaskRequest) *Server { +func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan runner.TaskRequest, enqueue models.Enqueue) *Server { Api = &Server{ Runner: r, Router: gin.New(), Datastore: ds, MQ: mq, tasks: tasks, + Enqueue: enqueue, } + + Api.Router.Use(func(c *gin.Context) { + ctx, _ := common.LoggerWithFields(ctx, extractFields(c)) + c.Set("ctx", ctx) + c.Next() + }) + return Api } @@ -87,13 +96,13 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { return nil } +func DefaultEnqueue(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) { + ctx, _ = common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID}) + return mq.Push(ctx, task) +} + func (s *Server) handleRunnerRequest(c *gin.Context) { - enqueue := func(task *models.Task) (*models.Task, error) { - c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) - ctx, _ := common.LoggerWithFields(c, logrus.Fields{"call_id": task.ID}) - return s.MQ.Push(ctx, task) - } - s.handleRequest(c, enqueue) + s.handleRequest(c, s.Enqueue) } func (s *Server) handleTaskRequest(c *gin.Context) { @@ -138,23 +147,15 @@ func extractFields(c *gin.Context) logrus.Fields { return fields } -func (s *Server) Run(ctx context.Context) { - s.Router.Use(func(c *gin.Context) { - ctx, _ := common.LoggerWithFields(ctx, extractFields(c)) - c.Set("ctx", ctx) - c.Next() - }) - +func (s *Server) Run() { s.bindHandlers() // By default it serves on :8080 unless a // PORT environment variable was defined. go s.Router.Run() - <-ctx.Done() } func (s *Server) bindHandlers() { - engine := s.Router engine.GET("/", handlePing) diff --git a/api/server/server_test.go b/api/server/server_test.go index d66d522b0..30db340c1 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -21,10 +21,12 @@ import ( var tmpBolt = "/tmp/func_test_bolt.db" -func testRouter(s *Server) *gin.Engine { +func testRouter(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, tasks chan runner.TaskRequest) *gin.Engine { + ctx := context.Background() + s := New(ctx, ds, mq, rnr, tasks, DefaultEnqueue) r := s.Router r.Use(gin.Logger()) - ctx := context.Background() + r.Use(func(c *gin.Context) { ctx, _ := common.LoggerWithFields(ctx, extractFields(c)) c.Set("ctx", ctx) @@ -93,8 +95,7 @@ func TestFullStack(t *testing.T) { defer cancel() go runner.StartWorkers(ctx, testRunner(t), tasks) - s := New(ds, &mqs.Mock{}, testRunner(t), tasks) - router := testRouter(s) + router := testRouter(ds, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { method string diff --git a/main.go b/main.go index 1b3330650..f3da4303e 100644 --- a/main.go +++ b/main.go @@ -63,7 +63,8 @@ func main() { if err != nil { log.WithError(err).Fatalln("Invalid DB url.") } - mqType, err := mqs.New(viper.GetString(envMQ)) + + mq, err := mqs.New(viper.GetString(envMQ)) if err != nil { log.WithError(err).Fatal("Error on init MQ") } @@ -87,8 +88,9 @@ func main() { }) svr.AddFunc(func(ctx context.Context) { - srv := server.New(ds, mqType, rnr, tasks) - srv.Run(ctx) + srv := server.New(ctx, ds, mq, rnr, tasks, server.DefaultEnqueue) + srv.Run() + <-ctx.Done() }) apiURL := viper.GetString(envAPIURL)