functions: fix route timeout (#349)

* functions: add route-level timeout configuration options

* functions: harmonize defaults
This commit is contained in:
C Cirello
2016-11-28 23:53:11 +01:00
committed by Pedro Nasser
parent 73075cc605
commit a7a466f968
8 changed files with 122 additions and 24 deletions

View File

@@ -342,6 +342,9 @@ func (ds *BoltDatastore) UpdateRoute(ctx context.Context, newroute *models.Route
if route.Type != "" { if route.Type != "" {
route.Type = newroute.Type route.Type = newroute.Type
} }
if newroute.Timeout != 0 {
route.Timeout = newroute.Timeout
}
route.Format = newroute.Format route.Format = newroute.Format
route.MaxConcurrency = newroute.MaxConcurrency route.MaxConcurrency = newroute.MaxConcurrency
if newroute.Headers != nil { if newroute.Headers != nil {

View File

@@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS routes (
format character varying(16) NOT NULL, format character varying(16) NOT NULL,
maxc integer NOT NULL, maxc integer NOT NULL,
memory integer NOT NULL, memory integer NOT NULL,
timeout integer NOT NULL,
type character varying(16) NOT NULL, type character varying(16) NOT NULL,
headers text NOT NULL, headers text NOT NULL,
config text NOT NULL, config text NOT NULL,
@@ -38,7 +39,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value character varying(256) NOT NULL value character varying(256) NOT NULL
);` );`
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, headers, config FROM routes` const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes`
type rowScanner interface { type rowScanner interface {
Scan(dest ...interface{}) error Scan(dest ...interface{}) error
@@ -267,10 +268,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
maxc, maxc,
memory, memory,
type, type,
timeout,
headers, headers,
config config
) )
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);`, VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
route.AppName, route.AppName,
route.Path, route.Path,
route.Image, route.Image,
@@ -278,6 +280,7 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
route.MaxConcurrency, route.MaxConcurrency,
route.Memory, route.Memory,
route.Type, route.Type,
route.Timeout,
string(hbyte), string(hbyte),
string(cbyte), string(cbyte),
) )
@@ -314,8 +317,9 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, route *models.Rout
memory = $5, memory = $5,
maxc = $6, maxc = $6,
type = $7, type = $7,
headers = $8, timeout = $8,
config = $9 headers = $9,
config = $10
WHERE app_name = $1 AND path = $2;`, WHERE app_name = $1 AND path = $2;`,
route.AppName, route.AppName,
route.Path, route.Path,
@@ -324,6 +328,7 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, route *models.Rout
route.Memory, route.Memory,
route.MaxConcurrency, route.MaxConcurrency,
route.Type, route.Type,
route.Timeout,
string(hbyte), string(hbyte),
string(cbyte), string(cbyte),
) )
@@ -386,6 +391,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
&route.Memory, &route.Memory,
&route.MaxConcurrency, &route.MaxConcurrency,
&route.Type, &route.Type,
&route.Timeout,
&headerStr, &headerStr,
&configStr, &configStr,
) )

View File

@@ -10,6 +10,10 @@ import (
apiErrors "github.com/go-openapi/errors" apiErrors "github.com/go-openapi/errors"
) )
const (
defaultRouteTimeout = 30 // seconds
)
var ( var (
ErrRoutesCreate = errors.New("Could not create route") ErrRoutesCreate = errors.New("Could not create route")
ErrRoutesUpdate = errors.New("Could not update route") ErrRoutesUpdate = errors.New("Could not update route")
@@ -33,6 +37,7 @@ type Route struct {
Type string `json:"type,omitempty"` Type string `json:"type,omitempty"`
Format string `json:"format,omitempty"` Format string `json:"format,omitempty"`
MaxConcurrency int `json:"max_concurrency,omitempty"` MaxConcurrency int `json:"max_concurrency,omitempty"`
Timeout int32 `json:"timeout,omitempty"`
Config `json:"config"` Config `json:"config"`
} }
@@ -47,6 +52,7 @@ var (
ErrRoutesValidationMissingPath = errors.New("Missing route Path") ErrRoutesValidationMissingPath = errors.New("Missing route Path")
ErrRoutesValidationMissingType = errors.New("Missing route Type") ErrRoutesValidationMissingType = errors.New("Missing route Type")
ErrRoutesValidationPathMalformed = errors.New("Path malformed") ErrRoutesValidationPathMalformed = errors.New("Path malformed")
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
) )
func (r *Route) Validate() error { func (r *Route) Validate() error {
@@ -93,6 +99,12 @@ func (r *Route) Validate() error {
r.MaxConcurrency = 1 r.MaxConcurrency = 1
} }
if r.Timeout == 0 {
r.Timeout = defaultRouteTimeout
} else if r.Timeout < 0 {
res = append(res, ErrRoutesValidationNegativeTimeout)
}
if len(res) > 0 { if len(res) > 0 {
return apiErrors.CompositeValidationError(res...) return apiErrors.CompositeValidationError(res...)
} }

View File

@@ -179,7 +179,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
Memory: found.Memory, Memory: found.Memory,
Stdin: payload, Stdin: payload,
Stdout: &stdout, Stdout: &stdout,
Timeout: 30 * time.Second, Timeout: time.Duration(found.Timeout) * time.Second,
} }
switch found.Type { switch found.Type {
@@ -216,12 +216,14 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
c.Header(k, v[0]) c.Header(k, v[0])
} }
if result.Status() == "success" { switch result.Status() {
case "success":
c.Data(http.StatusOK, "", stdout.Bytes()) c.Data(http.StatusOK, "", stdout.Bytes())
} else { case "timeout":
c.AbortWithStatus(http.StatusGatewayTimeout)
default:
c.AbortWithStatus(http.StatusInternalServerError) c.AbortWithStatus(http.StatusInternalServerError)
} }
} }
return true return true

View File

@@ -126,18 +126,19 @@ func TestRouteRunnerExecution(t *testing.T) {
for i, test := range []struct { for i, test := range []struct {
path string path string
body string body string
method string
expectedCode int expectedCode int
expectedHeaders map[string][]string expectedHeaders map[string][]string
}{ }{
{"/r/myapp/myroute", ``, http.StatusOK, map[string][]string{"X-Function": {"Test"}}}, {"/r/myapp/myroute", ``, "GET", http.StatusOK, map[string][]string{"X-Function": {"Test"}}},
{"/r/myapp/myerror", ``, http.StatusInternalServerError, map[string][]string{"X-Function": {"Test"}}}, {"/r/myapp/myerror", ``, "GET", http.StatusInternalServerError, map[string][]string{"X-Function": {"Test"}}},
// Added same tests again to check if time is reduced by the auth cache // Added same tests again to check if time is reduced by the auth cache
{"/r/myapp/myroute", ``, http.StatusOK, map[string][]string{"X-Function": {"Test"}}}, {"/r/myapp/myroute", ``, "GET", http.StatusOK, map[string][]string{"X-Function": {"Test"}}},
{"/r/myapp/myerror", ``, http.StatusInternalServerError, map[string][]string{"X-Function": {"Test"}}}, {"/r/myapp/myerror", ``, "GET", http.StatusInternalServerError, map[string][]string{"X-Function": {"Test"}}},
} { } {
body := bytes.NewBuffer([]byte(test.body)) body := strings.NewReader(test.body)
_, rec := routerRequest(t, router, "GET", test.path, body) _, rec := routerRequest(t, router, test.method, test.path, body)
if rec.Code != test.expectedCode { if rec.Code != test.expectedCode {
t.Log(buf.String()) t.Log(buf.String())
@@ -145,7 +146,9 @@ func TestRouteRunnerExecution(t *testing.T) {
i, test.expectedCode, rec.Code) i, test.expectedCode, rec.Code)
} }
if test.expectedHeaders != nil { if test.expectedHeaders == nil {
continue
}
for name, header := range test.expectedHeaders { for name, header := range test.expectedHeaders {
if header[0] != rec.Header().Get(name) { if header[0] != rec.Header().Get(name) {
t.Log(buf.String()) t.Log(buf.String())
@@ -154,6 +157,56 @@ func TestRouteRunnerExecution(t *testing.T) {
} }
} }
} }
}
func TestRouteRunnerTimeout(t *testing.T) {
t.Skip("doesn't work on old Ubuntu")
buf := setLogBuffer()
tasks := make(chan task.Request)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go runner.StartWorkers(ctx, testRunner(t), tasks)
router := testRouter(&datastore.Mock{
Apps: []*models.App{
{Name: "myapp", Config: models.Config{}},
},
Routes: []*models.Route{
{Path: "/sleeper", AppName: "myapp", Image: "iron/sleeper", Timeout: 1},
},
}, &mqs.Mock{}, testRunner(t), tasks)
for i, test := range []struct {
path string
body string
method string
expectedCode int
expectedHeaders map[string][]string
}{
{"/r/myapp/sleeper", `{"sleep": 0}`, "POST", http.StatusOK, nil},
{"/r/myapp/sleeper", `{"sleep": 2}`, "POST", http.StatusGatewayTimeout, nil},
} {
body := strings.NewReader(test.body)
_, rec := routerRequest(t, router, test.method, test.path, body)
if rec.Code != test.expectedCode {
t.Log(buf.String())
t.Errorf("Test %d: Expected status code to be %d but was %d",
i, test.expectedCode, rec.Code)
}
if test.expectedHeaders == nil {
continue
}
for name, header := range test.expectedHeaders {
if header[0] != rec.Header().Get(name) {
t.Log(buf.String())
t.Errorf("Test %d: Expected header `%s` to be %s but was %s",
i, name, header[0], rec.Header().Get(name))
}
}
} }
} }

View File

@@ -6,7 +6,7 @@ swagger: '2.0'
info: info:
title: IronFunctions title: IronFunctions
description: The open source serverless platform. description: The open source serverless platform.
version: "0.1.18" version: "0.1.19"
# the domain of the service # the domain of the service
host: "127.0.0.1:8080" host: "127.0.0.1:8080"
# array of all schemes that your API supports # array of all schemes that your API supports
@@ -353,6 +353,10 @@ definitions:
description: Route configuration - overrides application configuration description: Route configuration - overrides application configuration
additionalProperties: additionalProperties:
type: string type: string
timeout:
type: integer
default: 60
description: Timeout for executions of this route. Value in Seconds
App: App:
type: object type: object

View File

@@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
) )
@@ -32,6 +33,7 @@ type funcfile struct {
Type *string `yaml:"type,omitempty",json:"type,omitempty"` Type *string `yaml:"type,omitempty",json:"type,omitempty"`
Memory *int64 `yaml:"memory,omitempty",json:"memory,omitempty"` Memory *int64 `yaml:"memory,omitempty",json:"memory,omitempty"`
Format *string `yaml:"format,omitempty",json:"format,omitempty"` Format *string `yaml:"format,omitempty",json:"format,omitempty"`
Timeout *time.Duration `yaml:"timeout,omitempty",json:"timeout,omitempty"`
MaxConcurrency *int `yaml:"int,omitempty",json:"int,omitempty"` MaxConcurrency *int `yaml:"int,omitempty",json:"int,omitempty"`
Config map[string]string `yaml:"config,omitempty",json:"config,omitempty"` Config map[string]string `yaml:"config,omitempty",json:"config,omitempty"`
Build []string `yaml:"build,omitempty",json:"build,omitempty"` Build []string `yaml:"build,omitempty",json:"build,omitempty"`

View File

@@ -11,6 +11,7 @@ import (
"path" "path"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"time"
functions "github.com/iron-io/functions_go" functions "github.com/iron-io/functions_go"
"github.com/urfave/cli" "github.com/urfave/cli"
@@ -74,6 +75,11 @@ func routes() cli.Command {
Usage: "maximum concurrency for hot container", Usage: "maximum concurrency for hot container",
Value: 1, Value: 1,
}, },
cli.DurationFlag{
Name: "timeout",
Usage: "route timeout",
Value: 30 * time.Second,
},
}, },
}, },
{ {
@@ -239,8 +245,11 @@ func (a *routesCmd) create(c *cli.Context) error {
appName := c.Args().Get(0) appName := c.Args().Get(0)
route := c.Args().Get(1) route := c.Args().Get(1)
image := c.Args().Get(2) image := c.Args().Get(2)
var format string var (
var maxC int format string
maxC int
timeout time.Duration
)
if image == "" { if image == "" {
ff, err := findFuncfile() ff, err := findFuncfile()
if err != nil { if err != nil {
@@ -257,6 +266,9 @@ func (a *routesCmd) create(c *cli.Context) error {
if ff.MaxConcurrency != nil { if ff.MaxConcurrency != nil {
maxC = *ff.MaxConcurrency maxC = *ff.MaxConcurrency
} }
if ff.Timeout != nil {
timeout = *ff.Timeout
}
} }
if f := c.String("format"); f != "" { if f := c.String("format"); f != "" {
@@ -265,6 +277,9 @@ func (a *routesCmd) create(c *cli.Context) error {
if m := c.Int("max-concurrency"); m > 0 { if m := c.Int("max-concurrency"); m > 0 {
maxC = m maxC = m
} }
if t := c.Duration("timeout"); t > 0 {
timeout = t
}
body := functions.RouteWrapper{ body := functions.RouteWrapper{
Route: functions.Route{ Route: functions.Route{
@@ -276,6 +291,7 @@ func (a *routesCmd) create(c *cli.Context) error {
Config: extractEnvConfig(c.StringSlice("config")), Config: extractEnvConfig(c.StringSlice("config")),
Format: format, Format: format,
MaxConcurrency: int32(maxC), MaxConcurrency: int32(maxC),
Timeout: int32(timeout.Seconds()),
}, },
} }