diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 118bc23d2..66dbe510e 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -14,6 +14,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/models" + "github.com/iron-io/runner/drivers" ) func getTask(url string) (*models.Task, error) { @@ -83,16 +84,15 @@ func deleteTask(url string, task *models.Task) error { return nil } -func runTask(task *models.Task) error { +func runTask(task *models.Task) (drivers.RunResult, error) { // Set up runner and process task cfg := getCfg(task) ctx := context.Background() rnr, err := New(NewMetricLogger()) if err != nil { - return err + return nil, err } - _, err = rnr.Run(ctx, cfg) - return err + return rnr.Run(ctx, cfg) } // RunAsyncRunner pulls tasks off a queue and processes them @@ -110,7 +110,7 @@ func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port wgAsync.Done() } -func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) error) { +func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) { defer wg.Done() for { select { @@ -132,7 +132,7 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin log.Info("Running task:", task.ID) // Process Task - if err := runTask(task); err != nil { + if _, err := runTask(task); err != nil { log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task") continue } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 196358ae2..4b9b2dfae 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -16,6 +16,7 @@ import ( "github.com/gin-gonic/gin" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" + "github.com/iron-io/runner/drivers" ) func getMockTask() models.Task { @@ -79,6 +80,22 @@ func getTestServer(mockTasks []*models.Task) *httptest.Server { return httptest.NewServer(r) } +var helloImage = "iron/hello" + +func TestRunTask(t *testing.T) { + mockTask := getMockTask() + mockTask.Image = &helloImage + + result, err := runTask(&mockTask) + if err != nil { + t.Error(err) + } + + if result.Status() != "success" { + t.Errorf("TestRunTask failed to execute runTask") + } +} + func TestGetTask(t *testing.T) { mockTask := getMockTask() @@ -178,8 +195,8 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) { ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) var wg sync.WaitGroup wg.Add(1) - go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(task *models.Task) error { - return nil + go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(task *models.Task) (drivers.RunResult, error) { + return nil, nil }) wg.Wait()