mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'async-fixes' into 'master'
Fixes async payload passing for #68. See merge request !67
This commit is contained in:
@@ -1,100 +0,0 @@
|
||||
package models
|
||||
|
||||
// This file was generated by the swagger tool.
|
||||
// Editing this file might prove futile when you re-run the swagger generate command
|
||||
|
||||
import (
|
||||
strfmt "github.com/go-openapi/strfmt"
|
||||
|
||||
"github.com/go-openapi/errors"
|
||||
"github.com/go-openapi/validate"
|
||||
)
|
||||
|
||||
/*NewTask new task
|
||||
|
||||
swagger:model NewTask
|
||||
*/
|
||||
type NewTask struct {
|
||||
|
||||
/* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds.
|
||||
*/
|
||||
Delay int32 `json:"delay,omitempty"`
|
||||
|
||||
/* Name of Docker image to use. This is optional and can be used to override the image defined at the route level.
|
||||
|
||||
Required: true
|
||||
*/
|
||||
Image *string `json:"image"`
|
||||
|
||||
/* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried."
|
||||
|
||||
*/
|
||||
MaxRetries int32 `json:"max_retries,omitempty"`
|
||||
|
||||
/* Payload for the task. This is what you pass into each task to make it do something.
|
||||
*/
|
||||
Payload string `json:"payload,omitempty"`
|
||||
|
||||
/* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order.
|
||||
|
||||
Required: true
|
||||
*/
|
||||
Priority *int32 `json:"priority"`
|
||||
|
||||
/* Time in seconds to wait before retrying the task. Must be a non-negative integer.
|
||||
*/
|
||||
RetriesDelay *int32 `json:"retries_delay,omitempty"`
|
||||
|
||||
/* Maximum runtime in seconds. If a consumer retrieves the
|
||||
task, but does not change it's status within timeout seconds, the task
|
||||
is considered failed, with reason timeout (Titan may allow a small
|
||||
grace period). The consumer should also kill the task after timeout
|
||||
seconds. If a consumer tries to change status after Titan has already
|
||||
timed out the task, the consumer will be ignored.
|
||||
|
||||
*/
|
||||
Timeout *int32 `json:"timeout,omitempty"`
|
||||
|
||||
/* Hot function idle timeout in seconds before termination.
|
||||
|
||||
*/
|
||||
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
|
||||
}
|
||||
|
||||
// Validate validates this new task
|
||||
func (m *NewTask) Validate(formats strfmt.Registry) error {
|
||||
var res []error
|
||||
|
||||
if err := m.validateImage(formats); err != nil {
|
||||
// prop
|
||||
res = append(res, err)
|
||||
}
|
||||
|
||||
if err := m.validatePriority(formats); err != nil {
|
||||
// prop
|
||||
res = append(res, err)
|
||||
}
|
||||
|
||||
if len(res) > 0 {
|
||||
return errors.CompositeValidationError(res...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NewTask) validateImage(formats strfmt.Registry) error {
|
||||
|
||||
if err := validate.Required("image", "body", m.Image); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *NewTask) validatePriority(formats strfmt.Registry) error {
|
||||
|
||||
if err := validate.Required("priority", "body", m.Priority); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -31,23 +31,22 @@ const (
|
||||
type FnCall struct {
|
||||
IDStatus
|
||||
CompletedAt strfmt.DateTime `json:"completed_at,omitempty"`
|
||||
CreatedAt strfmt.DateTime `json:"created_at,omitempty"`
|
||||
StartedAt strfmt.DateTime `json:"started_at,omitempty"`
|
||||
AppName string `json:"app_name,omitempty"`
|
||||
Path string `json:"path"`
|
||||
|
||||
CreatedAt strfmt.DateTime `json:"created_at,omitempty"`
|
||||
StartedAt strfmt.DateTime `json:"started_at,omitempty"`
|
||||
AppName string `json:"app_name,omitempty"`
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
func (fnCall *FnCall) FromTask(task *Task) *FnCall {
|
||||
return &FnCall{
|
||||
CreatedAt:task.CreatedAt,
|
||||
StartedAt:task.StartedAt,
|
||||
CompletedAt:task.CompletedAt,
|
||||
AppName:task.AppName,
|
||||
Path:task.Path,
|
||||
CreatedAt: task.CreatedAt,
|
||||
StartedAt: task.StartedAt,
|
||||
CompletedAt: task.CompletedAt,
|
||||
AppName: task.AppName,
|
||||
Path: task.Path,
|
||||
IDStatus: IDStatus{
|
||||
ID:task.ID,
|
||||
Status:task.Status,
|
||||
ID: task.ID,
|
||||
Status: task.Status,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -57,10 +56,52 @@ func (fnCall *FnCall) FromTask(task *Task) *FnCall {
|
||||
swagger:model Task
|
||||
*/
|
||||
type Task struct {
|
||||
NewTask
|
||||
|
||||
IDStatus
|
||||
|
||||
/* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds.
|
||||
*/
|
||||
Delay int32 `json:"delay,omitempty"`
|
||||
|
||||
/* Name of Docker image to use. This is optional and can be used to override the image defined at the route level.
|
||||
|
||||
Required: true
|
||||
*/
|
||||
Image *string `json:"image"`
|
||||
|
||||
/* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried."
|
||||
|
||||
*/
|
||||
MaxRetries int32 `json:"max_retries,omitempty"`
|
||||
|
||||
/* Payload for the task. This is what you pass into each task to make it do something.
|
||||
*/
|
||||
Payload string `json:"payload,omitempty"`
|
||||
|
||||
/* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order.
|
||||
|
||||
Required: true
|
||||
*/
|
||||
Priority *int32 `json:"priority"`
|
||||
|
||||
/* Time in seconds to wait before retrying the task. Must be a non-negative integer.
|
||||
*/
|
||||
RetriesDelay *int32 `json:"retries_delay,omitempty"`
|
||||
|
||||
/* Maximum runtime in seconds. If a consumer retrieves the
|
||||
task, but does not change it's status within timeout seconds, the task
|
||||
is considered failed, with reason timeout (Titan may allow a small
|
||||
grace period). The consumer should also kill the task after timeout
|
||||
seconds. If a consumer tries to change status after Titan has already
|
||||
timed out the task, the consumer will be ignored.
|
||||
|
||||
*/
|
||||
Timeout *int32 `json:"timeout,omitempty"`
|
||||
|
||||
/* Hot function idle timeout in seconds before termination.
|
||||
|
||||
*/
|
||||
IdleTimeout *int32 `json:"idle_timeout,omitempty"`
|
||||
|
||||
/* Time when task completed, whether it was successul or failed. Always in UTC.
|
||||
*/
|
||||
CompletedAt strfmt.DateTime `json:"completed_at,omitempty"`
|
||||
@@ -116,9 +157,7 @@ type Task struct {
|
||||
func (m *Task) Validate(formats strfmt.Registry) error {
|
||||
var res []error
|
||||
|
||||
if err := m.NewTask.Validate(formats); err != nil {
|
||||
res = append(res, err)
|
||||
}
|
||||
// NewTask validations will get generated automatically
|
||||
|
||||
if err := m.IDStatus.Validate(formats); err != nil {
|
||||
res = append(res, err)
|
||||
|
||||
@@ -127,7 +127,8 @@ func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.D
|
||||
}
|
||||
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
|
||||
log.Debug("Running task:", task.ID)
|
||||
log.Info("Running task:", task.ID)
|
||||
// log.Infof("Task: %+v", task)
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -63,6 +64,10 @@ func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg
|
||||
startedAt := strfmt.DateTime(time.Now())
|
||||
newTask.StartedAt = startedAt
|
||||
|
||||
// set payload as Stdin
|
||||
// fmt.Printf("ABOUT TO PASS IN PAYLOAD: %v", newTask.Payload)
|
||||
cfg.Stdin = strings.NewReader(newTask.Payload)
|
||||
|
||||
result, err := rnr.RunTask(ctx, cfg)
|
||||
|
||||
completedAt := strfmt.DateTime(time.Now())
|
||||
|
||||
@@ -90,7 +90,7 @@ func (s *Server) handleRouteCreate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
s.cacherefresh(route)
|
||||
s.cacheRefresh(route)
|
||||
|
||||
c.JSON(http.StatusOK, routeResponse{"Route successfully created", route})
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ func (s *Server) handleRouteUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// fmt.Printf("ROUTE BOUND: %+v", *wroute.Route)
|
||||
|
||||
wroute.Route.AppName = c.MustGet(api.AppName).(string)
|
||||
wroute.Route.Path = path.Clean(c.MustGet(api.Path).(string))
|
||||
|
||||
@@ -69,7 +71,7 @@ func (s *Server) handleRouteUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
s.cacherefresh(route)
|
||||
s.cacheRefresh(route)
|
||||
|
||||
c.JSON(http.StatusOK, routeResponse{"Route successfully updated", route})
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func (s *Server) cacheget(appname, path string) (*models.Route, bool) {
|
||||
return route, ok
|
||||
}
|
||||
|
||||
func (s *Server) cacherefresh(route *models.Route) {
|
||||
func (s *Server) cacheRefresh(route *models.Route) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.hotroutes.Refresh(route)
|
||||
|
||||
@@ -13,6 +13,7 @@ If you are a developer using Oracle Functions through the API, this section is f
|
||||
* [Open Function Format](function-format.md)
|
||||
* [API Reference](http://petstore.swagger.io/?url=https://raw.githubusercontent.com/treeder/functions/master/docs/swagger.yml)
|
||||
* [Hot functions](hot-functions.md)
|
||||
* [Async functions](async.md)
|
||||
* [FAQ](faq.md)
|
||||
|
||||
## For Operators
|
||||
|
||||
18
docs/async.md
Normal file
18
docs/async.md
Normal file
@@ -0,0 +1,18 @@
|
||||
# Asynchronous Functions
|
||||
|
||||
Asynchronous (async) functions will run your function at some point in the future. The default mode is synchronous which means
|
||||
a function is executed and the caller blocks while waiting for the response. Asynchronous on the other, puts the request into a
|
||||
message queue and responds immediately to the caller. The function will then be executed at some point in the future, upon resource availability giving priority
|
||||
to synchronous calls. Also, since it is using a message queue, you can safely queue up millions of function calls without worrying about
|
||||
capacity.
|
||||
|
||||
Async will return immediately with a `call_id`, for example:
|
||||
|
||||
```json
|
||||
{"call_id": "abc123"}
|
||||
```
|
||||
|
||||
The `call_id` can then be used to retrieve the status at a later time.
|
||||
|
||||
Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete.
|
||||
For instance, image processing, video processing, data processing, ETL, etc.
|
||||
128
docs/swagger.yml
128
docs/swagger.yml
@@ -555,58 +555,67 @@ definitions:
|
||||
readOnly: true
|
||||
|
||||
Task:
|
||||
allOf:
|
||||
- $ref: "#/definitions/NewTask"
|
||||
- type: object
|
||||
properties:
|
||||
group_name:
|
||||
type: string
|
||||
description: "Group this task belongs to."
|
||||
readOnly: true
|
||||
error:
|
||||
type: string
|
||||
description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log."
|
||||
reason:
|
||||
type: string
|
||||
description: |
|
||||
Machine usable reason for task being in this state.
|
||||
Valid values for error status are `timeout | killed | bad_exit`.
|
||||
Valid values for cancelled status are `client_request`.
|
||||
For everything else, this is undefined.
|
||||
enum:
|
||||
- timeout
|
||||
- killed
|
||||
- bad_exit
|
||||
- client_request
|
||||
created_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task was submitted. Always in UTC.
|
||||
readOnly: true
|
||||
started_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task started execution. Always in UTC.
|
||||
completed_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task completed, whether it was successul or failed. Always in UTC.
|
||||
# We maintain a doubly linked list of the retried task to the
|
||||
# original task.
|
||||
retry_of:
|
||||
type: string
|
||||
description: If this field is set, then this task is a retry of the ID in this field.
|
||||
readOnly: true
|
||||
retry_at:
|
||||
type: string
|
||||
description: If this field is set, then this task was retried by the task referenced in this field.
|
||||
readOnly: true
|
||||
env_vars:
|
||||
# this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties
|
||||
type: object
|
||||
description: Env vars for the task. Comes from the ones set on the Group.
|
||||
additionalProperties:
|
||||
type: string
|
||||
type: object
|
||||
required:
|
||||
- image
|
||||
properties:
|
||||
image:
|
||||
type: string
|
||||
description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
|
||||
payload:
|
||||
type: string
|
||||
# 256k
|
||||
# maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272
|
||||
# maxLength: 268435456
|
||||
description: Payload for the task. This is what you pass into each task to make it do something.
|
||||
group_name:
|
||||
type: string
|
||||
description: "Group this task belongs to."
|
||||
readOnly: true
|
||||
error:
|
||||
type: string
|
||||
description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log."
|
||||
reason:
|
||||
type: string
|
||||
description: |
|
||||
Machine usable reason for task being in this state.
|
||||
Valid values for error status are `timeout | killed | bad_exit`.
|
||||
Valid values for cancelled status are `client_request`.
|
||||
For everything else, this is undefined.
|
||||
enum:
|
||||
- timeout
|
||||
- killed
|
||||
- bad_exit
|
||||
- client_request
|
||||
created_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task was submitted. Always in UTC.
|
||||
readOnly: true
|
||||
started_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task started execution. Always in UTC.
|
||||
completed_at:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Time when task completed, whether it was successul or failed. Always in UTC.
|
||||
# We maintain a doubly linked list of the retried task to the
|
||||
# original task.
|
||||
retry_of:
|
||||
type: string
|
||||
description: If this field is set, then this task is a retry of the ID in this field.
|
||||
readOnly: true
|
||||
retry_at:
|
||||
type: string
|
||||
description: If this field is set, then this task was retried by the task referenced in this field.
|
||||
readOnly: true
|
||||
env_vars:
|
||||
# this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties
|
||||
type: object
|
||||
description: Env vars for the task. Comes from the ones set on the Group.
|
||||
additionalProperties:
|
||||
type: string
|
||||
|
||||
ErrorBody:
|
||||
type: object
|
||||
@@ -624,21 +633,6 @@ definitions:
|
||||
error:
|
||||
$ref: '#/definitions/ErrorBody'
|
||||
|
||||
NewTask:
|
||||
type: object
|
||||
required:
|
||||
- image
|
||||
properties:
|
||||
image:
|
||||
type: string
|
||||
description: Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
|
||||
payload:
|
||||
type: string
|
||||
# 256k
|
||||
# maxLength breaks ruby generator too: https://github.com/treeder/worker_ruby/blob/0aa9236ce5060af3f15758937712973f80dd54fe/lib/iron_titan/models/task.rb#L272
|
||||
# maxLength: 268435456
|
||||
description: Payload for the task. This is what you pass into each task to make it do something.
|
||||
|
||||
TaskWrapper:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -87,7 +87,7 @@ You should see it say `Hello Johnny!` now instead of `Hello World!`.
|
||||
|
||||
Oracle Functions supports synchronous function calls like we just tried above, and asynchronous for background processing.
|
||||
|
||||
Asynchronous function calls are great for tasks that are CPU heavy or take more than a few seconds to complete.
|
||||
[Asynchronous functions](async.md) are great for tasks that are CPU heavy or take more than a few seconds to complete.
|
||||
For instance, image processing, video processing, data processing, ETL, etc.
|
||||
Architecturally, the main difference between synchronous and asynchronous is that requests
|
||||
to asynchronous functions are put in a queue and executed on upon resource availability so that they do not interfere with the fast synchronous responses required for an API.
|
||||
@@ -106,6 +106,8 @@ curl -H "Content-Type: application/json" -X POST -d '{
|
||||
}' http://localhost:8080/v1/apps/myapp/routes
|
||||
```
|
||||
|
||||
or set `type: async` in your `func.yaml`.
|
||||
|
||||
Now if you request this route:
|
||||
|
||||
```sh
|
||||
|
||||
6
examples/tutorial/async/go/.gitignore
vendored
Normal file
6
examples/tutorial/async/go/.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
vendor/
|
||||
/go
|
||||
/app
|
||||
/__uberscript__
|
||||
|
||||
func.yaml
|
||||
28
examples/tutorial/async/go/README.md
Normal file
28
examples/tutorial/async/go/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# Aynchronous Function Example
|
||||
|
||||
This is an example of an [asynchronous function](/docs/async.md).
|
||||
|
||||
### First, run the following commands:
|
||||
|
||||
```sh
|
||||
# Initialize your function creating a func.yaml file
|
||||
fn init --type async <DOCKERHUB_USERNAME>/hello-go-async
|
||||
|
||||
# Test your function. This will run inside a container exactly how it will on the server
|
||||
fn run
|
||||
|
||||
# Now try with an input
|
||||
cat sample.payload.json | fn run
|
||||
|
||||
# Deploy your functions to the Oracle Functions server (default localhost:8080)
|
||||
# This will create a route to your function as well
|
||||
fn deploy myapp
|
||||
```
|
||||
|
||||
### Now call your function:
|
||||
|
||||
```sh
|
||||
cat payload.json | fn call myapp hello-go-async
|
||||
```
|
||||
|
||||
That's it!
|
||||
27
examples/tutorial/async/go/func.go
Normal file
27
examples/tutorial/async/go/func.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Person struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
// b, err := ioutil.ReadAll(os.Stdin)
|
||||
// if err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// fmt.Printf("BODY!!! %s", string(b))
|
||||
|
||||
p := &Person{Name: "World"}
|
||||
json.NewDecoder(os.Stdin).Decode(p)
|
||||
fmt.Printf("Hello %v!\n", p.Name)
|
||||
|
||||
log.Println("---> stderr goes to the server logs.")
|
||||
}
|
||||
3
examples/tutorial/async/go/sample.payload.json
Normal file
3
examples/tutorial/async/go/sample.payload.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"name": "Johnny"
|
||||
}
|
||||
28
fn/calls.go
28
fn/calls.go
@@ -1,12 +1,12 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/funcy/functions_go/models"
|
||||
fnclient "github.com/funcy/functions_go/client"
|
||||
apicall "github.com/funcy/functions_go/client/call"
|
||||
"github.com/funcy/functions_go/models"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
@@ -40,14 +40,14 @@ func calls() cli.Command {
|
||||
}
|
||||
|
||||
func printCalls(calls []*models.Call) {
|
||||
for _, call := range calls{
|
||||
for _, call := range calls {
|
||||
fmt.Println(fmt.Sprintf(
|
||||
"ID: %v\n" +
|
||||
"App: %v\n" +
|
||||
"Route: %v\n" +
|
||||
"Created At: %v\n" +
|
||||
"Started At: %v\n" +
|
||||
"Completed At: %v\n" +
|
||||
"ID: %v\n"+
|
||||
"App: %v\n"+
|
||||
"Route: %v\n"+
|
||||
"Created At: %v\n"+
|
||||
"Started At: %v\n"+
|
||||
"Completed At: %v\n"+
|
||||
"Status: %v\n",
|
||||
call.ID, call.AppName, call.Path, call.CreatedAt,
|
||||
call.StartedAt, call.CompletedAt, call.Status))
|
||||
@@ -55,17 +55,16 @@ func printCalls(calls []*models.Call) {
|
||||
}
|
||||
|
||||
func (call *callsCmd) get(ctx *cli.Context) error {
|
||||
call_id := ctx.Args().Get(0)
|
||||
callID := ctx.Args().Get(0)
|
||||
params := apicall.GetCallsCallParams{
|
||||
Call: call_id,
|
||||
Call: callID,
|
||||
Context: context.Background(),
|
||||
}
|
||||
resp, err := call.client.Call.GetCallsCall(¶ms)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case *apicall.GetCallsCallNotFound:
|
||||
return fmt.Errorf("error: %v", err.(
|
||||
*apicall.GetCallsCallNotFound).Payload.Error.Message)
|
||||
return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message)
|
||||
}
|
||||
return fmt.Errorf("unexpected error: %v", err)
|
||||
|
||||
@@ -85,8 +84,7 @@ func (call *callsCmd) list(ctx *cli.Context) error {
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case *apicall.GetCallsCallNotFound:
|
||||
return fmt.Errorf("error: %v", err.(
|
||||
*apicall.GetCallsCallNotFound).Payload.Error.Message)
|
||||
return fmt.Errorf("error: %v", err.(*apicall.GetCallsCallNotFound).Payload.Error.Message)
|
||||
}
|
||||
return fmt.Errorf("unexpected error: %v", err)
|
||||
|
||||
|
||||
11
fn/deploy.go
11
fn/deploy.go
@@ -129,9 +129,8 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if funcfile.Path == nil || *funcfile.Path == "" {
|
||||
dirName := "/" + path.Base(path.Dir(funcFilePath))
|
||||
funcfile.Path = &dirName
|
||||
if funcfile.Path == "" {
|
||||
funcfile.Path = "/" + path.Base(path.Dir(funcFilePath))
|
||||
}
|
||||
|
||||
if p.skippush {
|
||||
@@ -146,18 +145,18 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error {
|
||||
}
|
||||
|
||||
func (p *deploycmd) route(c *cli.Context, ff *funcfile) error {
|
||||
fmt.Printf("Updating route, setting %s -> %s...", ff.Path, ff.Name)
|
||||
fmt.Printf("Updating route %s using image %s...\n", ff.Path, ff.FullName())
|
||||
fmt.Printf("%+v\ntype: %v\n", ff, *ff.Type)
|
||||
if err := resetBasePath(p.Configuration); err != nil {
|
||||
return fmt.Errorf("error setting endpoint: %v", err)
|
||||
}
|
||||
|
||||
routesCmd := routesCmd{client: apiClient()}
|
||||
rt := &models.Route{}
|
||||
|
||||
if err := routeWithFuncFile(c, ff, rt); err != nil {
|
||||
return fmt.Errorf("error getting route with funcfile: %s", err)
|
||||
}
|
||||
return routesCmd.patchRoute(c, p.appName, *ff.Path, rt)
|
||||
return routesCmd.patchRoute(c, p.appName, ff.Path, rt)
|
||||
}
|
||||
|
||||
func expandEnvConfig(configs map[string]string) map[string]string {
|
||||
|
||||
@@ -45,7 +45,7 @@ type funcfile struct {
|
||||
Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"`
|
||||
Build []string `yaml:"build,omitempty" json:"build,omitempty"`
|
||||
Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"`
|
||||
Path *string `yaml:"path,omitempty" json:"path,omitempty"`
|
||||
Path string `yaml:"path,omitempty" json:"path,omitempty"`
|
||||
}
|
||||
|
||||
func (ff *funcfile) FullName() string {
|
||||
|
||||
13
fn/init.go
13
fn/init.go
@@ -50,6 +50,7 @@ type initFnCmd struct {
|
||||
entrypoint string
|
||||
cmd string
|
||||
format string
|
||||
typeS string
|
||||
}
|
||||
|
||||
func initFn() cli.Command {
|
||||
@@ -83,6 +84,12 @@ func initFn() cli.Command {
|
||||
Destination: &a.format,
|
||||
Value: "",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "type",
|
||||
Usage: "sync or async",
|
||||
Destination: &a.typeS,
|
||||
Value: "",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -124,10 +131,14 @@ func (a *initFnCmd) init(c *cli.Context) error {
|
||||
Entrypoint: a.entrypoint,
|
||||
Cmd: a.cmd,
|
||||
Format: ffmt,
|
||||
Type: &a.typeS,
|
||||
}
|
||||
if ff.Type != nil && *ff.Type == "" {
|
||||
ff.Type = nil
|
||||
}
|
||||
|
||||
_, path := appNamePath(ff.FullName())
|
||||
ff.Path = &path
|
||||
ff.Path = path
|
||||
|
||||
if err := encodeFuncfileYAML("func.yaml", ff); err != nil {
|
||||
return err
|
||||
|
||||
@@ -180,7 +180,7 @@ func createFunctionYaml(opts createImageOptions, functionName string) error {
|
||||
|
||||
funcDesc := &funcfile{
|
||||
Name: opts.Name,
|
||||
Path: &path,
|
||||
Path: path,
|
||||
Config: opts.Config,
|
||||
Version: "0.0.1",
|
||||
Runtime: &opts.Base,
|
||||
|
||||
@@ -296,9 +296,13 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error {
|
||||
to := int32(ff.Timeout.Seconds())
|
||||
rt.Timeout = &to
|
||||
}
|
||||
if rt.Path == "" && ff.Path != nil {
|
||||
rt.Path = *ff.Path
|
||||
if rt.Path == "" && ff.Path != "" {
|
||||
rt.Path = ff.Path
|
||||
}
|
||||
if rt.Type == nil && ff.Type != nil && *ff.Type != "" {
|
||||
rt.Type = *ff.Type
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
24
fn/run.go
24
fn/run.go
@@ -48,22 +48,28 @@ func (r *runCmd) run(c *cli.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var ff *funcfile
|
||||
// if image name is passed in, it will run that image
|
||||
image := c.Args().First()
|
||||
if image == "" {
|
||||
ff, err := loadFuncfile()
|
||||
ff, err = loadFuncfile()
|
||||
if err != nil {
|
||||
if _, ok := err.(*notFoundError); ok {
|
||||
return errors.New("error: image name is missing or no function file found")
|
||||
}
|
||||
return err
|
||||
}
|
||||
image = ff.FullName()
|
||||
} else {
|
||||
ff = &funcfile{
|
||||
Name: image,
|
||||
}
|
||||
}
|
||||
|
||||
return runff(image, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"))
|
||||
return runff(ff, stdin(), os.Stdout, os.Stderr, c.String("method"), c.StringSlice("e"), c.StringSlice("link"))
|
||||
}
|
||||
|
||||
func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error {
|
||||
// TODO: THIS SHOULD USE THE RUNNER DRIVERS FROM THE SERVER SO IT'S ESSENTIALLY THE SAME PROCESS (MINUS DATABASE AND ALL THAT)
|
||||
func runff(ff *funcfile, stdin io.Reader, stdout, stderr io.Writer, method string, envVars []string, links []string) error {
|
||||
sh := []string{"docker", "run", "--rm", "-i"}
|
||||
|
||||
var env []string // env for the shelled out docker run command
|
||||
@@ -80,7 +86,7 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin
|
||||
runEnv = append(runEnv, kvEq("METHOD", method))
|
||||
runEnv = append(runEnv, kvEq("REQUEST_URL", "http://localhost:8080/myapp/hello"))
|
||||
runEnv = append(runEnv, kvEq("APP_NAME", "myapp"))
|
||||
runEnv = append(runEnv, kvEq("ROUTE", "/hello"))
|
||||
runEnv = append(runEnv, kvEq("ROUTE", "/hello")) // TODO: should we change this to PATH ?
|
||||
// add user defined envs
|
||||
runEnv = append(runEnv, envVars...)
|
||||
|
||||
@@ -97,7 +103,13 @@ func runff(image string, stdin io.Reader, stdout, stderr io.Writer, method strin
|
||||
sh = append(sh, "-e", e)
|
||||
}
|
||||
|
||||
sh = append(sh, image)
|
||||
if ff.Type != nil && *ff.Type == "async" {
|
||||
// if async, we'll run this in a separate thread and wait for it to complete
|
||||
// reqID := id.New().String()
|
||||
// I'm starting to think maybe `fn run` locally should work the same whether sync or async? Or how would we allow to test the output?
|
||||
}
|
||||
|
||||
sh = append(sh, ff.FullName())
|
||||
cmd := exec.Command(sh[0], sh[1:]...)
|
||||
cmd.Stdin = stdin
|
||||
cmd.Stdout = stdout
|
||||
|
||||
@@ -68,8 +68,8 @@ func (t *testcmd) test(c *cli.Context) error {
|
||||
target := ff.FullName()
|
||||
runtest := runlocaltest
|
||||
if t.remote != "" {
|
||||
if ff.Path == nil || *ff.Path == "" {
|
||||
return errors.New("execution of tests on remote server demand that this function to have a `path`.")
|
||||
if ff.Path == "" {
|
||||
return errors.New("execution of tests on remote server demand that this function has a `path`.")
|
||||
}
|
||||
if err := resetBasePath(t.Configuration); err != nil {
|
||||
return fmt.Errorf("error setting endpoint: %v", err)
|
||||
@@ -80,7 +80,7 @@ func (t *testcmd) test(c *cli.Context) error {
|
||||
}
|
||||
|
||||
u, err := url.Parse("../")
|
||||
u.Path = path.Join(u.Path, "r", t.remote, *ff.Path)
|
||||
u.Path = path.Join(u.Path, "r", t.remote, ff.Path)
|
||||
target = baseURL.ResolveReference(u).String()
|
||||
runtest = runremotetest
|
||||
}
|
||||
@@ -134,7 +134,8 @@ func runlocaltest(target string, in, expectedOut, expectedErr *string, env map[s
|
||||
restrictedEnv = append(restrictedEnv, k)
|
||||
}
|
||||
|
||||
if err := runff(target, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil {
|
||||
ff := &funcfile{Name: target}
|
||||
if err := runff(ff, stdin, &stdout, &stderr, "", restrictedEnv, nil); err != nil {
|
||||
return fmt.Errorf("%v\nstdout:%s\nstderr:%s\n", err, stdout.String(), stderr.String())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user