Fixes async payload passing for #68.

This commit is contained in:
Travis Reeder
2017-06-20 11:32:51 -07:00
parent c94dab3d45
commit 8c96d3ba2f
23 changed files with 276 additions and 225 deletions

View File

@@ -1,100 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/errors"
"github.com/go-openapi/validate"
)
/*NewTask new task
swagger:model NewTask
*/
type NewTask struct {
/* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds.
*/
Delay int32 `json:"delay,omitempty"`
/* Name of Docker image to use. This is optional and can be used to override the image defined at the route level.
Required: true
*/
Image *string `json:"image"`
/* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried."
*/
MaxRetries int32 `json:"max_retries,omitempty"`
/* Payload for the task. This is what you pass into each task to make it do something.
*/
Payload string `json:"payload,omitempty"`
/* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order.
Required: true
*/
Priority *int32 `json:"priority"`
/* Time in seconds to wait before retrying the task. Must be a non-negative integer.
*/
RetriesDelay *int32 `json:"retries_delay,omitempty"`
/* Maximum runtime in seconds. If a consumer retrieves the
task, but does not change it's status within timeout seconds, the task
is considered failed, with reason timeout (Titan may allow a small
grace period). The consumer should also kill the task after timeout
seconds. If a consumer tries to change status after Titan has already
timed out the task, the consumer will be ignored.
*/
Timeout *int32 `json:"timeout,omitempty"`
/* Hot function idle timeout in seconds before termination.
*/
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
}
// Validate validates this new task
func (m *NewTask) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateImage(formats); err != nil {
// prop
res = append(res, err)
}
if err := m.validatePriority(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *NewTask) validateImage(formats strfmt.Registry) error {
if err := validate.Required("image", "body", m.Image); err != nil {
return err
}
return nil
}
func (m *NewTask) validatePriority(formats strfmt.Registry) error {
if err := validate.Required("priority", "body", m.Priority); err != nil {
return err
}
return nil
}

View File

