From 9edacae9288d8069795b70a5dd4baf9eb4ecf6a2 Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Wed, 31 May 2017 00:05:25 -0700 Subject: [PATCH] clean up hotf(x) concurrency, rm max c this patch gets rid of max concurrency for functions altogether, as discussed, since it will be challenging to support across functions nodes. as a result of doing so, the previous version of functions would fall over when offered 1000 functions, so there was some work needed in order to push this through. further work is necessary as docker basically falls over when trying to start enough containers at the same time, and with this patch essentially every function can scale infinitely. it seems like we could add some kind of adaptive restrictions based on task run length and configured wait time so that fast running functions will line up to run in a hot container instead of them all creating new hot containers. this patch takes a first cut at whacking out some of the insanity that was the previous concurrency model, which was problematic in that it limited concurrency significantly across all functions since every task went through the same unbuffered channel, which could create blocking issues for all functions if the channel is not picked off fast enough (it's not apparent that this was impossible in the previous implementation). in any event, each request has a goroutine already, there's no reason not to use it. not too hard to wrap a map in a lock, not sure what the benefits were (added insanity?) in effect this is marginally easier to understand and less insane (marginally). after getting rid of max c this adds a blocking mechanism for the first invocation of any function so that all other hot functions will wait on the first one to finish to avoid a herd issue (was making docker die...) -- this could be slightly improved, but works in a pinch. reduced some memory usage by having redundant maps of htfnsvr's and task.Requests (by a factor of 2!). cleaned up some of the protocol stuff, need to clean this up further. anyway, it's a first cut. have another patch that rewrites all of it but was getting into rabbit hole territory, would be happy to oblige if anybody else has problems understanding this rat's nest of channels. there is a good bit of work left to make this prod ready (regardless of removing max c). a warning that this will break the db schemas, didn't put the effort in to add migration stuff since this isn't deployed anywhere in prod... TODO need to clean out the htfnmgr bucket with LRU TODO need to clean up runner interface TODO need to unify the task running paths across protocols TODO need to move the ram checking stuff into worker for noted reasons TODO need better elasticity of hot f(x) containers --- .../internal/datastoreutil/shared.go | 9 +- api/datastore/mysql/mysql.go | 15 +- api/datastore/postgres/postgres.go | 29 +- api/models/route.go | 57 ++-- api/runner/async_runner.go | 8 +- api/runner/protocol/factory.go | 46 ++- api/runner/runner.go | 12 +- api/runner/stats.go | 6 + api/runner/task/task.go | 24 +- api/runner/worker.go | 286 +++++++----------- api/server/runner.go | 28 +- api/server/runner_test.go | 3 - api/server/server.go | 18 +- api/server/server_test.go | 2 - api/server/special_handler_test.go | 2 - docs/function-file.md | 6 - docs/hot-functions.md | 4 - docs/swagger.yml | 6 +- .../tutorial/hotfunctions/http/hotroute.json | 3 - examples/tutorial/params/func.yaml | 1 - fn/funcfile.go | 29 +- fn/init.go | 32 +- fn/routes.go | 14 - test/fnlb-test-harness/primes-func/func.yaml | 1 - 24 files changed, 250 insertions(+), 391 deletions(-) diff --git a/api/datastore/internal/datastoreutil/shared.go b/api/datastore/internal/datastoreutil/shared.go index f1c57747a..5f6c0b71e 100644 --- a/api/datastore/internal/datastoreutil/shared.go +++ b/api/datastore/internal/datastoreutil/shared.go @@ -4,8 +4,8 @@ import ( "bytes" "database/sql" "encoding/json" - "github.com/Sirupsen/logrus" "fmt" + "github.com/Sirupsen/logrus" "strings" "gitlab-odx.oracle.com/odx/functions/api/models" @@ -15,7 +15,6 @@ type RowScanner interface { Scan(dest ...interface{}) error } - func ScanRoute(scanner RowScanner, route *models.Route) error { var headerStr string var configStr string @@ -25,7 +24,6 @@ func ScanRoute(scanner RowScanner, route *models.Route) error { &route.Path, &route.Image, &route.Format, - &route.MaxConcurrency, &route.Memory, &route.Type, &route.Timeout, @@ -107,7 +105,6 @@ func BuildFilterRouteQuery(filter *models.RouteFilter, whereStm, andStm string) return b.String(), args } - func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []interface{}) { if filter == nil { return "", nil @@ -120,7 +117,6 @@ func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []i return "", nil } - func BuildFilterCallQuery(filter *models.CallFilter, whereStm, andStm string) (string, []interface{}) { if filter == nil { return "", nil @@ -269,7 +265,6 @@ func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, err logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info( fmt.Sprintf("%v datastore dialed", dialect)) - for _, v := range tables { _, err = db.Exec(v) if err != nil { @@ -372,4 +367,4 @@ func SQLRemoveRoute(db *sql.DB, appName, routePath, deleteStm string) error { return nil -} \ No newline at end of file +} diff --git a/api/datastore/mysql/mysql.go b/api/datastore/mysql/mysql.go index e6f50e565..60bab963f 100644 --- a/api/datastore/mysql/mysql.go +++ b/api/datastore/mysql/mysql.go @@ -18,7 +18,6 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes ( path varchar(256) NOT NULL, image varchar(256) NOT NULL, format varchar(16) NOT NULL, - maxc int NOT NULL, memory int NOT NULL, timeout int NOT NULL, idle_timeout int NOT NULL, @@ -38,7 +37,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value varchar(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( created_at varchar(256) NOT NULL, @@ -53,7 +52,6 @@ const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` - /* MySQLDatastore defines a basic MySQL Datastore struct. */ @@ -237,7 +235,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) path, image, format, - maxc, memory, type, timeout, @@ -250,7 +247,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -296,7 +292,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout UPDATE routes SET image = ?, format = ?, - maxc = ?, memory = ?, type = ?, timeout = ?, @@ -306,7 +301,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout WHERE app_name = ? AND path = ?;`, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -341,7 +335,6 @@ RemoveRoute removes an existing route on MySQL. */ func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } @@ -429,7 +422,7 @@ func (ds *MySQLDatastore) Tx(f func(*sql.Tx) error) error { return tx.Commit() } -func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { +func (ds *MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { stmt, err := ds.db.Prepare("INSERT calls SET id=?,created_at=?,started_at=?,completed_at=?,status=?,app_name=?,path=?") if err != nil { return err @@ -445,13 +438,13 @@ func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) er return nil } -func (ds * MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { +func (ds *MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { whereStm := "%s WHERE id=?" return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) } -func (ds * MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { +func (ds *MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { whereStm := "WHERE %s ?" andStm := " AND %s ?" diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go index f598ea4d7..cf2193eed 100644 --- a/api/datastore/postgres/postgres.go +++ b/api/datastore/postgres/postgres.go @@ -1,17 +1,16 @@ package postgres import ( + "context" "database/sql" "encoding/json" "fmt" "net/url" - "context" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" "github.com/lib/pq" _ "github.com/lib/pq" + "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" + "gitlab-odx.oracle.com/odx/functions/api/models" ) const routesTableCreate = ` @@ -20,7 +19,6 @@ CREATE TABLE IF NOT EXISTS routes ( path text NOT NULL, image character varying(256) NOT NULL, format character varying(16) NOT NULL, - maxc integer NOT NULL, memory integer NOT NULL, timeout integer NOT NULL, idle_timeout integer NOT NULL, @@ -40,7 +38,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value character varying(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls ( created_at character varying(256) NOT NULL, @@ -203,7 +201,6 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout path, image, format, - maxc, memory, type, timeout, @@ -211,12 +208,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout headers, config ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`, + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, route.AppName, route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -259,19 +255,17 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R UPDATE routes SET image = $3, format = $4, - maxc = $5, - memory = $6, - type = $7, - timeout = $8, - idle_timeout = $9, - headers = $10, - config = $11 + memory = $5, + type = $6, + timeout = $7, + idle_timeout = $8, + headers = $9, + config = $10 WHERE app_name = $1 AND path = $2;`, route.AppName, route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -301,7 +295,6 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } diff --git a/api/models/route.go b/api/models/route.go index b8c560bd8..61b2a5428 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -31,33 +31,31 @@ var ( type Routes []*Route type Route struct { - AppName string `json:"app_name"` - Path string `json:"path"` - Image string `json:"image"` - Memory uint64 `json:"memory"` - Headers http.Header `json:"headers"` - Type string `json:"type"` - Format string `json:"format"` - MaxConcurrency int `json:"max_concurrency"` - Timeout int32 `json:"timeout"` - IdleTimeout int32 `json:"idle_timeout"` - Config `json:"config"` + AppName string `json:"app_name"` + Path string `json:"path"` + Image string `json:"image"` + Memory uint64 `json:"memory"` + Headers http.Header `json:"headers"` + Type string `json:"type"` + Format string `json:"format"` + Timeout int32 `json:"timeout"` + IdleTimeout int32 `json:"idle_timeout"` + Config `json:"config"` } var ( - ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed") - ErrRoutesValidationInvalidPath = errors.New("Invalid Path format") - ErrRoutesValidationInvalidType = errors.New("Invalid route Type") - ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format") - ErrRoutesValidationMissingAppName = errors.New("Missing route AppName") - ErrRoutesValidationMissingImage = errors.New("Missing route Image") - ErrRoutesValidationMissingName = errors.New("Missing route Name") - ErrRoutesValidationMissingPath = errors.New("Missing route Path") - ErrRoutesValidationMissingType = errors.New("Missing route Type") - ErrRoutesValidationPathMalformed = errors.New("Path malformed") - ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout") - ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout") - ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency") + ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed") + ErrRoutesValidationInvalidPath = errors.New("Invalid Path format") + ErrRoutesValidationInvalidType = errors.New("Invalid route Type") + ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format") + ErrRoutesValidationMissingAppName = errors.New("Missing route AppName") + ErrRoutesValidationMissingImage = errors.New("Missing route Image") + ErrRoutesValidationMissingName = errors.New("Missing route Name") + ErrRoutesValidationMissingPath = errors.New("Missing route Path") + ErrRoutesValidationMissingType = errors.New("Missing route Type") + ErrRoutesValidationPathMalformed = errors.New("Path malformed") + ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout") + ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout") ) // SetDefaults sets zeroed field to defaults. @@ -74,10 +72,6 @@ func (r *Route) SetDefaults() { r.Format = FormatDefault } - if r.MaxConcurrency == 0 { - r.MaxConcurrency = 1 - } - if r.Headers == nil { r.Headers = http.Header{} } @@ -140,10 +134,6 @@ func (r *Route) Validate(skipZero bool) error { } } - if r.MaxConcurrency < 0 { - res = append(res, ErrRoutesValidationNegativeMaxConcurrency) - } - if r.Timeout < 0 { res = append(res, ErrRoutesValidationNegativeTimeout) } @@ -188,9 +178,6 @@ func (r *Route) Update(new *Route) { if new.Format != "" { r.Format = new.Format } - if new.MaxConcurrency != 0 { - r.MaxConcurrency = new.MaxConcurrency - } if new.Headers != nil { if r.Headers == nil { r.Headers = make(http.Header) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 80aeb2cb5..bf45652aa 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { +func RunAsyncRunner(ctx context.Context, tasksrv string, rnr *Runner, ds models.Datastore) { u := tasksrvURL(tasksrv) - startAsyncRunners(ctx, u, tasks, rnr, ds) + startAsyncRunners(ctx, u, rnr, ds) <-ctx.Done() } -func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { +func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.Datastore) { var wg sync.WaitGroup ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"}) for { @@ -133,7 +133,7 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, go func() { defer wg.Done() // Process Task - _, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds) + _, err := rnr.RunTrackedTask(task, ctx, getCfg(task), ds) if err != nil { log.WithError(err).Error("Cannot run task") } diff --git a/api/runner/protocol/factory.go b/api/runner/protocol/factory.go index 4b2ae9892..6f3078741 100644 --- a/api/runner/protocol/factory.go +++ b/api/runner/protocol/factory.go @@ -29,25 +29,49 @@ const ( Empty Protocol = "" ) +func (p *Protocol) UnmarshalJSON(b []byte) error { + switch Protocol(b) { + case Empty, Default: + *p = Default + case HTTP: + *p = HTTP + default: + return errInvalidProtocol + } + return nil +} + +func (p Protocol) MarshalJSON() ([]byte, error) { + switch p { + case Empty, Default: + return []byte(Default), nil + case HTTP: + return []byte(HTTP), nil + } + return nil, errInvalidProtocol +} + +// implements ContainerIO +type errorProto struct{} + +func (e *errorProto) IsStreamable() bool { return false } +func (e *errorProto) Dispatch(ctx context.Context, t task.Request) error { return errInvalidProtocol } + // New creates a valid protocol handler from a I/O pipe representing containers // stdin/stdout. -func New(p Protocol, in io.Writer, out io.Reader) (ContainerIO, error) { +func New(p Protocol, in io.Writer, out io.Reader) ContainerIO { switch p { case HTTP: - return &HTTPProtocol{in, out}, nil + return &HTTPProtocol{in, out} case Default, Empty: - return &DefaultProtocol{}, nil - default: - return nil, errInvalidProtocol + return &DefaultProtocol{} } + return &errorProto{} // shouldn't make it past testing... } // IsStreamable says whether the given protocol can be used for streaming into // hot functions. -func IsStreamable(p string) (bool, error) { - proto, err := New(Protocol(p), nil, nil) - if err != nil { - return false, err - } - return proto.IsStreamable(), nil +// TODO get rid of ContainerIO and just use Protocol +func IsStreamable(p Protocol) bool { + return New(p, nil, nil).IsStreamable() } diff --git a/api/runner/runner.go b/api/runner/runner.go index 440670f17..598b33022 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -16,12 +16,14 @@ import ( "github.com/Sirupsen/logrus" "gitlab-odx.oracle.com/odx/functions/api/runner/common" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" - driverscommon "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers/docker" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers/mock" "gitlab-odx.oracle.com/odx/functions/api/runner/task" ) +// TODO clean all of this up, the exposed API is huge and incohesive, +// we need 1 thing that runs 1 thing and 1 thing that runs those things; +// right now this is all the things. type Runner struct { driver drivers.Driver taskQueue chan *containerTask @@ -30,6 +32,7 @@ type Runner struct { availableMem int64 usedMem int64 usedMemMutex sync.RWMutex + hcmgr htfnmgr stats } @@ -50,7 +53,7 @@ func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, erro env := common.NewEnvironment(func(e *common.Environment) {}) // TODO: Create a drivers.New(runnerConfig) in Titan - driver, err := selectDriver("docker", env, &driverscommon.Config{}) + driver, err := selectDriver("docker", env, &drivers.Config{}) if err != nil { return nil, err } @@ -158,7 +161,8 @@ func (r *Runner) checkMemAndUse(req uint64) bool { return true } -func (r *Runner) Run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { +// run is responsible for running 1 instance of a docker container +func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { var err error if cfg.Memory == 0 { @@ -241,7 +245,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error { return err } -func selectDriver(driver string, env *common.Environment, conf *driverscommon.Config) (drivers.Driver, error) { +func selectDriver(driver string, env *common.Environment, conf *drivers.Config) (drivers.Driver, error) { switch driver { case "docker": docker := docker.NewDocker(env, *conf) diff --git a/api/runner/stats.go b/api/runner/stats.go index 5c9a4351f..8ebc86725 100644 --- a/api/runner/stats.go +++ b/api/runner/stats.go @@ -7,6 +7,8 @@ type stats struct { queue uint64 running uint64 complete uint64 + + wait sync.WaitGroup } type Stats struct { @@ -22,6 +24,7 @@ func (s *stats) Enqueue() { } func (s *stats) Start() { + s.wait.Add(1) s.mu.Lock() s.queue-- s.running++ @@ -29,6 +32,7 @@ func (s *stats) Start() { } func (s *stats) Complete() { + s.wait.Done() s.mu.Lock() s.running-- s.complete++ @@ -44,3 +48,5 @@ func (s *stats) Stats() Stats { s.mu.Unlock() return stats } + +func (s *stats) Wait() { s.wait.Wait() } diff --git a/api/runner/task/task.go b/api/runner/task/task.go index f3bfa2fb5..4cb8bbbcb 100644 --- a/api/runner/task/task.go +++ b/api/runner/task/task.go @@ -9,25 +9,23 @@ import ( ) type Config struct { - ID string - Path string - Image string - Timeout time.Duration - IdleTimeout time.Duration - AppName string - Memory uint64 - Env map[string]string - Format string - MaxConcurrency int + ID string + Path string + Image string + Timeout time.Duration + IdleTimeout time.Duration + AppName string + Memory uint64 + Env map[string]string + Format string Stdin io.Reader Stdout io.Writer Stderr io.Writer } -// Request stores the task to be executed by the common concurrency stream, -// whatever type the ask actually is, either sync or async. It holds in itself -// the channel to return its response to its caller. +// Request stores the task to be executed, It holds in itself the channel to +// return its response to its caller. type Request struct { Ctx context.Context Config *Config diff --git a/api/runner/worker.go b/api/runner/worker.go index c8ace58e0..fa139d693 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -1,7 +1,6 @@ package runner import ( - "bufio" "context" "fmt" "io" @@ -9,12 +8,12 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/go-openapi/strfmt" uuid "github.com/satori/go.uuid" "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" "gitlab-odx.oracle.com/odx/functions/api/runner/protocol" "gitlab-odx.oracle.com/odx/functions/api/runner/task" - "github.com/go-openapi/strfmt" ) // hot functions - theory of operation @@ -42,12 +41,6 @@ import ( // Incoming // Task // │ -// ▼ -// ┌───────────────┐ -// │ Task Request │ -// │ Main Loop │ -// └───────────────┘ -// │ // ┌──────▼────────┐ // ┌┴──────────────┐│ // │ Per Function ││ non-streamable f() @@ -64,13 +57,12 @@ import ( // Terminate // (internal clock) - // RunTrackedTask is just a wrapper for shared logic for async/sync runners -func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) { +func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) { startedAt := strfmt.DateTime(time.Now()) newTask.StartedAt = startedAt - result, err := RunTask(tasks, ctx, cfg) + result, err := rnr.RunTask(ctx, cfg) completedAt := strfmt.DateTime(time.Now()) status := result.Status() @@ -78,89 +70,72 @@ func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.C newTask.Status = status err = ds.InsertTask(ctx, newTask) + // TODO we should just log this error not return it to user? just issue storing task status but task is run return result, err } +// RunTask will dispatch a task specified by cfg to a hot container, if possible, +// that already exists or will create a new container to run a task and then run it. +// TODO XXX (reed): merge this and RunTrackedTask to reduce surface area... +func (rnr *Runner) RunTask(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { + rnr.Start() // TODO layering issue ??? + defer rnr.Complete() -// RunTask helps sending a task.Request into the common concurrency stream. -// Refer to StartWorkers() to understand what this is about. -func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { tresp := make(chan task.Response) treq := task.Request{Ctx: ctx, Config: cfg, Response: tresp} - tasks <- treq + tasks := rnr.hcmgr.getPipe(ctx, rnr, cfg) + if tasks == nil { + // TODO get rid of this to use herd stuff + go runTaskReq(rnr, treq) + } else { + tasks <- treq + } resp := <-treq.Response return resp.Result, resp.Err } -// StartWorkers operates the common concurrency stream, ie, it will process all -// functions tasks, either sync or async. In the process, it also dispatches -// the workload to either regular or hot functions. -func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) { - var wg sync.WaitGroup - defer wg.Wait() - var hcmgr htfnmgr - - for { - select { - case <-ctx.Done(): - return - case task := <-tasks: - p := hcmgr.getPipe(ctx, rnr, task.Config) - if p == nil { - wg.Add(1) - go runTaskReq(rnr, &wg, task) - continue - } - - rnr.Start() - select { - case <-ctx.Done(): - return - case p <- task: - rnr.Complete() - } - } - } -} - -// htfnmgr is the intermediate between the common concurrency stream and -// hot functions. All hot functions share a single task.Request stream per -// function (chn), but each function may have more than one hot function (hc). +// htfnmgr tracks all hot functions, used to funnel kittens into existing tubes +// XXX (reed): this map grows unbounded, need to add LRU but need to make +// sure that no functions are running when we evict type htfnmgr struct { - chn map[string]chan task.Request - hc map[string]*htfnsvr + sync.RWMutex + hc map[string]*htfnsvr } -func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request { - isStream, err := protocol.IsStreamable(cfg.Format) - if err != nil { - logrus.WithError(err).Info("could not detect container IO protocol") - return nil - } else if !isStream { +func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan<- task.Request { + isStream := protocol.IsStreamable(protocol.Protocol(cfg.Format)) + if !isStream { + // TODO stop doing this, to prevent herds return nil } - if h.chn == nil { - h.chn = make(map[string]chan task.Request) - h.hc = make(map[string]*htfnsvr) + h.RLock() + if h.hc == nil { + h.RUnlock() + h.Lock() + if h.hc == nil { + h.hc = make(map[string]*htfnsvr) + } + h.Unlock() + h.RLock() } // TODO(ccirello): re-implement this without memory allocation (fmt.Sprint) - fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format, cfg.MaxConcurrency) - tasks, ok := h.chn[fn] + fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format) + svr, ok := h.hc[fn] + h.RUnlock() if !ok { - h.chn[fn] = make(chan task.Request) - tasks = h.chn[fn] - svr := newhtfnsvr(ctx, cfg, rnr, tasks) - if err := svr.launch(ctx); err != nil { - logrus.WithError(err).Error("cannot start hot function supervisor") - return nil + h.Lock() + svr, ok = h.hc[fn] + if !ok { + svr = newhtfnsvr(ctx, cfg, rnr) + h.hc[fn] = svr } - h.hc[fn] = svr + h.Unlock() } - return tasks + return svr.tasksin } // htfnsvr is part of htfnmgr, abstracted apart for simplicity, its only @@ -168,21 +143,28 @@ func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) ch // needed. In case of absence of workload, it will stop trying to start new hot // containers. type htfnsvr struct { - cfg *task.Config - rnr *Runner - tasksin <-chan task.Request + cfg *task.Config + rnr *Runner + // TODO sharing with only a channel among hot containers will result in + // inefficient recycling of containers, we need a stack not a queue, so that + // when a lot of hot containers are up and throughput drops they don't all + // find a task every few seconds and stay up for a lot longer than we really + // need them. + tasksin chan task.Request tasksout chan task.Request - maxc chan struct{} + first chan struct{} + once sync.Once // TODO this really needs to happen any time runner count goes to 0 } -func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htfnsvr { +func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner) *htfnsvr { svr := &htfnsvr{ cfg: cfg, rnr: rnr, - tasksin: tasks, + tasksin: make(chan task.Request), tasksout: make(chan task.Request, 1), - maxc: make(chan struct{}, cfg.MaxConcurrency), + first: make(chan struct{}, 1), } + svr.first <- struct{}{} // prime so that 1 thread will start the first container, others will wait // This pipe will take all incoming tasks and just forward them to the // started hot functions. The catch here is that it feeds a buffered @@ -190,7 +172,7 @@ func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan // then used to determine the presence of running hot functions. // If no hot function is available, tasksout will fill up to its // capacity and pipe() will start them. - go svr.pipe(ctx) + go svr.pipe(context.Background()) // XXX (reed): real context for adding consuela return svr } @@ -199,10 +181,18 @@ func (svr *htfnsvr) pipe(ctx context.Context) { select { case t := <-svr.tasksin: svr.tasksout <- t - if len(svr.tasksout) > 0 { - if err := svr.launch(ctx); err != nil { - logrus.WithError(err).Error("cannot start more hot functions") + + // TODO move checking for ram up here? then we can wait for hot functions to open up instead of always + // trying to make new ones if all hot functions are busy (and if machine is full and all functions are + // hot then most new hot functions are going to time out waiting to get available ram) + // TODO need to add some kind of metering here, we could track average run time and # of runners + select { + case _, ok := <-svr.first: // wait for >= 1 to be up to avoid herd + if ok || len(svr.tasksout) > 0 { + svr.launch(ctx) } + case <-ctx.Done(): // TODO we should prob watch the task timeout not just the pipe... + return } case <-ctx.Done(): return @@ -210,41 +200,25 @@ func (svr *htfnsvr) pipe(ctx context.Context) { } } -func (svr *htfnsvr) launch(ctx context.Context) error { - select { - case svr.maxc <- struct{}{}: - hc, err := newhtfn( - svr.cfg, - protocol.Protocol(svr.cfg.Format), - svr.tasksout, - svr.rnr, - ) - if err != nil { - return err - } - go func() { - hc.serve(ctx) - <-svr.maxc - }() - default: - } - - return nil +func (svr *htfnsvr) launch(ctx context.Context) { + hc := newhtfn( + svr.cfg, + svr.tasksout, + svr.rnr, + func() { svr.once.Do(func() { close(svr.first) }) }, + ) + go hc.serve(ctx) } -// htfn actually interfaces an incoming task from the common concurrency -// stream into a long lived container. If idle long enough, it will stop. It -// uses route configuration to determine which protocol to use. +// htfn is one instance of a hot container, which may or may not be running a +// task. If idle long enough, it will stop. It uses route configuration to +// determine which protocol to use. type htfn struct { id string cfg *task.Config proto protocol.ContainerIO tasks <-chan task.Request - - // Side of the pipe that takes information from outer world - // and injects into the container. - in io.Writer - out io.Reader + once func() // Receiving side of the container. containerIn io.Reader @@ -253,76 +227,50 @@ type htfn struct { rnr *Runner } -func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htfn, error) { +func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func()) *htfn { stdinr, stdinw := io.Pipe() stdoutr, stdoutw := io.Pipe() - p, err := protocol.New(proto, stdinw, stdoutr) - if err != nil { - return nil, err - } - - hc := &htfn{ + return &htfn{ id: uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", cfg.AppName, cfg.Path, time.Now().Unix())).String(), cfg: cfg, - proto: p, + proto: protocol.New(protocol.Protocol(cfg.Format), stdinw, stdoutr), tasks: tasks, - - in: stdinw, - out: stdoutr, + once: once, containerIn: stdinr, containerOut: stdoutw, rnr: rnr, } - - return hc, nil } func (hc *htfn) serve(ctx context.Context) { lctx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup + defer cancel() cfg := *hc.cfg - logFields := logrus.Fields{ - "hot_id": hc.id, - "app": cfg.AppName, - "route": cfg.Path, - "image": cfg.Image, - "memory": cfg.Memory, - "format": cfg.Format, - "max_concurrency": cfg.MaxConcurrency, - "idle_timeout": cfg.IdleTimeout, - } - logger := logrus.WithFields(logFields) + logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout}) - wg.Add(1) go func() { - defer wg.Done() for { - inactivity := time.After(cfg.IdleTimeout) - select { case <-lctx.Done(): return - - case <-inactivity: + case <-time.After(cfg.IdleTimeout): logger.Info("Canceling inactive hot function") cancel() - case t := <-hc.tasks: - if err := hc.proto.Dispatch(lctx, t); err != nil { + err := hc.proto.Dispatch(lctx, t) + status := "success" + if err != nil { + status = "error" logrus.WithField("ctx", lctx).Info("task failed") - t.Response <- task.Response{ - &runResult{StatusValue: "error", error: err}, - err, - } - continue } + hc.once() t.Response <- task.Response{ - &runResult{StatusValue: "success"}, - nil, + &runResult{StatusValue: status, error: err}, + err, } } } @@ -332,48 +280,18 @@ func (hc *htfn) serve(ctx context.Context) { cfg.Timeout = 0 // add a timeout to simulate ab.end. failure. cfg.Stdin = hc.containerIn cfg.Stdout = hc.containerOut + // NOTE: cfg.Stderr is overwritten in rnr.Run() - // Why can we not attach stderr to the task like we do for stdin and - // stdout? - // - // Stdin/Stdout are completely known to the scope of the task. You must - // have a task stdin to feed containers stdin, and also the other way - // around when reading from stdout. So both are directly related to the - // life cycle of the request. - // - // Stderr, on the other hand, can be written by anything any time: - // failure between requests, failures inside requests and messages send - // right after stdout has been finished being transmitted. Thus, with - // hot functions, there is not a 1:1 relation between stderr and tasks. - // - // Still, we do pass - at protocol level - a Task-ID header, from which - // the application running inside the hot function can use to identify - // its own stderr output. - errr, errw := io.Pipe() - cfg.Stderr = errw - wg.Add(1) - go func() { - defer wg.Done() - scanner := bufio.NewScanner(errr) - for scanner.Scan() { - logger.Info(scanner.Text()) - } - }() - - result, err := hc.rnr.Run(lctx, &cfg) + result, err := hc.rnr.run(lctx, &cfg) if err != nil { logger.WithError(err).Error("hot function failure detected") } - errw.Close() - wg.Wait() logger.WithField("result", result).Info("hot function terminated") } -func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) { - defer wg.Done() - rnr.Start() - defer rnr.Complete() - result, err := rnr.Run(t.Ctx, t.Config) +// TODO make Default protocol a real thing and get rid of this in favor of Dispatch +func runTaskReq(rnr *Runner, t task.Request) { + result, err := rnr.run(t.Ctx, t.Config) select { case t.Response <- task.Response{result, err}: close(t.Response) diff --git a/api/server/runner.go b/api/server/runner.go index fbfe0f321..a305cb2f5 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -152,7 +152,7 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]* } // TODO: Should remove *gin.Context from these functions, should use only context.Context -func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue, ) (ok bool) { +func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) params, match := matchRoute(found.Path, route) @@ -193,18 +193,17 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun } cfg := &task.Config{ - AppName: appName, - Path: found.Path, - Env: envVars, - Format: found.Format, - ID: reqID, - Image: found.Image, - MaxConcurrency: found.MaxConcurrency, - Memory: found.Memory, - Stdin: payload, - Stdout: &stdout, - Timeout: time.Duration(found.Timeout) * time.Second, - IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, + AppName: appName, + Path: found.Path, + Env: envVars, + Format: found.Format, + ID: reqID, + Image: found.Image, + Memory: found.Memory, + Stdin: payload, + Stdout: &stdout, + Timeout: time.Duration(found.Timeout) * time.Second, + IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, } // ensure valid values @@ -244,8 +243,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID}) default: - result, err := runner.RunTrackedTask(newTask, s.tasks, ctx, cfg, s.Datastore) - + result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg, s.Datastore) if err != nil { c.JSON(http.StatusInternalServerError, runnerResponse{ RequestID: cfg.ID, diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 37f4409a1..3bb923c30 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -121,8 +121,6 @@ func TestRouteRunnerExecution(t *testing.T) { rnr, cancelrnr := testRunner(t) defer cancelrnr() - go runner.StartWorkers(ctx, rnr, tasks) - srv := testServer(datastore.NewMockInit( []*models.App{ {Name: "myapp", Config: models.Config{}}, @@ -179,7 +177,6 @@ func TestRouteRunnerTimeout(t *testing.T) { rnr, cancelrnr := testRunner(t) defer cancelrnr() - go runner.StartWorkers(ctx, rnr, tasks) srv := testServer(datastore.NewMockInit( []*models.App{ diff --git a/api/server/server.go b/api/server/server.go index bf1de836e..0ce248442 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -21,7 +21,6 @@ import ( "gitlab-odx.oracle.com/odx/functions/api/mqs" "gitlab-odx.oracle.com/odx/functions/api/runner" "gitlab-odx.oracle.com/odx/functions/api/runner/common" - "gitlab-odx.oracle.com/odx/functions/api/runner/task" "gitlab-odx.oracle.com/odx/functions/api/server/internal/routecache" ) @@ -49,7 +48,6 @@ type Server struct { mu sync.Mutex // protects hotroutes hotroutes *routecache.Cache - tasks chan task.Request singleflight singleflight // singleflight assists Datastore } @@ -83,14 +81,12 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, apiUR return nil } - tasks := make(chan task.Request) s := &Server{ Runner: rnr, Router: gin.New(), Datastore: ds, MQ: mq, hotroutes: routecache.New(cacheSize), - tasks: tasks, Enqueue: DefaultEnqueue, apiURL: apiURL, } @@ -200,7 +196,6 @@ func extractFields(c *gin.Context) logrus.Fields { func (s *Server) Start(ctx context.Context) { ctx = contextWithSignal(ctx, os.Interrupt) s.startGears(ctx) - close(s.tasks) } func (s *Server) startGears(ctx context.Context) { @@ -245,14 +240,11 @@ func (s *Server) startGears(ctx context.Context) { }) svr.AddFunc(func(ctx context.Context) { - runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner, s.Datastore) - }) - - svr.AddFunc(func(ctx context.Context) { - runner.StartWorkers(ctx, s.Runner, s.tasks) + runner.RunAsyncRunner(ctx, s.apiURL, s.Runner, s.Datastore) }) svr.Serve(ctx) + s.Runner.Wait() // wait for tasks to finish (safe shutdown) } func (s *Server) bindHandlers(ctx context.Context) { @@ -321,11 +313,11 @@ type tasksResponse struct { } type fnCallResponse struct { - Message string `json:"message"` + Message string `json:"message"` Call *models.FnCall `json:"call"` } type fnCallsResponse struct { - Message string `json:"message"` - Calls models.FnCalls `json:"calls"` + Message string `json:"message"` + Calls models.FnCalls `json:"calls"` } diff --git a/api/server/server_test.go b/api/server/server_test.go index b4746a50a..9260e8b69 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -104,8 +104,6 @@ func TestFullStack(t *testing.T) { rnr, rnrcancel := testRunner(t) defer rnrcancel() - go runner.StartWorkers(ctx, rnr, tasks) - srv := testServer(ds, &mqs.Mock{}, rnr, tasks) srv.hotroutes = routecache.New(2) diff --git a/api/server/special_handler_test.go b/api/server/special_handler_test.go index c424b4b43..85ddf23f8 100644 --- a/api/server/special_handler_test.go +++ b/api/server/special_handler_test.go @@ -20,8 +20,6 @@ func TestSpecialHandlerSet(t *testing.T) { // rnr, cancelrnr := testRunner(t) // defer cancelrnr() - // go runner.StartWorkers(ctx, rnr, tasks) - // s := &Server{ // Runner: rnr, // Router: gin.New(), diff --git a/docs/function-file.md b/docs/function-file.md index 6ae65cc31..7a53e9a54 100644 --- a/docs/function-file.md +++ b/docs/function-file.md @@ -66,12 +66,6 @@ hot functions support also adds two extra options to this configuration file. `idle_timeout` (optional) is the time in seconds a container will remain alive without receiving any new requests; hot functions will stay alive as long as they receive a request in this interval. Default: `30`. -`max_concurrency` (optional) is the maximum of hot functions per node to be -started for a certain function. It defaults to one per function. If you -understand you need more processing power, make sure to raise this number. -Keep in mind that if there is not available memory to execute the configured -workload, it will fail to start new hot functions. - ## Testing functions `tests` (optional) is an array of tests that can be used to valid functions both diff --git a/docs/hot-functions.md b/docs/hot-functions.md index 01842f1eb..5300c6184 100644 --- a/docs/hot-functions.md +++ b/docs/hot-functions.md @@ -97,7 +97,6 @@ requests: "type": "sync", "config": null, "format": "http", - "max_concurrency": "1", "idle_timeout": 30 } } @@ -108,8 +107,5 @@ requests: `format` (mandatory) either "default" or "http". If "http", then it is a hot container. -`max_concurrency` (optional) - the number of simultaneous hot functions for -this functions. This is a per-node configuration option. Default: 1 - `idle_timeout` (optional) - idle timeout (in seconds) before function termination. diff --git a/docs/swagger.yml b/docs/swagger.yml index f61c629ce..4b0926621 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -416,10 +416,6 @@ definitions: - http - json description: Payload format sent into function. - max_concurrency: - type: integer - format: int32 - description: Maximum number of hot functions concurrency config: type: object description: Route configuration - overrides application configuration @@ -653,4 +649,4 @@ definitions: - task properties: task: - $ref: '#/definitions/Task' \ No newline at end of file + $ref: '#/definitions/Task' diff --git a/examples/tutorial/hotfunctions/http/hotroute.json b/examples/tutorial/hotfunctions/http/hotroute.json index af7e65172..58438a235 100644 --- a/examples/tutorial/hotfunctions/http/hotroute.json +++ b/examples/tutorial/hotfunctions/http/hotroute.json @@ -1,7 +1,4 @@ {"route":{ - "app_name": "myapp", - "path": "/hot", - "image": "USERNAME/hchttp", "memory": 64, "type": "sync", "config": null, diff --git a/examples/tutorial/params/func.yaml b/examples/tutorial/params/func.yaml index f26e7a126..2f147a155 100644 --- a/examples/tutorial/params/func.yaml +++ b/examples/tutorial/params/func.yaml @@ -3,4 +3,3 @@ version: 0.0.6 runtime: go entrypoint: ./func path: /fn3 -max_concurrency: 1 diff --git a/fn/funcfile.go b/fn/funcfile.go index 1545eb1f2..a5a8204de 100644 --- a/fn/funcfile.go +++ b/fn/funcfile.go @@ -32,21 +32,20 @@ type fftest struct { } type funcfile struct { - Name string `yaml:"name,omitempty" json:"name,omitempty"` - Version string `yaml:"version,omitempty" json:"version,omitempty"` - Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"` - Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"` - Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"` - Type *string `yaml:"type,omitempty" json:"type,omitempty"` - Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"` - Format *string `yaml:"format,omitempty" json:"format,omitempty"` - Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` - Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"` - Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"` - Build []string `yaml:"build,omitempty" json:"build,omitempty"` - Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"` - Path *string `yaml:"path,omitempty" json:"path,omitempty"` - MaxConcurrency *int `yaml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + Version string `yaml:"version,omitempty" json:"version,omitempty"` + Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"` + Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"` + Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"` + Type *string `yaml:"type,omitempty" json:"type,omitempty"` + Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"` + Format *string `yaml:"format,omitempty" json:"format,omitempty"` + Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` + Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"` + Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"` + Build []string `yaml:"build,omitempty" json:"build,omitempty"` + Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"` + Path *string `yaml:"path,omitempty" json:"path,omitempty"` } func (ff *funcfile) FullName() string { diff --git a/fn/init.go b/fn/init.go index 06ac61e01..903b71bde 100644 --- a/fn/init.go +++ b/fn/init.go @@ -44,13 +44,12 @@ func init() { } type initFnCmd struct { - name string - force bool - runtime string - entrypoint string - cmd string - format string - maxConcurrency int + name string + force bool + runtime string + entrypoint string + cmd string + format string } func initFn() cli.Command { @@ -84,12 +83,6 @@ func initFn() cli.Command { Destination: &a.format, Value: "", }, - cli.IntFlag{ - Name: "max-concurrency", - Usage: "maximum concurrency for hot function", - Destination: &a.maxConcurrency, - Value: 1, - }, }, } } @@ -125,13 +118,12 @@ func (a *initFnCmd) init(c *cli.Context) error { } ff := &funcfile{ - Name: a.name, - Runtime: &a.runtime, - Version: initialVersion, - Entrypoint: a.entrypoint, - Cmd: a.cmd, - Format: ffmt, - MaxConcurrency: &a.maxConcurrency, + Name: a.name, + Runtime: &a.runtime, + Version: initialVersion, + Entrypoint: a.entrypoint, + Cmd: a.cmd, + Format: ffmt, } _, path := appNamePath(ff.FullName()) diff --git a/fn/routes.go b/fn/routes.go index 6662b7feb..7c18af29e 100644 --- a/fn/routes.go +++ b/fn/routes.go @@ -49,10 +49,6 @@ var routeFlags = []cli.Flag{ Name: "format,f", Usage: "hot container IO format - json or http", }, - cli.IntFlag{ - Name: "max-concurrency,mc", - Usage: "maximum concurrency for hot container", - }, cli.DurationFlag{ Name: "timeout", Usage: "route timeout (eg. 30s)", @@ -259,10 +255,6 @@ func routeWithFlags(c *cli.Context, rt *fnmodels.Route) { rt.Type = t } - if m := c.Int("max-concurrency"); m > 0 { - rt.MaxConcurrency = int32(m) - } - if m := c.Int64("memory"); m > 0 { rt.Memory = m } @@ -300,9 +292,6 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error { if ff.Format != nil { rt.Format = *ff.Format } - if ff.MaxConcurrency != nil { - rt.MaxConcurrency = int32(*ff.MaxConcurrency) - } if ff.Timeout != nil { to := int64(ff.Timeout.Seconds()) rt.Timeout = &to @@ -422,9 +411,6 @@ func (a *routesCmd) patchRoute(c *cli.Context, appName, routePath string, r *fnm if r.Type != "" { resp.Payload.Route.Type = r.Type } - if r.MaxConcurrency > 0 { - resp.Payload.Route.MaxConcurrency = r.MaxConcurrency - } if r.Memory > 0 { resp.Payload.Route.Memory = r.Memory } diff --git a/test/fnlb-test-harness/primes-func/func.yaml b/test/fnlb-test-harness/primes-func/func.yaml index 71aa5f03f..32b3ff10d 100644 --- a/test/fnlb-test-harness/primes-func/func.yaml +++ b/test/fnlb-test-harness/primes-func/func.yaml @@ -3,4 +3,3 @@ version: 0.0.1 runtime: go entrypoint: ./func path: /primes -max_concurrency: 1