mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Add graceful shutdown support for async runners (#125)
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
@@ -95,28 +96,50 @@ func runTask(task *models.Task) error {
|
||||
}
|
||||
|
||||
// RunAsyncRunner pulls tasks off a queue and processes them
|
||||
func RunAsyncRunner(tasksrv, port string) {
|
||||
func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port string, n int) {
|
||||
u := tasksrvURL(tasksrv, port)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go startAsyncRunners(ctx, &wg, i, u, runTask)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
<-ctx.Done()
|
||||
wgAsync.Done()
|
||||
}
|
||||
|
||||
func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) error) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
task, err := getTask(u)
|
||||
if err != nil {
|
||||
log.WithError(err).Info("Cannot get task")
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
log.Info("Picked up task:", task.ID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
// Process Task
|
||||
if err := runTask(task); err != nil {
|
||||
log.WithError(err)
|
||||
continue
|
||||
}
|
||||
log.Info("Processed task:", task.ID)
|
||||
default:
|
||||
task, err := getTask(url)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not fetch task")
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
log.Info("Picked up task:", task.ID)
|
||||
|
||||
log.Info("Running task:", task.ID)
|
||||
// Process Task
|
||||
if err := runTask(task); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task")
|
||||
continue
|
||||
}
|
||||
log.Info("Processed task:", task.ID)
|
||||
|
||||
// Delete task from queue
|
||||
if err := deleteTask(url, task); err != nil {
|
||||
log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot delete task")
|
||||
continue
|
||||
}
|
||||
|
||||
// Delete task from queue
|
||||
if err := deleteTask(u, task); err != nil {
|
||||
log.WithError(err)
|
||||
} else {
|
||||
log.Info("Deleted task:", task.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,9 @@ import (
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -167,3 +169,21 @@ func TestTasksrvURL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsyncRunnersGracefulShutdown(t *testing.T) {
|
||||
mockTask := getMockTask()
|
||||
ts := getTestServer([]*models.Task{&mockTask})
|
||||
defer ts.Close()
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(task *models.Task) error {
|
||||
return nil
|
||||
})
|
||||
wg.Wait()
|
||||
|
||||
if err := ctx.Err(); err != context.DeadlineExceeded {
|
||||
t.Errorf("async runners stopped unexpectedly. context error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user