diff --git a/api/datastore/mysql/mysql.go b/api/datastore/mysql/mysql.go index 5fe4a5fd4..ec0b853b1 100644 --- a/api/datastore/mysql/mysql.go +++ b/api/datastore/mysql/mysql.go @@ -23,6 +23,7 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes ( maxc int NOT NULL, memory int NOT NULL, timeout int NOT NULL, + idle_timeout int NOT NULL, type varchar(16) NOT NULL, headers text NOT NULL, config text NOT NULL, @@ -39,7 +40,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value varchar(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` type rowScanner interface { Scan(dest ...interface{}) error @@ -302,10 +303,11 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) memory, type, timeout, + idle_timeout, headers, config ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, route.AppName, route.Path, route.Image, @@ -314,6 +316,7 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) route.Memory, route.Type, route.Timeout, + route.IdleTimeout, string(hbyte), string(cbyte), ) @@ -359,6 +362,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout memory = ?, type = ?, timeout = ?, + idle_timeout = ?, headers = ?, config = ? WHERE app_name = ? AND path = ?;`, @@ -368,6 +372,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout route.Memory, route.Type, route.Timeout, + route.IdleTimeout, string(hbyte), string(cbyte), route.AppName, @@ -431,6 +436,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error { &route.Memory, &route.Type, &route.Timeout, + &route.IdleTimeout, &headerStr, &configStr, ) diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go index 3e2fe41da..6577b2635 100644 --- a/api/datastore/postgres/postgres.go +++ b/api/datastore/postgres/postgres.go @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS routes ( maxc integer NOT NULL, memory integer NOT NULL, timeout integer NOT NULL, + idle_timeout integer NOT NULL, type character varying(16) NOT NULL, headers text NOT NULL, config text NOT NULL, @@ -41,7 +42,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value character varying(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` type rowScanner interface { Scan(dest ...interface{}) error @@ -274,10 +275,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout memory, type, timeout, + idle_timeout, headers, config ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`, route.AppName, route.Path, route.Image, @@ -286,6 +288,7 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout route.Memory, route.Type, route.Timeout, + route.IdleTimeout, string(hbyte), string(cbyte), ) @@ -329,8 +332,9 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R memory = $6, type = $7, timeout = $8, - headers = $9, - config = $10 + idle_timeout = $9, + headers = $10, + config = $11 WHERE app_name = $1 AND path = $2;`, route.AppName, route.Path, @@ -340,6 +344,7 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R route.Memory, route.Type, route.Timeout, + route.IdleTimeout, string(hbyte), string(cbyte), ) @@ -398,6 +403,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error { &route.Memory, &route.Type, &route.Timeout, + &route.IdleTimeout, &headerStr, &configStr, ) diff --git a/api/models/new_task.go b/api/models/new_task.go index 414929161..248fd11dc 100644 --- a/api/models/new_task.go +++ b/api/models/new_task.go @@ -54,6 +54,11 @@ type NewTask struct { */ Timeout *int32 `json:"timeout,omitempty"` + + /* Hot function idle timeout in seconds before termination. + + */ + IdleTimeout *int32 `json:"idle_timeout,omitempty"` } // Validate validates this new task diff --git a/api/models/route.go b/api/models/route.go index 372bb131c..57faf6915 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -12,6 +12,7 @@ import ( const ( defaultRouteTimeout = 30 // seconds + htfnScaleDownTimeout = 30 // seconds ) var ( @@ -39,6 +40,7 @@ type Route struct { Format string `json:"format"` MaxConcurrency int `json:"max_concurrency"` Timeout int32 `json:"timeout"` + IdleTimeout int32 `json:"idle_timeout"` Config `json:"config"` } @@ -54,6 +56,7 @@ var ( ErrRoutesValidationMissingType = errors.New("Missing route Type") ErrRoutesValidationPathMalformed = errors.New("Path malformed") ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout") + ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout") ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency") ) @@ -86,6 +89,10 @@ func (r *Route) SetDefaults() { if r.Timeout == 0 { r.Timeout = defaultRouteTimeout } + + //if r.IdleTimeout == 0 { + // r.IdleTimeout = htfnScaleDownTimeout + //} } // Validate validates field values, skipping zeroed fields if skipZero is true. @@ -141,6 +148,10 @@ func (r *Route) Validate(skipZero bool) error { res = append(res, ErrRoutesValidationNegativeTimeout) } + if r.IdleTimeout < 0 { + res = append(res, ErrRoutesValidationNegativeIdleTimeout) + } + if len(res) > 0 { return apiErrors.CompositeValidationError(res...) } @@ -171,6 +182,9 @@ func (r *Route) Update(new *Route) { if new.Timeout != 0 { r.Timeout = new.Timeout } + if new.IdleTimeout != 0 { + r.IdleTimeout = new.IdleTimeout + } if new.Format != "" { r.Format = new.Format } diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 1909e47f5..af9df7928 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -42,14 +42,18 @@ func getTask(ctx context.Context, url string) (*models.Task, error) { } func getCfg(t *models.Task) *task.Config { + timeout := int32(30) if t.Timeout == nil { - timeout := int32(30) t.Timeout = &timeout } + if t.IdleTimeout == nil { + t.IdleTimeout = &timeout + } cfg := &task.Config{ Image: *t.Image, Timeout: time.Duration(*t.Timeout) * time.Second, + IdleTimeout: time.Duration(*t.IdleTimeout) * time.Second, ID: t.ID, AppName: t.AppName, Env: t.EnvVars, diff --git a/api/runner/task.go b/api/runner/task.go index a54c5d6fb..01b28ab6e 100644 --- a/api/runner/task.go +++ b/api/runner/task.go @@ -35,7 +35,8 @@ func (t *containerTask) Id() string { return t.cfg.ID } func (t *containerTask) Route() string { return "" } func (t *containerTask) Image() string { return t.cfg.Image } func (t *containerTask) Timeout() time.Duration { return t.cfg.Timeout } -func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr } +func (t *containerTask) IdleTimeout() time.Duration { return t.cfg.IdleTimeout } +func (t *containerTask) Logger() (io.Writer, io.Writer) { return t.cfg.Stdout, t.cfg.Stderr } func (t *containerTask) Volumes() [][2]string { return [][2]string{} } func (t *containerTask) WorkDir() string { return "" } diff --git a/api/runner/task/task.go b/api/runner/task/task.go index 1399d6c1f..a7f7d24b0 100644 --- a/api/runner/task/task.go +++ b/api/runner/task/task.go @@ -9,15 +9,16 @@ import ( ) type Config struct { - ID string - Path string - Image string - Timeout time.Duration - AppName string - Memory uint64 - Env map[string]string - Format string - MaxConcurrency int + ID string + Path string + Image string + Timeout time.Duration + IdleTimeout time.Duration + AppName string + Memory uint64 + Env map[string]string + Format string + MaxConcurrency int Stdin io.Reader Stdout io.Writer diff --git a/api/runner/worker.go b/api/runner/worker.go index 91c4ee61b..4a97b4e83 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -61,10 +61,6 @@ import ( // Terminate // (internal clock) -const ( - // Terminate hot function after this timeout - htfnScaleDownTimeout = 30 * time.Second -) // RunTask helps sending a task.Request into the common concurrency stream. // Refer to StartWorkers() to understand what this is about. @@ -264,17 +260,29 @@ func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Reques func (hc *htfn) serve(ctx context.Context) { lctx, cancel := context.WithCancel(ctx) var wg sync.WaitGroup + cfg := *hc.cfg + logger := logrus.WithFields(logrus.Fields{ + "app": cfg.AppName, + "route": cfg.Path, + "image": cfg.Image, + "memory": cfg.Memory, + "format": cfg.Format, + "max_concurrency": cfg.MaxConcurrency, + "idle_timeout": cfg.IdleTimeout, + }) + wg.Add(1) go func() { defer wg.Done() for { - inactivity := time.After(htfnScaleDownTimeout) + inactivity := time.After(cfg.IdleTimeout) select { case <-lctx.Done(): return case <-inactivity: + logger.Info("Canceling inactive hot function") cancel() case t := <-hc.tasks: @@ -295,7 +303,6 @@ func (hc *htfn) serve(ctx context.Context) { } }() - cfg := *hc.cfg cfg.Env["FN_FORMAT"] = cfg.Format cfg.Timeout = 0 // add a timeout to simulate ab.end. failure. cfg.Stdin = hc.containerIn @@ -324,14 +331,7 @@ func (hc *htfn) serve(ctx context.Context) { defer wg.Done() scanner := bufio.NewScanner(errr) for scanner.Scan() { - logrus.WithFields(logrus.Fields{ - "app": cfg.AppName, - "route": cfg.Path, - "image": cfg.Image, - "memory": cfg.Memory, - "format": cfg.Format, - "max_concurrency": cfg.MaxConcurrency, - }).Info(scanner.Text()) + logger.Info(scanner.Text()) } }() @@ -339,7 +339,6 @@ func (hc *htfn) serve(ctx context.Context) { if err != nil { logrus.WithError(err).Error("hot function failure detected") } - cancel() errw.Close() wg.Wait() logrus.WithField("result", result).Info("hot function terminated") diff --git a/api/server/runner.go b/api/server/runner.go index 5de2b1328..818222f53 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -190,17 +190,18 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun } cfg := &task.Config{ - AppName: appName, - Path: found.Path, - Env: envVars, - Format: found.Format, - ID: reqID, - Image: found.Image, - MaxConcurrency: found.MaxConcurrency, - Memory: found.Memory, - Stdin: payload, - Stdout: &stdout, - Timeout: time.Duration(found.Timeout) * time.Second, + AppName: appName, + Path: found.Path, + Env: envVars, + Format: found.Format, + ID: reqID, + Image: found.Image, + MaxConcurrency: found.MaxConcurrency, + Memory: found.Memory, + Stdin: payload, + Stdout: &stdout, + Timeout: time.Duration(found.Timeout) * time.Second, + IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, } s.Runner.Enqueue() diff --git a/docs/function-timeouts.md b/docs/function-timeouts.md new file mode 100644 index 000000000..4dd61350d --- /dev/null +++ b/docs/function-timeouts.md @@ -0,0 +1,58 @@ +# Function timeouts + +Within Function API, each functions supplied with 2 timeouts parameters, both optional, see [swagger.yaml](swagger.yml) for more details. +So, what are those timeouts and what are they used for? + +## Function call timeout + +This time of timeouts defines for how long function execution may happen before it'll be terminated along with notifying caller that function terminated with error - timed out. + +```json +{ + "route":{ + ... + "timeout": 30, + ... + } +} +``` + +This timeout parameter used with both types of functions: async and sync. +It starts at the beginning of function call. + +## Hot function idle timeout + +This type of timeout defines for how long should hot function hang around before its termination in case if there are no incoming requests. + +```json +{ + "route":{ + ... + "idle_timeout": 30, + ... + } +} +``` + +This timeout parameter is valid for hot functions, see what [hot functions](hot-functions.md) is. By default this parameter equals to 30 seconds. +It starts after last request being processed by hot function. + +## Correlation between idle and regular timeout + +This two timeouts are independent. The order of timeouts for hot functions: + + 0. start hot function be sending first timeout-bound request to it + 1. make request to function with `timeout` + 2. if call finished (no matter successful or not) check for more requests to dispatch + 3. if none - start idle timeout + 4. if new request appears - stop idle timeout and serve request + 5. if none - terminate hot function + +## Hot function idle timeout edge cases + +Having both timeouts may cause confusion while configuring hot function. +So, there are certain limitations for `idle_timeout` as well as for regular `timeout`: + + * Idle timeout might be equal to zero. Such case may lead to satiation when function would be terminated immediately after last request processing, i.e. no idle timeout at all. + * Idle timeout can't be negative. + * Idle timeout can't be changed while hot function is running. Idle timeout is permanent within hot function execution lifecycle. It means that idle timeout should be considered for changing once functions is not running. diff --git a/docs/hot-functions.md b/docs/hot-functions.md index dde40fdcb..022d63db2 100644 --- a/docs/hot-functions.md +++ b/docs/hot-functions.md @@ -91,7 +91,8 @@ requests: "type": "sync", "config": null, "format": "http", - "max_concurrency": "1" + "max_concurrency": "1", + "idle_timeout": 30 } } ``` @@ -100,4 +101,6 @@ requests: container. `max_concurrency` (optional) - the number of simultaneous hot functions for -this functions. This is a per-node configuration option. Default: 1 \ No newline at end of file +this functions. This is a per-node configuration option. Default: 1 + +`idle_timeout` (optional) - idle timeout (in seconds) before function termination. diff --git a/docs/swagger.yml b/docs/swagger.yml index 03c8db511..d9b32827d 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -378,8 +378,12 @@ definitions: type: string timeout: type: integer - default: 60 + default: 30 description: Timeout for executions of this route. Value in Seconds + idle_timeout: + type: integer + default: 30 + description: Hot functions idle timeout before termination. Value in Seconds App: type: object