mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'enqueue-error' into 'master'
Reject async requests in case if MQ is not reachable Closes #141 See merge request !127
This commit is contained in:
@@ -48,7 +48,7 @@ func TestAppCreate(t *testing.T) {
|
|||||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil},
|
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil},
|
||||||
} {
|
} {
|
||||||
rnr, cancel := testRunner(t)
|
rnr, cancel := testRunner(t)
|
||||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr)
|
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||||
router := srv.Router
|
router := srv.Router
|
||||||
|
|
||||||
body := bytes.NewBuffer([]byte(test.body))
|
body := bytes.NewBuffer([]byte(test.body))
|
||||||
@@ -92,7 +92,7 @@ func TestAppDelete(t *testing.T) {
|
|||||||
), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil},
|
), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil},
|
||||||
} {
|
} {
|
||||||
rnr, cancel := testRunner(t)
|
rnr, cancel := testRunner(t)
|
||||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
||||||
|
|
||||||
@@ -122,7 +122,7 @@ func TestAppList(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
ds := datastore.NewMock()
|
ds := datastore.NewMock()
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -159,7 +159,7 @@ func TestAppGet(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
ds := datastore.NewMock()
|
ds := datastore.NewMock()
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -218,7 +218,7 @@ func TestAppUpdate(t *testing.T) {
|
|||||||
), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil},
|
), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil},
|
||||||
} {
|
} {
|
||||||
rnr, cancel := testRunner(t)
|
rnr, cancel := testRunner(t)
|
||||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr)
|
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
body := bytes.NewBuffer([]byte(test.body))
|
body := bytes.NewBuffer([]byte(test.body))
|
||||||
_, rec := routerRequest(t, srv.Router, "PATCH", test.path, body)
|
_, rec := routerRequest(t, srv.Router, "PATCH", test.path, body)
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ type routeTestCase struct {
|
|||||||
|
|
||||||
func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) {
|
func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) {
|
||||||
rnr, cancel := testRunner(t)
|
rnr, cancel := testRunner(t)
|
||||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
body := bytes.NewBuffer([]byte(test.body))
|
body := bytes.NewBuffer([]byte(test.body))
|
||||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||||
@@ -131,7 +131,7 @@ func TestRouteDelete(t *testing.T) {
|
|||||||
), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil},
|
), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil},
|
||||||
} {
|
} {
|
||||||
rnr, cancel := testRunner(t)
|
rnr, cancel := testRunner(t)
|
||||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||||
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
||||||
|
|
||||||
if rec.Code != test.expectedCode {
|
if rec.Code != test.expectedCode {
|
||||||
@@ -162,7 +162,7 @@ func TestRouteList(t *testing.T) {
|
|||||||
ds := datastore.NewMock()
|
ds := datastore.NewMock()
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -201,7 +201,7 @@ func TestRouteGet(t *testing.T) {
|
|||||||
ds := datastore.NewMock()
|
ds := datastore.NewMock()
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
|
|||||||
@@ -245,8 +245,14 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout
|
|||||||
priority := int32(0)
|
priority := int32(0)
|
||||||
newTask.Priority = &priority
|
newTask.Priority = &priority
|
||||||
newTask.Payload = string(pl)
|
newTask.Payload = string(pl)
|
||||||
|
|
||||||
// Push to queue
|
// Push to queue
|
||||||
enqueue(c, s.MQ, newTask)
|
_, err = enqueue(c, s.MQ, newTask)
|
||||||
|
if err != nil {
|
||||||
|
handleErrorResponse(c, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Added new task to queue")
|
log.Info("Added new task to queue")
|
||||||
c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID})
|
c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID})
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"errors"
|
||||||
"gitlab-odx.oracle.com/odx/functions/api/datastore"
|
"gitlab-odx.oracle.com/odx/functions/api/datastore"
|
||||||
"gitlab-odx.oracle.com/odx/functions/api/logs"
|
"gitlab-odx.oracle.com/odx/functions/api/logs"
|
||||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||||
@@ -35,7 +36,7 @@ func TestRouteRunnerGet(t *testing.T) {
|
|||||||
}, nil, nil, nil,
|
}, nil, nil, nil,
|
||||||
)
|
)
|
||||||
logDB := logs.NewMock()
|
logDB := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr)
|
srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -79,7 +80,7 @@ func TestRouteRunnerPost(t *testing.T) {
|
|||||||
}, nil, nil, nil,
|
}, nil, nil, nil,
|
||||||
)
|
)
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -131,7 +132,7 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
@@ -171,6 +172,45 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFailedEnqueue(t *testing.T) {
|
||||||
|
buf := setLogBuffer()
|
||||||
|
rnr, cancelrnr := testRunner(t)
|
||||||
|
defer cancelrnr()
|
||||||
|
|
||||||
|
ds := datastore.NewMockInit(
|
||||||
|
[]*models.App{
|
||||||
|
{Name: "myapp", Config: models.Config{}},
|
||||||
|
},
|
||||||
|
[]*models.Route{
|
||||||
|
{Path: "/dummy", AppName: "myapp", Image: "dummy/dummy", Type: "async"},
|
||||||
|
}, nil, nil,
|
||||||
|
)
|
||||||
|
fnl := logs.NewMock()
|
||||||
|
|
||||||
|
enqueue := func(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) {
|
||||||
|
return nil, errors.New("Unable to push task to queue")
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, enqueue)
|
||||||
|
for i, test := range []struct {
|
||||||
|
path string
|
||||||
|
body string
|
||||||
|
method string
|
||||||
|
expectedCode int
|
||||||
|
expectedHeaders map[string][]string
|
||||||
|
}{
|
||||||
|
{"/r/myapp/dummy", ``, "POST", http.StatusInternalServerError, nil},
|
||||||
|
} {
|
||||||
|
body := strings.NewReader(test.body)
|
||||||
|
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||||
|
if rec.Code != test.expectedCode {
|
||||||
|
t.Log(buf.String())
|
||||||
|
t.Errorf("Test %d: Expected status code to be %d but was %d",
|
||||||
|
i, test.expectedCode, rec.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRouteRunnerTimeout(t *testing.T) {
|
func TestRouteRunnerTimeout(t *testing.T) {
|
||||||
t.Skip("doesn't work on old Ubuntu")
|
t.Skip("doesn't work on old Ubuntu")
|
||||||
buf := setLogBuffer()
|
buf := setLogBuffer()
|
||||||
@@ -187,7 +227,7 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
|||||||
}, nil, nil,
|
}, nil, nil,
|
||||||
)
|
)
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
path string
|
path string
|
||||||
|
|||||||
@@ -22,16 +22,16 @@ import (
|
|||||||
|
|
||||||
var tmpDatastoreTests = "/tmp/func_test_datastore.db"
|
var tmpDatastoreTests = "/tmp/func_test_datastore.db"
|
||||||
|
|
||||||
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner) *Server {
|
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner, enqueue models.Enqueue) *Server {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
Runner: rnr,
|
Runner: rnr,
|
||||||
Router: gin.New(),
|
Router: gin.New(),
|
||||||
Datastore: ds,
|
Datastore: ds,
|
||||||
LogDB: nil,
|
LogDB: logDB,
|
||||||
MQ: mq,
|
MQ: mq,
|
||||||
Enqueue: DefaultEnqueue,
|
Enqueue: enqueue,
|
||||||
routeCache: cache.New(60*time.Second, 5*time.Minute),
|
routeCache: cache.New(60*time.Second, 5*time.Minute),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +102,7 @@ func TestFullStack(t *testing.T) {
|
|||||||
rnr, rnrcancel := testRunner(t)
|
rnr, rnrcancel := testRunner(t)
|
||||||
defer rnrcancel()
|
defer rnrcancel()
|
||||||
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr)
|
srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
|
||||||
|
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
name string
|
name string
|
||||||
|
|||||||
Reference in New Issue
Block a user