add runTask test (#131)

This commit is contained in:
Pedro Nasser
2016-10-12 17:32:06 -03:00
committed by C Cirello
parent 2e12e2c700
commit 1ff480561a
2 changed files with 25 additions and 8 deletions

View File

@@ -14,6 +14,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/models"
"github.com/iron-io/runner/drivers"
) )
func getTask(url string) (*models.Task, error) { func getTask(url string) (*models.Task, error) {
@@ -83,16 +84,15 @@ func deleteTask(url string, task *models.Task) error {
return nil return nil
} }
func runTask(task *models.Task) error { func runTask(task *models.Task) (drivers.RunResult, error) {
// Set up runner and process task // Set up runner and process task
cfg := getCfg(task) cfg := getCfg(task)
ctx := context.Background() ctx := context.Background()
rnr, err := New(NewMetricLogger()) rnr, err := New(NewMetricLogger())
if err != nil { if err != nil {
return err return nil, err
} }
_, err = rnr.Run(ctx, cfg) return rnr.Run(ctx, cfg)
return err
} }
// RunAsyncRunner pulls tasks off a queue and processes them // RunAsyncRunner pulls tasks off a queue and processes them
@@ -110,7 +110,7 @@ func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port
wgAsync.Done() wgAsync.Done()
} }
func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) error) { func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
@@ -132,7 +132,7 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin
log.Info("Running task:", task.ID) log.Info("Running task:", task.ID)
// Process Task // Process Task
if err := runTask(task); err != nil { if _, err := runTask(task); err != nil {
log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task") log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task")
continue continue
} }

View File

@@ -16,6 +16,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/mqs"
"github.com/iron-io/runner/drivers"
) )
func getMockTask() models.Task { func getMockTask() models.Task {
@@ -79,6 +80,22 @@ func getTestServer(mockTasks []*models.Task) *httptest.Server {
return httptest.NewServer(r) return httptest.NewServer(r)
} }
var helloImage = "iron/hello"
func TestRunTask(t *testing.T) {
mockTask := getMockTask()
mockTask.Image = &helloImage
result, err := runTask(&mockTask)
if err != nil {
t.Error(err)
}
if result.Status() != "success" {
t.Errorf("TestRunTask failed to execute runTask")
}
}
func TestGetTask(t *testing.T) { func TestGetTask(t *testing.T) {
mockTask := getMockTask() mockTask := getMockTask()
@@ -178,8 +195,8 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(task *models.Task) error { go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(task *models.Task) (drivers.RunResult, error) {
return nil return nil, nil
}) })
wg.Wait() wg.Wait()