diff --git a/api/datastore/internal/datastoreutil/shared.go b/api/datastore/internal/datastoreutil/shared.go index f1c57747a..5f6c0b71e 100644 --- a/api/datastore/internal/datastoreutil/shared.go +++ b/api/datastore/internal/datastoreutil/shared.go @@ -4,8 +4,8 @@ import ( "bytes" "database/sql" "encoding/json" - "github.com/Sirupsen/logrus" "fmt" + "github.com/Sirupsen/logrus" "strings" "gitlab-odx.oracle.com/odx/functions/api/models" @@ -15,7 +15,6 @@ type RowScanner interface { Scan(dest ...interface{}) error } - func ScanRoute(scanner RowScanner, route *models.Route) error { var headerStr string var configStr string @@ -25,7 +24,6 @@ func ScanRoute(scanner RowScanner, route *models.Route) error { &route.Path, &route.Image, &route.Format, - &route.MaxConcurrency, &route.Memory, &route.Type, &route.Timeout, @@ -107,7 +105,6 @@ func BuildFilterRouteQuery(filter *models.RouteFilter, whereStm, andStm string) return b.String(), args } - func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []interface{}) { if filter == nil { return "", nil @@ -120,7 +117,6 @@ func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []i return "", nil } - func BuildFilterCallQuery(filter *models.CallFilter, whereStm, andStm string) (string, []interface{}) { if filter == nil { return "", nil @@ -269,7 +265,6 @@ func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, err logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info( fmt.Sprintf("%v datastore dialed", dialect)) - for _, v := range tables { _, err = db.Exec(v) if err != nil { @@ -372,4 +367,4 @@ func SQLRemoveRoute(db *sql.DB, appName, routePath, deleteStm string) error { return nil -} \ No newline at end of file +} diff --git a/api/datastore/mysql/mysql.go b/api/datastore/mysql/mysql.go index e6f50e565..60bab963f 100644 --- a/api/datastore/mysql/mysql.go +++ b/api/datastore/mysql/mysql.go @@ -18,7 +18,6 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes ( path varchar(256) NOT NULL, image varchar(256) NOT NULL, format varchar(16) NOT NULL, - maxc int NOT NULL, memory int NOT NULL, timeout int NOT NULL, idle_timeout int NOT NULL, @@ -38,7 +37,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value varchar(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( created_at varchar(256) NOT NULL, @@ -53,7 +52,6 @@ const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` - /* MySQLDatastore defines a basic MySQL Datastore struct. */ @@ -237,7 +235,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) path, image, format, - maxc, memory, type, timeout, @@ -250,7 +247,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -296,7 +292,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout UPDATE routes SET image = ?, format = ?, - maxc = ?, memory = ?, type = ?, timeout = ?, @@ -306,7 +301,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout WHERE app_name = ? AND path = ?;`, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -341,7 +335,6 @@ RemoveRoute removes an existing route on MySQL. */ func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } @@ -429,7 +422,7 @@ func (ds *MySQLDatastore) Tx(f func(*sql.Tx) error) error { return tx.Commit() } -func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { +func (ds *MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { stmt, err := ds.db.Prepare("INSERT calls SET id=?,created_at=?,started_at=?,completed_at=?,status=?,app_name=?,path=?") if err != nil { return err @@ -445,13 +438,13 @@ func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) er return nil } -func (ds * MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { +func (ds *MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { whereStm := "%s WHERE id=?" return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) } -func (ds * MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { +func (ds *MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { whereStm := "WHERE %s ?" andStm := " AND %s ?" diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go index f598ea4d7..cf2193eed 100644 --- a/api/datastore/postgres/postgres.go +++ b/api/datastore/postgres/postgres.go @@ -1,17 +1,16 @@ package postgres import ( + "context" "database/sql" "encoding/json" "fmt" "net/url" - "context" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" "github.com/lib/pq" _ "github.com/lib/pq" + "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" + "gitlab-odx.oracle.com/odx/functions/api/models" ) const routesTableCreate = ` @@ -20,7 +19,6 @@ CREATE TABLE IF NOT EXISTS routes ( path text NOT NULL, image character varying(256) NOT NULL, format character varying(16) NOT NULL, - maxc integer NOT NULL, memory integer NOT NULL, timeout integer NOT NULL, idle_timeout integer NOT NULL, @@ -40,7 +38,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value character varying(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls ( created_at character varying(256) NOT NULL, @@ -203,7 +201,6 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout path, image, format, - maxc, memory, type, timeout, @@ -211,12 +208,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout headers, config ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`, + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, route.AppName, route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -259,19 +255,17 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R UPDATE routes SET image = $3, format = $4, - maxc = $5, - memory = $6, - type = $7, - timeout = $8, - idle_timeout = $9, - headers = $10, - config = $11 + memory = $5, + type = $6, + timeout = $7, + idle_timeout = $8, + headers = $9, + config = $10 WHERE app_name = $1 AND path = $2;`, route.AppName, route.Path, route.Image, route.Format, - route.MaxConcurrency, route.Memory, route.Type, route.Timeout, @@ -301,7 +295,6 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } diff --git a/api/models/route.go b/api/models/route.go index b8c560bd8..61b2a5428 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -31,33 +31,31 @@ var ( type Routes []*Route type Route struct { - AppName string `json:"app_name"` - Path string `json:"path"` - Image string `json:"image"` - Memory uint64 `json:"memory"` - Headers http.Header `json:"headers"` - Type string `json:"type"` - Format string `json:"format"` - MaxConcurrency int `json:"max_concurrency"` - Timeout int32 `json:"timeout"` - IdleTimeout int32 `json:"idle_timeout"` - Config `json:"config"` + AppName string `json:"app_name"` + Path string `json:"path"` + Image string `json:"image"` + Memory uint64 `json:"memory"` + Headers http.Header `json:"headers"` + Type string `json:"type"` + Format string `json:"format"` + Timeout int32 `json:"timeout"` + IdleTimeout int32 `json:"idle_timeout"` + Config `json:"config"` } var ( - ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed") - ErrRoutesValidationInvalidPath = errors.New("Invalid Path format") - ErrRoutesValidationInvalidType = errors.New("Invalid route Type") - ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format") - ErrRoutesValidationMissingAppName = errors.New("Missing route AppName") - ErrRoutesValidationMissingImage = errors.New("Missing route Image") - ErrRoutesValidationMissingName = errors.New("Missing route Name") - ErrRoutesValidationMissingPath = errors.New("Missing route Path") - ErrRoutesValidationMissingType = errors.New("Missing route Type") - ErrRoutesValidationPathMalformed = errors.New("Path malformed") - ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout") - ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout") - ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency") + ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed") + ErrRoutesValidationInvalidPath = errors.New("Invalid Path format") + ErrRoutesValidationInvalidType = errors.New("Invalid route Type") + ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format") + ErrRoutesValidationMissingAppName = errors.New("Missing route AppName") + ErrRoutesValidationMissingImage = errors.New("Missing route Image") + ErrRoutesValidationMissingName = errors.New("Missing route Name") + ErrRoutesValidationMissingPath = errors.New("Missing route Path") + ErrRoutesValidationMissingType = errors.New("Missing route Type") + ErrRoutesValidationPathMalformed = errors.New("Path malformed") + ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout") + ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout") ) // SetDefaults sets zeroed field to defaults. @@ -74,10 +72,6 @@ func (r *Route) SetDefaults() { r.Format = FormatDefault } - if r.MaxConcurrency == 0 { - r.MaxConcurrency = 1 - } - if r.Headers == nil { r.Headers = http.Header{} } @@ -140,10 +134,6 @@ func (r *Route) Validate(skipZero bool) error { } } - if r.MaxConcurrency < 0 { - res = append(res, ErrRoutesValidationNegativeMaxConcurrency) - } - if r.Timeout < 0 { res = append(res, ErrRoutesValidationNegativeTimeout) } @@ -188,9 +178,6 @@ func (r *Route) Update(new *Route) { if new.Format != "" { r.Format = new.Format } - if new.MaxConcurrency != 0 { - r.MaxConcurrency = new.MaxConcurrency - } if new.Headers != nil { if r.Headers == nil { r.Headers = make(http.Header) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 80aeb2cb5..bf45652aa 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { +func RunAsyncRunner(ctx context.Context, tasksrv string, rnr *Runner, ds models.Datastore) { u := tasksrvURL(tasksrv) - startAsyncRunners(ctx, u, tasks, rnr, ds) + startAsyncRunners(ctx, u, rnr, ds) <-ctx.Done() } -func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { +func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.Datastore) { var wg sync.WaitGroup ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"}) for { @@ -133,7 +133,7 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, go func() { defer wg.Done() // Process Task - _, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds) + _, err := rnr.RunTrackedTask(task, ctx, getCfg(task), ds) if err != nil { log.WithError(err).Error("Cannot run task") } diff --git a/api/runner/protocol/factory.go b/api/runner/protocol/factory.go index 4b2ae9892..6f3078741 100644 --- a/api/runner/protocol/factory.go +++ b/api/runner/protocol/factory.go @@ -29,25 +29,49 @@ const ( Empty Protocol = "" ) +func (p *Protocol) UnmarshalJSON(b []byte) error { + switch Protocol(b) { + case Empty, Default: + *p = Default + case HTTP: + *p = HTTP + default: + return errInvalidProtocol + } + return nil +} + +func (p Protocol) MarshalJSON() ([]byte, error) { + switch p { + case Empty, Default: + return []byte(Default), nil + case HTTP: + return []byte(HTTP), nil + } + return nil, errInvalidProtocol +} + +// implements ContainerIO +type errorProto struct{} + +func (e *errorProto) IsStreamable() bool { return false } +func (e *errorProto) Dispatch(ctx context.Context, t task.Request) error { return errInvalidProtocol } + // New creates a valid protocol handler from a I/O pipe representing containers // stdin/stdout. -func New(p Protocol, in io.Writer, out io.Reader) (ContainerIO, error) { +func New(p Protocol, in io.Writer, out io.Reader) ContainerIO { switch p { case HTTP: - return &HTTPProtocol{in, out}, nil + return &HTTPProtocol{in, out} case Default, Empty: - return &DefaultProtocol{}, nil - default: - return nil, errInvalidProtocol + return &DefaultProtocol{} } + return &errorProto{} // shouldn't make it past testing... } // IsStreamable says whether the given protocol can be used for streaming into // hot functions. -func IsStreamable(p string) (bool, error) { - proto, err := New(Protocol(p), nil, nil) - if err != nil { - return false, err - } - return proto.IsStreamable(), nil +// TODO get rid of ContainerIO and just use Protocol +func IsStreamable(p Protocol) bool { + return New(p, nil, nil).IsStreamable() } diff --git a/api/runner/runner.go b/api/runner/runner.go index 440670f17..598b33022 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -16,12 +16,14 @@ import ( "github.com/Sirupsen/logrus" "gitlab-odx.oracle.com/odx/functions/api/runner/common" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" - driverscommon "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers/docker" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers/mock" "gitlab-odx.oracle.com/odx/functions/api/runner/task" ) +// TODO clean all of this up, the exposed API is huge and incohesive, +// we need 1 thing that runs 1 thing and 1 thing that runs those things; +// right now this is all the things. type Runner struct { driver drivers.Driver taskQueue chan *containerTask @@ -30,6 +32,7 @@ type Runner struct { availableMem int64 usedMem int64 usedMemMutex sync.RWMutex + hcmgr htfnmgr stats } @@ -50,7 +53,7 @@ func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, erro env := common.NewEnvironment(func(e *common.Environment) {}) // TODO: Create a drivers.New(runnerConfig) in Titan - driver, err := selectDriver("docker", env, &driverscommon.Config{}) + driver, err := selectDriver("docker", env, &drivers.Config{}) if err != nil { return nil, err } @@ -158,7 +161,8 @@ func (r *Runner) checkMemAndUse(req uint64) bool { return true } -func (r *Runner) Run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { +// run is responsible for running 1 instance of a docker container +func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { var err error if cfg.Memory == 0 { @@ -241,7 +245,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error { return err } -func selectDriver(driver string, env *common.Environment, conf *driverscommon.Config) (drivers.Driver, error) { +func selectDriver(driver string, env *common.Environment, conf *drivers.Config) (drivers.Driver, error) { switch driver { case "docker": docker := docker.NewDocker(env, *conf) diff --git a/api/runner/stats.go b/api/runner/stats.go index 5c9a4351f..8ebc86725 100644 --- a/api/runner/stats.go +++ b/api/runner/stats.go @@ -7,6 +7,8 @@ type stats struct { queue uint64 running uint64 complete uint64 + + wait sync.WaitGroup } type Stats struct { @@ -22,6 +24,7 @@ func (s *stats) Enqueue() { } func (s *stats) Start() { + s.wait.Add(1) s.mu.Lock() s.queue-- s.running++ @@ -29,6 +32,7 @@ func (s *stats) Start() { } func (s *stats) Complete() { + s.wait.Done() s.mu.Lock() s.running-- s.complete++ @@ -44,3 +48,5 @@ func (s *stats) Stats() Stats { s.mu.Unlock() return stats } + +func (s *stats) Wait() { s.wait.Wait() } diff --git a/api/runner/task/task.go b/api/runner/task/task.go index f3bfa2fb5..4cb8bbbcb 100644 --- a/api/runner/task/task.go +++ b/api/runner/task/task.go @@ -9,25 +9,23 @@ import ( ) type Config struct { - ID string - Path string - Image string - Timeout time.Duration - IdleTimeout time.Duration - AppName string - Memory uint64 - Env map[string]string - Format string - MaxConcurrency int + ID string + Path string + Image string + Timeout time.Duration + IdleTimeout time.Duration + AppName string + Memory uint64 + Env map[string]string + Format string Stdin io.Reader Stdout io.Writer Stderr io.Writer } -// Request stores the task to be executed by the common concurrency stream, -// whatever type the ask actually is, either sync or async. It holds in itself -// the channel to return its response to its caller. +// Request stores the task to be executed, It holds in itself the channel to +// return its response to its caller. type Request struct { Ctx context.Context Config *Config diff --git a/api/runner/worker.go b/api/runner/worker.go index c8ace58e0..fa139d693 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -1,7 +1,6 @@ package runner import ( - "bufio" "context" "fmt" "io" @@ -9,12 +8,12 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/go-openapi/strfmt" uuid "github.com/satori/go.uuid" "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" "gitlab-odx.oracle.com/odx/functions/api/runner/protocol" "gitlab-odx.oracle.com/odx/functions/api/runner/task" - "github.com/go-openapi/strfmt" ) // hot functions - theory of operation @@ -42,12 +41,6 @@ import ( // Incoming // Task // │ -// ▼ -// ┌───────────────┐ -// │ Task Request │ -// │ Main Loop │ -// └───────────────┘ -// │ // ┌──────▼────────┐ // ┌┴──────────────┐│ // │ Per Function ││ non-streamable f() @@ -64,13 +57,12 @@ import ( // Terminate // (internal clock) - // RunTrackedTask is just a wrapper for shared logic for async/sync runners -func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) { +func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) { startedAt := strfmt.DateTime(time.Now()) newTask.StartedAt = startedAt - result, err := RunTask(tasks, ctx, cfg) + result, err := rnr.RunTask(ctx, cfg) completedAt := strfmt.DateTime(time.Now()) status := result.Status() @@ -78,89 +70,72 @@ func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.C newTask.Status = status err = ds.InsertTask(ctx, newTask) + // TODO we should just log this error not return it to user? just issue storing task status but task is run return result, err } +// RunTask will dispatch a task specified by cfg to a hot container, if possible, +// that already exists or will create a new container to run a task and then run it. +// TODO XXX (reed): merge this and RunTrackedTask to reduce surface area... +func (rnr *Runner) RunTask(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { + rnr.Start() // TODO layering issue ??? + defer rnr.Complete() -// RunTask helps sending a task.Request into the common concurrency stream. -// Refer to StartWorkers() to understand what this is about. -func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { tresp := make(chan task.Response) treq := task.Request{Ctx: ctx, Config: cfg, Response: tresp} - tasks <- treq + tasks := rnr.hcmgr.getPipe(ctx, rnr, cfg) + if tasks == nil { + // TODO get rid of this to use herd stuff + go runTaskReq(rnr, treq) + } else { + tasks <- treq + } resp := <-treq.Response return resp.Result, resp.Err } -// StartWorkers operates the common concurrency stream, ie, it will process all -// functions tasks, either sync or async. In the process, it also dispatches -// the workload to either regular or hot functions. -func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) { - var wg sync.WaitGroup - defer wg.Wait() - var hcmgr htfnmgr - - for { - select { - case <-ctx.Done(): - return - case task := <-tasks: - p := hcmgr.getPipe(ctx, rnr, task.Config) - if p == nil { - wg.Add(1) - go runTaskReq(rnr, &wg, task) - continue - } - - rnr.Start() - select { - case <-ctx.Done(): - return - case p <- task: - rnr.Complete() - } - } - } -} - -// htfnmgr is the intermediate between the common concurrency stream and -// hot functions. All hot functions share a single task.Request stream per -// function (chn), but each function may have more than one hot function (hc). +// htfnmgr tracks all hot functions, used to funnel kittens into existing tubes +// XXX (reed): this map grows unbounded, need to add LRU but need to make +// sure that no functions are running when we evict type htfnmgr struct { - chn map[string]chan task.Request - hc map[string]*htfnsvr + sync.RWMutex + hc map[string]*htfnsvr } -func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request { - isStream, err := protocol.IsStreamable(cfg.Format) - if err != nil { - logrus.WithError(err).Info("could not detect container IO protocol") - return nil - } else if !isStream { +func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan<- task.Request { + isStream := protocol.IsStreamable(protocol.Protocol(cfg.Format)) + if !isStream { + // TODO stop doing this, to prevent herds return nil } - if h.chn == nil { - h.chn = make(map[string]chan task.Request) - h.hc = make(map[string]*htfnsvr) + h.RLock() + if h.hc == nil { + h.RUnlock() + h.Lock() + if h.hc == nil { + h.hc = make(map[string]*htfnsvr) + } + h.Unlock() + h.RLock() } // TODO(ccirello): re-implement this without memory allocation (fmt.Sprint) - fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format, cfg.MaxConcurrency) - tasks, ok := h.chn[fn] + fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format) + svr, ok := h.hc[fn] + h.RUnlock() if !ok { - h.chn[fn] = make(chan task.Request) - tasks = h.chn[fn] - svr := newhtfnsvr(ctx, cfg, rnr, tasks) - if err := svr.launch(ctx); err != nil { - logrus.WithError(err).Error("cannot start hot function supervisor") - return nil + h.Lock() + svr, ok = h.hc[fn] + if !ok { + svr = newhtfnsvr(ctx, cfg, rnr) + h.hc[fn] = svr } - h.hc[fn] = svr + h.Unlock() } - return tasks + return svr.tasksin } // htfnsvr is part of htfnmgr, abstracted apart for simplicity, its only @@ -168,21 +143,28 @@ func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) ch // needed. In case of absence of workload, it will stop trying to start new hot // containers. type htfnsvr struct { - cfg *task.Config - rnr *Runner - tasksin <-chan task.Request + cfg *task.Config + rnr *Runner + // TODO sharing with only a channel among hot containers will result in + // inefficient recycling of containers, we need a stack not a queue, so that + // when a lot of hot containers are up and throughput drops they don't all + // find a task every few seconds and stay up for a lot longer than we really + // need them. + tasksin chan task.Request tasksout chan task.Request - maxc chan struct{} + first chan struct{} + once sync.Once // TODO this really needs to happen any time runner count goes to 0 } -func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htfnsvr { +func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner) *htfnsvr { svr := &htfnsvr{ cfg: cfg, rnr: rnr, - tasksin: tasks, + tasksin: make(chan task.Request), tasksout: make(chan task.Request, 1), - maxc: make(chan struct{}, cfg.MaxConcurrency), + first: make(chan struct{}, 1), } + svr.first <- struct{}{} // prime so that 1 thread will start the first container, others will wait // This pipe will take all incoming tasks and just forward them to the // started hot functions. The catch here is that it feeds a buffered @@ -190,7 +172,7 @@ func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan // then used to determine the presence of running hot functions. // If no hot function is available, tasksout will fill up to its // capacity and pipe() will start them. - go svr.pipe(ctx) + go svr.pipe(context.Background()) // XXX (reed): real context for adding consuela return svr } @@ -199,10 +181,18 @@ func (svr *htfnsvr) pipe(ctx context.Context) { select { case t := <-svr.tasksin: svr.tasksout <- t - if len(svr.tasksout) > 0 { - if err := svr.launch(ctx); err != nil { - logrus.WithError(err).Error("cannot start more hot functions") + + // TODO move checking for ram up here? then we can wait for hot functions to open up instead of always + // trying to make new ones if all hot functions are busy (and if machine is full and all functions are + // hot then most new hot functions are going to time out waiting to get available ram) + // TODO need to add some kind of metering here, we could track average run time and # of runners + select { + case _, ok := <-svr.first: // wait for >= 1 to be up to avoid herd + if ok || len(svr.tasksout) > 0 { + svr.launch(ctx) } + case <-ctx.Done(): // TODO we should prob watch the task timeout not just the pipe... + return } case <-ctx.Done(): return @@ -210,41 +200,25 @@ func (svr *htfnsvr) pipe(ctx context.Context) { } } -func (svr *htfnsvr) launch(ctx context.Context) error { - select { - case svr.maxc <- struct{}{}: - hc, err := newhtfn( - svr.cfg, - protocol.Protocol(svr.cfg.Format), - svr.tasksout, - svr.rnr, - ) - if err != nil { - return err - } - go func() { - hc.serve(ctx) - <-svr.maxc - }() - default: - } - - return nil +func (svr *htfnsvr) launch(ctx context.Context) { + hc := newhtfn( + svr.cfg, + svr.tasksout, + svr.rnr, + func() { svr.once.Do(func() { close(svr.first) }) }, + ) + go hc.serve(ctx) } -// htfn actually interfaces an incoming task from the common concurrency -// stream into a long lived container. If idle long enough, it will stop. It -// uses route configuration to determine which protocol to use. +// htfn is one instance of a hot container, which may or may not be running a +// task. If idle long enough, it will stop. It uses route configuration to +// determine which protocol to use. type htfn struct { id string cfg *task.Config proto protocol.ContainerIO tasks <-chan task.Request - - // Side of the pipe that takes information from outer world - // and injects into the container. - in io.Writer - out io.Reader + once func() // Receiving side of the container. containerIn io.Reader @@ -253,76 +227,50 @@ type htfn struct { rnr *Runner } -func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htfn, error) { +func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func()) *htfn { stdinr, stdinw := io.Pipe() stdoutr, stdoutw := io.Pipe() - p, err := protocol.New(proto, stdinw, stdoutr) - if err != nil { - return nil, err - } - - hc := &htfn{ + return &htfn{ id: uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", cfg.AppName, cfg.Path, time.Now().Unix())).String(), cfg: cfg, - proto: p, + proto: protocol.New(protocol.Protocol(cfg.Format), stdinw, stdoutr), tasks: tasks, - - in: stdinw, - out: stdoutr, + once: once, containerIn: stdinr, containerOut: stdoutw, rnr: rnr, } - - return hc, nil } func (hc *htfn) serve(ctx context.Context) { lctx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup + defer cancel() cfg := *hc.cfg - logFields := logrus.Fields{ - "hot_id": hc.id, - "app": cfg.AppName, - "route": cfg.Path, - "image": cfg.Image, - "memory": cfg.Memory, - "format": cfg.Format, - "max_concurrency": cfg.MaxConcurrency, - "idle_timeout": cfg.IdleTimeout, - } - logger := logrus.WithFields(logFields) + logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout}) - wg.Add(1) go func() { - defer wg.Done() for { - inactivity := time.After(cfg.IdleTimeout) - select { case <-lctx.Done(): return - - case <-inactivity: + case <-time.After(cfg.IdleTimeout): logger.Info("Canceling inactive hot function") cancel() - case t := <-hc.tasks: - if err := hc.proto.Dispatch(lctx, t); err != nil { + err := hc.proto.Dispatch(lctx, t) + status := "success" + if err != nil { + status = "error" logrus.WithField("ctx", lctx).Info("task failed") - t.Response <- task.Response{ - &runResult{StatusValue: "error", error: err}, - err, - } - continue } + hc.once() t.Response <- task.Response{ - &runResult{StatusValue: "success"}, - nil, + &runResult{StatusValue: status, error: err}, + err, } } } @@ -332,48 +280,18 @@ func (hc *htfn) serve(ctx context.Context) { cfg.Timeout = 0 // add a timeout to simulate ab.end. failure. cfg.Stdin = hc.containerIn cfg.Stdout = hc.containerOut + // NOTE: cfg.Stderr is overwritten in rnr.Run() - // Why can we not attach stderr to the task like we do for stdin and - // stdout? - // - // Stdin/Stdout are completely known to the scope of the task. You must - // have a task stdin to feed containers stdin, and also the other way - // around when reading from stdout. So both are directly related to the - // life cycle of the request. - // - // Stderr, on the other hand, can be written by anything any time: - // failure between requests, failures inside requests and messages send - // right after stdout has been finished being transmitted. Thus, with - // hot functions, there is not a 1:1 relation between stderr and tasks. - // - // Still, we do pass - at protocol level - a Task-ID header, from which - // the application running inside the hot function can use to identify - // its own stderr output. - errr, errw := io.Pipe() - cfg.Stderr = errw - wg.Add(1) - go func() { - defer wg.Done() - scanner := bufio.NewScanner(errr) - for scanner.Scan() { - logger.Info(scanner.Text()) - } - }() - - result, err := hc.rnr.Run(lctx, &cfg) + result, err := hc.rnr.run(lctx, &cfg) if err != nil { logger.WithError(err).Error("hot function failure detected") } - errw.Close() - wg.Wait() logger.WithField("result", result).Info("hot function terminated") } -func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) { - defer wg.Done() - rnr.Start() - defer rnr.Complete() - result, err := rnr.Run(t.Ctx, t.Config) +// TODO make Default protocol a real thing and get rid of this in favor of Dispatch +func runTaskReq(rnr *Runner, t task.Request) { + result, err := rnr.run(t.Ctx, t.Config) select { case t.Response <- task.Response{result, err}: close(t.Response) diff --git a/api/server/runner.go b/api/server/runner.go index fbfe0f321..a305cb2f5 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -152,7 +152,7 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]* } // TODO: Should remove *gin.Context from these functions, should use only context.Context -func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue, ) (ok bool) { +func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) params, match := matchRoute(found.Path, route) @@ -193,18 +193,17 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun } cfg := &task.Config{ - AppName: appName, - Path: found.Path, - Env: envVars, - Format: found.Format, - ID: reqID, - Image: found.Image, - MaxConcurrency: found.MaxConcurrency, - Memory: found.Memory, - Stdin: payload, - Stdout: &stdout, - Timeout: time.Duration(found.Timeout) * time.Second, - IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, + AppName: appName, + Path: found.Path, + Env: envVars, + Format: found.Format, + ID: reqID, + Image: found.Image, + Memory: found.Memory, + Stdin: payload, + Stdout: &stdout, + Timeout: time.Duration(found.Timeout) * time.Second, + IdleTimeout: time.Duration(found.IdleTimeout) * time.Second, } // ensure valid values @@ -244,8 +243,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID}) default: - result, err := runner.RunTrackedTask(newTask, s.tasks, ctx, cfg, s.Datastore) - + result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg, s.Datastore) if err != nil { c.JSON(http.StatusInternalServerError, runnerResponse{ RequestID: cfg.ID, diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 37f4409a1..3bb923c30 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -121,8 +121,6 @@ func TestRouteRunnerExecution(t *testing.T) { rnr, cancelrnr := testRunner(t) defer cancelrnr() - go runner.StartWorkers(ctx, rnr, tasks) - srv := testServer(datastore.NewMockInit( []*models.App{ {Name: "myapp", Config: models.Config{}}, @@ -179,7 +177,6 @@ func TestRouteRunnerTimeout(t *testing.T) { rnr, cancelrnr := testRunner(t) defer cancelrnr() - go runner.StartWorkers(ctx, rnr, tasks) srv := testServer(datastore.NewMockInit( []*models.App{ diff --git a/api/server/server.go b/api/server/server.go index bf1de836e..0ce248442 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -21,7 +21,6 @@ import ( "gitlab-odx.oracle.com/odx/functions/api/mqs" "gitlab-odx.oracle.com/odx/functions/api/runner" "gitlab-odx.oracle.com/odx/functions/api/runner/common" - "gitlab-odx.oracle.com/odx/functions/api/runner/task" "gitlab-odx.oracle.com/odx/functions/api/server/internal/routecache" ) @@ -49,7 +48,6 @@ type Server struct { mu sync.Mutex // protects hotroutes hotroutes *routecache.Cache - tasks chan task.Request singleflight singleflight // singleflight assists Datastore } @@ -83,14 +81,12 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, apiUR return nil } - tasks := make(chan task.Request) s := &Server{ Runner: rnr, Router: gin.New(), Datastore: ds, MQ: mq, hotroutes: routecache.New(cacheSize), - tasks: tasks, Enqueue: DefaultEnqueue, apiURL: apiURL, } @@ -200,7 +196,6 @@ func extractFields(c *gin.Context) logrus.Fields { func (s *Server) Start(ctx context.Context) { ctx = contextWithSignal(ctx, os.Interrupt) s.startGears(ctx) - close(s.tasks) } func (s *Server) startGears(ctx context.Context) { @@ -245,14 +240,11 @@ func (s *Server) startGears(ctx context.Context) { }) svr.AddFunc(func(ctx context.Context) { - runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner, s.Datastore) - }) - - svr.AddFunc(func(ctx context.Context) { - runner.StartWorkers(ctx, s.Runner, s.tasks) + runner.RunAsyncRunner(ctx, s.apiURL, s.Runner, s.Datastore) }) svr.Serve(ctx) + s.Runner.Wait() // wait for tasks to finish (safe shutdown) } func (s *Server) bindHandlers(ctx context.Context) { @@ -321,11 +313,11 @@ type tasksResponse struct { } type fnCallResponse struct { - Message string `json:"message"` + Message string `json:"message"` Call *models.FnCall `json:"call"` } type fnCallsResponse struct { - Message string `json:"message"` - Calls models.FnCalls `json:"calls"` + Message string `json:"message"` + Calls models.FnCalls `json:"calls"` } diff --git a/api/server/server_test.go b/api/server/server_test.go index b4746a50a..9260e8b69 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -104,8 +104,6 @@ func TestFullStack(t *testing.T) { rnr, rnrcancel := testRunner(t) defer rnrcancel() - go runner.StartWorkers(ctx, rnr, tasks) - srv := testServer(ds, &mqs.Mock{}, rnr, tasks) srv.hotroutes = routecache.New(2) diff --git a/api/server/special_handler_test.go b/api/server/special_handler_test.go index c424b4b43..85ddf23f8 100644 --- a/api/server/special_handler_test.go +++ b/api/server/special_handler_test.go @@ -20,8 +20,6 @@ func TestSpecialHandlerSet(t *testing.T) { // rnr, cancelrnr := testRunner(t) // defer cancelrnr() - // go runner.StartWorkers(ctx, rnr, tasks) - // s := &Server{ // Runner: rnr, // Router: gin.New(), diff --git a/docs/function-file.md b/docs/function-file.md index 6ae65cc31..7a53e9a54 100644 --- a/docs/function-file.md +++ b/docs/function-file.md @@ -66,12 +66,6 @@ hot functions support also adds two extra options to this configuration file. `idle_timeout` (optional) is the time in seconds a container will remain alive without receiving any new requests; hot functions will stay alive as long as they receive a request in this interval. Default: `30`. -`max_concurrency` (optional) is the maximum of hot functions per node to be -started for a certain function. It defaults to one per function. If you -understand you need more processing power, make sure to raise this number. -Keep in mind that if there is not available memory to execute the configured -workload, it will fail to start new hot functions. - ## Testing functions `tests` (optional) is an array of tests that can be used to valid functions both diff --git a/docs/hot-functions.md b/docs/hot-functions.md index 01842f1eb..5300c6184 100644 --- a/docs/hot-functions.md +++ b/docs/hot-functions.md @@ -97,7 +97,6 @@ requests: "type": "sync", "config": null, "format": "http", - "max_concurrency": "1", "idle_timeout": 30 } } @@ -108,8 +107,5 @@ requests: `format` (mandatory) either "default" or "http". If "http", then it is a hot container. -`max_concurrency` (optional) - the number of simultaneous hot functions for -this functions. This is a per-node configuration option. Default: 1 - `idle_timeout` (optional) - idle timeout (in seconds) before function termination. diff --git a/docs/swagger.yml b/docs/swagger.yml index f61c629ce..4b0926621 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -416,10 +416,6 @@ definitions: - http - json description: Payload format sent into function. - max_concurrency: - type: integer - format: int32 - description: Maximum number of hot functions concurrency config: type: object description: Route configuration - overrides application configuration @@ -653,4 +649,4 @@ definitions: - task properties: task: - $ref: '#/definitions/Task' \ No newline at end of file + $ref: '#/definitions/Task' diff --git a/examples/tutorial/hotfunctions/http/hotroute.json b/examples/tutorial/hotfunctions/http/hotroute.json index af7e65172..58438a235 100644 --- a/examples/tutorial/hotfunctions/http/hotroute.json +++ b/examples/tutorial/hotfunctions/http/hotroute.json @@ -1,7 +1,4 @@ {"route":{ - "app_name": "myapp", - "path": "/hot", - "image": "USERNAME/hchttp", "memory": 64, "type": "sync", "config": null, diff --git a/examples/tutorial/params/func.yaml b/examples/tutorial/params/func.yaml index f26e7a126..2f147a155 100644 --- a/examples/tutorial/params/func.yaml +++ b/examples/tutorial/params/func.yaml @@ -3,4 +3,3 @@ version: 0.0.6 runtime: go entrypoint: ./func path: /fn3 -max_concurrency: 1 diff --git a/fn/funcfile.go b/fn/funcfile.go index 1545eb1f2..a5a8204de 100644 --- a/fn/funcfile.go +++ b/fn/funcfile.go @@ -32,21 +32,20 @@ type fftest struct { } type funcfile struct { - Name string `yaml:"name,omitempty" json:"name,omitempty"` - Version string `yaml:"version,omitempty" json:"version,omitempty"` - Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"` - Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"` - Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"` - Type *string `yaml:"type,omitempty" json:"type,omitempty"` - Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"` - Format *string `yaml:"format,omitempty" json:"format,omitempty"` - Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` - Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"` - Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"` - Build []string `yaml:"build,omitempty" json:"build,omitempty"` - Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"` - Path *string `yaml:"path,omitempty" json:"path,omitempty"` - MaxConcurrency *int `yaml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + Version string `yaml:"version,omitempty" json:"version,omitempty"` + Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"` + Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"` + Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"` + Type *string `yaml:"type,omitempty" json:"type,omitempty"` + Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"` + Format *string `yaml:"format,omitempty" json:"format,omitempty"` + Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` + Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"` + Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"` + Build []string `yaml:"build,omitempty" json:"build,omitempty"` + Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"` + Path *string `yaml:"path,omitempty" json:"path,omitempty"` } func (ff *funcfile) FullName() string { diff --git a/fn/init.go b/fn/init.go index 06ac61e01..903b71bde 100644 --- a/fn/init.go +++ b/fn/init.go @@ -44,13 +44,12 @@ func init() { } type initFnCmd struct { - name string - force bool - runtime string - entrypoint string - cmd string - format string - maxConcurrency int + name string + force bool + runtime string + entrypoint string + cmd string + format string } func initFn() cli.Command { @@ -84,12 +83,6 @@ func initFn() cli.Command { Destination: &a.format, Value: "", }, - cli.IntFlag{ - Name: "max-concurrency", - Usage: "maximum concurrency for hot function", - Destination: &a.maxConcurrency, - Value: 1, - }, }, } } @@ -125,13 +118,12 @@ func (a *initFnCmd) init(c *cli.Context) error { } ff := &funcfile{ - Name: a.name, - Runtime: &a.runtime, - Version: initialVersion, - Entrypoint: a.entrypoint, - Cmd: a.cmd, - Format: ffmt, - MaxConcurrency: &a.maxConcurrency, + Name: a.name, + Runtime: &a.runtime, + Version: initialVersion, + Entrypoint: a.entrypoint, + Cmd: a.cmd, + Format: ffmt, } _, path := appNamePath(ff.FullName()) diff --git a/fn/routes.go b/fn/routes.go index 6662b7feb..7c18af29e 100644 --- a/fn/routes.go +++ b/fn/routes.go @@ -49,10 +49,6 @@ var routeFlags = []cli.Flag{ Name: "format,f", Usage: "hot container IO format - json or http", }, - cli.IntFlag{ - Name: "max-concurrency,mc", - Usage: "maximum concurrency for hot container", - }, cli.DurationFlag{ Name: "timeout", Usage: "route timeout (eg. 30s)", @@ -259,10 +255,6 @@ func routeWithFlags(c *cli.Context, rt *fnmodels.Route) { rt.Type = t } - if m := c.Int("max-concurrency"); m > 0 { - rt.MaxConcurrency = int32(m) - } - if m := c.Int64("memory"); m > 0 { rt.Memory = m } @@ -300,9 +292,6 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error { if ff.Format != nil { rt.Format = *ff.Format } - if ff.MaxConcurrency != nil { - rt.MaxConcurrency = int32(*ff.MaxConcurrency) - } if ff.Timeout != nil { to := int64(ff.Timeout.Seconds()) rt.Timeout = &to @@ -422,9 +411,6 @@ func (a *routesCmd) patchRoute(c *cli.Context, appName, routePath string, r *fnm if r.Type != "" { resp.Payload.Route.Type = r.Type } - if r.MaxConcurrency > 0 { - resp.Payload.Route.MaxConcurrency = r.MaxConcurrency - } if r.Memory > 0 { resp.Payload.Route.Memory = r.Memory } diff --git a/test/fnlb-test-harness/primes-func/func.yaml b/test/fnlb-test-harness/primes-func/func.yaml index 71aa5f03f..32b3ff10d 100644 --- a/test/fnlb-test-harness/primes-func/func.yaml +++ b/test/fnlb-test-harness/primes-func/func.yaml @@ -3,4 +3,3 @@ version: 0.0.1 runtime: go entrypoint: ./func path: /primes -max_concurrency: 1