functions: fix goroutine leak in runner (#394)

* functions: fix goroutine leak in runner

* functions: ensure taskQueue is consumed after context cancellation
This commit is contained in:
C Cirello
2016-12-06 16:11:06 +01:00
committed by GitHub
parent f0fc85b85a
commit 0cdd1db3e1
9 changed files with 122 additions and 67 deletions

View File

@@ -189,12 +189,13 @@ func TestTasksrvURL(t *testing.T) {
}
}
func testRunner(t *testing.T) *Runner {
r, err := New(NewFuncLogger(), NewMetricLogger())
func testRunner(t *testing.T) (*Runner, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
r, err := New(ctx, NewFuncLogger(), NewMetricLogger())
if err != nil {
t.Fatal("Test: failed to create new runner")
}
return r
return r, cancel
}
func TestAsyncRunnersGracefulShutdown(t *testing.T) {
@@ -217,7 +218,9 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) {
}
}()
startAsyncRunners(ctx, ts.URL+"/tasks", tasks, testRunner(t))
rnr, cancel := testRunner(t)
defer cancel()
startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr)
if err := ctx.Err(); err != context.DeadlineExceeded {
t.Log(buf.String())

View File

@@ -39,7 +39,7 @@ var (
WaitMemoryTimeout = 10 * time.Second
)
func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) {
func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, error) {
// TODO: Is this really required for the container drivers? Can we remove it?
env := common.NewEnvironment(func(e *common.Environment) {})
@@ -58,7 +58,7 @@ func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) {
usedMem: 0,
}
go r.queueHandler()
go r.queueHandler(ctx)
return r, nil
}
@@ -67,42 +67,52 @@ func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) {
// If there's memory then send signal to the task to proceed.
// If there's not available memory to run the task it waits
// If the task waits for more than X seconds it timeouts
func (r *Runner) queueHandler() {
var task *containerTask
var waitStart time.Time
var waitTime time.Duration
var timedOut bool
func (r *Runner) queueHandler(ctx context.Context) {
consumeQueue:
for {
select {
case task = <-r.taskQueue:
waitStart = time.Now()
timedOut = false
case task := <-r.taskQueue:
r.handleTask(task)
case <-ctx.Done():
break consumeQueue
}
// Loop waiting for available memory
for !r.checkRequiredMem(task.cfg.Memory) {
waitTime = time.Since(waitStart)
if waitTime > WaitMemoryTimeout {
timedOut = true
break
}
time.Sleep(time.Microsecond)
}
metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName)
r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime)
r.mlog.LogTime(task.ctx, "run.wait_time", waitTime)
if timedOut {
// Send to a signal to this task saying it cannot run
r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1)
task.canRun <- false
continue
}
// Send a signal to this task saying it can run
task.canRun <- true
}
// consume remainders
for len(r.taskQueue) > 0 {
r.handleTask(<-r.taskQueue)
}
}
func (r *Runner) handleTask(task *containerTask) {
waitStart := time.Now()
var waitTime time.Duration
var timedOut bool
// Loop waiting for available memory
for !r.checkRequiredMem(task.cfg.Memory) {
waitTime = time.Since(waitStart)
if waitTime > WaitMemoryTimeout {
timedOut = true
break
}
time.Sleep(time.Microsecond)
}
metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName)
r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime)
r.mlog.LogTime(task.ctx, "run.wait_time", waitTime)
if timedOut {
// Send to a signal to this task saying it cannot run
r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1)
task.canRun <- false
return
}
// Send a signal to this task saying it can run
task.canRun <- true
}
func (r *Runner) hasAsyncAvailableMemory() bool {

View File

@@ -14,13 +14,14 @@ import (
func TestRunnerHello(t *testing.T) {
buf := setLogBuffer()
runner, err := New(NewFuncLogger(), NewMetricLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runner, err := New(ctx, NewFuncLogger(), NewMetricLogger())
if err != nil {
t.Fatalf("Test error during New() - %s", err)
}
ctx := context.Background()
for i, test := range []struct {
route *models.Route
payload string
@@ -67,13 +68,14 @@ func TestRunnerHello(t *testing.T) {
func TestRunnerError(t *testing.T) {
t.Skip()
buf := setLogBuffer()
runner, err := New(NewFuncLogger(), NewMetricLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runner, err := New(ctx, NewFuncLogger(), NewMetricLogger())
if err != nil {
t.Fatalf("Test error during New() - %s", err)
}
ctx := context.Background()
for i, test := range []struct {
route *models.Route
payload string