From f6f160cef92a3842e9eba6027559b4b18057d23c Mon Sep 17 00:00:00 2001 From: Seif Lotfy Date: Mon, 26 Sep 2016 14:49:42 +0200 Subject: [PATCH] Move async runner to runner file --- api/runner/async_runner.go | 97 +++++++++++++++++++++++++++++++ main.go | 115 +++++-------------------------------- 2 files changed, 110 insertions(+), 102 deletions(-) create mode 100644 api/runner/async_runner.go diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go new file mode 100644 index 000000000..f4f0353b7 --- /dev/null +++ b/api/runner/async_runner.go @@ -0,0 +1,97 @@ +package runner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + log "github.com/Sirupsen/logrus" + + "github.com/iron-io/functions/api/models" +) + +func RunAsyncRunners(mqAdr string) { + + url := fmt.Sprintf("http://%s/tasks", mqAdr) + + logAndWait := func(err error) { + log.WithError(err) + time.Sleep(1 * time.Second) + } + + for { + resp, err := http.Get(url) + if err != nil { + logAndWait(err) + continue + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logAndWait(err) + continue + } + + var task models.Task + + if err := json.Unmarshal(body, &task); err != nil { + logAndWait(err) + continue + } + + if task.ID == "" { + time.Sleep(1 * time.Second) + continue + } + + log.Info("Picked up task:", task.ID) + var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader + stderr := NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? + + if task.Timeout == nil { + timeout := int32(30) + task.Timeout = &timeout + } + cfg := &Config{ + Image: *task.Image, + Timeout: time.Duration(*task.Timeout) * time.Second, + ID: task.ID, + AppName: task.RouteName, + Stdout: &stdout, + Stderr: stderr, + Env: task.EnvVars, + } + + metricLogger := NewMetricLogger() + + rnr, err := New(metricLogger) + if err != nil { + log.WithError(err) + continue + } + + ctx := context.Background() + if _, err = rnr.Run(ctx, cfg); err != nil { + log.WithError(err) + continue + } + + log.Info("Processed task:", task.ID) + req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body)) + if err != nil { + log.WithError(err) + } + + c := &http.Client{} + if _, err := c.Do(req); err != nil { + log.WithError(err) + continue + } + + log.Info("Deleted task:", task.ID) + } +} diff --git a/main.go b/main.go index aefff88cf..788a57851 100644 --- a/main.go +++ b/main.go @@ -1,19 +1,13 @@ package main import ( - "bytes" - "encoding/json" "fmt" - "io/ioutil" - "net/http" "strconv" "strings" - "time" log "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/config" "github.com/iron-io/functions/api/datastore" - "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/runner" "github.com/iron-io/functions/api/server" @@ -36,16 +30,6 @@ func main() { log.WithError(err).Fatal("Error on init MQ") } - nasync := 1 - - if nasyncStr := strings.TrimSpace(viper.GetString("MQADR")); len(nasyncStr) > 0 { - var err error - nasync, err = strconv.Atoi(nasyncStr) - if err != nil { - log.WithError(err).Fatalln("Failed to parse number of async runners") - } - } - mqAdr := strings.TrimSpace(viper.GetString("MQADR")) port := viper.GetInt("PORT") if port == 0 { @@ -57,101 +41,28 @@ func main() { metricLogger := runner.NewMetricLogger() - runner, err := runner.New(metricLogger) + rnr, err := runner.New(metricLogger) if err != nil { log.WithError(err).Fatalln("Failed to create a runner") } - srv := server.New(ds, mqType, runner) + srv := server.New(ds, mqType, rnr) go srv.Run(ctx) + + nasync := 1 + if nasyncStr := strings.TrimSpace(viper.GetString("NASYNC")); len(nasyncStr) > 0 { + var err error + nasync, err = strconv.Atoi(nasyncStr) + if err != nil { + log.WithError(err).Fatalln("Failed to parse number of async runners") + } + } + for i := 0; i < nasync; i++ { - fmt.Println(i) - go runAsyncRunners(mqAdr) + go runner.RunAsyncRunners(mqAdr) } quit := make(chan bool) for _ = range quit { } } - -func runAsyncRunners(mqAdr string) { - - url := fmt.Sprintf("http://%s/tasks", mqAdr) - - logAndWait := func(err error) { - log.WithError(err) - time.Sleep(1 * time.Second) - } - - for { - resp, err := http.Get(url) - if err != nil { - logAndWait(err) - continue - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - logAndWait(err) - continue - } - - var task models.Task - - if err := json.Unmarshal(body, &task); err != nil { - logAndWait(err) - continue - } - - if task.ID == "" { - time.Sleep(1 * time.Second) - continue - } - - log.Info("Picked up task:", task.ID) - var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader - stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? - - if task.Timeout == nil { - timeout := int32(30) - task.Timeout = &timeout - } - cfg := &runner.Config{ - Image: *task.Image, - Timeout: time.Duration(*task.Timeout) * time.Second, - ID: task.ID, - AppName: task.RouteName, - Stdout: &stdout, - Stderr: stderr, - Env: task.EnvVars, - } - - metricLogger := runner.NewMetricLogger() - - rnr, err := runner.New(metricLogger) - if err != nil { - log.WithError(err) - continue - } - - ctx := context.Background() - if _, err = rnr.Run(ctx, cfg); err != nil { - log.WithError(err) - continue - } - - log.Info("Processed task:", task.ID) - req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body)) - if err != nil { - log.WithError(err) - } - - c := &http.Client{} - if _, err := c.Do(req); err != nil { - log.WithError(err) - continue - } - - log.Info("Deleted task:", task.ID) - } -}