functions: common concurrency stream for sync and async (#314)

* functions: add bounded concurrency

* functions: plug runners to sync and async interfaces

* functions: update documentation about the new env var

* functions: fix test flakiness

* functions: the runner is self-regulated, no need to set a number of runners

* functions: push the execution to the background on incoming requests

* functions: ensure async tasks are always on

* functions: add prioritization to tasks consumption

Ensure that Sync tasks are consumed before Async tasks. Also, fixes
termination races problems for free.

* functions: remove stale comments

* functions: improve mem availability calculation

* functions: parallel run for async tasks

* functions: check for memory availability before pulling async task

* functions: comment about rnr.hasAvailableMemory and sync.Cond

* functions: implement memory check for async runners using Cond vars

* functions: code grooming

- remove unnecessary goroutines
- fix stale docs
- reorganize import group

* Revert "functions: implement memory check for async runners using Cond vars"

This reverts commit 922e64032201a177c03ce6a46240925e3d35430d.

* Revert "functions: comment about rnr.hasAvailableMemory and sync.Cond"

This reverts commit 49ad7d52d341f12da9603b1a1df9d145871f0e0a.

* functions: set a minimum memory availability for sync

* functions: simplify the implementation by removing the priority queue

* functions: code grooming

- code deduplication
- review waitgroups Waits
This commit is contained in:
C Cirello
2016-11-18 18:23:26 +01:00
committed by Seif Lotfy سيف لطفي
parent c1f361dd0c
commit 9d06b6e687
13 changed files with 205 additions and 98 deletions

View File

@@ -15,7 +15,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/runner/common"
"github.com/iron-io/runner/drivers"
)
func getTask(ctx context.Context, url string) (*models.Task, error) {
@@ -85,30 +84,14 @@ func deleteTask(url string, task *models.Task) error {
return nil
}
func runTask(ctx context.Context, task *models.Task) (drivers.RunResult, error) {
// Set up runner and process task
cfg := getCfg(task)
rnr, err := New(NewMetricLogger())
if err != nil {
return nil, err
}
return rnr.Run(ctx, cfg)
}
// RunAsyncRunner pulls tasks off a queue and processes them
func RunAsyncRunner(ctx context.Context, tasksrv string, n int) {
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan TaskRequest, rnr *Runner) {
u, h := tasksrvURL(tasksrv)
if isHostOpen(h) {
return
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go startAsyncRunners(ctx, &wg, i, u, runTask)
}
wg.Wait()
startAsyncRunners(ctx, u, tasks, rnr)
<-ctx.Done()
}
@@ -121,16 +104,21 @@ func isHostOpen(host string) bool {
return available
}
// todo: not a big fan of this anonymous function for testing, should use an interface and make a Mock object for testing - TR
func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(ctx context.Context, task *models.Task) (drivers.RunResult, error)) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"async_runner": i})
defer wg.Done()
func startAsyncRunners(ctx context.Context, url string, tasks chan TaskRequest, rnr *Runner) {
var wg sync.WaitGroup
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"})
for {
select {
case <-ctx.Done():
wg.Wait()
return
default:
if !rnr.hasAsyncAvailableMemory() {
log.Debug("memory full")
time.Sleep(1 * time.Second)
continue
}
task, err := getTask(ctx, url)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
@@ -148,11 +136,16 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
log.Debug("Running task:", task.ID)
// Process Task
if _, err := runTask(ctx, task); err != nil {
log.WithError(err).Error("Cannot run task")
continue
}
wg.Add(1)
go func() {
defer wg.Done()
// Process Task
if _, err := RunTask(tasks, ctx, getCfg(task)); err != nil {
log.WithError(err).Error("Cannot run task")
}
}()
log.Debug("Processed task")
// Delete task from queue
@@ -160,8 +153,8 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin
log.WithError(err).Error("Cannot delete task")
continue
}
log.Info("Task complete")
}
}
}

View File

