diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index f2e12c0bf..d54efdbac 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -189,12 +189,13 @@ func TestTasksrvURL(t *testing.T) { } } -func testRunner(t *testing.T) *Runner { - r, err := New(NewFuncLogger(), NewMetricLogger()) +func testRunner(t *testing.T) (*Runner, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + r, err := New(ctx, NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatal("Test: failed to create new runner") } - return r + return r, cancel } func TestAsyncRunnersGracefulShutdown(t *testing.T) { @@ -217,7 +218,9 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) { } }() - startAsyncRunners(ctx, ts.URL+"/tasks", tasks, testRunner(t)) + rnr, cancel := testRunner(t) + defer cancel() + startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr) if err := ctx.Err(); err != context.DeadlineExceeded { t.Log(buf.String()) diff --git a/api/runner/runner.go b/api/runner/runner.go index 2d39e2453..56911fd92 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -39,7 +39,7 @@ var ( WaitMemoryTimeout = 10 * time.Second ) -func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) { +func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, error) { // TODO: Is this really required for the container drivers? Can we remove it? env := common.NewEnvironment(func(e *common.Environment) {}) @@ -58,7 +58,7 @@ func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) { usedMem: 0, } - go r.queueHandler() + go r.queueHandler(ctx) return r, nil } @@ -67,42 +67,52 @@ func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) { // If there's memory then send signal to the task to proceed. // If there's not available memory to run the task it waits // If the task waits for more than X seconds it timeouts -func (r *Runner) queueHandler() { - var task *containerTask - var waitStart time.Time - var waitTime time.Duration - var timedOut bool +func (r *Runner) queueHandler(ctx context.Context) { +consumeQueue: for { select { - case task = <-r.taskQueue: - waitStart = time.Now() - timedOut = false + case task := <-r.taskQueue: + r.handleTask(task) + case <-ctx.Done(): + break consumeQueue } - - // Loop waiting for available memory - for !r.checkRequiredMem(task.cfg.Memory) { - waitTime = time.Since(waitStart) - if waitTime > WaitMemoryTimeout { - timedOut = true - break - } - time.Sleep(time.Microsecond) - } - - metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) - r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime) - r.mlog.LogTime(task.ctx, "run.wait_time", waitTime) - - if timedOut { - // Send to a signal to this task saying it cannot run - r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1) - task.canRun <- false - continue - } - - // Send a signal to this task saying it can run - task.canRun <- true } + + // consume remainders + for len(r.taskQueue) > 0 { + r.handleTask(<-r.taskQueue) + } +} + +func (r *Runner) handleTask(task *containerTask) { + waitStart := time.Now() + + var waitTime time.Duration + var timedOut bool + + // Loop waiting for available memory + for !r.checkRequiredMem(task.cfg.Memory) { + waitTime = time.Since(waitStart) + if waitTime > WaitMemoryTimeout { + timedOut = true + break + } + time.Sleep(time.Microsecond) + } + + metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) + r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime) + r.mlog.LogTime(task.ctx, "run.wait_time", waitTime) + + if timedOut { + // Send to a signal to this task saying it cannot run + r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1) + task.canRun <- false + return + } + + // Send a signal to this task saying it can run + task.canRun <- true } func (r *Runner) hasAsyncAvailableMemory() bool { diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index d65ca3560..2f49503c8 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -14,13 +14,14 @@ import ( func TestRunnerHello(t *testing.T) { buf := setLogBuffer() - runner, err := New(NewFuncLogger(), NewMetricLogger()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + runner, err := New(ctx, NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } - ctx := context.Background() - for i, test := range []struct { route *models.Route payload string @@ -67,13 +68,14 @@ func TestRunnerHello(t *testing.T) { func TestRunnerError(t *testing.T) { t.Skip() buf := setLogBuffer() - runner, err := New(NewFuncLogger(), NewMetricLogger()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + runner, err := New(ctx, NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } - ctx := context.Background() - for i, test := range []struct { route *models.Route payload string diff --git a/api/server/apps_test.go b/api/server/apps_test.go index c50b2cfee..a46f979f7 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -57,7 +57,8 @@ func TestAppCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusCreated, nil}, } { - router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.mock, &mqs.Mock{}, rnr, tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "POST", test.path, body) @@ -77,6 +78,7 @@ func TestAppCreate(t *testing.T) { i, test.expectedError.Error()) } } + cancel() } } @@ -99,7 +101,8 @@ func TestAppDelete(t *testing.T) { }}, }, "/v1/apps/myapp", "", http.StatusOK, nil}, } { - router := testRouter(test.ds, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.ds, &mqs.Mock{}, rnr, tasks) _, rec := routerRequest(t, router, "DELETE", test.path, nil) @@ -118,6 +121,7 @@ func TestAppDelete(t *testing.T) { i, test.expectedError.Error()) } } + cancel() } } @@ -126,7 +130,9 @@ func TestAppList(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + defer cancel() + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -161,7 +167,9 @@ func TestAppGet(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + defer cancel() + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -213,7 +221,8 @@ func TestAppUpdate(t *testing.T) { }}, }, "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil}, } { - router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.mock, &mqs.Mock{}, rnr, tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "PUT", test.path, body) @@ -233,5 +242,7 @@ func TestAppUpdate(t *testing.T) { i, test.expectedError.Error()) } } + + cancel() } } diff --git a/api/server/routes_test.go b/api/server/routes_test.go index 83f5fa3c9..a03e4a6a1 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -36,7 +36,8 @@ func TestRouteCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps/a/routes", `{ "route": { "image": "iron/hello", "path": "/myroute" } }`, http.StatusCreated, nil}, } { - router := testRouter(test.mock, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.mock, &mqs.Mock{}, rnr, tasks) body := bytes.NewBuffer([]byte(test.body)) _, rec := routerRequest(t, router, "POST", test.path, body) @@ -56,6 +57,7 @@ func TestRouteCreate(t *testing.T) { i, test.expectedError.Error(), resp.Error.Message) } } + cancel() } } @@ -79,7 +81,8 @@ func TestRouteDelete(t *testing.T) { }, }, "/v1/apps/a/routes/myroute", "", http.StatusOK, nil}, } { - router := testRouter(test.ds, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.ds, &mqs.Mock{}, rnr, tasks) _, rec := routerRequest(t, router, "DELETE", test.path, nil) if rec.Code != test.expectedCode { @@ -97,6 +100,7 @@ func TestRouteDelete(t *testing.T) { i, test.expectedError.Error()) } } + cancel() } } @@ -105,7 +109,9 @@ func TestRouteList(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + defer cancel() + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -140,7 +146,10 @@ func TestRouteGet(t *testing.T) { tasks := mockTasksConduit() defer close(tasks) - router := testRouter(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + defer cancel() + + router := testRouter(&datastore.Mock{}, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -196,7 +205,8 @@ func TestRouteUpdate(t *testing.T) { }, }, "/v1/apps/a/routes/myroute/do", `{ "route": { "image": "iron/hello", "path": "/myroute" } }`, http.StatusOK, nil}, } { - router := testRouter(test.ds, &mqs.Mock{}, testRunner(t), tasks) + rnr, cancel := testRunner(t) + router := testRouter(test.ds, &mqs.Mock{}, rnr, tasks) body := bytes.NewBuffer([]byte(test.body)) @@ -217,5 +227,6 @@ func TestRouteUpdate(t *testing.T) { i, test.expectedError.Error()) } } + cancel() } } diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 4531c16b1..cb19683d2 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -76,7 +76,8 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { wg.Add(1) fmt.Println("About to start router") - router := testRouterAsync(ds, mq, testRunner(t), tasks, func(_ context.Context, _ models.MessageQueue, task *models.Task) (*models.Task, error) { + rnr, cancel := testRunner(t) + router := testRouterAsync(ds, mq, rnr, tasks, func(_ context.Context, _ models.MessageQueue, task *models.Task) (*models.Task, error) { if test.body != task.Payload { t.Errorf("Test %d: Expected task Payload to be the same as the test body", i) } @@ -109,5 +110,6 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { } wg.Wait() + cancel() } } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 5ea245837..ab877f3f6 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -14,23 +14,27 @@ import ( "github.com/iron-io/functions/api/runner/task" ) -func testRunner(t *testing.T) *runner.Runner { - r, err := runner.New(runner.NewFuncLogger(), runner.NewMetricLogger()) +func testRunner(t *testing.T) (*runner.Runner, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + r, err := runner.New(ctx, runner.NewFuncLogger(), runner.NewMetricLogger()) if err != nil { t.Fatal("Test: failed to create new runner") } - return r + return r, cancel } func TestRouteRunnerGet(t *testing.T) { buf := setLogBuffer() tasks := mockTasksConduit() + rnr, cancel := testRunner(t) + defer cancel() + router := testRouter(&datastore.Mock{ Apps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, - }, &mqs.Mock{}, testRunner(t), tasks) + }, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -66,11 +70,14 @@ func TestRouteRunnerPost(t *testing.T) { buf := setLogBuffer() tasks := mockTasksConduit() + rnr, cancel := testRunner(t) + defer cancel() + router := testRouter(&datastore.Mock{ Apps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, - }, &mqs.Mock{}, testRunner(t), tasks) + }, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -111,7 +118,10 @@ func TestRouteRunnerExecution(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go runner.StartWorkers(ctx, testRunner(t), tasks) + rnr, cancelrnr := testRunner(t) + defer cancelrnr() + + go runner.StartWorkers(ctx, rnr, tasks) router := testRouter(&datastore.Mock{ Apps: []*models.App{ @@ -121,7 +131,7 @@ func TestRouteRunnerExecution(t *testing.T) { {Path: "/myroute", AppName: "myapp", Image: "iron/hello", Headers: map[string][]string{"X-Function": {"Test"}}}, {Path: "/myerror", AppName: "myapp", Image: "iron/error", Headers: map[string][]string{"X-Function": {"Test"}}}, }, - }, &mqs.Mock{}, testRunner(t), tasks) + }, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string @@ -167,7 +177,9 @@ func TestRouteRunnerTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go runner.StartWorkers(ctx, testRunner(t), tasks) + rnr, cancelrnr := testRunner(t) + defer cancelrnr() + go runner.StartWorkers(ctx, rnr, tasks) router := testRouter(&datastore.Mock{ Apps: []*models.App{ @@ -176,7 +188,7 @@ func TestRouteRunnerTimeout(t *testing.T) { Routes: []*models.Route{ {Path: "/sleeper", AppName: "myapp", Image: "iron/sleeper", Timeout: 1}, }, - }, &mqs.Mock{}, testRunner(t), tasks) + }, &mqs.Mock{}, rnr, tasks) for i, test := range []struct { path string diff --git a/api/server/server_test.go b/api/server/server_test.go index da2e31d93..95a00da70 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -94,9 +94,13 @@ func TestFullStack(t *testing.T) { tasks := make(chan task.Request) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go runner.StartWorkers(ctx, testRunner(t), tasks) - router := testRouter(ds, &mqs.Mock{}, testRunner(t), tasks) + rnr, rnrcancel := testRunner(t) + defer rnrcancel() + + go runner.StartWorkers(ctx, rnr, tasks) + + router := testRouter(ds, &mqs.Mock{}, rnr, tasks) for _, test := range []struct { name string diff --git a/main.go b/main.go index 9ff3051a5..adec495e5 100644 --- a/main.go +++ b/main.go @@ -73,7 +73,7 @@ func main() { metricLogger := runner.NewMetricLogger() funcLogger := runner.NewFuncLogger() - rnr, err := runner.New(funcLogger, metricLogger) + rnr, err := runner.New(ctx, funcLogger, metricLogger) if err != nil { log.WithError(err).Fatalln("Failed to create a runner") }