mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'rm-max-c-2' into 'master'
clean up hotf(x) concurrency, rm max c See merge request !40
This commit is contained in:
@@ -4,8 +4,8 @@ import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"fmt"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"strings"
|
||||
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
@@ -15,7 +15,6 @@ type RowScanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
|
||||
func ScanRoute(scanner RowScanner, route *models.Route) error {
|
||||
var headerStr string
|
||||
var configStr string
|
||||
@@ -25,7 +24,6 @@ func ScanRoute(scanner RowScanner, route *models.Route) error {
|
||||
&route.Path,
|
||||
&route.Image,
|
||||
&route.Format,
|
||||
&route.MaxConcurrency,
|
||||
&route.Memory,
|
||||
&route.Type,
|
||||
&route.Timeout,
|
||||
@@ -107,7 +105,6 @@ func BuildFilterRouteQuery(filter *models.RouteFilter, whereStm, andStm string)
|
||||
return b.String(), args
|
||||
}
|
||||
|
||||
|
||||
func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []interface{}) {
|
||||
if filter == nil {
|
||||
return "", nil
|
||||
@@ -120,7 +117,6 @@ func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []i
|
||||
return "", nil
|
||||
}
|
||||
|
||||
|
||||
func BuildFilterCallQuery(filter *models.CallFilter, whereStm, andStm string) (string, []interface{}) {
|
||||
if filter == nil {
|
||||
return "", nil
|
||||
@@ -269,7 +265,6 @@ func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, err
|
||||
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info(
|
||||
fmt.Sprintf("%v datastore dialed", dialect))
|
||||
|
||||
|
||||
for _, v := range tables {
|
||||
_, err = db.Exec(v)
|
||||
if err != nil {
|
||||
@@ -372,4 +367,4 @@ func SQLRemoveRoute(db *sql.DB, appName, routePath, deleteStm string) error {
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes (
|
||||
path varchar(256) NOT NULL,
|
||||
image varchar(256) NOT NULL,
|
||||
format varchar(16) NOT NULL,
|
||||
maxc int NOT NULL,
|
||||
memory int NOT NULL,
|
||||
timeout int NOT NULL,
|
||||
idle_timeout int NOT NULL,
|
||||
@@ -38,7 +37,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
|
||||
value varchar(256) NOT NULL
|
||||
);`
|
||||
|
||||
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
|
||||
const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
|
||||
|
||||
const callTableCreate = `CREATE TABLE IF NOT EXISTS calls (
|
||||
created_at varchar(256) NOT NULL,
|
||||
@@ -53,7 +52,6 @@ const callTableCreate = `CREATE TABLE IF NOT EXISTS calls (
|
||||
|
||||
const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls`
|
||||
|
||||
|
||||
/*
|
||||
MySQLDatastore defines a basic MySQL Datastore struct.
|
||||
*/
|
||||
@@ -237,7 +235,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
|
||||
path,
|
||||
image,
|
||||
format,
|
||||
maxc,
|
||||
memory,
|
||||
type,
|
||||
timeout,
|
||||
@@ -250,7 +247,6 @@ func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route)
|
||||
route.Path,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.MaxConcurrency,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
@@ -296,7 +292,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
||||
UPDATE routes SET
|
||||
image = ?,
|
||||
format = ?,
|
||||
maxc = ?,
|
||||
memory = ?,
|
||||
type = ?,
|
||||
timeout = ?,
|
||||
@@ -306,7 +301,6 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
||||
WHERE app_name = ? AND path = ?;`,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.MaxConcurrency,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
@@ -341,7 +335,6 @@ RemoveRoute removes an existing route on MySQL.
|
||||
*/
|
||||
func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error {
|
||||
deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?`
|
||||
|
||||
return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm)
|
||||
}
|
||||
|
||||
@@ -429,7 +422,7 @@ func (ds *MySQLDatastore) Tx(f func(*sql.Tx) error) error {
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error {
|
||||
func (ds *MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error {
|
||||
stmt, err := ds.db.Prepare("INSERT calls SET id=?,created_at=?,started_at=?,completed_at=?,status=?,app_name=?,path=?")
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -445,13 +438,13 @@ func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds * MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) {
|
||||
func (ds *MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) {
|
||||
whereStm := "%s WHERE id=?"
|
||||
|
||||
return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm)
|
||||
}
|
||||
|
||||
func (ds * MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) {
|
||||
func (ds *MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) {
|
||||
whereStm := "WHERE %s ?"
|
||||
andStm := " AND %s ?"
|
||||
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"context"
|
||||
|
||||
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"github.com/lib/pq"
|
||||
_ "github.com/lib/pq"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
)
|
||||
|
||||
const routesTableCreate = `
|
||||
@@ -20,7 +19,6 @@ CREATE TABLE IF NOT EXISTS routes (
|
||||
path text NOT NULL,
|
||||
image character varying(256) NOT NULL,
|
||||
format character varying(16) NOT NULL,
|
||||
maxc integer NOT NULL,
|
||||
memory integer NOT NULL,
|
||||
timeout integer NOT NULL,
|
||||
idle_timeout integer NOT NULL,
|
||||
@@ -40,7 +38,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
|
||||
value character varying(256) NOT NULL
|
||||
);`
|
||||
|
||||
const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes`
|
||||
const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
|
||||
|
||||
const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls (
|
||||
created_at character varying(256) NOT NULL,
|
||||
@@ -203,7 +201,6 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
|
||||
path,
|
||||
image,
|
||||
format,
|
||||
maxc,
|
||||
memory,
|
||||
type,
|
||||
timeout,
|
||||
@@ -211,12 +208,11 @@ func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Rout
|
||||
headers,
|
||||
config
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`,
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
|
||||
route.AppName,
|
||||
route.Path,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.MaxConcurrency,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
@@ -259,19 +255,17 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
|
||||
UPDATE routes SET
|
||||
image = $3,
|
||||
format = $4,
|
||||
maxc = $5,
|
||||
memory = $6,
|
||||
type = $7,
|
||||
timeout = $8,
|
||||
idle_timeout = $9,
|
||||
headers = $10,
|
||||
config = $11
|
||||
memory = $5,
|
||||
type = $6,
|
||||
timeout = $7,
|
||||
idle_timeout = $8,
|
||||
headers = $9,
|
||||
config = $10
|
||||
WHERE app_name = $1 AND path = $2;`,
|
||||
route.AppName,
|
||||
route.Path,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.MaxConcurrency,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
@@ -301,7 +295,6 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R
|
||||
|
||||
func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error {
|
||||
deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2`
|
||||
|
||||
return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm)
|
||||
}
|
||||
|
||||
|
||||
@@ -31,33 +31,31 @@ var (
|
||||
type Routes []*Route
|
||||
|
||||
type Route struct {
|
||||
AppName string `json:"app_name"`
|
||||
Path string `json:"path"`
|
||||
Image string `json:"image"`
|
||||
Memory uint64 `json:"memory"`
|
||||
Headers http.Header `json:"headers"`
|
||||
Type string `json:"type"`
|
||||
Format string `json:"format"`
|
||||
MaxConcurrency int `json:"max_concurrency"`
|
||||
Timeout int32 `json:"timeout"`
|
||||
IdleTimeout int32 `json:"idle_timeout"`
|
||||
Config `json:"config"`
|
||||
AppName string `json:"app_name"`
|
||||
Path string `json:"path"`
|
||||
Image string `json:"image"`
|
||||
Memory uint64 `json:"memory"`
|
||||
Headers http.Header `json:"headers"`
|
||||
Type string `json:"type"`
|
||||
Format string `json:"format"`
|
||||
Timeout int32 `json:"timeout"`
|
||||
IdleTimeout int32 `json:"idle_timeout"`
|
||||
Config `json:"config"`
|
||||
}
|
||||
|
||||
var (
|
||||
ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed")
|
||||
ErrRoutesValidationInvalidPath = errors.New("Invalid Path format")
|
||||
ErrRoutesValidationInvalidType = errors.New("Invalid route Type")
|
||||
ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format")
|
||||
ErrRoutesValidationMissingAppName = errors.New("Missing route AppName")
|
||||
ErrRoutesValidationMissingImage = errors.New("Missing route Image")
|
||||
ErrRoutesValidationMissingName = errors.New("Missing route Name")
|
||||
ErrRoutesValidationMissingPath = errors.New("Missing route Path")
|
||||
ErrRoutesValidationMissingType = errors.New("Missing route Type")
|
||||
ErrRoutesValidationPathMalformed = errors.New("Path malformed")
|
||||
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
|
||||
ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout")
|
||||
ErrRoutesValidationNegativeMaxConcurrency = errors.New("Negative MaxConcurrency")
|
||||
ErrRoutesValidationFoundDynamicURL = errors.New("Dynamic URL is not allowed")
|
||||
ErrRoutesValidationInvalidPath = errors.New("Invalid Path format")
|
||||
ErrRoutesValidationInvalidType = errors.New("Invalid route Type")
|
||||
ErrRoutesValidationInvalidFormat = errors.New("Invalid route Format")
|
||||
ErrRoutesValidationMissingAppName = errors.New("Missing route AppName")
|
||||
ErrRoutesValidationMissingImage = errors.New("Missing route Image")
|
||||
ErrRoutesValidationMissingName = errors.New("Missing route Name")
|
||||
ErrRoutesValidationMissingPath = errors.New("Missing route Path")
|
||||
ErrRoutesValidationMissingType = errors.New("Missing route Type")
|
||||
ErrRoutesValidationPathMalformed = errors.New("Path malformed")
|
||||
ErrRoutesValidationNegativeTimeout = errors.New("Negative timeout")
|
||||
ErrRoutesValidationNegativeIdleTimeout = errors.New("Negative idle timeout")
|
||||
)
|
||||
|
||||
// SetDefaults sets zeroed field to defaults.
|
||||
@@ -74,10 +72,6 @@ func (r *Route) SetDefaults() {
|
||||
r.Format = FormatDefault
|
||||
}
|
||||
|
||||
if r.MaxConcurrency == 0 {
|
||||
r.MaxConcurrency = 1
|
||||
}
|
||||
|
||||
if r.Headers == nil {
|
||||
r.Headers = http.Header{}
|
||||
}
|
||||
@@ -140,10 +134,6 @@ func (r *Route) Validate(skipZero bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
if r.MaxConcurrency < 0 {
|
||||
res = append(res, ErrRoutesValidationNegativeMaxConcurrency)
|
||||
}
|
||||
|
||||
if r.Timeout < 0 {
|
||||
res = append(res, ErrRoutesValidationNegativeTimeout)
|
||||
}
|
||||
@@ -188,9 +178,6 @@ func (r *Route) Update(new *Route) {
|
||||
if new.Format != "" {
|
||||
r.Format = new.Format
|
||||
}
|
||||
if new.MaxConcurrency != 0 {
|
||||
r.MaxConcurrency = new.MaxConcurrency
|
||||
}
|
||||
if new.Headers != nil {
|
||||
if r.Headers == nil {
|
||||
r.Headers = make(http.Header)
|
||||
|
||||
@@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error {
|
||||
}
|
||||
|
||||
// RunAsyncRunner pulls tasks off a queue and processes them
|
||||
func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
|
||||
func RunAsyncRunner(ctx context.Context, tasksrv string, rnr *Runner, ds models.Datastore) {
|
||||
u := tasksrvURL(tasksrv)
|
||||
|
||||
startAsyncRunners(ctx, u, tasks, rnr, ds)
|
||||
startAsyncRunners(ctx, u, rnr, ds)
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) {
|
||||
func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.Datastore) {
|
||||
var wg sync.WaitGroup
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"})
|
||||
for {
|
||||
@@ -133,7 +133,7 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request,
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Process Task
|
||||
_, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds)
|
||||
_, err := rnr.RunTrackedTask(task, ctx, getCfg(task), ds)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Cannot run task")
|
||||
}
|
||||
|
||||
@@ -29,25 +29,49 @@ const (
|
||||
Empty Protocol = ""
|
||||
)
|
||||
|
||||
func (p *Protocol) UnmarshalJSON(b []byte) error {
|
||||
switch Protocol(b) {
|
||||
case Empty, Default:
|
||||
*p = Default
|
||||
case HTTP:
|
||||
*p = HTTP
|
||||
default:
|
||||
return errInvalidProtocol
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p Protocol) MarshalJSON() ([]byte, error) {
|
||||
switch p {
|
||||
case Empty, Default:
|
||||
return []byte(Default), nil
|
||||
case HTTP:
|
||||
return []byte(HTTP), nil
|
||||
}
|
||||
return nil, errInvalidProtocol
|
||||
}
|
||||
|
||||
// implements ContainerIO
|
||||
type errorProto struct{}
|
||||
|
||||
func (e *errorProto) IsStreamable() bool { return false }
|
||||
func (e *errorProto) Dispatch(ctx context.Context, t task.Request) error { return errInvalidProtocol }
|
||||
|
||||
// New creates a valid protocol handler from a I/O pipe representing containers
|
||||
// stdin/stdout.
|
||||
func New(p Protocol, in io.Writer, out io.Reader) (ContainerIO, error) {
|
||||
func New(p Protocol, in io.Writer, out io.Reader) ContainerIO {
|
||||
switch p {
|
||||
case HTTP:
|
||||
return &HTTPProtocol{in, out}, nil
|
||||
return &HTTPProtocol{in, out}
|
||||
case Default, Empty:
|
||||
return &DefaultProtocol{}, nil
|
||||
default:
|
||||
return nil, errInvalidProtocol
|
||||
return &DefaultProtocol{}
|
||||
}
|
||||
return &errorProto{} // shouldn't make it past testing...
|
||||
}
|
||||
|
||||
// IsStreamable says whether the given protocol can be used for streaming into
|
||||
// hot functions.
|
||||
func IsStreamable(p string) (bool, error) {
|
||||
proto, err := New(Protocol(p), nil, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return proto.IsStreamable(), nil
|
||||
// TODO get rid of ContainerIO and just use Protocol
|
||||
func IsStreamable(p Protocol) bool {
|
||||
return New(p, nil, nil).IsStreamable()
|
||||
}
|
||||
|
||||
@@ -16,12 +16,14 @@ import (
|
||||
"github.com/Sirupsen/logrus"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
|
||||
driverscommon "gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers/docker"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers/mock"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
)
|
||||
|
||||
// TODO clean all of this up, the exposed API is huge and incohesive,
|
||||
// we need 1 thing that runs 1 thing and 1 thing that runs those things;
|
||||
// right now this is all the things.
|
||||
type Runner struct {
|
||||
driver drivers.Driver
|
||||
taskQueue chan *containerTask
|
||||
@@ -30,6 +32,7 @@ type Runner struct {
|
||||
availableMem int64
|
||||
usedMem int64
|
||||
usedMemMutex sync.RWMutex
|
||||
hcmgr htfnmgr
|
||||
|
||||
stats
|
||||
}
|
||||
@@ -50,7 +53,7 @@ func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, erro
|
||||
env := common.NewEnvironment(func(e *common.Environment) {})
|
||||
|
||||
// TODO: Create a drivers.New(runnerConfig) in Titan
|
||||
driver, err := selectDriver("docker", env, &driverscommon.Config{})
|
||||
driver, err := selectDriver("docker", env, &drivers.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -158,7 +161,8 @@ func (r *Runner) checkMemAndUse(req uint64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
|
||||
// run is responsible for running 1 instance of a docker container
|
||||
func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
|
||||
var err error
|
||||
|
||||
if cfg.Memory == 0 {
|
||||
@@ -241,7 +245,7 @@ func (r Runner) EnsureImageExists(ctx context.Context, cfg *task.Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func selectDriver(driver string, env *common.Environment, conf *driverscommon.Config) (drivers.Driver, error) {
|
||||
func selectDriver(driver string, env *common.Environment, conf *drivers.Config) (drivers.Driver, error) {
|
||||
switch driver {
|
||||
case "docker":
|
||||
docker := docker.NewDocker(env, *conf)
|
||||
|
||||
@@ -7,6 +7,8 @@ type stats struct {
|
||||
queue uint64
|
||||
running uint64
|
||||
complete uint64
|
||||
|
||||
wait sync.WaitGroup
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
@@ -22,6 +24,7 @@ func (s *stats) Enqueue() {
|
||||
}
|
||||
|
||||
func (s *stats) Start() {
|
||||
s.wait.Add(1)
|
||||
s.mu.Lock()
|
||||
s.queue--
|
||||
s.running++
|
||||
@@ -29,6 +32,7 @@ func (s *stats) Start() {
|
||||
}
|
||||
|
||||
func (s *stats) Complete() {
|
||||
s.wait.Done()
|
||||
s.mu.Lock()
|
||||
s.running--
|
||||
s.complete++
|
||||
@@ -44,3 +48,5 @@ func (s *stats) Stats() Stats {
|
||||
s.mu.Unlock()
|
||||
return stats
|
||||
}
|
||||
|
||||
func (s *stats) Wait() { s.wait.Wait() }
|
||||
|
||||
@@ -9,25 +9,23 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ID string
|
||||
Path string
|
||||
Image string
|
||||
Timeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
AppName string
|
||||
Memory uint64
|
||||
Env map[string]string
|
||||
Format string
|
||||
MaxConcurrency int
|
||||
ID string
|
||||
Path string
|
||||
Image string
|
||||
Timeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
AppName string
|
||||
Memory uint64
|
||||
Env map[string]string
|
||||
Format string
|
||||
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
}
|
||||
|
||||
// Request stores the task to be executed by the common concurrency stream,
|
||||
// whatever type the ask actually is, either sync or async. It holds in itself
|
||||
// the channel to return its response to its caller.
|
||||
// Request stores the task to be executed, It holds in itself the channel to
|
||||
// return its response to its caller.
|
||||
type Request struct {
|
||||
Ctx context.Context
|
||||
Config *Config
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -9,12 +8,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/go-openapi/strfmt"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/protocol"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
"github.com/go-openapi/strfmt"
|
||||
)
|
||||
|
||||
// hot functions - theory of operation
|
||||
@@ -42,12 +41,6 @@ import (
|
||||
// Incoming
|
||||
// Task
|
||||
// │
|
||||
// ▼
|
||||
// ┌───────────────┐
|
||||
// │ Task Request │
|
||||
// │ Main Loop │
|
||||
// └───────────────┘
|
||||
// │
|
||||
// ┌──────▼────────┐
|
||||
// ┌┴──────────────┐│
|
||||
// │ Per Function ││ non-streamable f()
|
||||
@@ -64,13 +57,12 @@ import (
|
||||
// Terminate
|
||||
// (internal clock)
|
||||
|
||||
|
||||
// RunTrackedTask is just a wrapper for shared logic for async/sync runners
|
||||
func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
|
||||
func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
|
||||
startedAt := strfmt.DateTime(time.Now())
|
||||
newTask.StartedAt = startedAt
|
||||
|
||||
result, err := RunTask(tasks, ctx, cfg)
|
||||
result, err := rnr.RunTask(ctx, cfg)
|
||||
|
||||
completedAt := strfmt.DateTime(time.Now())
|
||||
status := result.Status()
|
||||
@@ -78,89 +70,72 @@ func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.C
|
||||
newTask.Status = status
|
||||
|
||||
err = ds.InsertTask(ctx, newTask)
|
||||
// TODO we should just log this error not return it to user? just issue storing task status but task is run
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// RunTask will dispatch a task specified by cfg to a hot container, if possible,
|
||||
// that already exists or will create a new container to run a task and then run it.
|
||||
// TODO XXX (reed): merge this and RunTrackedTask to reduce surface area...
|
||||
func (rnr *Runner) RunTask(ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
|
||||
rnr.Start() // TODO layering issue ???
|
||||
defer rnr.Complete()
|
||||
|
||||
// RunTask helps sending a task.Request into the common concurrency stream.
|
||||
// Refer to StartWorkers() to understand what this is about.
|
||||
func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
|
||||
tresp := make(chan task.Response)
|
||||
treq := task.Request{Ctx: ctx, Config: cfg, Response: tresp}
|
||||
tasks <- treq
|
||||
tasks := rnr.hcmgr.getPipe(ctx, rnr, cfg)
|
||||
if tasks == nil {
|
||||
// TODO get rid of this to use herd stuff
|
||||
go runTaskReq(rnr, treq)
|
||||
} else {
|
||||
tasks <- treq
|
||||
}
|
||||
resp := <-treq.Response
|
||||
return resp.Result, resp.Err
|
||||
}
|
||||
|
||||
// StartWorkers operates the common concurrency stream, ie, it will process all
|
||||
// functions tasks, either sync or async. In the process, it also dispatches
|
||||
// the workload to either regular or hot functions.
|
||||
func StartWorkers(ctx context.Context, rnr *Runner, tasks <-chan task.Request) {
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
var hcmgr htfnmgr
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case task := <-tasks:
|
||||
p := hcmgr.getPipe(ctx, rnr, task.Config)
|
||||
if p == nil {
|
||||
wg.Add(1)
|
||||
go runTaskReq(rnr, &wg, task)
|
||||
continue
|
||||
}
|
||||
|
||||
rnr.Start()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p <- task:
|
||||
rnr.Complete()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// htfnmgr is the intermediate between the common concurrency stream and
|
||||
// hot functions. All hot functions share a single task.Request stream per
|
||||
// function (chn), but each function may have more than one hot function (hc).
|
||||
// htfnmgr tracks all hot functions, used to funnel kittens into existing tubes
|
||||
// XXX (reed): this map grows unbounded, need to add LRU but need to make
|
||||
// sure that no functions are running when we evict
|
||||
type htfnmgr struct {
|
||||
chn map[string]chan task.Request
|
||||
hc map[string]*htfnsvr
|
||||
sync.RWMutex
|
||||
hc map[string]*htfnsvr
|
||||
}
|
||||
|
||||
func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan task.Request {
|
||||
isStream, err := protocol.IsStreamable(cfg.Format)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Info("could not detect container IO protocol")
|
||||
return nil
|
||||
} else if !isStream {
|
||||
func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) chan<- task.Request {
|
||||
isStream := protocol.IsStreamable(protocol.Protocol(cfg.Format))
|
||||
if !isStream {
|
||||
// TODO stop doing this, to prevent herds
|
||||
return nil
|
||||
}
|
||||
|
||||
if h.chn == nil {
|
||||
h.chn = make(map[string]chan task.Request)
|
||||
h.hc = make(map[string]*htfnsvr)
|
||||
h.RLock()
|
||||
if h.hc == nil {
|
||||
h.RUnlock()
|
||||
h.Lock()
|
||||
if h.hc == nil {
|
||||
h.hc = make(map[string]*htfnsvr)
|
||||
}
|
||||
h.Unlock()
|
||||
h.RLock()
|
||||
}
|
||||
|
||||
// TODO(ccirello): re-implement this without memory allocation (fmt.Sprint)
|
||||
fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format, cfg.MaxConcurrency)
|
||||
tasks, ok := h.chn[fn]
|
||||
fn := fmt.Sprint(cfg.AppName, ",", cfg.Path, cfg.Image, cfg.Timeout, cfg.Memory, cfg.Format)
|
||||
svr, ok := h.hc[fn]
|
||||
h.RUnlock()
|
||||
if !ok {
|
||||
h.chn[fn] = make(chan task.Request)
|
||||
tasks = h.chn[fn]
|
||||
svr := newhtfnsvr(ctx, cfg, rnr, tasks)
|
||||
if err := svr.launch(ctx); err != nil {
|
||||
logrus.WithError(err).Error("cannot start hot function supervisor")
|
||||
return nil
|
||||
h.Lock()
|
||||
svr, ok = h.hc[fn]
|
||||
if !ok {
|
||||
svr = newhtfnsvr(ctx, cfg, rnr)
|
||||
h.hc[fn] = svr
|
||||
}
|
||||
h.hc[fn] = svr
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
return tasks
|
||||
return svr.tasksin
|
||||
}
|
||||
|
||||
// htfnsvr is part of htfnmgr, abstracted apart for simplicity, its only
|
||||
@@ -168,21 +143,28 @@ func (h *htfnmgr) getPipe(ctx context.Context, rnr *Runner, cfg *task.Config) ch
|
||||
// needed. In case of absence of workload, it will stop trying to start new hot
|
||||
// containers.
|
||||
type htfnsvr struct {
|
||||
cfg *task.Config
|
||||
rnr *Runner
|
||||
tasksin <-chan task.Request
|
||||
cfg *task.Config
|
||||
rnr *Runner
|
||||
// TODO sharing with only a channel among hot containers will result in
|
||||
// inefficient recycling of containers, we need a stack not a queue, so that
|
||||
// when a lot of hot containers are up and throughput drops they don't all
|
||||
// find a task every few seconds and stay up for a lot longer than we really
|
||||
// need them.
|
||||
tasksin chan task.Request
|
||||
tasksout chan task.Request
|
||||
maxc chan struct{}
|
||||
first chan struct{}
|
||||
once sync.Once // TODO this really needs to happen any time runner count goes to 0
|
||||
}
|
||||
|
||||
func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan task.Request) *htfnsvr {
|
||||
func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner) *htfnsvr {
|
||||
svr := &htfnsvr{
|
||||
cfg: cfg,
|
||||
rnr: rnr,
|
||||
tasksin: tasks,
|
||||
tasksin: make(chan task.Request),
|
||||
tasksout: make(chan task.Request, 1),
|
||||
maxc: make(chan struct{}, cfg.MaxConcurrency),
|
||||
first: make(chan struct{}, 1),
|
||||
}
|
||||
svr.first <- struct{}{} // prime so that 1 thread will start the first container, others will wait
|
||||
|
||||
// This pipe will take all incoming tasks and just forward them to the
|
||||
// started hot functions. The catch here is that it feeds a buffered
|
||||
@@ -190,7 +172,7 @@ func newhtfnsvr(ctx context.Context, cfg *task.Config, rnr *Runner, tasks <-chan
|
||||
// then used to determine the presence of running hot functions.
|
||||
// If no hot function is available, tasksout will fill up to its
|
||||
// capacity and pipe() will start them.
|
||||
go svr.pipe(ctx)
|
||||
go svr.pipe(context.Background()) // XXX (reed): real context for adding consuela
|
||||
return svr
|
||||
}
|
||||
|
||||
@@ -199,10 +181,18 @@ func (svr *htfnsvr) pipe(ctx context.Context) {
|
||||
select {
|
||||
case t := <-svr.tasksin:
|
||||
svr.tasksout <- t
|
||||
if len(svr.tasksout) > 0 {
|
||||
if err := svr.launch(ctx); err != nil {
|
||||
logrus.WithError(err).Error("cannot start more hot functions")
|
||||
|
||||
// TODO move checking for ram up here? then we can wait for hot functions to open up instead of always
|
||||
// trying to make new ones if all hot functions are busy (and if machine is full and all functions are
|
||||
// hot then most new hot functions are going to time out waiting to get available ram)
|
||||
// TODO need to add some kind of metering here, we could track average run time and # of runners
|
||||
select {
|
||||
case _, ok := <-svr.first: // wait for >= 1 to be up to avoid herd
|
||||
if ok || len(svr.tasksout) > 0 {
|
||||
svr.launch(ctx)
|
||||
}
|
||||
case <-ctx.Done(): // TODO we should prob watch the task timeout not just the pipe...
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -210,41 +200,25 @@ func (svr *htfnsvr) pipe(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (svr *htfnsvr) launch(ctx context.Context) error {
|
||||
select {
|
||||
case svr.maxc <- struct{}{}:
|
||||
hc, err := newhtfn(
|
||||
svr.cfg,
|
||||
protocol.Protocol(svr.cfg.Format),
|
||||
svr.tasksout,
|
||||
svr.rnr,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
hc.serve(ctx)
|
||||
<-svr.maxc
|
||||
}()
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
func (svr *htfnsvr) launch(ctx context.Context) {
|
||||
hc := newhtfn(
|
||||
svr.cfg,
|
||||
svr.tasksout,
|
||||
svr.rnr,
|
||||
func() { svr.once.Do(func() { close(svr.first) }) },
|
||||
)
|
||||
go hc.serve(ctx)
|
||||
}
|
||||
|
||||
// htfn actually interfaces an incoming task from the common concurrency
|
||||
// stream into a long lived container. If idle long enough, it will stop. It
|
||||
// uses route configuration to determine which protocol to use.
|
||||
// htfn is one instance of a hot container, which may or may not be running a
|
||||
// task. If idle long enough, it will stop. It uses route configuration to
|
||||
// determine which protocol to use.
|
||||
type htfn struct {
|
||||
id string
|
||||
cfg *task.Config
|
||||
proto protocol.ContainerIO
|
||||
tasks <-chan task.Request
|
||||
|
||||
// Side of the pipe that takes information from outer world
|
||||
// and injects into the container.
|
||||
in io.Writer
|
||||
out io.Reader
|
||||
once func()
|
||||
|
||||
// Receiving side of the container.
|
||||
containerIn io.Reader
|
||||
@@ -253,76 +227,50 @@ type htfn struct {
|
||||
rnr *Runner
|
||||
}
|
||||
|
||||
func newhtfn(cfg *task.Config, proto protocol.Protocol, tasks <-chan task.Request, rnr *Runner) (*htfn, error) {
|
||||
func newhtfn(cfg *task.Config, tasks <-chan task.Request, rnr *Runner, once func()) *htfn {
|
||||
stdinr, stdinw := io.Pipe()
|
||||
stdoutr, stdoutw := io.Pipe()
|
||||
|
||||
p, err := protocol.New(proto, stdinw, stdoutr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hc := &htfn{
|
||||
return &htfn{
|
||||
id: uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", cfg.AppName, cfg.Path, time.Now().Unix())).String(),
|
||||
cfg: cfg,
|
||||
proto: p,
|
||||
proto: protocol.New(protocol.Protocol(cfg.Format), stdinw, stdoutr),
|
||||
tasks: tasks,
|
||||
|
||||
in: stdinw,
|
||||
out: stdoutr,
|
||||
once: once,
|
||||
|
||||
containerIn: stdinr,
|
||||
containerOut: stdoutw,
|
||||
|
||||
rnr: rnr,
|
||||
}
|
||||
|
||||
return hc, nil
|
||||
}
|
||||
|
||||
func (hc *htfn) serve(ctx context.Context) {
|
||||
lctx, cancel := context.WithCancel(ctx)
|
||||
var wg sync.WaitGroup
|
||||
defer cancel()
|
||||
cfg := *hc.cfg
|
||||
logFields := logrus.Fields{
|
||||
"hot_id": hc.id,
|
||||
"app": cfg.AppName,
|
||||
"route": cfg.Path,
|
||||
"image": cfg.Image,
|
||||
"memory": cfg.Memory,
|
||||
"format": cfg.Format,
|
||||
"max_concurrency": cfg.MaxConcurrency,
|
||||
"idle_timeout": cfg.IdleTimeout,
|
||||
}
|
||||
logger := logrus.WithFields(logFields)
|
||||
logger := logrus.WithFields(logrus.Fields{"hot_id": hc.id, "app": cfg.AppName, "route": cfg.Path, "image": cfg.Image, "memory": cfg.Memory, "format": cfg.Format, "idle_timeout": cfg.IdleTimeout})
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
inactivity := time.After(cfg.IdleTimeout)
|
||||
|
||||
select {
|
||||
case <-lctx.Done():
|
||||
return
|
||||
|
||||
case <-inactivity:
|
||||
case <-time.After(cfg.IdleTimeout):
|
||||
logger.Info("Canceling inactive hot function")
|
||||
cancel()
|
||||
|
||||
case t := <-hc.tasks:
|
||||
if err := hc.proto.Dispatch(lctx, t); err != nil {
|
||||
err := hc.proto.Dispatch(lctx, t)
|
||||
status := "success"
|
||||
if err != nil {
|
||||
status = "error"
|
||||
logrus.WithField("ctx", lctx).Info("task failed")
|
||||
t.Response <- task.Response{
|
||||
&runResult{StatusValue: "error", error: err},
|
||||
err,
|
||||
}
|
||||
continue
|
||||
}
|
||||
hc.once()
|
||||
|
||||
t.Response <- task.Response{
|
||||
&runResult{StatusValue: "success"},
|
||||
nil,
|
||||
&runResult{StatusValue: status, error: err},
|
||||
err,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -332,48 +280,18 @@ func (hc *htfn) serve(ctx context.Context) {
|
||||
cfg.Timeout = 0 // add a timeout to simulate ab.end. failure.
|
||||
cfg.Stdin = hc.containerIn
|
||||
cfg.Stdout = hc.containerOut
|
||||
// NOTE: cfg.Stderr is overwritten in rnr.Run()
|
||||
|
||||
// Why can we not attach stderr to the task like we do for stdin and
|
||||
// stdout?
|
||||
//
|
||||
// Stdin/Stdout are completely known to the scope of the task. You must
|
||||
// have a task stdin to feed containers stdin, and also the other way
|
||||
// around when reading from stdout. So both are directly related to the
|
||||
// life cycle of the request.
|
||||
//
|
||||
// Stderr, on the other hand, can be written by anything any time:
|
||||
// failure between requests, failures inside requests and messages send
|
||||
// right after stdout has been finished being transmitted. Thus, with
|
||||
// hot functions, there is not a 1:1 relation between stderr and tasks.
|
||||
//
|
||||
// Still, we do pass - at protocol level - a Task-ID header, from which
|
||||
// the application running inside the hot function can use to identify
|
||||
// its own stderr output.
|
||||
errr, errw := io.Pipe()
|
||||
cfg.Stderr = errw
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
scanner := bufio.NewScanner(errr)
|
||||
for scanner.Scan() {
|
||||
logger.Info(scanner.Text())
|
||||
}
|
||||
}()
|
||||
|
||||
result, err := hc.rnr.Run(lctx, &cfg)
|
||||
result, err := hc.rnr.run(lctx, &cfg)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("hot function failure detected")
|
||||
}
|
||||
errw.Close()
|
||||
wg.Wait()
|
||||
logger.WithField("result", result).Info("hot function terminated")
|
||||
}
|
||||
|
||||
func runTaskReq(rnr *Runner, wg *sync.WaitGroup, t task.Request) {
|
||||
defer wg.Done()
|
||||
rnr.Start()
|
||||
defer rnr.Complete()
|
||||
result, err := rnr.Run(t.Ctx, t.Config)
|
||||
// TODO make Default protocol a real thing and get rid of this in favor of Dispatch
|
||||
func runTaskReq(rnr *Runner, t task.Request) {
|
||||
result, err := rnr.run(t.Ctx, t.Config)
|
||||
select {
|
||||
case t.Response <- task.Response{result, err}:
|
||||
close(t.Response)
|
||||
|
||||
@@ -152,7 +152,7 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]*
|
||||
}
|
||||
|
||||
// TODO: Should remove *gin.Context from these functions, should use only context.Context
|
||||
func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue, ) (ok bool) {
|
||||
func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image})
|
||||
|
||||
params, match := matchRoute(found.Path, route)
|
||||
@@ -193,18 +193,17 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
|
||||
}
|
||||
|
||||
cfg := &task.Config{
|
||||
AppName: appName,
|
||||
Path: found.Path,
|
||||
Env: envVars,
|
||||
Format: found.Format,
|
||||
ID: reqID,
|
||||
Image: found.Image,
|
||||
MaxConcurrency: found.MaxConcurrency,
|
||||
Memory: found.Memory,
|
||||
Stdin: payload,
|
||||
Stdout: &stdout,
|
||||
Timeout: time.Duration(found.Timeout) * time.Second,
|
||||
IdleTimeout: time.Duration(found.IdleTimeout) * time.Second,
|
||||
AppName: appName,
|
||||
Path: found.Path,
|
||||
Env: envVars,
|
||||
Format: found.Format,
|
||||
ID: reqID,
|
||||
Image: found.Image,
|
||||
Memory: found.Memory,
|
||||
Stdin: payload,
|
||||
Stdout: &stdout,
|
||||
Timeout: time.Duration(found.Timeout) * time.Second,
|
||||
IdleTimeout: time.Duration(found.IdleTimeout) * time.Second,
|
||||
}
|
||||
|
||||
// ensure valid values
|
||||
@@ -244,8 +243,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
|
||||
c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID})
|
||||
|
||||
default:
|
||||
result, err := runner.RunTrackedTask(newTask, s.tasks, ctx, cfg, s.Datastore)
|
||||
|
||||
result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg, s.Datastore)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, runnerResponse{
|
||||
RequestID: cfg.ID,
|
||||
|
||||
@@ -121,8 +121,6 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
rnr, cancelrnr := testRunner(t)
|
||||
defer cancelrnr()
|
||||
|
||||
go runner.StartWorkers(ctx, rnr, tasks)
|
||||
|
||||
srv := testServer(datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
@@ -179,7 +177,6 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
||||
|
||||
rnr, cancelrnr := testRunner(t)
|
||||
defer cancelrnr()
|
||||
go runner.StartWorkers(ctx, rnr, tasks)
|
||||
|
||||
srv := testServer(datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"gitlab-odx.oracle.com/odx/functions/api/mqs"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/server/internal/routecache"
|
||||
)
|
||||
|
||||
@@ -49,7 +48,6 @@ type Server struct {
|
||||
|
||||
mu sync.Mutex // protects hotroutes
|
||||
hotroutes *routecache.Cache
|
||||
tasks chan task.Request
|
||||
singleflight singleflight // singleflight assists Datastore
|
||||
}
|
||||
|
||||
@@ -83,14 +81,12 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, apiUR
|
||||
return nil
|
||||
}
|
||||
|
||||
tasks := make(chan task.Request)
|
||||
s := &Server{
|
||||
Runner: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
MQ: mq,
|
||||
hotroutes: routecache.New(cacheSize),
|
||||
tasks: tasks,
|
||||
Enqueue: DefaultEnqueue,
|
||||
apiURL: apiURL,
|
||||
}
|
||||
@@ -200,7 +196,6 @@ func extractFields(c *gin.Context) logrus.Fields {
|
||||
func (s *Server) Start(ctx context.Context) {
|
||||
ctx = contextWithSignal(ctx, os.Interrupt)
|
||||
s.startGears(ctx)
|
||||
close(s.tasks)
|
||||
}
|
||||
|
||||
func (s *Server) startGears(ctx context.Context) {
|
||||
@@ -245,14 +240,11 @@ func (s *Server) startGears(ctx context.Context) {
|
||||
})
|
||||
|
||||
svr.AddFunc(func(ctx context.Context) {
|
||||
runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner, s.Datastore)
|
||||
})
|
||||
|
||||
svr.AddFunc(func(ctx context.Context) {
|
||||
runner.StartWorkers(ctx, s.Runner, s.tasks)
|
||||
runner.RunAsyncRunner(ctx, s.apiURL, s.Runner, s.Datastore)
|
||||
})
|
||||
|
||||
svr.Serve(ctx)
|
||||
s.Runner.Wait() // wait for tasks to finish (safe shutdown)
|
||||
}
|
||||
|
||||
func (s *Server) bindHandlers(ctx context.Context) {
|
||||
@@ -321,11 +313,11 @@ type tasksResponse struct {
|
||||
}
|
||||
|
||||
type fnCallResponse struct {
|
||||
Message string `json:"message"`
|
||||
Message string `json:"message"`
|
||||
Call *models.FnCall `json:"call"`
|
||||
}
|
||||
|
||||
type fnCallsResponse struct {
|
||||
Message string `json:"message"`
|
||||
Calls models.FnCalls `json:"calls"`
|
||||
Message string `json:"message"`
|
||||
Calls models.FnCalls `json:"calls"`
|
||||
}
|
||||
|
||||
@@ -104,8 +104,6 @@ func TestFullStack(t *testing.T) {
|
||||
rnr, rnrcancel := testRunner(t)
|
||||
defer rnrcancel()
|
||||
|
||||
go runner.StartWorkers(ctx, rnr, tasks)
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, rnr, tasks)
|
||||
srv.hotroutes = routecache.New(2)
|
||||
|
||||
|
||||
@@ -20,8 +20,6 @@ func TestSpecialHandlerSet(t *testing.T) {
|
||||
// rnr, cancelrnr := testRunner(t)
|
||||
// defer cancelrnr()
|
||||
|
||||
// go runner.StartWorkers(ctx, rnr, tasks)
|
||||
|
||||
// s := &Server{
|
||||
// Runner: rnr,
|
||||
// Router: gin.New(),
|
||||
|
||||
@@ -66,12 +66,6 @@ hot functions support also adds two extra options to this configuration file.
|
||||
`idle_timeout` (optional) is the time in seconds a container will remain alive without receiving any new requests;
|
||||
hot functions will stay alive as long as they receive a request in this interval. Default: `30`.
|
||||
|
||||
`max_concurrency` (optional) is the maximum of hot functions per node to be
|
||||
started for a certain function. It defaults to one per function. If you
|
||||
understand you need more processing power, make sure to raise this number.
|
||||
Keep in mind that if there is not available memory to execute the configured
|
||||
workload, it will fail to start new hot functions.
|
||||
|
||||
## Testing functions
|
||||
|
||||
`tests` (optional) is an array of tests that can be used to valid functions both
|
||||
|
||||
@@ -97,7 +97,6 @@ requests:
|
||||
"type": "sync",
|
||||
"config": null,
|
||||
"format": "http",
|
||||
"max_concurrency": "1",
|
||||
"idle_timeout": 30
|
||||
}
|
||||
}
|
||||
@@ -108,8 +107,5 @@ requests:
|
||||
`format` (mandatory) either "default" or "http". If "http", then it is a hot
|
||||
container.
|
||||
|
||||
`max_concurrency` (optional) - the number of simultaneous hot functions for
|
||||
this functions. This is a per-node configuration option. Default: 1
|
||||
|
||||
`idle_timeout` (optional) - idle timeout (in seconds) before function termination.
|
||||
|
||||
|
||||
@@ -416,10 +416,6 @@ definitions:
|
||||
- http
|
||||
- json
|
||||
description: Payload format sent into function.
|
||||
max_concurrency:
|
||||
type: integer
|
||||
format: int32
|
||||
description: Maximum number of hot functions concurrency
|
||||
config:
|
||||
type: object
|
||||
description: Route configuration - overrides application configuration
|
||||
@@ -653,4 +649,4 @@ definitions:
|
||||
- task
|
||||
properties:
|
||||
task:
|
||||
$ref: '#/definitions/Task'
|
||||
$ref: '#/definitions/Task'
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
{"route":{
|
||||
"app_name": "myapp",
|
||||
"path": "/hot",
|
||||
"image": "USERNAME/hchttp",
|
||||
"memory": 64,
|
||||
"type": "sync",
|
||||
"config": null,
|
||||
|
||||
@@ -3,4 +3,3 @@ version: 0.0.6
|
||||
runtime: go
|
||||
entrypoint: ./func
|
||||
path: /fn3
|
||||
max_concurrency: 1
|
||||
|
||||
@@ -32,21 +32,20 @@ type fftest struct {
|
||||
}
|
||||
|
||||
type funcfile struct {
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
Version string `yaml:"version,omitempty" json:"version,omitempty"`
|
||||
Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"`
|
||||
Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"`
|
||||
Type *string `yaml:"type,omitempty" json:"type,omitempty"`
|
||||
Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"`
|
||||
Format *string `yaml:"format,omitempty" json:"format,omitempty"`
|
||||
Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
|
||||
Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"`
|
||||
Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"`
|
||||
Build []string `yaml:"build,omitempty" json:"build,omitempty"`
|
||||
Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"`
|
||||
Path *string `yaml:"path,omitempty" json:"path,omitempty"`
|
||||
MaxConcurrency *int `yaml:"max_concurrency,omitempty" json:"max_concurrency,omitempty"`
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
Version string `yaml:"version,omitempty" json:"version,omitempty"`
|
||||
Runtime *string `yaml:"runtime,omitempty" json:"runtime,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint,omitempty" json:"entrypoint,omitempty"`
|
||||
Cmd string `yaml:"cmd,omitempty" json:"cmd,omitempty"`
|
||||
Type *string `yaml:"type,omitempty" json:"type,omitempty"`
|
||||
Memory *int64 `yaml:"memory,omitempty" json:"memory,omitempty"`
|
||||
Format *string `yaml:"format,omitempty" json:"format,omitempty"`
|
||||
Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
|
||||
Headers map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"`
|
||||
Config map[string]string `yaml:"config,omitempty" json:"config,omitempty"`
|
||||
Build []string `yaml:"build,omitempty" json:"build,omitempty"`
|
||||
Tests []fftest `yaml:"tests,omitempty" json:"tests,omitempty"`
|
||||
Path *string `yaml:"path,omitempty" json:"path,omitempty"`
|
||||
}
|
||||
|
||||
func (ff *funcfile) FullName() string {
|
||||
|
||||
32
fn/init.go
32
fn/init.go
@@ -44,13 +44,12 @@ func init() {
|
||||
}
|
||||
|
||||
type initFnCmd struct {
|
||||
name string
|
||||
force bool
|
||||
runtime string
|
||||
entrypoint string
|
||||
cmd string
|
||||
format string
|
||||
maxConcurrency int
|
||||
name string
|
||||
force bool
|
||||
runtime string
|
||||
entrypoint string
|
||||
cmd string
|
||||
format string
|
||||
}
|
||||
|
||||
func initFn() cli.Command {
|
||||
@@ -84,12 +83,6 @@ func initFn() cli.Command {
|
||||
Destination: &a.format,
|
||||
Value: "",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "max-concurrency",
|
||||
Usage: "maximum concurrency for hot function",
|
||||
Destination: &a.maxConcurrency,
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -125,13 +118,12 @@ func (a *initFnCmd) init(c *cli.Context) error {
|
||||
}
|
||||
|
||||
ff := &funcfile{
|
||||
Name: a.name,
|
||||
Runtime: &a.runtime,
|
||||
Version: initialVersion,
|
||||
Entrypoint: a.entrypoint,
|
||||
Cmd: a.cmd,
|
||||
Format: ffmt,
|
||||
MaxConcurrency: &a.maxConcurrency,
|
||||
Name: a.name,
|
||||
Runtime: &a.runtime,
|
||||
Version: initialVersion,
|
||||
Entrypoint: a.entrypoint,
|
||||
Cmd: a.cmd,
|
||||
Format: ffmt,
|
||||
}
|
||||
|
||||
_, path := appNamePath(ff.FullName())
|
||||
|
||||
14
fn/routes.go
14
fn/routes.go
@@ -49,10 +49,6 @@ var routeFlags = []cli.Flag{
|
||||
Name: "format,f",
|
||||
Usage: "hot container IO format - json or http",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "max-concurrency,mc",
|
||||
Usage: "maximum concurrency for hot container",
|
||||
},
|
||||
cli.DurationFlag{
|
||||
Name: "timeout",
|
||||
Usage: "route timeout (eg. 30s)",
|
||||
@@ -259,10 +255,6 @@ func routeWithFlags(c *cli.Context, rt *fnmodels.Route) {
|
||||
rt.Type = t
|
||||
}
|
||||
|
||||
if m := c.Int("max-concurrency"); m > 0 {
|
||||
rt.MaxConcurrency = int32(m)
|
||||
}
|
||||
|
||||
if m := c.Int64("memory"); m > 0 {
|
||||
rt.Memory = m
|
||||
}
|
||||
@@ -300,9 +292,6 @@ func routeWithFuncFile(c *cli.Context, ff *funcfile, rt *fnmodels.Route) error {
|
||||
if ff.Format != nil {
|
||||
rt.Format = *ff.Format
|
||||
}
|
||||
if ff.MaxConcurrency != nil {
|
||||
rt.MaxConcurrency = int32(*ff.MaxConcurrency)
|
||||
}
|
||||
if ff.Timeout != nil {
|
||||
to := int64(ff.Timeout.Seconds())
|
||||
rt.Timeout = &to
|
||||
@@ -422,9 +411,6 @@ func (a *routesCmd) patchRoute(c *cli.Context, appName, routePath string, r *fnm
|
||||
if r.Type != "" {
|
||||
resp.Payload.Route.Type = r.Type
|
||||
}
|
||||
if r.MaxConcurrency > 0 {
|
||||
resp.Payload.Route.MaxConcurrency = r.MaxConcurrency
|
||||
}
|
||||
if r.Memory > 0 {
|
||||
resp.Payload.Route.Memory = r.Memory
|
||||
}
|
||||
|
||||
@@ -3,4 +3,3 @@ version: 0.0.1
|
||||
runtime: go
|
||||
entrypoint: ./func
|
||||
path: /primes
|
||||
max_concurrency: 1
|
||||
|
||||
Reference in New Issue
Block a user