@@ -35,19 +35,18 @@ type FnCall struct {
StartedAt strfmt.DateTime `json:"started_at,omitempty"`
AppName string `json:"app_name,omitempty"`
Path string `json:"path"`
}
func (fnCall *FnCall) FromTask(task *Task) *FnCall {
return &FnCall{
CreatedAt:task.CreatedAt,
StartedAt:task.StartedAt,
CompletedAt:task.CompletedAt,
AppName:task.AppName,
Path:task.Path,
CreatedAt: task.CreatedAt,
StartedAt: task.StartedAt,
CompletedAt: task.CompletedAt,
AppName: task.AppName,
Path: task.Path,
IDStatus: IDStatus{
ID:task.ID,
Status:task.Status,
ID: task.ID,
Status: task.Status,
},
}
}
@@ -57,10 +56,52 @@ func (fnCall *FnCall) FromTask(task *Task) *FnCall {
swagger:model Task
*/
type Task struct {
NewTask
IDStatus
/* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds.
*/
Delay int32 `json:"delay,omitempty"`
/* Name of Docker image to use. This is optional and can be used to override the image defined at the route level.
Required: true
*/
Image *string `json:"image"`
/* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried."
*/
MaxRetries int32 `json:"max_retries,omitempty"`
/* Payload for the task. This is what you pass into each task to make it do something.
*/
Payload string `json:"payload,omitempty"`
/* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order.
Required: true
*/
Priority *int32 `json:"priority"`
/* Time in seconds to wait before retrying the task. Must be a non-negative integer.
*/
RetriesDelay *int32 `json:"retries_delay,omitempty"`
/* Maximum runtime in seconds. If a consumer retrieves the
task, but does not change it's status within timeout seconds, the task
is considered failed, with reason timeout (Titan may allow a small
grace period). The consumer should also kill the task after timeout
seconds. If a consumer tries to change status after Titan has already
timed out the task, the consumer will be ignored.
*/
Timeout *int32 `json:"timeout,omitempty"`
/* Hot function idle timeout in seconds before termination.
*/
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
/* Time when task completed, whether it was successul or failed. Always in UTC.
*/
CompletedAt strfmt.DateTime `json:"completed_at,omitempty"`
@@ -116,9 +157,7 @@ type Task struct {
func (m *Task) Validate(formats strfmt.Registry) error {
var res []error
if err := m.NewTask.Validate(formats); err != nil {
res = append(res, err)
}
// NewTask validations will get generated automatically
if err := m.IDStatus.Validate(formats); err != nil {
res = append(res, err)

View File

@@ -127,7 +127,8 @@ func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.D
}
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
log.Debug("Running task:", task.ID)
log.Info("Running task:", task.ID)
// log.Infof("Task: %+v", task)
wg.Add(1)

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
@@ -63,6 +64,10 @@ func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg
startedAt := strfmt.DateTime(time.Now())
newTask.StartedAt = startedAt
// set payload as Stdin
// fmt.Printf("ABOUT TO PASS IN PAYLOAD: %v", newTask.Payload)
cfg.Stdin = strings.NewReader(newTask.Payload)
result, err := rnr.RunTask(ctx, cfg)
completedAt := strfmt.DateTime(time.Now())

View File

@@ -90,7 +90,7 @@ func (s *Server) handleRouteCreate(c *gin.Context) {
return
}
s.cacherefresh(route)
s.cacheRefresh(route)
c.JSON(http.StatusOK, routeResponse{"Route successfully created", route})
}

View File

@@ -36,6 +36,8 @@ func (s *Server) handleRouteUpdate(c *gin.Context) {
return
}
// fmt.Printf("ROUTE BOUND: %+v", *wroute.Route)
wroute.Route.AppName = c.MustGet(api.AppName).(string)
wroute.Route.Path = path.Clean(c.MustGet(api.Path).(string))
@@ -69,7 +71,7 @@ func (s *Server) handleRouteUpdate(c *gin.Context) {
return
}
s.cacherefresh(route)
s.cacheRefresh(route)
c.JSON(http.StatusOK, routeResponse{"Route successfully updated", route})
}

View File

@@ -171,7 +171,7 @@ func (s *Server) cacheget(appname, path string) (*models.Route, bool) {
return route, ok
}
func (s *Server) cacherefresh(route *models.Route) {
func (s *Server) cacheRefresh(route *models.Route) {
s.mu.Lock()
defer s.mu.Unlock()
s.hotroutes.Refresh(route)

View File

@@ -13,6 +13,7 @@ If you are a developer using Oracle Functions through the API, this section is f
* [Open Function Format](function-format.md)
* [API Reference](http://petstore.swagger.io/?url=https://raw.githubusercontent.com/treeder/functions/master/docs/swagger.yml)
* [Hot functions](hot-functions.md)
* [Async functions](async.md)
* [FAQ](faq.md)
## For Operators

18
docs/async.md Normal file
View File

@@ -0,0 +1,18 @@
# Asynchronous Functions
Asynchronous (async) functions will run your function at some point in the future. The default mode is synchronous which means
a function is executed and the caller blocks while waiting for the response. Asynchronous on the other, puts the request into a
message queue and responds immediately to the caller. The function will then be executed at some point in the future, upon resource availability giving priority
to synchronous calls. Also, since it is using a message queue, you can safely queue up millions of function calls without worrying about
capacity.
Async will return immediately with a `call_id`, for example:
```json
{"call_id": "abc123"}
```
The `call_id` can then be used to retrieve the status at a later time.
Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete.
For instance, image processing, video processing, data processing, ETL, etc.

View File

@@ -555,10 +555,19 @@ definitions:
readOnly: true
Task:
allOf:
- $ref: "#/definitions/NewTask"
- type: object
type: object
required:
- image
properties:
image:
type: string
description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
payload:
type: string
# 256k
# maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272
# maxLength: 268435456
description: Payload for the task. This is what you pass into each task to make it do something.
group_name:
type: string
description: "Group this task belongs to."
@@ -624,21 +633,6 @@ definitions:
error:
$ref: '#/definitions/ErrorBody'
NewTask:
type: object
required:
- image
properties:
image:
type: string
description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
payload:
type: string
# 256k
# maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272
# maxLength: 268435456
description: Payload for the task. This is what you pass into each task to make it do something.
TaskWrapper:
type: object
required:

View File

@@ -87,7 +87,7 @@ You should see it say `Hello Johnny!` now instead of `Hello World!`.
Oracle Functions supports synchronous function calls like we just tried above, and asynchronous for background processing.
Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete.
[Asynchronous functions](async.md) are great for tasks that are CPU heavy or take more than a few seconds to complete.
For instance, image processing, video processing, data processing, ETL, etc.
Architecturally, the main difference between synchronous and asynchronous is that requests
to asynchronous functions are put in a queue and executed on upon resource availability so that they do not interfere with the fast synchronous responses required for an API.
@@ -106,6 +106,8 @@ curl -H "Content-Type: application/json" -X POST -d '{
}' http://localhost:8080/v1/apps/myapp/routes
```
or set `type: async` in your `func.yaml`.
Now if you request this route:
```sh

6
examples/tutorial/async/go/.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
vendor/
/go
/app
/__uberscript__
func.yaml

View File

@@ -0,0 +1,28 @@
# Aynchronous Function Example
This is an example of an [asynchronous function](/docs/async.md).
### First, run the following commands:
```sh
# Initialize your function creating a func.yaml file
fn init --type async <DOCKERHUB_USERNAME>/hello-go-async
# Test your function. This will run inside a container exactly how it will on the server
fn run
# Now try with an input
cat sample.payload.json | fn run
# Deploy your functions to the Oracle Functions server (default localhost:8080)
# This will create a route to your function as well
fn deploy myapp
```
### Now call your function:
```sh
cat payload.json | fn call myapp hello-go-async
```
That's it!

View File

@@ -0,0 +1,27 @@
package main
import (
"encoding/json"
"fmt"
"log"
"os"
)
type Person struct {
Name string
}
func main() {
// b, err := ioutil.ReadAll(os.Stdin)
// if err != nil {
// log.Fatal(err)
// }
// fmt.Printf("BODY!!! %s", string(b))
p := &Person{Name: "World"}
json.NewDecoder(os.Stdin).Decode(p)
fmt.Printf("Hello %v!\n", p.Name)
log.Println("---> stderr goes to the server logs.")
}

View File

@@ -0,0 +1,3 @@
{
"name": "Johnny"
}

View File

@@ -1,12 +1,12 @@
package main
import (
"fmt"
"context"
"fmt"
"github.com/funcy/functions_go/models"
fnclient "github.com/funcy/functions_go/client"
apicall "github.com/funcy/functions_go/client/call"
"github.com/funcy/functions_go/models"
"github.com/urfave/cli"
)
@@ -40,14 +40,14 @@ func calls() cli.Command {
}
func printCalls(calls []*models.Call) {
for _, call := range calls{
for _, call := range calls {
fmt.Println(fmt.Sprintf(
"ID: %v\n" +
"App: %v\n" +
"Route: %v\n" +
"Created At: %v\n" +
"Started At: %v\n" +
"Completed At: %v\n" +
"ID: %v\n"+
"App: %v\n"+
"Route: %v\n"+
"Created At: %v\n"+
"Started At: %v\n"+
"Completed At: %v\n"+
"Status: %v\n",
call.ID, call.AppName, call.Path, call.CreatedAt,
call.StartedAt, call.CompletedAt, call.Status))
@@ -55,17 +55,16 @@ func printCalls(calls []*models.Call) {
}
func (call *callsCmd) get(ctx *cli.Context) error {
call_id := ctx.Args().Get(0)
callID := ctx.Args().Get(0)
params := apicall.GetCallsCallParams{
Call: call_id,
Call: callID,
Context: context.Background(),
}
resp, err := call.client.Call.GetCallsCall(&params)
if err != nil {
switch err.(type) {
case *apicall.GetCallsCallNotFound:
return fmt.Errorf("error: %v", err.(
*apicall.GetCallsCallNotFound).Payload.Error.Message)
return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message)
}
return fmt.Errorf("unexpected error: %v", err)
@@ -85,8 +84,7 @@ func (call *callsCmd) list(ctx *cli.Context) error {
if err != nil {
switch err.(type) {
case *apicall.GetCallsCallNotFound:
return fmt.Errorf("error: %v", err.(
*apicall.GetCallsCallNotFound).Payload.Error.Message)
return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message)
}
return fmt.Errorf("unexpected error: %v", err)

View File

@@ -129,9 +129,8 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error {
if err != nil {
return err
}
if funcfile.Path == nil || *funcfile.Path == "" {
dirName := "/" + path.Base(path.Dir(funcFilePath))
funcfile.Path = &dirName
if funcfile.Path == "" {
funcfile.Path = "/" + path.Base(path.Dir(funcFilePath))
}
if p.skippush {
@@ -146,18 +145,18 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error {
}
func (p *deploycmd) route(c *cli.Context, ff *funcfile) error {
fmt.Printf("Updating route, setting %s -> %s...", ff.Path, ff.Name)
fmt.Printf("Updating route %s using image %s...\n", ff.Path, ff.FullName())
fmt.Printf("%+v\ntype: %v\n", ff, *ff.Type)
if err := resetBasePath(p.Configuration); err != nil {
return fmt.Errorf("error setting endpoint: %v", err)
}
routesCmd := routesCmd{client: apiClient()}
rt := &models.Route{}
if err := routeWithFuncFile(c, ff, rt); err != nil {
return fmt.Errorf("error getting route with funcfile: %s", err)
}
return routesCmd.patchRoute(c, p.appName, *ff.Path, rt)
return routesCmd.patchRoute(c, p.appName, ff.Path, rt)
}
func expandEnvConfig(configs map[string]string) map[string]string {

View File

@@ -45,7 +45,7 @@ type funcfile struct {
Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"`
Build []string `yaml:"build,omitempty" json:"build,omitempty"`
Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"`
Path *string `yaml:"path,omitempty" json:"path,omitempty"`
Path string `yaml:"path,omitempty" json:"path,omitempty"`
}
func (ff *funcfile) FullName() string {

View File

@@ -50,6 +50,7 @@ type initFnCmd struct {
entrypoint string
cmd string
format string
typeS string
}
func initFn() cli.Command {
@@ -83,6 +84,12 @@ func initFn() cli.Command {
Destination: &a.format,
Value: "",
},
cli.StringFlag{
Name: "type",
Usage: "sync or async",
Destination: &a.typeS,
Value: "",
},
},
}
}
@@ -124,10 +131,14 @@ func (a *initFnCmd) init(c *cli.Context) error {
Entrypoint: a.entrypoint,
Cmd: a.cmd,
Format: ffmt,
Type: &a.typeS,
}
if ff.Type != nil && *ff.Type == "" {
ff.Type = nil
}
_, path := appNamePath(ff.FullName())
ff.Path = &path
ff.Path = path
if err := encodeFuncfileYAML("func.yaml", ff); err != nil {
return err

View File

@@ -180,7 +180,7 @@ func createFunctionYaml(opts createImageOptions, functionName string) error {
funcDesc := &funcfile{
Name: opts.Name,
Path: &path,
Path: path,
Config: opts.Config,
Version: "0.0.1",
Runtime: &opts.Base,

View File

@@ -296,9 +296,13 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error {
to := int32(ff.Timeout.Seconds())
rt.Timeout = &to
}
if rt.Path == "" && ff.Path != nil {
rt.Path = *ff.Path
if rt.Path == "" && ff.Path != "" {
rt.Path = ff.Path
}
if rt.Type == nil && ff.Type != nil && *ff.Type != "" {
rt.Type = *ff.Type
}
return nil
}

View File

@@ -48,22 +48,28 @@ func (r *runCmd) run(c *cli.Context) error {
if err != nil {
return err
}
var ff *funcfile
// if image name is passed in, it will run that image
image := c.Args().First()
if image == "" {
ff, err := loadFuncfile()
ff, err = loadFuncfile()
if err != nil {
if _, ok := err.(*notFoundError); ok {
return errors.New("error: image name is missing or no function file found")
}
return err
}
image = ff.FullName()
} else {
ff = &funcfile{
Name: image,
}
}
return runff(image, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"))
return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"))
}
func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error {
// TODO: THIS SHOULD USE THE RUNNER DRIVERS FROM THE SERVER SO IT'S ESSENTIALLY THE SAME PROCESS (MINUS DATABASE AND ALL THAT)
func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error {
sh := []string{"docker", "run", "--rm", "-i"}
var env []string // env for the shelled out docker run command
@@ -80,7 +86,7 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin
runEnv = append(runEnv, kvEq("METHOD", method))
runEnv = append(runEnv, kvEq("REQUEST_URL", "http://localhost:8080/myapp/hello"))
runEnv = append(runEnv, kvEq("APP_NAME", "myapp"))
runEnv = append(runEnv, kvEq("ROUTE", "/hello"))
runEnv = append(runEnv, kvEq("ROUTE", "/hello")) // TODO: should we change this to PATH ?
// add user defined envs
runEnv = append(runEnv, envVars...)
@@ -97,7 +103,13 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin
sh = append(sh, "-e", e)
}
sh = append(sh, image)
if ff.Type != nil && *ff.Type == "async" {
// if async, we'll run this in a separate thread and wait for it to complete
// reqID := id.New().String()
// I'm starting to think maybe `fn run` locally should work the same whether sync or async? Or how would we allow to test the output?
}
sh = append(sh, ff.FullName())
cmd := exec.Command(sh[0], sh[1:]...)
cmd.Stdin = stdin
cmd.Stdout = stdout

View File

@@ -68,8 +68,8 @@ func (t *testcmd) test(c *cli.Context) error {
target := ff.FullName()
runtest := runlocaltest
if t.remote != "" {
if ff.Path == nil || *ff.Path == "" {
return errors.New("execution of tests on remote server demand that this function to have a `path`.")
if ff.Path == "" {
return errors.New("execution of tests on remote server demand that this function has a `path`.")
}
if err := resetBasePath(t.Configuration); err != nil {
return fmt.Errorf("error setting endpoint: %v", err)
@@ -80,7 +80,7 @@ func (t *testcmd) test(c *cli.Context) error {
}
u, err := url.Parse("../")
u.Path = path.Join(u.Path, "r", t.remote, *ff.Path)
u.Path = path.Join(u.Path, "r", t.remote, ff.Path)
target = baseURL.ResolveReference(u).String()
runtest = runremotetest
}
@@ -134,7 +134,8 @@ func runlocaltest(target string, in, expectedOut, expectedErr *string, env map[s
restrictedEnv = append(restrictedEnv, k)
}
if err := runff(target, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil {
ff := &funcfile{Name: target}
if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil {
return fmt.Errorf("%v\nstdout:%s\nstderr:%s\n", err, stdout.String(), stderr.String())
}