clean up hotf(x) concurrency, rm max c

this patch gets rid of max concurrency for functions altogether, as discussed,
since it will be challenging to support across functions nodes. as a result of
doing so, the previous version of functions would fall over when offered 1000
functions, so there was some work needed in order to push this through.
further work is necessary as docker basically falls over when trying to start
enough containers at the same time, and with this patch essentially every
function can scale infinitely. it seems like we could add some kind of
adaptive restrictions based on task run length and configured wait time so
that fast running functions will line up to run in a hot container instead of
them all creating new hot containers.

this patch takes a first cut at whacking out some of the insanity that was the
previous concurrency model, which was problematic in that it limited
concurrency significantly across all functions since every task went through
the same unbuffered channel, which could create blocking issues for all
functions if the channel is not picked off fast enough (it's not apparent that
this was impossible in the previous implementation). in any event, each
request has a goroutine already, there's no reason not to use it. not too hard
to wrap a map in a lock, not sure what the benefits were (added insanity?) in effect
this is marginally easier to understand and less insane (marginally). after
getting rid of max c this adds a blocking mechanism for the first invocation
of any function so that all other hot functions will wait on the first one to
finish to avoid a herd issue (was making docker die...) -- this could be
slightly improved, but works in a pinch. reduced some memory usage by having
redundant maps of htfnsvr's and task.Requests (by a factor of 2!). cleaned up
some of the protocol stuff, need to clean this up further. anyway, it's a
first cut. have another patch that rewrites all of it but was getting into
rabbit hole territory, would be happy to oblige if anybody else has problems
understanding this rat's nest of channels. there is a good bit of work left to
make this prod ready (regardless of removing max c).

a warning that this will break the db schemas, didn't put the effort in to add
migration stuff since this isn't deployed anywhere in prod...

TODO need to clean out the htfnmgr bucket with LRU
TODO need to clean up runner interface
TODO need to unify the task running paths across protocols
TODO need to move the ram checking stuff into worker for noted reasons
TODO need better elasticity of hot f(x) containers
This commit is contained in:
Reed Allman
2017-05-31 00:05:25 -07:00
parent 17bbfa0cdf
commit 9edacae928
24 changed files with 250 additions and 391 deletions

View File