@@ -10,7 +10,6 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@@ -18,7 +17,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs"
"github.com/iron-io/runner/drivers"
)
func setLogBuffer() *bytes.Buffer {
@@ -92,22 +90,6 @@ func getTestServer(mockTasks []*models.Task) *httptest.Server {
return httptest.NewServer(r)
}
var helloImage = "iron/hello"
func TestRunTask(t *testing.T) {
mockTask := getMockTask()
mockTask.Image = &helloImage
result, err := runTask(context.Background(), &mockTask)
if err != nil {
t.Error(err)
}
if result.Status() != "success" {
t.Errorf("TestRunTask failed to execute runTask")
}
}
func TestGetTask(t *testing.T) {
buf := setLogBuffer()
mockTask := getMockTask()
@@ -206,19 +188,35 @@ func TestTasksrvURL(t *testing.T) {
}
}
func testRunner(t *testing.T) *Runner {
r, err := New(NewMetricLogger())
if err != nil {
t.Fatal("Test: failed to create new runner")
}
return r
}
func TestAsyncRunnersGracefulShutdown(t *testing.T) {
buf := setLogBuffer()
mockTask := getMockTask()
ts := getTestServer([]*models.Task{&mockTask})
defer ts.Close()
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
var wg sync.WaitGroup
wg.Add(1)
go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(ctx context.Context, task *models.Task) (drivers.RunResult, error) {
return nil, nil
})
wg.Wait()
tasks := make(chan TaskRequest)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
defer close(tasks)
go func() {
for t := range tasks {
t.Response <- TaskResponse{
Result: nil,
Err: nil,
}
}
}()
startAsyncRunners(ctx, ts.URL+"/tasks", tasks, testRunner(t))
if err := ctx.Err(); err != context.DeadlineExceeded {
t.Log(buf.String())

View File

@@ -115,10 +115,17 @@ func (r *Runner) queueHandler() {
}
}
func (r *Runner) hasAsyncAvailableMemory() bool {
r.usedMemMutex.RLock()
defer r.usedMemMutex.RUnlock()
// reserve at least half of the memory for sync
return (r.availableMem/2)-r.usedMem > 0
}
func (r *Runner) checkRequiredMem(req uint64) bool {
r.usedMemMutex.RLock()
defer r.usedMemMutex.RUnlock()
return r.availableMem-r.usedMem/int64(req)*1024*1024 > 0
return (r.availableMem-r.usedMem)/int64(req)*1024*1024 > 0
}
func (r *Runner) addUsedMem(used int64) {
@@ -136,7 +143,7 @@ func (r *Runner) checkMemAndUse(req uint64) bool {
used := int64(req) * 1024 * 1024
if r.availableMem-r.usedMem/used < 0 {
if (r.availableMem-r.usedMem)/used < 0 {
return false
}

52
api/runner/worker.go Normal file
View File

@@ -0,0 +1,52 @@
package runner
import (
"context"
"sync"
"github.com/iron-io/runner/drivers"
)
type TaskRequest struct {
Ctx context.Context
Config *Config
Response chan TaskResponse
}
type TaskResponse struct {
Result drivers.RunResult
Err error
}
// StartWorkers handle incoming tasks and spawns self-regulating container
// workers.
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan TaskRequest) {
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
wg.Wait()
return
case task := <-tasks:
wg.Add(1)
go func(task TaskRequest) {
defer wg.Done()
result, err := rnr.Run(task.Ctx, task.Config)
select {
case task.Response <- TaskResponse{result, err}:
close(task.Response)
default:
}
}(task)
}
}
}
func RunTask(tasks chan TaskRequest, ctx context.Context, cfg *Config) (drivers.RunResult, error) {
tresp := make(chan TaskResponse)
treq := TaskRequest{Ctx: ctx, Config: cfg, Response: tresp}
tasks <- treq
resp := <-treq.Response
return resp.Result, resp.Err
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/iron-io/functions/api/datastore"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs"
"github.com/iron-io/functions/api/runner"
)
func setLogBuffer() *bytes.Buffer {
@@ -24,9 +25,19 @@ func setLogBuffer() *bytes.Buffer {
return &buf
}
func mockTasksConduit() chan runner.TaskRequest {
tasks := make(chan runner.TaskRequest)
go func() {
for range tasks {
}
}()
return tasks
}
func TestAppCreate(t *testing.T) {
buf := setLogBuffer()
tasks := mockTasksConduit()
defer close(tasks)
for i, test := range []struct {
mock *datastore.Mock
path string
@@ -46,7 +57,7 @@ func TestAppCreate(t *testing.T) {
// success
{&datastore.Mock{}, "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusCreated, nil},
} {
s := New(test.mock, &mqs.Mock{}, testRunner(t))
s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
body := bytes.NewBuffer([]byte(test.body))
@@ -72,7 +83,10 @@ func TestAppCreate(t *testing.T) {
func TestAppDelete(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -106,7 +120,10 @@ func TestAppDelete(t *testing.T) {
func TestAppList(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -139,7 +156,10 @@ func TestAppList(t *testing.T) {
func TestAppGet(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -172,6 +192,8 @@ func TestAppGet(t *testing.T) {
func TestAppUpdate(t *testing.T) {
buf := setLogBuffer()
tasks := mockTasksConduit()
defer close(tasks)
for i, test := range []struct {
mock *datastore.Mock
@@ -190,7 +212,7 @@ func TestAppUpdate(t *testing.T) {
},
}, "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil},
} {
s := New(test.mock, &mqs.Mock{}, testRunner(t))
s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
body := bytes.NewBuffer([]byte(test.body))

View File

@@ -13,6 +13,8 @@ import (
func TestRouteCreate(t *testing.T) {
buf := setLogBuffer()
tasks := mockTasksConduit()
defer close(tasks)
for i, test := range []struct {
mock *datastore.Mock
@@ -34,7 +36,7 @@ func TestRouteCreate(t *testing.T) {
// success
{&datastore.Mock{}, "/v1/apps/a/routes", `{ "route": { "image": "iron/hello", "path": "/myroute" } }`, http.StatusCreated, nil},
} {
s := New(test.mock, &mqs.Mock{}, testRunner(t))
s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
body := bytes.NewBuffer([]byte(test.body))
@@ -60,7 +62,10 @@ func TestRouteCreate(t *testing.T) {
func TestRouteDelete(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -94,7 +99,10 @@ func TestRouteDelete(t *testing.T) {
func TestRouteList(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -127,7 +135,10 @@ func TestRouteList(t *testing.T) {
func TestRouteGet(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -160,7 +171,10 @@ func TestRouteGet(t *testing.T) {
func TestRouteUpdate(t *testing.T) {
buf := setLogBuffer()
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t))
tasks := mockTasksConduit()
defer close(tasks)
s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {

View File

@@ -15,7 +15,6 @@ import (
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/runner/common"
"github.com/iron-io/runner/drivers"
uuid "github.com/satori/go.uuid"
)
@@ -35,7 +34,7 @@ func ToEnvName(envtype, name string) string {
return fmt.Sprintf("%s_%s", envtype, name)
}
func handleRequest(c *gin.Context, enqueue models.Enqueue) {
func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
if strings.HasPrefix(c.Request.URL.Path, "/v1") {
c.Status(http.StatusNotFound)
return
@@ -156,7 +155,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
Stdin: payload,
}
var result drivers.RunResult
switch found.Type {
case "async":
// Read payload
@@ -182,7 +180,9 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
log.Info("Added new task to queue")
default:
if result, err = Api.Runner.Run(c, cfg); err != nil {
result, err := runner.RunTask(s.tasks, ctx, cfg)
if err != nil {
break
}
for k, v := range found.Headers {
@@ -194,6 +194,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
} else {
c.AbortWithStatus(http.StatusInternalServerError)
}
}
}

View File

@@ -30,6 +30,8 @@ func testRouterAsync(s *Server, enqueueFunc models.Enqueue) *gin.Engine {
func TestRouteRunnerAsyncExecution(t *testing.T) {
t.Skip()
tasks := mockTasksConduit()
// todo: I broke how this test works trying to clean up the code a bit. Is there a better way to do this test rather than having to override the default route behavior?
s := New(&datastore.Mock{
FakeApps: []*models.App{
@@ -40,7 +42,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) {
{Type: "async", Path: "/myerror", AppName: "myapp", Image: "iron/error", Config: map[string]string{"test": "true"}},
{Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "iron/hello", Config: map[string]string{"test": "true"}},
},
}, &mqs.Mock{}, testRunner(t))
}, &mqs.Mock{}, testRunner(t), tasks)
for i, test := range []struct {
path string

View File

@@ -2,6 +2,7 @@ package server
import (
"bytes"
"context"
"net/http"
"strings"
"testing"
@@ -22,11 +23,13 @@ func testRunner(t *testing.T) *runner.Runner {
func TestRouteRunnerGet(t *testing.T) {
buf := setLogBuffer()
tasks := mockTasksConduit()
s := New(&datastore.Mock{
FakeApps: []*models.App{
{Name: "myapp", Config: models.Config{}},
},
}, &mqs.Mock{}, testRunner(t))
}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -61,11 +64,13 @@ func TestRouteRunnerGet(t *testing.T) {
func TestRouteRunnerPost(t *testing.T) {
buf := setLogBuffer()
tasks := mockTasksConduit()
s := New(&datastore.Mock{
FakeApps: []*models.App{
{Name: "myapp", Config: models.Config{}},
},
}, &mqs.Mock{}, testRunner(t))
}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -102,6 +107,13 @@ func TestRouteRunnerPost(t *testing.T) {
func TestRouteRunnerExecution(t *testing.T) {
buf := setLogBuffer()
tasks := make(chan runner.TaskRequest)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runner.StartWorkers(ctx, testRunner(t), tasks)
s := New(&datastore.Mock{
FakeApps: []*models.App{
{Name: "myapp", Config: models.Config{}},
@@ -110,7 +122,7 @@ func TestRouteRunnerExecution(t *testing.T) {
{Path: "/myroute", AppName: "myapp", Image: "iron/hello", Headers: map[string][]string{"X-Function": {"Test"}}},
{Path: "/myerror", AppName: "myapp", Image: "iron/error", Headers: map[string][]string{"X-Function": {"Test"}}},
},
}, &mqs.Mock{}, testRunner(t))
}, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {

View File

@@ -27,14 +27,17 @@ type Server struct {
MQ models.MessageQueue
AppListeners []ifaces.AppListener
SpecialHandlers []ifaces.SpecialHandler
tasks chan runner.TaskRequest
}
func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner) *Server {
func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan runner.TaskRequest) *Server {
Api = &Server{
Runner: r,
Router: gin.New(),
Datastore: ds,
MQ: mq,
Runner: r,
tasks: tasks,
}
return Api
}
@@ -80,7 +83,7 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error {
}
}
// now call the normal runner call
handleRequest(ginC, nil)
s.handleRequest(ginC, nil)
return nil
}
@@ -90,7 +93,7 @@ func (s *Server) handleRunnerRequest(c *gin.Context) {
ctx, _ := common.LoggerWithFields(c, logrus.Fields{"call_id": task.ID})
return s.MQ.Push(ctx, task)
}
handleRequest(c, enqueue)
s.handleRequest(c, enqueue)
}
func (s *Server) handleTaskRequest(c *gin.Context) {

View File

@@ -15,6 +15,7 @@ import (
"github.com/iron-io/functions/api/datastore"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/runner/common"
)
@@ -84,10 +85,15 @@ func prepareBolt(t *testing.T) (models.Datastore, func()) {
func TestFullStack(t *testing.T) {
buf := setLogBuffer()
ds, close := prepareBolt(t)
defer close()
ds, closeBolt := prepareBolt(t)
defer closeBolt()
s := New(ds, &mqs.Mock{}, testRunner(t))
tasks := make(chan runner.TaskRequest)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runner.StartWorkers(ctx, testRunner(t), tasks)
s := New(ds, &mqs.Mock{}, testRunner(t), tasks)
router := testRouter(s)
for i, test := range []struct {
@@ -119,5 +125,4 @@ func TestFullStack(t *testing.T) {
i, test.expectedCode, rec.Code)
}
}
}

View File

@@ -31,10 +31,6 @@ docker run -e VAR_NAME=VALUE ...
<td>Sets the port to run on. Default: `8080`.</td>
</tr>
<tr>
<td>NUM_ASYNC</td>
<td>The number of async runners in the functions process (default 1).</td>
</tr>
<tr>
<td>LOG_LEVEL</td>
<td>Set to `DEBUG` to enable debugging. Default: INFO.</td>
</tr>
@@ -47,7 +43,7 @@ a couple reasons why we did it this way:
* It's clean. Once the container exits, there is nothing left behind including all the function images.
* You can set resource restrictions for the entire IronFunctions instance. For instance, you can set `--memory` on
the docker run command to set the max memory for the IronFunctions instance AND all of the functions it's running.
the docker run command to set the max memory for the IronFunctions instance AND all of the functions it's running.
There are some reasons you may not want to use dind, such as using the image cache during testing or you're running
[Windows](windows.md).

22
main.go
View File

@@ -23,7 +23,6 @@ const (
envDB = "db_url"
envPort = "port" // be careful, Gin expects this variable to be "port"
envAPIURL = "api_url"
envNumAsync = "num_async"
)
func init() {
@@ -37,7 +36,6 @@ func init() {
viper.SetDefault(envDB, fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd))
viper.SetDefault(envPort, 8080)
viper.SetDefault(envAPIURL, fmt.Sprintf("http://127.0.0.1:%d", viper.GetInt(envPort)))
viper.SetDefault(envNumAsync, 1)
viper.AutomaticEnv() // picks up env vars automatically
logLevel, err := log.ParseLevel(viper.GetString("log_level"))
if err != nil {
@@ -82,18 +80,22 @@ func main() {
},
}
tasks := make(chan runner.TaskRequest)
svr.AddFunc(func(ctx context.Context) {
srv := server.New(ds, mqType, rnr)
runner.StartWorkers(ctx, rnr, tasks)
})
svr.AddFunc(func(ctx context.Context) {
srv := server.New(ds, mqType, rnr, tasks)
srv.Run(ctx)
})
apiURL, numAsync := viper.GetString(envAPIURL), viper.GetInt(envNumAsync)
log.Debug("async workers:", numAsync)
if numAsync > 0 {
svr.AddFunc(func(ctx context.Context) {
runner.RunAsyncRunner(ctx, apiURL, numAsync)
})
}
apiURL := viper.GetString(envAPIURL)
svr.AddFunc(func(ctx context.Context) {
runner.RunAsyncRunner(ctx, apiURL, tasks, rnr)
})
svr.Serve(ctx)
close(tasks)
}