Reject async requests in case if MQ is not reachable

This commit is contained in:
Denis Makogon
2017-07-25 10:03:04 -07:00
committed by Reed Allman
parent 565f61fe6f
commit 97b0b97bd8
5 changed files with 64 additions and 18 deletions

View File

@@ -48,7 +48,7 @@ func TestAppCreate(t *testing.T) {
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil}, {datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil},
} { } {
rnr, cancel := testRunner(t) rnr, cancel := testRunner(t)
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
router := srv.Router router := srv.Router
body := bytes.NewBuffer([]byte(test.body)) body := bytes.NewBuffer([]byte(test.body))
@@ -92,7 +92,7 @@ func TestAppDelete(t *testing.T) {
), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil}, ), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil},
} { } {
rnr, cancel := testRunner(t) rnr, cancel := testRunner(t)
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
@@ -122,7 +122,7 @@ func TestAppList(t *testing.T) {
defer cancel() defer cancel()
ds := datastore.NewMock() ds := datastore.NewMock()
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -159,7 +159,7 @@ func TestAppGet(t *testing.T) {
defer cancel() defer cancel()
ds := datastore.NewMock() ds := datastore.NewMock()
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -218,7 +218,7 @@ func TestAppUpdate(t *testing.T) {
), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil}, ), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil},
} { } {
rnr, cancel := testRunner(t) rnr, cancel := testRunner(t)
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr) srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
body := bytes.NewBuffer([]byte(test.body)) body := bytes.NewBuffer([]byte(test.body))
_, rec := routerRequest(t, srv.Router, "PATCH", test.path, body) _, rec := routerRequest(t, srv.Router, "PATCH", test.path, body)

View File

@@ -24,7 +24,7 @@ type routeTestCase struct {
func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) { func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) {
rnr, cancel := testRunner(t) rnr, cancel := testRunner(t)
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
body := bytes.NewBuffer([]byte(test.body)) body := bytes.NewBuffer([]byte(test.body))
_, rec := routerRequest(t, srv.Router, test.method, test.path, body) _, rec := routerRequest(t, srv.Router, test.method, test.path, body)
@@ -131,7 +131,7 @@ func TestRouteDelete(t *testing.T) {
), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil}, ), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil},
} { } {
rnr, cancel := testRunner(t) rnr, cancel := testRunner(t)
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr) srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil) _, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
if rec.Code != test.expectedCode { if rec.Code != test.expectedCode {
@@ -162,7 +162,7 @@ func TestRouteList(t *testing.T) {
ds := datastore.NewMock() ds := datastore.NewMock()
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -201,7 +201,7 @@ func TestRouteGet(t *testing.T) {
ds := datastore.NewMock() ds := datastore.NewMock()
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string

View File

@@ -245,8 +245,14 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout
priority := int32(0) priority := int32(0)
newTask.Priority = &priority newTask.Priority = &priority
newTask.Payload = string(pl) newTask.Payload = string(pl)
// Push to queue // Push to queue
enqueue(c, s.MQ, newTask) _, err = enqueue(c, s.MQ, newTask)
if err != nil {
handleErrorResponse(c, err)
return true
}
log.Info("Added new task to queue") log.Info("Added new task to queue")
c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID}) c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID})

View File

@@ -7,6 +7,7 @@ import (
"strings" "strings"
"testing" "testing"
"errors"
"gitlab-odx.oracle.com/odx/functions/api/datastore" "gitlab-odx.oracle.com/odx/functions/api/datastore"
"gitlab-odx.oracle.com/odx/functions/api/logs" "gitlab-odx.oracle.com/odx/functions/api/logs"
"gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/models"
@@ -35,7 +36,7 @@ func TestRouteRunnerGet(t *testing.T) {
}, nil, nil, nil, }, nil, nil, nil,
) )
logDB := logs.NewMock() logDB := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, logDB, rnr) srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -79,7 +80,7 @@ func TestRouteRunnerPost(t *testing.T) {
}, nil, nil, nil, }, nil, nil, nil,
) )
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -131,7 +132,7 @@ func TestRouteRunnerExecution(t *testing.T) {
) )
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string
@@ -171,6 +172,45 @@ func TestRouteRunnerExecution(t *testing.T) {
} }
} }
func TestFailedEnqueue(t *testing.T) {
buf := setLogBuffer()
rnr, cancelrnr := testRunner(t)
defer cancelrnr()
ds := datastore.NewMockInit(
[]*models.App{
{Name: "myapp", Config: models.Config{}},
},
[]*models.Route{
{Path: "/dummy", AppName: "myapp", Image: "dummy/dummy", Type: "async"},
}, nil, nil,
)
fnl := logs.NewMock()
enqueue := func(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) {
return nil, errors.New("Unable to push task to queue")
}
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, enqueue)
for i, test := range []struct {
path string
body string
method string
expectedCode int
expectedHeaders map[string][]string
}{
{"/r/myapp/dummy", ``, "POST", http.StatusInternalServerError, nil},
} {
body := strings.NewReader(test.body)
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
if rec.Code != test.expectedCode {
t.Log(buf.String())
t.Errorf("Test %d: Expected status code to be %d but was %d",
i, test.expectedCode, rec.Code)
}
}
}
func TestRouteRunnerTimeout(t *testing.T) { func TestRouteRunnerTimeout(t *testing.T) {
t.Skip("doesn't work on old Ubuntu") t.Skip("doesn't work on old Ubuntu")
buf := setLogBuffer() buf := setLogBuffer()
@@ -187,7 +227,7 @@ func TestRouteRunnerTimeout(t *testing.T) {
}, nil, nil, }, nil, nil,
) )
fnl := logs.NewMock() fnl := logs.NewMock()
srv := testServer(ds, &mqs.Mock{}, fnl, rnr) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
for i, test := range []struct { for i, test := range []struct {
path string path string

View File

@@ -22,16 +22,16 @@ import (
var tmpDatastoreTests = "/tmp/func_test_datastore.db" var tmpDatastoreTests = "/tmp/func_test_datastore.db"
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner) *Server { func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner, enqueue models.Enqueue) *Server {
ctx := context.Background() ctx := context.Background()
s := &Server{ s := &Server{
Runner: rnr, Runner: rnr,
Router: gin.New(), Router: gin.New(),
Datastore: ds, Datastore: ds,
LogDB: nil, LogDB: logDB,
MQ: mq, MQ: mq,
Enqueue: DefaultEnqueue, Enqueue: enqueue,
routeCache: cache.New(60*time.Second, 5*time.Minute), routeCache: cache.New(60*time.Second, 5*time.Minute),
} }
@@ -102,7 +102,7 @@ func TestFullStack(t *testing.T) {
rnr, rnrcancel := testRunner(t) rnr, rnrcancel := testRunner(t)
defer rnrcancel() defer rnrcancel()
srv := testServer(ds, &mqs.Mock{}, logDB, rnr) srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
for _, test := range []struct { for _, test := range []struct {
name string name string