@@ -4,8 +4,8 @@ import (
"bytes"
"database/sql"
"encoding/json"
"github.com/Sirupsen/logrus"
"fmt"
"github.com/Sirupsen/logrus"
"strings"
"gitlab-odx.oracle.com/odx/functions/api/models"
@@ -15,7 +15,6 @@ type RowScanner interface {
Scan(dest ...interface{}) error
}
func ScanRoute(scanner RowScanner, route *models.Route) error {
var headerStr string
var configStr string
@@ -25,7 +24,6 @@ func ScanRoute(scanner RowScanner, route *models.Route) error {
&route.Path,
&route.Image,
&route.Format,
&route.MaxConcurrency,
&route.Memory,
&route.Type,
&route.Timeout,
@@ -107,7 +105,6 @@ func BuildFilterRouteQuery(filter *models.RouteFilter, whereStm, andStm string)
return b.String(), args
}
func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []interface{}) {
if filter == nil {
return "", nil
@@ -120,7 +117,6 @@ func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []i
return "", nil
}
func BuildFilterCallQuery(filter *models.CallFilter, whereStm, andStm string) (string, []interface{}) {
if filter == nil {
return "", nil
@@ -269,7 +265,6 @@ func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, err
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info(
fmt.Sprintf("%v datastore dialed", dialect))
for _, v := range tables {
_, err = db.Exec(v)
if err != nil {

View File

@@ -18,7 +18,6 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes (
path varchar(256) NOT NULL,
image varchar(256) NOT NULL,
format varchar(16) NOT NULL,
maxc int NOT NULL,
memory int NOT NULL,
timeout int NOT NULL,
idle_timeout int NOT NULL,
@@ -38,7 +37,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value varchar(256) NOT NULL
);`
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
const callTableCreate = `CREATE TABLE IF NOT EXISTS calls (
created_at varchar(256) NOT NULL,
@@ -53,7 +52,6 @@ const callTableCreate = `CREATE TABLE IF NOT EXISTS calls (
const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls`
/*
MySQLDatastore defines a basic MySQL Datastore struct.
*/
@@ -237,7 +235,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
path,
image,
format,
maxc,
memory,
type,
timeout,
@@ -250,7 +247,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
route.Path,
route.Image,
route.Format,
route.MaxConcurrency,
route.Memory,
route.Type,
route.Timeout,
@@ -296,7 +292,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
UPDATE routes SET
image = ?,
format = ?,
maxc = ?,
memory = ?,
type = ?,
timeout = ?,
@@ -306,7 +301,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
WHERE app_name = ? AND path = ?;`,
route.Image,
route.Format,
route.MaxConcurrency,
route.Memory,
route.Type,
route.Timeout,
@@ -341,7 +335,6 @@ RemoveRoute removes an existing route on MySQL.
*/
func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error {
deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?`
return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm)
}

View File

@@ -1,17 +1,16 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"
"context"
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
"gitlab-odx.oracle.com/odx/functions/api/models"
"github.com/lib/pq"
_ "github.com/lib/pq"
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
"gitlab-odx.oracle.com/odx/functions/api/models"
)
const routesTableCreate = `
@@ -20,7 +19,6 @@ CREATE TABLE IF NOT EXISTS routes (
path text NOT NULL,
image character varying(256) NOT NULL,
format character varying(16) NOT NULL,
maxc integer NOT NULL,
memory integer NOT NULL,
timeout integer NOT NULL,
idle_timeout integer NOT NULL,
@@ -40,7 +38,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value character varying(256) NOT NULL
);`
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls (
created_at character varying(256) NOT NULL,
@@ -203,7 +201,6 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
path,
image,
format,
maxc,
memory,
type,
timeout,
@@ -211,12 +208,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
headers,
config
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
route.AppName,
route.Path,
route.Image,
route.Format,
route.MaxConcurrency,
route.Memory,
route.Type,
route.Timeout,
@@ -259,19 +255,17 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
UPDATE routes SET
image = $3,
format = $4,
maxc = $5,
memory = $6,
type = $7,
timeout = $8,
idle_timeout = $9,
headers = $10,
config = $11
memory = $5,
type = $6,
timeout = $7,
idle_timeout = $8,
headers = $9,
config = $10
WHERE app_name = $1 AND path = $2;`,
route.AppName,
route.Path,
route.Image,
route.Format,
route.MaxConcurrency,
route.Memory,
route.Type,
route.Timeout,
@@ -301,7 +295,6 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error {
deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2`
return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm)
}

View File

@@ -38,7 +38,6 @@ type Route struct {
Headers http.Header `json:"headers"`
Type string `json:"type"`
Format string `json:"format"`
MaxConcurrency int `json:"max_concurrency"`
Timeout int32 `json:"timeout"`
IdleTimeout int32 `json:"idle_timeout"`
Config `json:"config"`
@@ -57,7 +56,6 @@ var (
ErrRoutesValidationPathMalformed = errors.New("Path malformed")
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout")
ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency")
)
// SetDefaults sets zeroed field to defaults.
@@ -74,10 +72,6 @@ func (r *Route) SetDefaults() {
r.Format = FormatDefault
}
if r.MaxConcurrency == 0 {
r.MaxConcurrency = 1
}
if r.Headers == nil {
r.Headers = http.Header{}
}
@@ -140,10 +134,6 @@ func (r *Route) Validate(skipZero bool) error {
}
}
if r.MaxConcurrency < 0 {
res = append(res, ErrRoutesValidationNegativeMaxConcurrency)
}
if r.Timeout < 0 {
res = append(res, ErrRoutesValidationNegativeTimeout)
}
@@ -188,9 +178,6 @@ func (r *Route) Update(new *Route) {
if new.Format != "" {
r.Format = new.Format
}
if new.MaxConcurrency != 0 {
r.MaxConcurrency = new.MaxConcurrency
}
if new.Headers != nil {
if r.Headers == nil {
r.Headers = make(http.Header)

View File

@@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error {
}
// RunAsyncRunner pulls tasks off a queue and processes them
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
func RunAsyncRunner(ctx context.Context, tasksrv string, rnr *Runner, ds models.Datastore) {
u := tasksrvURL(tasksrv)
startAsyncRunners(ctx, u, tasks, rnr, ds)
startAsyncRunners(ctx, u, rnr, ds)
<-ctx.Done()
}
func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.Datastore) {
var wg sync.WaitGroup
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"})
for {
@@ -133,7 +133,7 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request,
go func() {
defer wg.Done()
// Process Task
_, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds)
_, err := rnr.RunTrackedTask(task, ctx, getCfg(task), ds)
if err != nil {
log.WithError(err).Error("Cannot run task")
}

View File

@@ -29,25 +29,49 @@ const (
Empty Protocol = ""
)
// New creates a valid protocol handler from a I/O pipe representing containers
// stdin/stdout.
func New(p Protocol, in io.Writer, out io.Reader) (ContainerIO, error) {
switch p {
func (p *Protocol) UnmarshalJSON(b []byte) error {
switch Protocol(b) {
case Empty, Default:
*p = Default
case HTTP:
return &HTTPProtocol{in, out}, nil
case Default, Empty:
return &DefaultProtocol{}, nil
*p = HTTP
default:
return errInvalidProtocol
}
return nil
}
func (p Protocol) MarshalJSON() ([]byte, error) {
switch p {
case Empty, Default:
return []byte(Default), nil
case HTTP:
return []byte(HTTP), nil
}
return nil, errInvalidProtocol
}
// implements ContainerIO
type errorProto struct{}
func (e *errorProto) IsStreamable() bool { return false }
func (e *errorProto) Dispatch(ctx context.Context, t task.Request) error { return errInvalidProtocol }
// New creates a valid protocol handler from a I/O pipe representing containers
// stdin/stdout.
func New(p Protocol, in io.Writer, out io.Reader) ContainerIO {
switch p {
case HTTP:
return &HTTPProtocol{in, out}
case Default, Empty:
return &DefaultProtocol{}
}
return &errorProto{} // shouldn't make it past testing...
}
// IsStreamable says whether the given protocol can be used for streaming into
// hot functions.
func IsStreamable(p string) (bool, error) {
proto, err := New(Protocol(p), nil, nil)
if err != nil {
return false, err
}
return proto.IsStreamable(), nil
// TODO get rid of ContainerIO and just use Protocol
func IsStreamable(p Protocol) bool {
return New(p, nil, nil).IsStreamable()
}

View File

@@ -16,12 +16,14 @@ import (
"github.com/Sirupsen/logrus"
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
driverscommon "gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers/docker"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers/mock"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
)
// TODO clean all of this up, the exposed API is huge and incohesive,
// we need 1 thing that runs 1 thing and 1 thing that runs those things;
// right now this is all the things.
type Runner struct {
driver drivers.Driver
taskQueue chan *containerTask
@@ -30,6 +32,7 @@ type Runner struct {
availableMem int64
usedMem int64
usedMemMutex sync.RWMutex
hcmgr htfnmgr
stats
}
@@ -50,7 +53,7 @@ func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, erro
env := common.NewEnvironment(func(e *common.Environment) {})
// TODO: Create a drivers.New(runnerConfig) in Titan
driver, err := selectDriver("docker", env, &driverscommon.Config{})
driver, err := selectDriver("docker", env, &drivers.Config{})
if err != nil {
return nil, err
}
@@ -158,7 +161,8 @@ func (r *Runner) checkMemAndUse(req uint64) bool {
return true
}
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
// run is responsible for running 1 instance of a docker container
func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
var err error
if cfg.Memory == 0 {
@@ -241,7 +245,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error {
return err
}
func selectDriver(driver string, env *common.Environment, conf *driverscommon.Config) (drivers.Driver, error) {
func selectDriver(driver string, env *common.Environment, conf *drivers.Config) (drivers.Driver, error) {
switch driver {
case "docker":
docker := docker.NewDocker(env, *conf)

View File

@@ -7,6 +7,8 @@ type stats struct {
queue uint64
running uint64
complete uint64
wait sync.WaitGroup
}
type Stats struct {
@@ -22,6 +24,7 @@ func (s *stats) Enqueue() {
}
func (s *stats) Start() {
s.wait.Add(1)
s.mu.Lock()
s.queue--
s.running++
@@ -29,6 +32,7 @@ func (s *stats) Start() {
}
func (s *stats) Complete() {
s.wait.Done()
s.mu.Lock()
s.running--
s.complete++
@@ -44,3 +48,5 @@ func (s *stats) Stats() Stats {
s.mu.Unlock()
return stats
}
func (s *stats) Wait() { s.wait.Wait() }

View File

@@ -18,16 +18,14 @@ type Config struct {
Memory uint64
Env map[string]string
Format string
MaxConcurrency int
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// Request stores the task to be executed by the common concurrency stream,
// whatever type the ask actually is, either sync or async. It holds in itself
// the channel to return its response to its caller.
// Request stores the task to be executed, It holds in itself the channel to
// return its response to its caller.
type Request struct {
Ctx context.Context
Config *Config

View File

@@ -1,7 +1,6 @@
package runner
import (
"bufio"
"context"
"fmt"
"io"
@@ -9,12 +8,12 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/go-openapi/strfmt"
uuid "github.com/satori/go.uuid"
"gitlab-odx.oracle.com/odx/functions/api/models"
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
"gitlab-odx.oracle.com/odx/functions/api/runner/protocol"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
"github.com/go-openapi/strfmt"
)
// hot functions - theory of operation
@@ -42,12 +41,6 @@ import (
// Incoming
// Task
// │
// ▼
// ┌───────────────┐
// │ Task Request │
// │ Main Loop │
// └───────────────┘
// │
// ┌──────▼────────┐
// ┌┴──────────────┐│
// │ Per Function ││ non-streamable f()
@@ -64,13 +57,12 @@ import (
// Terminate
// (internal clock)
// RunTrackedTask is just a wrapper for shared logic for async/sync runners
func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
startedAt := strfmt.DateTime(time.Now())
newTask.StartedAt = startedAt
result, err := RunTask(tasks, ctx, cfg)
result, err := rnr.RunTask(ctx, cfg)
completedAt := strfmt.DateTime(time.Now())
status := result.Status()
@@ -78,89 +70,72 @@ func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.C
newTask.Status = status
err = ds.InsertTask(ctx, newTask)
// TODO we should just log this error not return it to user? just issue storing task status but task is run
return result, err
}
// RunTask will dispatch a task specified by cfg to a hot container, if possible,
// that already exists or will create a new container to run a task and then run it.
// TODO XXX (reed): merge this and RunTrackedTask to reduce surface area...
func (rnr *Runner) RunTask(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
rnr.Start() // TODO layering issue ???
defer rnr.Complete()
// RunTask helps sending a task.Request into the common concurrency stream.
// Refer to StartWorkers() to understand what this is about.
func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
tresp := make(chan task.Response)
treq := task.Request{Ctx: ctx, Config: cfg, Response: tresp}
tasks := rnr.hcmgr.getPipe(ctx, rnr, cfg)
if tasks == nil {
// TODO get rid of this to use herd stuff
go runTaskReq(rnr, treq)
} else {
tasks <- treq
}
resp := <-treq.Response
return resp.Result, resp.Err
}
// StartWorkers operates the common concurrency stream, ie, it will process all
// functions tasks, either sync or async. In the process, it also dispatches
// the workload to either regular or hot functions.
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) {
var wg sync.WaitGroup
defer wg.Wait()
var hcmgr htfnmgr
for {
select {
case <-ctx.Done():
return
case task := <-tasks:
p := hcmgr.getPipe(ctx, rnr, task.Config)
if p == nil {
wg.Add(1)
go runTaskReq(rnr, &wg, task)
continue
}
rnr.Start()
select {
case <-ctx.Done():
return
case p <- task:
rnr.Complete()
}
}
}
}
// htfnmgr is the intermediate between the common concurrency stream and
// hot functions. All hot functions share a single task.Request stream per
// function (chn), but each function may have more than one hot function (hc).
// htfnmgr tracks all hot functions, used to funnel kittens into existing tubes
// XXX (reed): this map grows unbounded, need to add LRU but need to make
// sure that no functions are running when we evict
type htfnmgr struct {
chn map[string]chan task.Request
sync.RWMutex
hc map[string]*htfnsvr
}
func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request {
isStream, err := protocol.IsStreamable(cfg.Format)
if err != nil {
logrus.WithError(err).Info("could not detect container IO protocol")
return nil
} else if !isStream {
func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan<- task.Request {
isStream := protocol.IsStreamable(protocol.Protocol(cfg.Format))
if !isStream {
// TODO stop doing this, to prevent herds
return nil
}
if h.chn == nil {
h.chn = make(map[string]chan task.Request)
h.RLock()
if h.hc == nil {
h.RUnlock()
h.Lock()
if h.hc == nil {
h.hc = make(map[string]*htfnsvr)
}
h.Unlock()
h.RLock()
}
// TODO(ccirello): re-implement this without memory allocation (fmt.Sprint)
fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format, cfg.MaxConcurrency)
tasks, ok := h.chn[fn]
fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format)
svr, ok := h.hc[fn]
h.RUnlock()
if !ok {
h.chn[fn] = make(chan task.Request)
tasks = h.chn[fn]
svr := newhtfnsvr(ctx, cfg, rnr, tasks)
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start hot function supervisor")
return nil
}
h.Lock()
svr, ok = h.hc[fn]
if !ok {
svr = newhtfnsvr(ctx, cfg, rnr)
h.hc[fn] = svr
}
h.Unlock()
}
return tasks
return svr.tasksin
}
// htfnsvr is part of htfnmgr, abstracted apart for simplicity, its only
@@ -170,19 +145,26 @@ func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) ch
type htfnsvr struct {
cfg *task.Config
rnr *Runner
tasksin <-chan task.Request
// TODO sharing with only a channel among hot containers will result in
// inefficient recycling of containers, we need a stack not a queue, so that
// when a lot of hot containers are up and throughput drops they don't all
// find a task every few seconds and stay up for a lot longer than we really
// need them.
tasksin chan task.Request
tasksout chan task.Request
maxc chan struct{}
first chan struct{}
once sync.Once // TODO this really needs to happen any time runner count goes to 0
}
func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htfnsvr {
func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner) *htfnsvr {
svr := &htfnsvr{
cfg: cfg,
rnr: rnr,
tasksin: tasks,
tasksin: make(chan task.Request),
tasksout: make(chan task.Request, 1),
maxc: make(chan struct{}, cfg.MaxConcurrency),
first: make(chan struct{}, 1),
}
svr.first <- struct{}{} // prime so that 1 thread will start the first container, others will wait
// This pipe will take all incoming tasks and just forward them to the
// started hot functions. The catch here is that it feeds a buffered
@@ -190,7 +172,7 @@ func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan
// then used to determine the presence of running hot functions.
// If no hot function is available, tasksout will fill up to its
// capacity and pipe() will start them.
go svr.pipe(ctx)
go svr.pipe(context.Background()) // XXX (reed): real context for adding consuela
return svr
}
@@ -199,10 +181,18 @@ func (svr *htfnsvr) pipe(ctx context.Context) {
select {
case t := <-svr.tasksin:
svr.tasksout <- t
if len(svr.tasksout) > 0 {
if err := svr.launch(ctx); err != nil {
logrus.WithError(err).Error("cannot start more hot functions")
// TODO move checking for ram up here? then we can wait for hot functions to open up instead of always
// trying to make new ones if all hot functions are busy (and if machine is full and all functions are
// hot then most new hot functions are going to time out waiting to get available ram)
// TODO need to add some kind of metering here, we could track average run time and # of runners
select {
case _, ok := <-svr.first: // wait for >= 1 to be up to avoid herd
if ok || len(svr.tasksout) > 0 {
svr.launch(ctx)
}
case <-ctx.Done(): // TODO we should prob watch the task timeout not just the pipe...
return
}
case <-ctx.Done():
return
@@ -210,41 +200,25 @@ func (svr *htfnsvr) pipe(ctx context.Context) {
}
}
func (svr *htfnsvr) launch(ctx context.Context) error {
select {
case svr.maxc <- struct{}{}:
hc, err := newhtfn(
func (svr *htfnsvr) launch(ctx context.Context) {
hc := newhtfn(
svr.cfg,
protocol.Protocol(svr.cfg.Format),
svr.tasksout,
svr.rnr,
func() { svr.once.Do(func() { close(svr.first) }) },
)
if err != nil {
return err
}
go func() {
hc.serve(ctx)
<-svr.maxc
}()
default:
go hc.serve(ctx)
}
return nil
}
// htfn actually interfaces an incoming task from the common concurrency
// stream into a long lived container. If idle long enough, it will stop. It
// uses route configuration to determine which protocol to use.
// htfn is one instance of a hot container, which may or may not be running a
// task. If idle long enough, it will stop. It uses route configuration to
// determine which protocol to use.
type htfn struct {
id string
cfg *task.Config
proto protocol.ContainerIO
tasks <-chan task.Request
// Side of the pipe that takes information from outer world
// and injects into the container.
in io.Writer
out io.Reader
once func()
// Receiving side of the container.
containerIn io.Reader
@@ -253,76 +227,50 @@ type htfn struct {
rnr *Runner
}
func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htfn, error) {
func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func()) *htfn {
stdinr, stdinw := io.Pipe()
stdoutr, stdoutw := io.Pipe()
p, err := protocol.New(proto, stdinw, stdoutr)
if err != nil {
return nil, err
}
hc := &htfn{
return &htfn{
id: uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", cfg.AppName, cfg.Path, time.Now().Unix())).String(),
cfg: cfg,
proto: p,
proto: protocol.New(protocol.Protocol(cfg.Format), stdinw, stdoutr),
tasks: tasks,
in: stdinw,
out: stdoutr,
once: once,
containerIn: stdinr,
containerOut: stdoutw,
rnr: rnr,
}
return hc, nil
}
func (hc *htfn) serve(ctx context.Context) {
lctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
defer cancel()
cfg := *hc.cfg
logFields := logrus.Fields{
"hot_id": hc.id,
"app": cfg.AppName,
"route": cfg.Path,
"image": cfg.Image,
"memory": cfg.Memory,
"format": cfg.Format,
"max_concurrency": cfg.MaxConcurrency,
"idle_timeout": cfg.IdleTimeout,
}
logger := logrus.WithFields(logFields)
logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout})
wg.Add(1)
go func() {
defer wg.Done()
for {
inactivity := time.After(cfg.IdleTimeout)
select {
case <-lctx.Done():
return
case <-inactivity:
case <-time.After(cfg.IdleTimeout):
logger.Info("Canceling inactive hot function")
cancel()
case t := <-hc.tasks:
if err := hc.proto.Dispatch(lctx, t); err != nil {
err := hc.proto.Dispatch(lctx, t)
status := "success"
if err != nil {
status = "error"
logrus.WithField("ctx", lctx).Info("task failed")
t.Response <- task.Response{
&runResult{StatusValue: "error", error: err},
err,
}
continue
}
hc.once()
t.Response <- task.Response{
&runResult{StatusValue: "success"},
nil,
&runResult{StatusValue: status, error: err},
err,
}
}
}
@@ -332,48 +280,18 @@ func (hc *htfn) serve(ctx context.Context) {
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
cfg.Stdin = hc.containerIn
cfg.Stdout = hc.containerOut
// NOTE: cfg.Stderr is overwritten in rnr.Run()
// Why can we not attach stderr to the task like we do for stdin and
// stdout?
//
// Stdin/Stdout are completely known to the scope of the task. You must
// have a task stdin to feed containers stdin, and also the other way
// around when reading from stdout. So both are directly related to the
// life cycle of the request.
//
// Stderr, on the other hand, can be written by anything any time:
// failure between requests, failures inside requests and messages send
// right after stdout has been finished being transmitted. Thus, with
// hot functions, there is not a 1:1 relation between stderr and tasks.
//
// Still, we do pass - at protocol level - a Task-ID header, from which
// the application running inside the hot function can use to identify
// its own stderr output.
errr, errw := io.Pipe()
cfg.Stderr = errw
wg.Add(1)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(errr)
for scanner.Scan() {
logger.Info(scanner.Text())
}
}()
result, err := hc.rnr.Run(lctx, &cfg)
result, err := hc.rnr.run(lctx, &cfg)
if err != nil {
logger.WithError(err).Error("hot function failure detected")
}
errw.Close()
wg.Wait()
logger.WithField("result", result).Info("hot function terminated")
}
func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) {
defer wg.Done()
rnr.Start()
defer rnr.Complete()
result, err := rnr.Run(t.Ctx, t.Config)
// TODO make Default protocol a real thing and get rid of this in favor of Dispatch
func runTaskReq(rnr *Runner, t task.Request) {
result, err := rnr.run(t.Ctx, t.Config)
select {
case t.Response <- task.Response{result, err}:
close(t.Response)

View File

@@ -152,7 +152,7 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]*
}
// TODO: Should remove *gin.Context from these functions, should use only context.Context
func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue, ) (ok bool) {
func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image})
params, match := matchRoute(found.Path, route)
@@ -199,7 +199,6 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
Format: found.Format,
ID: reqID,
Image: found.Image,
MaxConcurrency: found.MaxConcurrency,
Memory: found.Memory,
Stdin: payload,
Stdout: &stdout,
@@ -244,8 +243,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID})
default:
result, err := runner.RunTrackedTask(newTask, s.tasks, ctx, cfg, s.Datastore)
result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg, s.Datastore)
if err != nil {
c.JSON(http.StatusInternalServerError, runnerResponse{
RequestID: cfg.ID,

View File

@@ -121,8 +121,6 @@ func TestRouteRunnerExecution(t *testing.T) {
rnr, cancelrnr := testRunner(t)
defer cancelrnr()
go runner.StartWorkers(ctx, rnr, tasks)
srv := testServer(datastore.NewMockInit(
[]*models.App{
{Name: "myapp", Config: models.Config{}},
@@ -179,7 +177,6 @@ func TestRouteRunnerTimeout(t *testing.T) {
rnr, cancelrnr := testRunner(t)
defer cancelrnr()
go runner.StartWorkers(ctx, rnr, tasks)
srv := testServer(datastore.NewMockInit(
[]*models.App{

View File

@@ -21,7 +21,6 @@ import (
"gitlab-odx.oracle.com/odx/functions/api/mqs"
"gitlab-odx.oracle.com/odx/functions/api/runner"
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
"gitlab-odx.oracle.com/odx/functions/api/server/internal/routecache"
)
@@ -49,7 +48,6 @@ type Server struct {
mu sync.Mutex // protects hotroutes
hotroutes *routecache.Cache
tasks chan task.Request
singleflight singleflight // singleflight assists Datastore
}
@@ -83,14 +81,12 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, apiUR
return nil
}
tasks := make(chan task.Request)
s := &Server{
Runner: rnr,
Router: gin.New(),
Datastore: ds,
MQ: mq,
hotroutes: routecache.New(cacheSize),
tasks: tasks,
Enqueue: DefaultEnqueue,
apiURL: apiURL,
}
@@ -200,7 +196,6 @@ func extractFields(c *gin.Context) logrus.Fields {
func (s *Server) Start(ctx context.Context) {
ctx = contextWithSignal(ctx, os.Interrupt)
s.startGears(ctx)
close(s.tasks)
}
func (s *Server) startGears(ctx context.Context) {
@@ -245,14 +240,11 @@ func (s *Server) startGears(ctx context.Context) {
})
svr.AddFunc(func(ctx context.Context) {
runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner, s.Datastore)
})
svr.AddFunc(func(ctx context.Context) {
runner.StartWorkers(ctx, s.Runner, s.tasks)
runner.RunAsyncRunner(ctx, s.apiURL, s.Runner, s.Datastore)
})
svr.Serve(ctx)
s.Runner.Wait() // wait for tasks to finish (safe shutdown)
}
func (s *Server) bindHandlers(ctx context.Context) {

View File

@@ -104,8 +104,6 @@ func TestFullStack(t *testing.T) {
rnr, rnrcancel := testRunner(t)
defer rnrcancel()
go runner.StartWorkers(ctx, rnr, tasks)
srv := testServer(ds, &mqs.Mock{}, rnr, tasks)
srv.hotroutes = routecache.New(2)

View File

@@ -20,8 +20,6 @@ func TestSpecialHandlerSet(t *testing.T) {
// rnr, cancelrnr := testRunner(t)
// defer cancelrnr()
// go runner.StartWorkers(ctx, rnr, tasks)
// s := &Server{
// Runner: rnr,
// Router: gin.New(),

View File

@@ -66,12 +66,6 @@ hot functions support also adds two extra options to this configuration file.
`idle_timeout` (optional) is the time in seconds a container will remain alive without receiving any new requests;
hot functions will stay alive as long as they receive a request in this interval. Default: `30`.
`max_concurrency` (optional) is the maximum of hot functions per node to be
started for a certain function. It defaults to one per function. If you
understand you need more processing power, make sure to raise this number.
Keep in mind that if there is not available memory to execute the configured
workload, it will fail to start new hot functions.
## Testing functions
`tests` (optional) is an array of tests that can be used to valid functions both

View File

@@ -97,7 +97,6 @@ requests:
"type": "sync",
"config": null,
"format": "http",
"max_concurrency": "1",
"idle_timeout": 30
}
}
@@ -108,8 +107,5 @@ requests:
`format` (mandatory) either "default" or "http". If "http", then it is a hot
container.
`max_concurrency` (optional) - the number of simultaneous hot functions for
this functions. This is a per-node configuration option. Default: 1
`idle_timeout` (optional) - idle timeout (in seconds) before function termination.

View File

@@ -416,10 +416,6 @@ definitions:
- http
- json
description: Payload format sent into function.
max_concurrency:
type: integer
format: int32
description: Maximum number of hot functions concurrency
config:
type: object
description: Route configuration - overrides application configuration

View File

@@ -1,7 +1,4 @@
{"route":{
"app_name": "myapp",
"path": "/hot",
"image": "USERNAME/hchttp",
"memory": 64,
"type": "sync",
"config": null,

View File

@@ -3,4 +3,3 @@ version: 0.0.6
runtime: go
entrypoint: ./func
path: /fn3
max_concurrency: 1

View File

@@ -46,7 +46,6 @@ type funcfile struct {
Build []string `yaml:"build,omitempty" json:"build,omitempty"`
Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"`
Path *string `yaml:"path,omitempty" json:"path,omitempty"`
MaxConcurrency *int `yaml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
}
func (ff *funcfile) FullName() string {

View File

@@ -50,7 +50,6 @@ type initFnCmd struct {
entrypoint string
cmd string
format string
maxConcurrency int
}
func initFn() cli.Command {
@@ -84,12 +83,6 @@ func initFn() cli.Command {
Destination: &a.format,
Value: "",
},
cli.IntFlag{
Name: "max-concurrency",
Usage: "maximum concurrency for hot function",
Destination: &a.maxConcurrency,
Value: 1,
},
},
}
}
@@ -131,7 +124,6 @@ func (a *initFnCmd) init(c *cli.Context) error {
Entrypoint: a.entrypoint,
Cmd: a.cmd,
Format: ffmt,
MaxConcurrency: &a.maxConcurrency,
}
_, path := appNamePath(ff.FullName())

View File

@@ -49,10 +49,6 @@ var routeFlags = []cli.Flag{
Name: "format,f",
Usage: "hot container IO format - json or http",
},
cli.IntFlag{
Name: "max-concurrency,mc",
Usage: "maximum concurrency for hot container",
},
cli.DurationFlag{
Name: "timeout",
Usage: "route timeout (eg. 30s)",
@@ -259,10 +255,6 @@ func routeWithFlags(c *cli.Context, rt *fnmodels.Route) {
rt.Type = t
}
if m := c.Int("max-concurrency"); m > 0 {
rt.MaxConcurrency = int32(m)
}
if m := c.Int64("memory"); m > 0 {
rt.Memory = m
}
@@ -300,9 +292,6 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error {
if ff.Format != nil {
rt.Format = *ff.Format
}
if ff.MaxConcurrency != nil {
rt.MaxConcurrency = int32(*ff.MaxConcurrency)
}
if ff.Timeout != nil {
to := int64(ff.Timeout.Seconds())
rt.Timeout = &to
@@ -422,9 +411,6 @@ func (a *routesCmd) patchRoute(c *cli.Context, appName, routePath string, r *fnm
if r.Type != "" {
resp.Payload.Route.Type = r.Type
}
if r.MaxConcurrency > 0 {
resp.Payload.Route.MaxConcurrency = r.MaxConcurrency
}
if r.Memory > 0 {
resp.Payload.Route.Memory = r.Memory
}

View File

@@ -3,4 +3,3 @@ version: 0.0.1
runtime: go
entrypoint: ./func
path: /primes
max_concurrency: 1