diff --git a/api/common/ctx.go b/api/common/ctx.go new file mode 100644 index 000000000..1955e9da9 --- /dev/null +++ b/api/common/ctx.go @@ -0,0 +1,29 @@ +package common + +import ( + "github.com/Sirupsen/logrus" + "golang.org/x/net/context" +) + +// WithLogger stores the logger. +func WithLogger(ctx context.Context, l logrus.FieldLogger) context.Context { + return context.WithValue(ctx, "logger", l) +} + +// Logger returns the structured logger. +func Logger(ctx context.Context) logrus.FieldLogger { + l, ok := ctx.Value("logger").(logrus.FieldLogger) + if !ok { + return logrus.StandardLogger() + } + return l +} + +// Attempt at simplifying this whole logger in the context thing +// Could even make this take a generic map, then the logger that gets returned could be used just like the stdlib too, since it's compatible +func LoggerWithFields(ctx context.Context, fields logrus.Fields) (context.Context, logrus.FieldLogger) { + l := Logger(ctx) + l = l.WithFields(fields) + ctx = WithLogger(ctx, l) + return ctx, l +} diff --git a/api/mqs/memory.go b/api/mqs/memory.go index 57d99fd03..5f8265322 100644 --- a/api/mqs/memory.go +++ b/api/mqs/memory.go @@ -131,6 +131,7 @@ func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) { // Push the work onto the queue. return mq.pushForce(job) } + func (mq *MemoryMQ) pushTimeout(job *models.Task) error { ji := &TaskItem{ diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 07ccbb311..a3f3d33e7 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -3,6 +3,7 @@ package runner import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -15,84 +16,109 @@ import ( "github.com/iron-io/functions/api/models" ) -func RunAsyncRunners(mqAdr string) { - - url := fmt.Sprintf("http://%s/tasks", mqAdr) - - logAndWait := func(err error) { - log.WithError(err) - time.Sleep(1 * time.Second) +func getTask(url string) (*models.Task, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err } - for { - resp, err := http.Get(url) - if err != nil { - logAndWait(err) - continue - } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var task models.Task + + if err := json.Unmarshal(body, &task); err != nil { + return nil, err + } + + if task.ID == "" { + return nil, errors.New("Invalid Task: ID empty") + } + return &task, nil +} + +func getCfg(task *models.Task) *Config { + var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader + stderr := NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? + if task.Timeout == nil { + timeout := int32(30) + task.Timeout = &timeout + } + cfg := &Config{ + Image: *task.Image, + Timeout: time.Duration(*task.Timeout) * time.Second, + ID: task.ID, + AppName: task.RouteName, + Stdout: &stdout, + Stderr: stderr, + Env: task.EnvVars, + } + return cfg +} + +func deleteTask(url string, task *models.Task) error { + // Unmarshal task to be sent over as a json + body, err := json.Marshal(task) + if err != nil { + return err + } + + // Send out Delete request to delete task from queue + req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body)) + if err != nil { + return err + } + c := &http.Client{} + if resp, err := c.Do(req); err != nil { + return err + } else if resp.StatusCode != http.StatusAccepted { body, err := ioutil.ReadAll(resp.Body) if err != nil { - logAndWait(err) - continue + return err } + return errors.New(string(body)) + } + return nil +} - var task models.Task +func runTask(task *models.Task) error { + // Set up runner and process task + cfg := getCfg(task) + ctx := context.Background() + rnr, err := New(NewMetricLogger()) + if err != nil { + return err + } + _, err = rnr.Run(ctx, cfg) + return err +} - if err := json.Unmarshal(body, &task); err != nil { - logAndWait(err) - continue - } - - if task.ID == "" { +// RunAsyncRunner pulls tasks off a queue and processes them +func RunAsyncRunner(mqAdr string) { + url := fmt.Sprintf("http://%s/tasks", mqAdr) + for { + task, err := getTask(url) + if err != nil { + log.WithError(err) time.Sleep(1 * time.Second) continue } - log.Info("Picked up task:", task.ID) - var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader - stderr := NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? - if task.Timeout == nil { - timeout := int32(30) - task.Timeout = &timeout - } - cfg := &Config{ - Image: *task.Image, - Timeout: time.Duration(*task.Timeout) * time.Second, - ID: task.ID, - AppName: task.RouteName, - Stdout: &stdout, - Stderr: stderr, - Env: task.EnvVars, - } - - metricLogger := NewMetricLogger() - - rnr, err := New(metricLogger) - if err != nil { + // Process Task + if err := runTask(task); err != nil { log.WithError(err) continue } - - ctx := context.Background() - if _, err = rnr.Run(ctx, cfg); err != nil { - log.WithError(err) - continue - } - log.Info("Processed task:", task.ID) - req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body)) - if err != nil { - log.WithError(err) - } - c := &http.Client{} - if _, err := c.Do(req); err != nil { + // Delete task from queue + if err := deleteTask(url, task); err != nil { log.WithError(err) - continue + } else { + log.Info("Deleted task:", task.ID) } - - log.Info("Deleted task:", task.ID) } } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go new file mode 100644 index 000000000..8c4f2fdf3 --- /dev/null +++ b/api/runner/async_runner_test.go @@ -0,0 +1,144 @@ +package runner + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Sirupsen/logrus" + "github.com/gin-gonic/gin" + "github.com/iron-io/functions/api/models" + "github.com/iron-io/functions/api/mqs" +) + +func getMockTask() models.Task { + priority := int32(0) + image := fmt.Sprintf("Image-%d", rand.Int31()%1000) + task := &models.Task{} + task.Image = &image + task.ID = fmt.Sprintf("ID-%d", rand.Int31()%1000) + task.RouteName = fmt.Sprintf("RouteName-%d", rand.Int31()%1000) + task.Priority = &priority + return *task +} + +func getTestServer(mockTasks []*models.Task) *httptest.Server { + mq, err := mqs.New("memory://test") + if err != nil { + panic(err) + } + + for _, mt := range mockTasks { + mq.Push(mt) + } + + getHandler := func(c *gin.Context) { + task, err := mq.Reserve() + if err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusAccepted, task) + } + + delHandler := func(c *gin.Context) { + body, err := ioutil.ReadAll(c.Request.Body) + if err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err.Error()) + return + } + var task models.Task + if err = json.Unmarshal(body, &task); err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err.Error()) + return + } + + if err := mq.Delete(&task); err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusAccepted, task) + } + + r := gin.Default() + r.GET("/tasks", getHandler) + r.DELETE("/tasks", delHandler) + return httptest.NewServer(r) +} + +func TestGetTask(t *testing.T) { + mockTask := getMockTask() + + ts := getTestServer([]*models.Task{&mockTask}) + defer ts.Close() + + url := ts.URL + "/tasks" + task, err := getTask(url) + if err != nil { + t.Error("expected no error, got", err) + } + if task.ID != mockTask.ID { + t.Errorf("expected task ID '%s', got '%s'", task.ID, mockTask.ID) + } +} + +func TestGetTaskError(t *testing.T) { + tests := []map[string]interface{}{ + map[string]interface{}{ + "url": "/invalid", + "task": getMockTask(), + "error": "invalid character 'p' after top-level value", + }, + } + + var tasks []*models.Task + for _, v := range tests { + task := v["task"].(models.Task) + tasks = append(tasks, &task) + } + + ts := getTestServer(tasks) + defer ts.Close() + + for i, test := range tests { + url := ts.URL + test["url"].(string) + _, err := getTask(url) + if err == nil { + t.Errorf("expected error '%s'", test["error"].(string)) + } + if err.Error() != test["error"].(string) { + t.Errorf("test %d: expected error '%s', got '%s'", i, test["error"].(string), err) + } + } +} + +func TestDeleteTask(t *testing.T) { + mockTask := getMockTask() + + ts := getTestServer([]*models.Task{&mockTask}) + defer ts.Close() + + url := ts.URL + "/tasks" + err := deleteTask(url, &mockTask) + if err == nil { + t.Error("expected error 'Not reserver', got", err) + } + + _, err = getTask(url) + if err != nil { + t.Error("expected no error, got", err) + } + + err = deleteTask(url, &mockTask) + if err != nil { + t.Error("expected no error, got", err) + } +} diff --git a/api/server/runner.go b/api/server/runner.go index 958b7a40b..9584f8425 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -14,16 +14,16 @@ import ( "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" + "github.com/iron-io/functions/api/common" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/runner" - titancommon "github.com/iron-io/worker/common" "github.com/iron-io/worker/runner/drivers" "github.com/satori/go.uuid" ) func handleSpecial(c *gin.Context) { ctx := c.MustGet("ctx").(context.Context) - log := titancommon.Logger(ctx) + log := common.Logger(ctx) err := Api.UseSpecialHandlers(c) if err != nil { @@ -39,12 +39,12 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { } ctx := c.MustGet("ctx").(context.Context) - log := titancommon.Logger(ctx) + log := common.Logger(ctx) reqID := uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", c.Request.RemoteAddr, c.Request.URL.Path, time.Now().Unix())).String() c.Set("reqID", reqID) // todo: put this in the ctx instead of gin's - log = log.WithFields(logrus.Fields{"request_id": reqID}) + ctx, log = common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID}) var err error @@ -107,7 +107,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { log.WithField("routes", routes).Debug("Got routes from datastore") for _, el := range routes { log = log.WithFields(logrus.Fields{ - "app": appName, "route": el.Path, "image": el.Image, "request_id": reqID}) + "app": appName, "route": el.Path, "image": el.Image}) if params, match := matchRoute(el.Path, route); match { @@ -156,15 +156,17 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { var result drivers.RunResult switch el.Type { case "async": - // TODO: Create Task + // Create Task priority := int32(0) task := &models.Task{} task.Image = &cfg.Image task.ID = cfg.ID task.RouteName = cfg.AppName task.Priority = &priority - // TODO: Push to queue + // Push to queue enqueue(task) + log.Info("Added new task to queue") + default: if result, err = Api.Runner.Run(c, cfg); err != nil { break diff --git a/api/server/server.go b/api/server/server.go index c2d529fb8..8c3e45742 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -86,6 +86,7 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { func (s *Server) handleRunnerRequest(c *gin.Context) { enqueue := func(task *models.Task) (*models.Task, error) { + c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) return s.MQ.Push(task) } handleRequest(c, enqueue) diff --git a/main.go b/main.go index 788a57851..205c0af87 100644 --- a/main.go +++ b/main.go @@ -59,7 +59,7 @@ func main() { } for i := 0; i < nasync; i++ { - go runner.RunAsyncRunners(mqAdr) + go runner.RunAsyncRunner(mqAdr) } quit := make(chan bool) diff --git a/hello-async.sh b/tool/examples/hello-async.sh similarity index 100% rename from hello-async.sh rename to tool/examples/hello-async.sh diff --git a/hello-sync.sh b/tool/examples/hello-sync.sh similarity index 100% rename from hello-sync.sh rename to tool/examples/hello-sync.sh