From 8c96d3ba2fad56704bf2bcd3c0f3ea7bb32ae08f Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Tue, 20 Jun 2017 11:32:51 -0700 Subject: [PATCH] Fixes async payload passing for #68. --- api/models/new_task.go | 100 -------------- api/models/task.go | 73 +++++++--- api/runner/async_runner.go | 3 +- api/runner/worker.go | 5 + api/server/routes_create.go | 2 +- api/server/routes_update.go | 4 +- api/server/server.go | 2 +- docs/README.md | 1 + docs/async.md | 18 +++ docs/swagger.yml | 128 +++++++++--------- docs/usage.md | 4 +- examples/tutorial/async/go/.gitignore | 6 + examples/tutorial/async/go/README.md | 28 ++++ examples/tutorial/async/go/func.go | 27 ++++ .../tutorial/async/go/sample.payload.json | 3 + fn/calls.go | 28 ++-- fn/deploy.go | 11 +- fn/funcfile.go | 2 +- fn/init.go | 13 +- fn/lambda.go | 2 +- fn/routes.go | 8 +- fn/run.go | 24 +++- fn/testfn.go | 9 +- 23 files changed, 276 insertions(+), 225 deletions(-) delete mode 100644 api/models/new_task.go create mode 100644 docs/async.md create mode 100644 examples/tutorial/async/go/.gitignore create mode 100644 examples/tutorial/async/go/README.md create mode 100644 examples/tutorial/async/go/func.go create mode 100644 examples/tutorial/async/go/sample.payload.json diff --git a/api/models/new_task.go b/api/models/new_task.go deleted file mode 100644 index 248fd11dc..000000000 --- a/api/models/new_task.go +++ /dev/null @@ -1,100 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - - "github.com/go-openapi/errors" - "github.com/go-openapi/validate" -) - -/*NewTask new task - -swagger:model NewTask -*/ -type NewTask struct { - - /* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds. - */ - Delay int32 `json:"delay,omitempty"` - - /* Name of Docker image to use. This is optional and can be used to override the image defined at the route level. - - Required: true - */ - Image *string `json:"image"` - - /* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried." - - */ - MaxRetries int32 `json:"max_retries,omitempty"` - - /* Payload for the task. This is what you pass into each task to make it do something. - */ - Payload string `json:"payload,omitempty"` - - /* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order. - - Required: true - */ - Priority *int32 `json:"priority"` - - /* Time in seconds to wait before retrying the task. Must be a non-negative integer. - */ - RetriesDelay *int32 `json:"retries_delay,omitempty"` - - /* Maximum runtime in seconds. If a consumer retrieves the - task, but does not change it's status within timeout seconds, the task - is considered failed, with reason timeout (Titan may allow a small - grace period). The consumer should also kill the task after timeout - seconds. If a consumer tries to change status after Titan has already - timed out the task, the consumer will be ignored. - - */ - Timeout *int32 `json:"timeout,omitempty"` - - /* Hot function idle timeout in seconds before termination. - - */ - IdleTimeout *int32 `json:"idle_timeout,omitempty"` -} - -// Validate validates this new task -func (m *NewTask) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateImage(formats); err != nil { - // prop - res = append(res, err) - } - - if err := m.validatePriority(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *NewTask) validateImage(formats strfmt.Registry) error { - - if err := validate.Required("image", "body", m.Image); err != nil { - return err - } - - return nil -} - -func (m *NewTask) validatePriority(formats strfmt.Registry) error { - - if err := validate.Required("priority", "body", m.Priority); err != nil { - return err - } - - return nil -} diff --git a/api/models/task.go b/api/models/task.go index 164557b44..92649e52d 100644 --- a/api/models/task.go +++ b/api/models/task.go @@ -31,23 +31,22 @@ const ( type FnCall struct { IDStatus CompletedAt strfmt.DateTime `json:"completed_at,omitempty"` - CreatedAt strfmt.DateTime `json:"created_at,omitempty"` - StartedAt strfmt.DateTime `json:"started_at,omitempty"` - AppName string `json:"app_name,omitempty"` - Path string `json:"path"` - + CreatedAt strfmt.DateTime `json:"created_at,omitempty"` + StartedAt strfmt.DateTime `json:"started_at,omitempty"` + AppName string `json:"app_name,omitempty"` + Path string `json:"path"` } func (fnCall *FnCall) FromTask(task *Task) *FnCall { return &FnCall{ - CreatedAt:task.CreatedAt, - StartedAt:task.StartedAt, - CompletedAt:task.CompletedAt, - AppName:task.AppName, - Path:task.Path, + CreatedAt: task.CreatedAt, + StartedAt: task.StartedAt, + CompletedAt: task.CompletedAt, + AppName: task.AppName, + Path: task.Path, IDStatus: IDStatus{ - ID:task.ID, - Status:task.Status, + ID: task.ID, + Status: task.Status, }, } } @@ -57,10 +56,52 @@ func (fnCall *FnCall) FromTask(task *Task) *FnCall { swagger:model Task */ type Task struct { - NewTask - IDStatus + /* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds. + */ + Delay int32 `json:"delay,omitempty"` + + /* Name of Docker image to use. This is optional and can be used to override the image defined at the route level. + + Required: true + */ + Image *string `json:"image"` + + /* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried." + + */ + MaxRetries int32 `json:"max_retries,omitempty"` + + /* Payload for the task. This is what you pass into each task to make it do something. + */ + Payload string `json:"payload,omitempty"` + + /* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order. + + Required: true + */ + Priority *int32 `json:"priority"` + + /* Time in seconds to wait before retrying the task. Must be a non-negative integer. + */ + RetriesDelay *int32 `json:"retries_delay,omitempty"` + + /* Maximum runtime in seconds. If a consumer retrieves the + task, but does not change it's status within timeout seconds, the task + is considered failed, with reason timeout (Titan may allow a small + grace period). The consumer should also kill the task after timeout + seconds. If a consumer tries to change status after Titan has already + timed out the task, the consumer will be ignored. + + */ + Timeout *int32 `json:"timeout,omitempty"` + + /* Hot function idle timeout in seconds before termination. + + */ + IdleTimeout *int32 `json:"idle_timeout,omitempty"` + /* Time when task completed, whether it was successul or failed. Always in UTC. */ CompletedAt strfmt.DateTime `json:"completed_at,omitempty"` @@ -116,9 +157,7 @@ type Task struct { func (m *Task) Validate(formats strfmt.Registry) error { var res []error - if err := m.NewTask.Validate(formats); err != nil { - res = append(res, err) - } + // NewTask validations will get generated automatically if err := m.IDStatus.Validate(formats); err != nil { res = append(res, err) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 714a4f9d8..a616eb9ab 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -127,7 +127,8 @@ func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.D } ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID}) - log.Debug("Running task:", task.ID) + log.Info("Running task:", task.ID) + // log.Infof("Task: %+v", task) wg.Add(1) diff --git a/api/runner/worker.go b/api/runner/worker.go index 527643203..2e5112a64 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "strings" "sync" "time" @@ -63,6 +64,10 @@ func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg startedAt := strfmt.DateTime(time.Now()) newTask.StartedAt = startedAt + // set payload as Stdin + // fmt.Printf("ABOUT TO PASS IN PAYLOAD: %v", newTask.Payload) + cfg.Stdin = strings.NewReader(newTask.Payload) + result, err := rnr.RunTask(ctx, cfg) completedAt := strfmt.DateTime(time.Now()) diff --git a/api/server/routes_create.go b/api/server/routes_create.go index 561cd6071..ddbaa2140 100644 --- a/api/server/routes_create.go +++ b/api/server/routes_create.go @@ -90,7 +90,7 @@ func (s *Server) handleRouteCreate(c *gin.Context) { return } - s.cacherefresh(route) + s.cacheRefresh(route) c.JSON(http.StatusOK, routeResponse{"Route successfully created", route}) } diff --git a/api/server/routes_update.go b/api/server/routes_update.go index 9142707f9..dc38e4ec0 100644 --- a/api/server/routes_update.go +++ b/api/server/routes_update.go @@ -36,6 +36,8 @@ func (s *Server) handleRouteUpdate(c *gin.Context) { return } + // fmt.Printf("ROUTE BOUND: %+v", *wroute.Route) + wroute.Route.AppName = c.MustGet(api.AppName).(string) wroute.Route.Path = path.Clean(c.MustGet(api.Path).(string)) @@ -69,7 +71,7 @@ func (s *Server) handleRouteUpdate(c *gin.Context) { return } - s.cacherefresh(route) + s.cacheRefresh(route) c.JSON(http.StatusOK, routeResponse{"Route successfully updated", route}) } diff --git a/api/server/server.go b/api/server/server.go index 9c6d48a3d..714a4892b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -171,7 +171,7 @@ func (s *Server) cacheget(appname, path string) (*models.Route, bool) { return route, ok } -func (s *Server) cacherefresh(route *models.Route) { +func (s *Server) cacheRefresh(route *models.Route) { s.mu.Lock() defer s.mu.Unlock() s.hotroutes.Refresh(route) diff --git a/docs/README.md b/docs/README.md index 41529dc22..c6bac5344 100644 --- a/docs/README.md +++ b/docs/README.md @@ -13,6 +13,7 @@ If you are a developer using Oracle Functions through the API, this section is f * [Open Function Format](function-format.md) * [API Reference](http://petstore.swagger.io/?url=https://raw.githubusercontent.com/treeder/functions/master/docs/swagger.yml) * [Hot functions](hot-functions.md) +* [Async functions](async.md) * [FAQ](faq.md) ## For Operators diff --git a/docs/async.md b/docs/async.md new file mode 100644 index 000000000..cbec81d96 --- /dev/null +++ b/docs/async.md @@ -0,0 +1,18 @@ +# Asynchronous Functions + +Asynchronous (async) functions will run your function at some point in the future. The default mode is synchronous which means +a function is executed and the caller blocks while waiting for the response. Asynchronous on the other, puts the request into a +message queue and responds immediately to the caller. The function will then be executed at some point in the future, upon resource availability giving priority +to synchronous calls. Also, since it is using a message queue, you can safely queue up millions of function calls without worrying about +capacity. + +Async will return immediately with a `call_id`, for example: + +```json +{"call_id": "abc123"} +``` + +The `call_id` can then be used to retrieve the status at a later time. + +Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete. +For instance, image processing, video processing, data processing, ETL, etc. diff --git a/docs/swagger.yml b/docs/swagger.yml index 97ae2ab80..1dd783d49 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -555,58 +555,67 @@ definitions: readOnly: true Task: - allOf: - - $ref: "#/definitions/NewTask" - - type: object - properties: - group_name: - type: string - description: "Group this task belongs to." - readOnly: true - error: - type: string - description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log." - reason: - type: string - description: | - Machine usable reason for task being in this state. - Valid values for error status are `timeout | killed | bad_exit`. - Valid values for cancelled status are `client_request`. - For everything else, this is undefined. - enum: - - timeout - - killed - - bad_exit - - client_request - created_at: - type: string - format: date-time - description: Time when task was submitted. Always in UTC. - readOnly: true - started_at: - type: string - format: date-time - description: Time when task started execution. Always in UTC. - completed_at: - type: string - format: date-time - description: Time when task completed, whether it was successul or failed. Always in UTC. - # We maintain a doubly linked list of the retried task to the - # original task. - retry_of: - type: string - description: If this field is set, then this task is a retry of the ID in this field. - readOnly: true - retry_at: - type: string - description: If this field is set, then this task was retried by the task referenced in this field. - readOnly: true - env_vars: - # this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties - type: object - description: Env vars for the task. Comes from the ones set on the Group. - additionalProperties: - type: string + type: object + required: + - image + properties: + image: + type: string + description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level. + payload: + type: string + # 256k + # maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272 + # maxLength: 268435456 + description: Payload for the task. This is what you pass into each task to make it do something. + group_name: + type: string + description: "Group this task belongs to." + readOnly: true + error: + type: string + description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log." + reason: + type: string + description: | + Machine usable reason for task being in this state. + Valid values for error status are `timeout | killed | bad_exit`. + Valid values for cancelled status are `client_request`. + For everything else, this is undefined. + enum: + - timeout + - killed + - bad_exit + - client_request + created_at: + type: string + format: date-time + description: Time when task was submitted. Always in UTC. + readOnly: true + started_at: + type: string + format: date-time + description: Time when task started execution. Always in UTC. + completed_at: + type: string + format: date-time + description: Time when task completed, whether it was successul or failed. Always in UTC. + # We maintain a doubly linked list of the retried task to the + # original task. + retry_of: + type: string + description: If this field is set, then this task is a retry of the ID in this field. + readOnly: true + retry_at: + type: string + description: If this field is set, then this task was retried by the task referenced in this field. + readOnly: true + env_vars: + # this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties + type: object + description: Env vars for the task. Comes from the ones set on the Group. + additionalProperties: + type: string ErrorBody: type: object @@ -624,21 +633,6 @@ definitions: error: $ref: '#/definitions/ErrorBody' - NewTask: - type: object - required: - - image - properties: - image: - type: string - description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level. - payload: - type: string - # 256k - # maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272 - # maxLength: 268435456 - description: Payload for the task. This is what you pass into each task to make it do something. - TaskWrapper: type: object required: diff --git a/docs/usage.md b/docs/usage.md index bd5f2baaf..4d44bc193 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -87,7 +87,7 @@ You should see it say `Hello Johnny!` now instead of `Hello World!`. Oracle Functions supports synchronous function calls like we just tried above, and asynchronous for background processing. -Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete. +[Asynchronous functions](async.md) are great for tasks that are CPU heavy or take more than a few seconds to complete. For instance, image processing, video processing, data processing, ETL, etc. Architecturally, the main difference between synchronous and asynchronous is that requests to asynchronous functions are put in a queue and executed on upon resource availability so that they do not interfere with the fast synchronous responses required for an API. @@ -106,6 +106,8 @@ curl -H "Content-Type: application/json" -X POST -d '{ }' http://localhost:8080/v1/apps/myapp/routes ``` +or set `type: async` in your `func.yaml`. + Now if you request this route: ```sh diff --git a/examples/tutorial/async/go/.gitignore b/examples/tutorial/async/go/.gitignore new file mode 100644 index 000000000..d450e309c --- /dev/null +++ b/examples/tutorial/async/go/.gitignore @@ -0,0 +1,6 @@ +vendor/ +/go +/app +/__uberscript__ + +func.yaml diff --git a/examples/tutorial/async/go/README.md b/examples/tutorial/async/go/README.md new file mode 100644 index 000000000..ab01765fe --- /dev/null +++ b/examples/tutorial/async/go/README.md @@ -0,0 +1,28 @@ +# Aynchronous Function Example + +This is an example of an [asynchronous function](/docs/async.md). + +### First, run the following commands: + +```sh +# Initialize your function creating a func.yaml file +fn init --type async /hello-go-async + +# Test your function. This will run inside a container exactly how it will on the server +fn run + +# Now try with an input +cat sample.payload.json | fn run + +# Deploy your functions to the Oracle Functions server (default localhost:8080) +# This will create a route to your function as well +fn deploy myapp +``` + +### Now call your function: + +```sh +cat payload.json | fn call myapp hello-go-async +``` + +That's it! diff --git a/examples/tutorial/async/go/func.go b/examples/tutorial/async/go/func.go new file mode 100644 index 000000000..8e73bd0ca --- /dev/null +++ b/examples/tutorial/async/go/func.go @@ -0,0 +1,27 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" +) + +type Person struct { + Name string +} + +func main() { + + // b, err := ioutil.ReadAll(os.Stdin) + // if err != nil { + // log.Fatal(err) + // } + // fmt.Printf("BODY!!! %s", string(b)) + + p := &Person{Name: "World"} + json.NewDecoder(os.Stdin).Decode(p) + fmt.Printf("Hello %v!\n", p.Name) + + log.Println("---> stderr goes to the server logs.") +} diff --git a/examples/tutorial/async/go/sample.payload.json b/examples/tutorial/async/go/sample.payload.json new file mode 100644 index 000000000..97e136b69 --- /dev/null +++ b/examples/tutorial/async/go/sample.payload.json @@ -0,0 +1,3 @@ +{ + "name": "Johnny" +} diff --git a/fn/calls.go b/fn/calls.go index 33b473ecc..312623f53 100644 --- a/fn/calls.go +++ b/fn/calls.go @@ -1,12 +1,12 @@ package main import ( - "fmt" "context" + "fmt" - "github.com/funcy/functions_go/models" fnclient "github.com/funcy/functions_go/client" apicall "github.com/funcy/functions_go/client/call" + "github.com/funcy/functions_go/models" "github.com/urfave/cli" ) @@ -40,14 +40,14 @@ func calls() cli.Command { } func printCalls(calls []*models.Call) { - for _, call := range calls{ + for _, call := range calls { fmt.Println(fmt.Sprintf( - "ID: %v\n" + - "App: %v\n" + - "Route: %v\n" + - "Created At: %v\n" + - "Started At: %v\n" + - "Completed At: %v\n" + + "ID: %v\n"+ + "App: %v\n"+ + "Route: %v\n"+ + "Created At: %v\n"+ + "Started At: %v\n"+ + "Completed At: %v\n"+ "Status: %v\n", call.ID, call.AppName, call.Path, call.CreatedAt, call.StartedAt, call.CompletedAt, call.Status)) @@ -55,17 +55,16 @@ func printCalls(calls []*models.Call) { } func (call *callsCmd) get(ctx *cli.Context) error { - call_id := ctx.Args().Get(0) + callID := ctx.Args().Get(0) params := apicall.GetCallsCallParams{ - Call: call_id, + Call: callID, Context: context.Background(), } resp, err := call.client.Call.GetCallsCall(¶ms) if err != nil { switch err.(type) { case *apicall.GetCallsCallNotFound: - return fmt.Errorf("error: %v", err.( - *apicall.GetCallsCallNotFound).Payload.Error.Message) + return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message) } return fmt.Errorf("unexpected error: %v", err) @@ -85,8 +84,7 @@ func (call *callsCmd) list(ctx *cli.Context) error { if err != nil { switch err.(type) { case *apicall.GetCallsCallNotFound: - return fmt.Errorf("error: %v", err.( - *apicall.GetCallsCallNotFound).Payload.Error.Message) + return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message) } return fmt.Errorf("unexpected error: %v", err) diff --git a/fn/deploy.go b/fn/deploy.go index c6d3b6909..1b9cb2dc0 100644 --- a/fn/deploy.go +++ b/fn/deploy.go @@ -129,9 +129,8 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error { if err != nil { return err } - if funcfile.Path == nil || *funcfile.Path == "" { - dirName := "/" + path.Base(path.Dir(funcFilePath)) - funcfile.Path = &dirName + if funcfile.Path == "" { + funcfile.Path = "/" + path.Base(path.Dir(funcFilePath)) } if p.skippush { @@ -146,18 +145,18 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error { } func (p *deploycmd) route(c *cli.Context, ff *funcfile) error { - fmt.Printf("Updating route, setting %s -> %s...", ff.Path, ff.Name) + fmt.Printf("Updating route %s using image %s...\n", ff.Path, ff.FullName()) + fmt.Printf("%+v\ntype: %v\n", ff, *ff.Type) if err := resetBasePath(p.Configuration); err != nil { return fmt.Errorf("error setting endpoint: %v", err) } routesCmd := routesCmd{client: apiClient()} rt := &models.Route{} - if err := routeWithFuncFile(c, ff, rt); err != nil { return fmt.Errorf("error getting route with funcfile: %s", err) } - return routesCmd.patchRoute(c, p.appName, *ff.Path, rt) + return routesCmd.patchRoute(c, p.appName, ff.Path, rt) } func expandEnvConfig(configs map[string]string) map[string]string { diff --git a/fn/funcfile.go b/fn/funcfile.go index a5a8204de..d1114372e 100644 --- a/fn/funcfile.go +++ b/fn/funcfile.go @@ -45,7 +45,7 @@ type funcfile struct { Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"` Build []string `yaml:"build,omitempty" json:"build,omitempty"` Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"` - Path *string `yaml:"path,omitempty" json:"path,omitempty"` + Path string `yaml:"path,omitempty" json:"path,omitempty"` } func (ff *funcfile) FullName() string { diff --git a/fn/init.go b/fn/init.go index 903b71bde..ab94b3df5 100644 --- a/fn/init.go +++ b/fn/init.go @@ -50,6 +50,7 @@ type initFnCmd struct { entrypoint string cmd string format string + typeS string } func initFn() cli.Command { @@ -83,6 +84,12 @@ func initFn() cli.Command { Destination: &a.format, Value: "", }, + cli.StringFlag{ + Name: "type", + Usage: "sync or async", + Destination: &a.typeS, + Value: "", + }, }, } } @@ -124,10 +131,14 @@ func (a *initFnCmd) init(c *cli.Context) error { Entrypoint: a.entrypoint, Cmd: a.cmd, Format: ffmt, + Type: &a.typeS, + } + if ff.Type != nil && *ff.Type == "" { + ff.Type = nil } _, path := appNamePath(ff.FullName()) - ff.Path = &path + ff.Path = path if err := encodeFuncfileYAML("func.yaml", ff); err != nil { return err diff --git a/fn/lambda.go b/fn/lambda.go index b2913d020..47020c5bc 100644 --- a/fn/lambda.go +++ b/fn/lambda.go @@ -180,7 +180,7 @@ func createFunctionYaml(opts createImageOptions, functionName string) error { funcDesc := &funcfile{ Name: opts.Name, - Path: &path, + Path: path, Config: opts.Config, Version: "0.0.1", Runtime: &opts.Base, diff --git a/fn/routes.go b/fn/routes.go index 609e61d9a..d18809877 100644 --- a/fn/routes.go +++ b/fn/routes.go @@ -296,9 +296,13 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error { to := int32(ff.Timeout.Seconds()) rt.Timeout = &to } - if rt.Path == "" && ff.Path != nil { - rt.Path = *ff.Path + if rt.Path == "" && ff.Path != "" { + rt.Path = ff.Path } + if rt.Type == nil && ff.Type != nil && *ff.Type != "" { + rt.Type = *ff.Type + } + return nil } diff --git a/fn/run.go b/fn/run.go index 43eb06abc..a58c67ed7 100644 --- a/fn/run.go +++ b/fn/run.go @@ -48,22 +48,28 @@ func (r *runCmd) run(c *cli.Context) error { if err != nil { return err } + var ff *funcfile + // if image name is passed in, it will run that image image := c.Args().First() if image == "" { - ff, err := loadFuncfile() + ff, err = loadFuncfile() if err != nil { if _, ok := err.(*notFoundError); ok { return errors.New("error: image name is missing or no function file found") } return err } - image = ff.FullName() + } else { + ff = &funcfile{ + Name: image, + } } - return runff(image, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link")) + return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link")) } -func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error { +// TODO: THIS SHOULD USE THE RUNNER DRIVERS FROM THE SERVER SO IT'S ESSENTIALLY THE SAME PROCESS (MINUS DATABASE AND ALL THAT) +func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error { sh := []string{"docker", "run", "--rm", "-i"} var env []string // env for the shelled out docker run command @@ -80,7 +86,7 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin runEnv = append(runEnv, kvEq("METHOD", method)) runEnv = append(runEnv, kvEq("REQUEST_URL", "http://localhost:8080/myapp/hello")) runEnv = append(runEnv, kvEq("APP_NAME", "myapp")) - runEnv = append(runEnv, kvEq("ROUTE", "/hello")) + runEnv = append(runEnv, kvEq("ROUTE", "/hello")) // TODO: should we change this to PATH ? // add user defined envs runEnv = append(runEnv, envVars...) @@ -97,7 +103,13 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin sh = append(sh, "-e", e) } - sh = append(sh, image) + if ff.Type != nil && *ff.Type == "async" { + // if async, we'll run this in a separate thread and wait for it to complete + // reqID := id.New().String() + // I'm starting to think maybe `fn run` locally should work the same whether sync or async? Or how would we allow to test the output? + } + + sh = append(sh, ff.FullName()) cmd := exec.Command(sh[0], sh[1:]...) cmd.Stdin = stdin cmd.Stdout = stdout diff --git a/fn/testfn.go b/fn/testfn.go index 23f5d9340..d64ded0d7 100644 --- a/fn/testfn.go +++ b/fn/testfn.go @@ -68,8 +68,8 @@ func (t *testcmd) test(c *cli.Context) error { target := ff.FullName() runtest := runlocaltest if t.remote != "" { - if ff.Path == nil || *ff.Path == "" { - return errors.New("execution of tests on remote server demand that this function to have a `path`.") + if ff.Path == "" { + return errors.New("execution of tests on remote server demand that this function has a `path`.") } if err := resetBasePath(t.Configuration); err != nil { return fmt.Errorf("error setting endpoint: %v", err) @@ -80,7 +80,7 @@ func (t *testcmd) test(c *cli.Context) error { } u, err := url.Parse("../") - u.Path = path.Join(u.Path, "r", t.remote, *ff.Path) + u.Path = path.Join(u.Path, "r", t.remote, ff.Path) target = baseURL.ResolveReference(u).String() runtest = runremotetest } @@ -134,7 +134,8 @@ func runlocaltest(target string, in, expectedOut, expectedErr *string, env map[s restrictedEnv = append(restrictedEnv, k) } - if err := runff(target, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil { + ff := &funcfile{Name: target} + if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil { return fmt.Errorf("%v\nstdout:%s\nstderr:%s\n", err, stdout.String(), stderr.String()) }