diff --git a/README.md b/README.md index 69272a807..f0438f348 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,9 @@ The app `myapp` that we created above along with the `/hello` route we added wou curl http://localhost:8080/r/myapp/hello ``` -### To pass in data to your function +### Passing data into a function -Your function will get the body of the request as is, and the headers of the request will be passed in as env vars. Try this: +Your function will get the body of the HTTP request via STDIN, and the headers of the request will be passed in as env vars. Try this: ```sh curl -H "Content-Type: application/json" -X POST -d '{ @@ -57,42 +57,44 @@ curl -H "Content-Type: application/json" -X POST -d '{ }' http://localhost:8080/r/myapp/hello ``` -### Add an asynchronous route +You should see it say `Hello Johnny!` now instead of `Hello World!`. -### Adding a route with URL params +### Add an asynchronous function -You can create a route with dynamic URL parameters that will be available inside your function by prefixing path segments with a `:`, for example: +IronFunctions supports synchronous function calls like we just tried above, and asynchronous for background processing. + +Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete. +For instance, image processing, video processing, data processing, ETL, etc. +Architecturally, the main difference between synchronous and asynchronous is that requests +to asynchronous functions are put in a queue and executed on upon resource availability so that they do not interfere with the fast synchronous responses required for an API. +Also, since it uses a message queue, you can queue up millions of function calls without worrying about capacity as requests will +just be queued up and run at some point in the future. + +To add an asynchronous function, create another route with the `"type":"async"`, for example: ```sh -$ curl -H "Content-Type: application/json" -X POST -d '{ - "route": { - "path":"/comments/:author_id/:num_page", - "image":"IMAGE_NAME" - } +curl -H "Content-Type: application/json" -X POST -d '{ + "route": { + "type": "async", + "path":"/hello-async", + "image":"iron/hello" + } }' http://localhost:8080/v1/apps/myapp/routes ``` -`:author_id` and `:num_page` in the path will be passed into your function as `PARAM_AUTHOR_ID` and `PARAM_NUM_PAGE`. +Now if you request this route, you will just get a `call_id` response: +```json +{"call_id":"572415fd-e26e-542b-846f-f1f5870034f2"} +``` -See the [Blog Example](https://github.com/iron-io/functions/blob/master/examples/blog/README.md#creating-our-blog-application-in-your-ironfunctions). +If you watch the logs, you will see the function actually runs in the background. +## Writing Functions -## Adding Asynchronous Data Processing Support +TODO: -Data processing is for functions that run in the background. This type of functionality is good for functions -that are CPU heavy or take more than a few seconds to complete. -Architecturally, the main difference between synchronous you tried above and asynchronous is that requests -to asynchronous functions are put in a queue and executed on upon resource availablitiy on the same process -or a remote functions process so that they do not interfere with the fast synchronous responses required by an API. -Also, since it uses a queue, you can queue up millions of jobs without worrying about capacity as requests will -just be queued up and run at some point in the future. - -TODO: Add link to differences here in README.io docs here. - -#### Running remote functions process - -Coming soon... +See examples for now. ## Using IronFunctions Hosted by Iron.io @@ -112,10 +114,6 @@ myapp.USER_ID.ironfunctions.com/hello https://swaggerhub.com/api/iron/functions -## Full Documentation - -http://docs-new.iron.io/docs - ## Join Our Community [![Slack Status](https://open-iron.herokuapp.com/badge.svg)](https://open-iron.herokuapp.com) diff --git a/api/models/task.go b/api/models/task.go index 1a9a17bcc..6c294764f 100644 --- a/api/models/task.go +++ b/api/models/task.go @@ -48,11 +48,13 @@ type Task struct { */ Error string `json:"error,omitempty"` - /* Route this task belongs to. + /* App this task belongs to. Read Only: true */ - RouteName string `json:"route_name,omitempty"` + AppName string `json:"route_name,omitempty"` + + Path string `json:"path"` /* Machine usable reason for task being in this state. Valid values for error status are `timeout | killed | bad_exit`. diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 4b7a48825..eb87b152c 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -12,12 +12,16 @@ import ( "sync" "time" - log "github.com/Sirupsen/logrus" + "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/models" + "github.com/iron-io/runner/common" "github.com/iron-io/runner/drivers" ) -func getTask(url string) (*models.Task, error) { +func getTask(ctx context.Context, url string) (*models.Task, error) { + // log := common.Logger(ctx) + // log.Infoln("Getting task from URL:", url) + resp, err := http.Get(url) if err != nil { return nil, err @@ -41,8 +45,8 @@ func getTask(url string) (*models.Task, error) { } func getCfg(task *models.Task) *Config { - var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader - stderr := NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? + // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader + stderr := NewFuncLogger(task.AppName, task.Path, *task.Image, task.ID) // TODO: missing path here, how do i get that? if task.Timeout == nil { timeout := int32(30) task.Timeout = &timeout @@ -51,8 +55,8 @@ func getCfg(task *models.Task) *Config { Image: *task.Image, Timeout: time.Duration(*task.Timeout) * time.Second, ID: task.ID, - AppName: task.RouteName, - Stdout: &stdout, + AppName: task.AppName, + Stdout: stderr, Stderr: stderr, Env: task.EnvVars, } @@ -84,10 +88,9 @@ func deleteTask(url string, task *models.Task) error { return nil } -func runTask(task *models.Task) (drivers.RunResult, error) { +func runTask(ctx context.Context, task *models.Task) (drivers.RunResult, error) { // Set up runner and process task cfg := getCfg(task) - ctx := context.Background() rnr, err := New(NewMetricLogger()) if err != nil { return nil, err @@ -96,8 +99,8 @@ func runTask(task *models.Task) (drivers.RunResult, error) { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) { - u, h := tasksrvURL(tasksrv, port) +func RunAsyncRunner(ctx context.Context, tasksrv string, n int) { + u, h := tasksrvURL(tasksrv) if isHostOpen(h) { return } @@ -105,7 +108,7 @@ func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) - go startAsyncRunners(ctx, &wg, i, u, runTask) + go startAsyncRunners(ctx, &wg, i, u) } wg.Wait() @@ -121,7 +124,8 @@ func isHostOpen(host string) bool { return available } -func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) { +func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string) { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"async_runner": i}) defer wg.Done() for { select { @@ -129,8 +133,12 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin return default: - task, err := getTask(url) + task, err := getTask(ctx, url) if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + log.Infoln("Could not fetch task, timeout. Probably no tasks to run.") + continue + } log.WithError(err).Error("Could not fetch task") time.Sleep(1 * time.Second) continue @@ -139,33 +147,36 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin time.Sleep(1 * time.Second) continue } - log.Debug("Picked up task:", task.ID) + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID}) log.Debug("Running task:", task.ID) // Process Task - if _, err := runTask(task); err != nil { - log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot run task") + if _, err := runTask(ctx, task); err != nil { + log.WithError(err).Error("Cannot run task") continue } - log.Debug("Processed task:", task.ID) + log.Debug("Processed task") // Delete task from queue if err := deleteTask(url, task); err != nil { - log.WithError(err).WithFields(log.Fields{"async runner": i, "task_id": task.ID}).Error("Cannot delete task") + log.WithError(err).Error("Cannot delete task") continue } - log.Debug("Deleted task:", task.ID) - log.Info("Task complete:", task.ID) + log.Info("Task complete:") } } } -func tasksrvURL(tasksrv, port string) (parsedURL, host string) { +func tasksrvURL(tasksrv string) (parsedURL, host string) { parsed, err := url.Parse(tasksrv) if err != nil { - log.Fatalf("cannot parse TASKSRV endpoint: %v", err) + logrus.WithError(err).Fatalln("cannot parse TASKSRV endpoint") } + // host, port, err := net.SplitHostPort(parsed.Host) + // if err != nil { + // log.WithError(err).Fatalln("net.SplitHostPort") + // } if parsed.Scheme == "" { parsed.Scheme = "http" @@ -175,9 +186,9 @@ func tasksrvURL(tasksrv, port string) (parsedURL, host string) { parsed.Path = "/tasks" } - if _, _, err := net.SplitHostPort(parsed.Host); err != nil { - parsed.Host = net.JoinHostPort(parsed.Host, port) - } + // if _, _, err := net.SplitHostPort(parsed.Host); err != nil { + // parsed.Host = net.JoinHostPort(parsed.Host, parsed) + // } return parsed.String(), parsed.Host } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index dfb9da8f5..f05a1229f 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -37,7 +37,7 @@ func getMockTask() models.Task { task := &models.Task{} task.Image = &image task.ID = fmt.Sprintf("ID-%d", rand.Int31()%1000) - task.RouteName = fmt.Sprintf("RouteName-%d", rand.Int31()%1000) + task.AppName = fmt.Sprintf("RouteName-%d", rand.Int31()%1000) task.Priority = &priority return *task } diff --git a/api/runner/logger.go b/api/runner/logger.go index ad8042de4..52a39deee 100644 --- a/api/runner/logger.go +++ b/api/runner/logger.go @@ -19,7 +19,7 @@ func NewFuncLogger(appName, path, function, requestID string) io.Writer { r: r, w: w, } - log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "request_id": requestID}) + log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "call_id": requestID}) go func(reader io.Reader) { scanner := bufio.NewScanner(reader) for scanner.Scan() { diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 06902dad0..790295b3d 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -27,7 +27,8 @@ func setLogBuffer() *bytes.Buffer { func TestAppCreate(t *testing.T) { buf := setLogBuffer() New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) - router := testRouter() + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + router := testRouter(s) for i, test := range []struct { path string @@ -70,8 +71,8 @@ func TestAppCreate(t *testing.T) { func TestAppDelete(t *testing.T) { buf := setLogBuffer() - New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) - router := testRouter() + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + router := testRouter(s) for i, test := range []struct { path string @@ -104,8 +105,8 @@ func TestAppDelete(t *testing.T) { func TestAppList(t *testing.T) { buf := setLogBuffer() - New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) - router := testRouter() + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + router := testRouter(s) for i, test := range []struct { path string @@ -137,8 +138,8 @@ func TestAppList(t *testing.T) { func TestAppGet(t *testing.T) { buf := setLogBuffer() - New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) - router := testRouter() + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + router := testRouter(s) for i, test := range []struct { path string @@ -170,8 +171,8 @@ func TestAppGet(t *testing.T) { func TestAppUpdate(t *testing.T) { buf := setLogBuffer() - New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) - router := testRouter() + s := New(&datastore.Mock{}, &mqs.Mock{}, testRunner(t)) + router := testRouter(s) for i, test := range []struct { path string diff --git a/api/server/helpers.go b/api/server/helpers.go new file mode 100644 index 000000000..362157fcd --- /dev/null +++ b/api/server/helpers.go @@ -0,0 +1,90 @@ +package server + +// TODO: this whole file shouldn't be in a non test file + +import ( + "context" + "encoding/json" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/iron-io/functions/api/models" + "github.com/iron-io/functions/api/runner" + "github.com/iron-io/runner/common" +) + +type appResponse struct { + Message string `json:"message"` + App *models.App `json:"app"` +} + +type appsResponse struct { + Message string `json:"message"` + Apps models.Apps `json:"apps"` +} + +type routeResponse struct { + Message string `json:"message"` + Route *models.Route `json:"route"` +} + +type routesResponse struct { + Message string `json:"message"` + Routes models.Routes `json:"routes"` +} + +type tasksResponse struct { + Message string `json:"message"` + Task models.Task `json:"tasksResponse"` +} + +func testRouter(s *Server) *gin.Engine { + r := gin.Default() + ctx := context.Background() + r.Use(func(c *gin.Context) { + ctx, _ := common.LoggerWithFields(ctx, extractFields(c)) + c.Set("ctx", ctx) + c.Next() + }) + s.bindHandlers() + return r +} + +func testRunner(t *testing.T) *runner.Runner { + r, err := runner.New(runner.NewMetricLogger()) + if err != nil { + t.Fatal("Test: failed to create new runner") + } + return r +} + +func routerRequest(t *testing.T, router *gin.Engine, method, path string, body io.Reader) (*http.Request, *httptest.ResponseRecorder) { + req, err := http.NewRequest(method, "http://localhost:8080"+path, body) + if err != nil { + t.Fatalf("Test: Could not create %s request to %s: %v", method, path, err) + } + + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + return req, rec +} + +func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) models.Error { + respBody, err := ioutil.ReadAll(rec.Body) + if err != nil { + t.Error("Test: Expected not empty response body") + } + + var errResp models.Error + err = json.Unmarshal(respBody, &errResp) + if err != nil { + t.Error("Test: Expected response body to be a valid models.Error object") + } + + return errResp +} diff --git a/api/server/runner.go b/api/server/runner.go index 900e9e6cb..03c9524ba 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -42,12 +42,11 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { } ctx := c.MustGet("ctx").(context.Context) - log := common.Logger(ctx) reqID := uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", c.Request.RemoteAddr, c.Request.URL.Path, time.Now().Unix())).String() c.Set("reqID", reqID) // todo: put this in the ctx instead of gin's - ctx, log = common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID}) + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID}) var err error var payload io.Reader @@ -164,7 +163,8 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { task := &models.Task{} task.Image = &cfg.Image task.ID = cfg.ID - task.RouteName = cfg.AppName + task.Path = el.Path + task.AppName = cfg.AppName task.Priority = &priority task.EnvVars = cfg.Env task.Payload = string(pl) diff --git a/api/server/server.go b/api/server/server.go index 1e00d3016..19635367a 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -131,7 +131,6 @@ func extractFields(c *gin.Context) logrus.Fields { for _, param := range c.Params { fields[param.Key] = param.Value } - return fields } @@ -142,7 +141,7 @@ func (s *Server) Run(ctx context.Context) { c.Next() }) - bindHandlers(s.Router, s.handleRunnerRequest, s.handleTaskRequest) + s.bindHandlers() // By default it serves on :8080 unless a // PORT environment variable was defined. @@ -150,8 +149,9 @@ func (s *Server) Run(ctx context.Context) { <-ctx.Done() } -func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) { - engine.Use(gin.Logger()) +func (s *Server) bindHandlers() { + + engine := s.Router engine.GET("/", handlePing) engine.GET("/version", handleVersion) @@ -177,9 +177,9 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHa } } - engine.DELETE("/tasks", taskHandler) - engine.GET("/tasks", taskHandler) - engine.Any("/r/:app/*route", reqHandler) + engine.DELETE("/tasks", s.handleTaskRequest) + engine.GET("/tasks", s.handleTaskRequest) + engine.Any("/r/:app/*route", s.handleRunnerRequest) // This final route is used for extensions, see Server.Add engine.NoRoute(handleSpecial) diff --git a/api/swagger.yml b/api/swagger.yml index 92768c618..517d20da1 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -263,10 +263,6 @@ definitions: allOf: - type: object properties: - name: - type: string - description: "Route name" - readOnly: true app_name: type: string description: "App this route belongs to." diff --git a/main.go b/main.go index d73934cb2..0c9852400 100644 --- a/main.go +++ b/main.go @@ -90,11 +90,11 @@ func main() { srv.Run(ctx) }) - apiURL, port, numAsync := viper.GetString(envAPIURL), viper.GetString(envPort), viper.GetInt(envNumAsync) + apiURL, numAsync := viper.GetString(envAPIURL), viper.GetInt(envNumAsync) log.Debug("async workers:", numAsync) if numAsync > 0 { svr.AddFunc(func(ctx context.Context) { - runner.RunAsyncRunner(ctx, apiURL, port, numAsync) + runner.RunAsyncRunner(ctx, apiURL, numAsync) }) }