From 9d06b6e687ba88e6033bccb39256c18bf8e601f3 Mon Sep 17 00:00:00 2001 From: C Cirello Date: Fri, 18 Nov 2016 18:23:26 +0100 Subject: [PATCH] functions: common concurrency stream for sync and async (#314) * functions: add bounded concurrency * functions: plug runners to sync and async interfaces * functions: update documentation about the new env var * functions: fix test flakiness * functions: the runner is self-regulated, no need to set a number of runners * functions: push the execution to the background on incoming requests * functions: ensure async tasks are always on * functions: add prioritization to tasks consumption Ensure that Sync tasks are consumed before Async tasks. Also, fixes termination races problems for free. * functions: remove stale comments * functions: improve mem availability calculation * functions: parallel run for async tasks * functions: check for memory availability before pulling async task * functions: comment about rnr.hasAvailableMemory and sync.Cond * functions: implement memory check for async runners using Cond vars * functions: code grooming - remove unnecessary goroutines - fix stale docs - reorganize import group * Revert "functions: implement memory check for async runners using Cond vars" This reverts commit 922e64032201a177c03ce6a46240925e3d35430d. * Revert "functions: comment about rnr.hasAvailableMemory and sync.Cond" This reverts commit 49ad7d52d341f12da9603b1a1df9d145871f0e0a. * functions: set a minimum memory availability for sync * functions: simplify the implementation by removing the priority queue * functions: code grooming - code deduplication - review waitgroups Waits --- api/runner/async_runner.go | 51 ++++++++++++++------------------ api/runner/async_runner_test.go | 48 +++++++++++++++--------------- api/runner/runner.go | 11 +++++-- api/runner/worker.go | 52 +++++++++++++++++++++++++++++++++ api/server/apps_test.go | 34 +++++++++++++++++---- api/server/routes_test.go | 24 +++++++++++---- api/server/runner.go | 9 +++--- api/server/runner_async_test.go | 4 ++- api/server/runner_test.go | 18 ++++++++++-- api/server/server.go | 11 ++++--- api/server/server_test.go | 13 ++++++--- docs/options.md | 6 +--- main.go | 22 +++++++------- 13 files changed, 205 insertions(+), 98 deletions(-) create mode 100644 api/runner/worker.go diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 907cdea7b..95597c386 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -15,7 +15,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/models" "github.com/iron-io/runner/common" - "github.com/iron-io/runner/drivers" ) func getTask(ctx context.Context, url string) (*models.Task, error) { @@ -85,30 +84,14 @@ func deleteTask(url string, task *models.Task) error { return nil } -func runTask(ctx context.Context, task *models.Task) (drivers.RunResult, error) { - // Set up runner and process task - cfg := getCfg(task) - rnr, err := New(NewMetricLogger()) - if err != nil { - return nil, err - } - return rnr.Run(ctx, cfg) -} - // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, tasksrv string, n int) { +func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan TaskRequest, rnr *Runner) { u, h := tasksrvURL(tasksrv) if isHostOpen(h) { return } - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - go startAsyncRunners(ctx, &wg, i, u, runTask) - } - - wg.Wait() + startAsyncRunners(ctx, u, tasks, rnr) <-ctx.Done() } @@ -121,16 +104,21 @@ func isHostOpen(host string) bool { return available } -// todo: not a big fan of this anonymous function for testing, should use an interface and make a Mock object for testing - TR -func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(ctx context.Context, task *models.Task) (drivers.RunResult, error)) { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"async_runner": i}) - defer wg.Done() +func startAsyncRunners(ctx context.Context, url string, tasks chan TaskRequest, rnr *Runner) { + var wg sync.WaitGroup + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"}) for { select { case <-ctx.Done(): + wg.Wait() return default: + if !rnr.hasAsyncAvailableMemory() { + log.Debug("memory full") + time.Sleep(1 * time.Second) + continue + } task, err := getTask(ctx, url) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -148,11 +136,16 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID}) log.Debug("Running task:", task.ID) - // Process Task - if _, err := runTask(ctx, task); err != nil { - log.WithError(err).Error("Cannot run task") - continue - } + + wg.Add(1) + go func() { + defer wg.Done() + // Process Task + if _, err := RunTask(tasks, ctx, getCfg(task)); err != nil { + log.WithError(err).Error("Cannot run task") + } + }() + log.Debug("Processed task") // Delete task from queue @@ -160,8 +153,8 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin log.WithError(err).Error("Cannot delete task") continue } - log.Info("Task complete") + } } } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index febe07ef8..2cadea350 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -10,7 +10,6 @@ import ( "math/rand" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -18,7 +17,6 @@ import ( "github.com/gin-gonic/gin" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" - "github.com/iron-io/runner/drivers" ) func setLogBuffer() *bytes.Buffer { @@ -92,22 +90,6 @@ func getTestServer(mockTasks []*models.Task) *httptest.Server { return httptest.NewServer(r) } -var helloImage = "iron/hello" - -func TestRunTask(t *testing.T) { - mockTask := getMockTask() - mockTask.Image = &helloImage - - result, err := runTask(context.Background(), &mockTask) - if err != nil { - t.Error(err) - } - - if result.Status() != "success" { - t.Errorf("TestRunTask failed to execute runTask") - } -} - func TestGetTask(t *testing.T) { buf := setLogBuffer() mockTask := getMockTask() @@ -206,19 +188,35 @@ func TestTasksrvURL(t *testing.T) { } } +func testRunner(t *testing.T) *Runner { + r, err := New(NewMetricLogger()) + if err != nil { + t.Fatal("Test: failed to create new runner") + } + return r +} + func TestAsyncRunnersGracefulShutdown(t *testing.T) { buf := setLogBuffer() mockTask := getMockTask() ts := getTestServer([]*models.Task{&mockTask}) defer ts.Close() - ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) - var wg sync.WaitGroup - wg.Add(1) - go startAsyncRunners(ctx, &wg, 0, ts.URL+"/tasks", func(ctx context.Context, task *models.Task) (drivers.RunResult, error) { - return nil, nil - }) - wg.Wait() + tasks := make(chan TaskRequest) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + defer close(tasks) + go func() { + for t := range tasks { + t.Response <- TaskResponse{ + Result: nil, + Err: nil, + } + + } + }() + + startAsyncRunners(ctx, ts.URL+"/tasks", tasks, testRunner(t)) if err := ctx.Err(); err != context.DeadlineExceeded { t.Log(buf.String()) diff --git a/api/runner/runner.go b/api/runner/runner.go index 6db9e4d0f..aa88f04ad 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -115,10 +115,17 @@ func (r *Runner) queueHandler() { } } +func (r *Runner) hasAsyncAvailableMemory() bool { + r.usedMemMutex.RLock() + defer r.usedMemMutex.RUnlock() + // reserve at least half of the memory for sync + return (r.availableMem/2)-r.usedMem > 0 +} + func (r *Runner) checkRequiredMem(req uint64) bool { r.usedMemMutex.RLock() defer r.usedMemMutex.RUnlock() - return r.availableMem-r.usedMem/int64(req)*1024*1024 > 0 + return (r.availableMem-r.usedMem)/int64(req)*1024*1024 > 0 } func (r *Runner) addUsedMem(used int64) { @@ -136,7 +143,7 @@ func (r *Runner) checkMemAndUse(req uint64) bool { used := int64(req) * 1024 * 1024 - if r.availableMem-r.usedMem/used < 0 { + if (r.availableMem-r.usedMem)/used < 0 { return false } diff --git a/api/runner/worker.go b/api/runner/worker.go new file mode 100644 index 000000000..0c2865d6a --- /dev/null +++ b/api/runner/worker.go @@ -0,0 +1,52 @@ +package runner + +import ( + "context" + "sync" + + "github.com/iron-io/runner/drivers" +) + +type TaskRequest struct { + Ctx context.Context + Config *Config + Response chan TaskResponse +} + +type TaskResponse struct { + Result drivers.RunResult + Err error +} + +// StartWorkers handle incoming tasks and spawns self-regulating container +// workers. +func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan TaskRequest) { + var wg sync.WaitGroup + for { + select { + case <-ctx.Done(): + wg.Wait() + return + case task := <-tasks: + wg.Add(1) + go func(task TaskRequest) { + defer wg.Done() + result, err := rnr.Run(task.Ctx, task.Config) + select { + case task.Response <- TaskResponse{result, err}: + close(task.Response) + default: + } + }(task) + } + } + +} + +func RunTask(tasks chan TaskRequest, ctx context.Context, cfg *Config) (drivers.RunResult, error) { + tresp := make(chan TaskResponse) + treq := TaskRequest{Ctx: ctx, Config: cfg, Response: tresp} + tasks <- treq + resp := <-treq.Response + return resp.Result, resp.Err +} diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 729134c72..0aea0d16d 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -12,6 +12,7 @@ import ( "github.com/iron-io/functions/api/datastore" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" + "github.com/iron-io/functions/api/runner" ) func setLogBuffer() *bytes.Buffer { @@ -24,9 +25,19 @@ func setLogBuffer() *bytes.Buffer { return &buf } +func mockTasksConduit() chan runner.TaskRequest { + tasks := make(chan runner.TaskRequest) + go func() { + for range tasks { + } + }() + return tasks +} + func TestAppCreate(t *testing.T) { buf := setLogBuffer() - + tasks := mockTasksConduit() + defer close(tasks) for i, test := range []struct { mock *datastore.Mock path string @@ -46,7 +57,7 @@ func TestAppCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusCreated, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t)) + s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) body := bytes.NewBuffer([]byte(test.body)) @@ -72,7 +83,10 @@ func TestAppCreate(t *testing.T) { func TestAppDelete(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -106,7 +120,10 @@ func TestAppDelete(t *testing.T) { func TestAppList(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -139,7 +156,10 @@ func TestAppList(t *testing.T) { func TestAppGet(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -172,6 +192,8 @@ func TestAppGet(t *testing.T) { func TestAppUpdate(t *testing.T) { buf := setLogBuffer() + tasks := mockTasksConduit() + defer close(tasks) for i, test := range []struct { mock *datastore.Mock @@ -190,7 +212,7 @@ func TestAppUpdate(t *testing.T) { }, }, "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t)) + s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) body := bytes.NewBuffer([]byte(test.body)) diff --git a/api/server/routes_test.go b/api/server/routes_test.go index a8ec4ce40..3b387baa5 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -13,6 +13,8 @@ import ( func TestRouteCreate(t *testing.T) { buf := setLogBuffer() + tasks := mockTasksConduit() + defer close(tasks) for i, test := range []struct { mock *datastore.Mock @@ -34,7 +36,7 @@ func TestRouteCreate(t *testing.T) { // success {&datastore.Mock{}, "/v1/apps/a/routes", `{ "route": { "image": "iron/hello", "path": "/myroute" } }`, http.StatusCreated, nil}, } { - s := New(test.mock, &mqs.Mock{}, testRunner(t)) + s := New(test.mock, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) body := bytes.NewBuffer([]byte(test.body)) @@ -60,7 +62,10 @@ func TestRouteCreate(t *testing.T) { func TestRouteDelete(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -94,7 +99,10 @@ func TestRouteDelete(t *testing.T) { func TestRouteList(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -127,7 +135,10 @@ func TestRouteList(t *testing.T) { func TestRouteGet(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -160,7 +171,10 @@ func TestRouteGet(t *testing.T) { func TestRouteUpdate(t *testing.T) { buf := setLogBuffer() - s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + tasks := mockTasksConduit() + defer close(tasks) + + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { diff --git a/api/server/runner.go b/api/server/runner.go index c73795075..8c6e37ccf 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -15,7 +15,6 @@ import ( "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/runner" "github.com/iron-io/runner/common" - "github.com/iron-io/runner/drivers" uuid "github.com/satori/go.uuid" ) @@ -35,7 +34,7 @@ func ToEnvName(envtype, name string) string { return fmt.Sprintf("%s_%s", envtype, name) } -func handleRequest(c *gin.Context, enqueue models.Enqueue) { +func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) { if strings.HasPrefix(c.Request.URL.Path, "/v1") { c.Status(http.StatusNotFound) return @@ -156,7 +155,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { Stdin: payload, } - var result drivers.RunResult switch found.Type { case "async": // Read payload @@ -182,7 +180,9 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { log.Info("Added new task to queue") default: - if result, err = Api.Runner.Run(c, cfg); err != nil { + + result, err := runner.RunTask(s.tasks, ctx, cfg) + if err != nil { break } for k, v := range found.Headers { @@ -194,6 +194,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { } else { c.AbortWithStatus(http.StatusInternalServerError) } + } } diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 951b9c402..175fa6769 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -30,6 +30,8 @@ func testRouterAsync(s *Server, enqueueFunc models.Enqueue) *gin.Engine { func TestRouteRunnerAsyncExecution(t *testing.T) { t.Skip() + tasks := mockTasksConduit() + // todo: I broke how this test works trying to clean up the code a bit. Is there a better way to do this test rather than having to override the default route behavior? s := New(&datastore.Mock{ FakeApps: []*models.App{ @@ -40,7 +42,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { {Type: "async", Path: "/myerror", AppName: "myapp", Image: "iron/error", Config: map[string]string{"test": "true"}}, {Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "iron/hello", Config: map[string]string{"test": "true"}}, }, - }, &mqs.Mock{}, testRunner(t)) + }, &mqs.Mock{}, testRunner(t), tasks) for i, test := range []struct { path string diff --git a/api/server/runner_test.go b/api/server/runner_test.go index dda59cdf9..b585e26c6 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "net/http" "strings" "testing" @@ -22,11 +23,13 @@ func testRunner(t *testing.T) *runner.Runner { func TestRouteRunnerGet(t *testing.T) { buf := setLogBuffer() + tasks := mockTasksConduit() + s := New(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, - }, &mqs.Mock{}, testRunner(t)) + }, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -61,11 +64,13 @@ func TestRouteRunnerGet(t *testing.T) { func TestRouteRunnerPost(t *testing.T) { buf := setLogBuffer() + tasks := mockTasksConduit() + s := New(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, }, - }, &mqs.Mock{}, testRunner(t)) + }, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -102,6 +107,13 @@ func TestRouteRunnerPost(t *testing.T) { func TestRouteRunnerExecution(t *testing.T) { buf := setLogBuffer() + + tasks := make(chan runner.TaskRequest) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go runner.StartWorkers(ctx, testRunner(t), tasks) + s := New(&datastore.Mock{ FakeApps: []*models.App{ {Name: "myapp", Config: models.Config{}}, @@ -110,7 +122,7 @@ func TestRouteRunnerExecution(t *testing.T) { {Path: "/myroute", AppName: "myapp", Image: "iron/hello", Headers: map[string][]string{"X-Function": {"Test"}}}, {Path: "/myerror", AppName: "myapp", Image: "iron/error", Headers: map[string][]string{"X-Function": {"Test"}}}, }, - }, &mqs.Mock{}, testRunner(t)) + }, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { diff --git a/api/server/server.go b/api/server/server.go index fb0ab1d6c..6007372f4 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -27,14 +27,17 @@ type Server struct { MQ models.MessageQueue AppListeners []ifaces.AppListener SpecialHandlers []ifaces.SpecialHandler + + tasks chan runner.TaskRequest } -func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner) *Server { +func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan runner.TaskRequest) *Server { Api = &Server{ + Runner: r, Router: gin.New(), Datastore: ds, MQ: mq, - Runner: r, + tasks: tasks, } return Api } @@ -80,7 +83,7 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { } } // now call the normal runner call - handleRequest(ginC, nil) + s.handleRequest(ginC, nil) return nil } @@ -90,7 +93,7 @@ func (s *Server) handleRunnerRequest(c *gin.Context) { ctx, _ := common.LoggerWithFields(c, logrus.Fields{"call_id": task.ID}) return s.MQ.Push(ctx, task) } - handleRequest(c, enqueue) + s.handleRequest(c, enqueue) } func (s *Server) handleTaskRequest(c *gin.Context) { diff --git a/api/server/server_test.go b/api/server/server_test.go index 2d27f18c0..d66d522b0 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/iron-io/functions/api/datastore" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" + "github.com/iron-io/functions/api/runner" "github.com/iron-io/runner/common" ) @@ -84,10 +85,15 @@ func prepareBolt(t *testing.T) (models.Datastore, func()) { func TestFullStack(t *testing.T) { buf := setLogBuffer() - ds, close := prepareBolt(t) - defer close() + ds, closeBolt := prepareBolt(t) + defer closeBolt() - s := New(ds, &mqs.Mock{}, testRunner(t)) + tasks := make(chan runner.TaskRequest) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go runner.StartWorkers(ctx, testRunner(t), tasks) + + s := New(ds, &mqs.Mock{}, testRunner(t), tasks) router := testRouter(s) for i, test := range []struct { @@ -119,5 +125,4 @@ func TestFullStack(t *testing.T) { i, test.expectedCode, rec.Code) } } - } diff --git a/docs/options.md b/docs/options.md index fbedd8891..6d3a4d872 100644 --- a/docs/options.md +++ b/docs/options.md @@ -31,10 +31,6 @@ docker run -e VAR_NAME=VALUE ... Sets the port to run on. Default: `8080`. -NUM_ASYNC -The number of async runners in the functions process (default 1). - - LOG_LEVEL Set to `DEBUG` to enable debugging. Default: INFO. @@ -47,7 +43,7 @@ a couple reasons why we did it this way: * It's clean. Once the container exits, there is nothing left behind including all the function images. * You can set resource restrictions for the entire IronFunctions instance. For instance, you can set `--memory` on -the docker run command to set the max memory for the IronFunctions instance AND all of the functions it's running. +the docker run command to set the max memory for the IronFunctions instance AND all of the functions it's running. There are some reasons you may not want to use dind, such as using the image cache during testing or you're running [Windows](windows.md). diff --git a/main.go b/main.go index 9df481273..1b3330650 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,6 @@ const ( envDB = "db_url" envPort = "port" // be careful, Gin expects this variable to be "port" envAPIURL = "api_url" - envNumAsync = "num_async" ) func init() { @@ -37,7 +36,6 @@ func init() { viper.SetDefault(envDB, fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) viper.SetDefault(envPort, 8080) viper.SetDefault(envAPIURL, fmt.Sprintf("http://127.0.0.1:%d", viper.GetInt(envPort))) - viper.SetDefault(envNumAsync, 1) viper.AutomaticEnv() // picks up env vars automatically logLevel, err := log.ParseLevel(viper.GetString("log_level")) if err != nil { @@ -82,18 +80,22 @@ func main() { }, } + tasks := make(chan runner.TaskRequest) + svr.AddFunc(func(ctx context.Context) { - srv := server.New(ds, mqType, rnr) + runner.StartWorkers(ctx, rnr, tasks) + }) + + svr.AddFunc(func(ctx context.Context) { + srv := server.New(ds, mqType, rnr, tasks) srv.Run(ctx) }) - apiURL, numAsync := viper.GetString(envAPIURL), viper.GetInt(envNumAsync) - log.Debug("async workers:", numAsync) - if numAsync > 0 { - svr.AddFunc(func(ctx context.Context) { - runner.RunAsyncRunner(ctx, apiURL, numAsync) - }) - } + apiURL := viper.GetString(envAPIURL) + svr.AddFunc(func(ctx context.Context) { + runner.RunAsyncRunner(ctx, apiURL, tasks, rnr) + }) svr.Serve(ctx) + close(tasks) }