diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 588274f89..004fdd090 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -48,7 +48,7 @@ func TestAppCreate(t *testing.T) { {datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue) router := srv.Router body := bytes.NewBuffer([]byte(test.body)) @@ -92,7 +92,7 @@ func TestAppDelete(t *testing.T) { ), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) @@ -122,7 +122,7 @@ func TestAppList(t *testing.T) { defer cancel() ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -159,7 +159,7 @@ func TestAppGet(t *testing.T) { defer cancel() ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -218,7 +218,7 @@ func TestAppUpdate(t *testing.T) { ), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, srv.Router, "PATCH", test.path, body) diff --git a/api/server/routes_test.go b/api/server/routes_test.go index b772f628a..5842ccd97 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -24,7 +24,7 @@ type routeTestCase struct { func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, srv.Router, test.method, test.path, body) @@ -131,7 +131,7 @@ func TestRouteDelete(t *testing.T) { ), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) - srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) + srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) if rec.Code != test.expectedCode { @@ -162,7 +162,7 @@ func TestRouteList(t *testing.T) { ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -201,7 +201,7 @@ func TestRouteGet(t *testing.T) { ds := datastore.NewMock() fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string diff --git a/api/server/runner.go b/api/server/runner.go index 6873d0f6f..8b1bc0457 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -245,8 +245,14 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout priority := int32(0) newTask.Priority = &priority newTask.Payload = string(pl) + // Push to queue - enqueue(c, s.MQ, newTask) + _, err = enqueue(c, s.MQ, newTask) + if err != nil { + handleErrorResponse(c, err) + return true + } + log.Info("Added new task to queue") c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID}) diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 64d4f86bb..de12b54d2 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "errors" "gitlab-odx.oracle.com/odx/functions/api/datastore" "gitlab-odx.oracle.com/odx/functions/api/logs" "gitlab-odx.oracle.com/odx/functions/api/models" @@ -35,7 +36,7 @@ func TestRouteRunnerGet(t *testing.T) { }, nil, nil, nil, ) logDB := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, logDB, rnr) + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -79,7 +80,7 @@ func TestRouteRunnerPost(t *testing.T) { }, nil, nil, nil, ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -131,7 +132,7 @@ func TestRouteRunnerExecution(t *testing.T) { ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string @@ -171,6 +172,45 @@ func TestRouteRunnerExecution(t *testing.T) { } } +func TestFailedEnqueue(t *testing.T) { + buf := setLogBuffer() + rnr, cancelrnr := testRunner(t) + defer cancelrnr() + + ds := datastore.NewMockInit( + []*models.App{ + {Name: "myapp", Config: models.Config{}}, + }, + []*models.Route{ + {Path: "/dummy", AppName: "myapp", Image: "dummy/dummy", Type: "async"}, + }, nil, nil, + ) + fnl := logs.NewMock() + + enqueue := func(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) { + return nil, errors.New("Unable to push task to queue") + } + + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, enqueue) + for i, test := range []struct { + path string + body string + method string + expectedCode int + expectedHeaders map[string][]string + }{ + {"/r/myapp/dummy", ``, "POST", http.StatusInternalServerError, nil}, + } { + body := strings.NewReader(test.body) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + if rec.Code != test.expectedCode { + t.Log(buf.String()) + t.Errorf("Test %d: Expected status code to be %d but was %d", + i, test.expectedCode, rec.Code) + } + } +} + func TestRouteRunnerTimeout(t *testing.T) { t.Skip("doesn't work on old Ubuntu") buf := setLogBuffer() @@ -187,7 +227,7 @@ func TestRouteRunnerTimeout(t *testing.T) { }, nil, nil, ) fnl := logs.NewMock() - srv := testServer(ds, &mqs.Mock{}, fnl, rnr) + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue) for i, test := range []struct { path string diff --git a/api/server/server_test.go b/api/server/server_test.go index a2ff1c74e..78c81abaf 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -22,16 +22,16 @@ import ( var tmpDatastoreTests = "/tmp/func_test_datastore.db" -func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner) *Server { +func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner, enqueue models.Enqueue) *Server { ctx := context.Background() s := &Server{ Runner: rnr, Router: gin.New(), Datastore: ds, - LogDB: nil, + LogDB: logDB, MQ: mq, - Enqueue: DefaultEnqueue, + Enqueue: enqueue, routeCache: cache.New(60*time.Second, 5*time.Minute), } @@ -102,7 +102,7 @@ func TestFullStack(t *testing.T) { rnr, rnrcancel := testRunner(t) defer rnrcancel() - srv := testServer(ds, &mqs.Mock{}, logDB, rnr) + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue) for _, test := range []struct { name string