mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: Call extensions/overriding and more customization friendly docker driver (#1065)
In pure-runner and LB agent, service providers might want to set specific driver options. For example, to add cpu-shares to functions, LB can add the information as extensions to the Call and pass this via gRPC to runners. Runners then pick these extensions from gRPC call and pass it to driver. Using a custom driver implementation, pure-runners can process these extensions to modify docker.CreateContainerOptions. To achieve this, LB agents can now be configured using a call overrider. Pure-runners can be configured using a custom docker driver. RunnerCall and Call interfaces both expose call extensions. An example to demonstrate this is implemented in test/fn-system-tests/system_test.go which registers a call overrider for LB agent as well as a simple custom docker driver. In this example, LB agent adds a key-value to extensions and runners add this key-value as an environment variable to the container.
This commit is contained in:
@@ -3,6 +3,7 @@ package agent
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/syslog"
|
||||
"strings"
|
||||
@@ -115,31 +116,17 @@ type agent struct {
|
||||
resources ResourceTracker
|
||||
|
||||
// used to track running calls / safe shutdown
|
||||
shutWg *common.WaitGroup
|
||||
shutonce sync.Once
|
||||
callEndCount int64
|
||||
shutWg *common.WaitGroup
|
||||
shutonce sync.Once
|
||||
callEndCount int64
|
||||
disableAsyncDequeue bool
|
||||
}
|
||||
|
||||
type AgentOption func(*agent) error
|
||||
|
||||
// New creates an Agent that executes functions locally as Docker containers.
|
||||
func New(da DataAccess, options ...AgentOption) Agent {
|
||||
a := createAgent(da, options...).(*agent)
|
||||
if !a.shutWg.AddSession(1) {
|
||||
logrus.Fatalf("cannot start agent, unable to add session")
|
||||
}
|
||||
go a.asyncDequeue() // safe shutdown can nanny this fine
|
||||
return a
|
||||
}
|
||||
|
||||
func WithConfig(cfg *AgentConfig) AgentOption {
|
||||
return func(a *agent) error {
|
||||
a.cfg = *cfg
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func createAgent(da DataAccess, options ...AgentOption) Agent {
|
||||
cfg, err := NewAgentConfig()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg)
|
||||
@@ -159,18 +146,9 @@ func createAgent(da DataAccess, options ...AgentOption) Agent {
|
||||
|
||||
logrus.Infof("agent starting cfg=%+v", a.cfg)
|
||||
|
||||
// TODO: Create drivers.New(runnerConfig)
|
||||
a.driver = docker.NewDocker(drivers.Config{
|
||||
DockerNetworks: a.cfg.DockerNetworks,
|
||||
ServerVersion: a.cfg.MinDockerVersion,
|
||||
PreForkPoolSize: a.cfg.PreForkPoolSize,
|
||||
PreForkImage: a.cfg.PreForkImage,
|
||||
PreForkCmd: a.cfg.PreForkCmd,
|
||||
PreForkUseOnce: a.cfg.PreForkUseOnce,
|
||||
PreForkNetworks: a.cfg.PreForkNetworks,
|
||||
MaxTmpFsInodes: a.cfg.MaxTmpFsInodes,
|
||||
EnableReadOnlyRootFs: !a.cfg.DisableReadOnlyRootFs,
|
||||
})
|
||||
if a.driver == nil {
|
||||
a.driver = NewDockerDriver(&a.cfg)
|
||||
}
|
||||
|
||||
a.da = da
|
||||
a.slotMgr = NewSlotQueueMgr()
|
||||
@@ -178,9 +156,59 @@ func createAgent(da DataAccess, options ...AgentOption) Agent {
|
||||
a.shutWg = common.NewWaitGroup()
|
||||
|
||||
// TODO assert that agent doesn't get started for API nodes up above ?
|
||||
if a.disableAsyncDequeue {
|
||||
return a
|
||||
}
|
||||
|
||||
if !a.shutWg.AddSession(1) {
|
||||
logrus.Fatalf("cannot start agent, unable to add session")
|
||||
}
|
||||
go a.asyncDequeue() // safe shutdown can nanny this fine
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func WithConfig(cfg *AgentConfig) AgentOption {
|
||||
return func(a *agent) error {
|
||||
a.cfg = *cfg
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Provide a customer driver to agent
|
||||
func WithDockerDriver(drv drivers.Driver) AgentOption {
|
||||
return func(a *agent) error {
|
||||
if a.driver != nil {
|
||||
return errors.New("cannot add driver to agent, driver already exists")
|
||||
}
|
||||
|
||||
a.driver = drv
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithoutAsyncDequeue() AgentOption {
|
||||
return func(a *agent) error {
|
||||
a.disableAsyncDequeue = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create a default docker driver from agent config
|
||||
func NewDockerDriver(cfg *AgentConfig) *docker.DockerDriver {
|
||||
return docker.NewDocker(drivers.Config{
|
||||
DockerNetworks: cfg.DockerNetworks,
|
||||
ServerVersion: cfg.MinDockerVersion,
|
||||
PreForkPoolSize: cfg.PreForkPoolSize,
|
||||
PreForkImage: cfg.PreForkImage,
|
||||
PreForkCmd: cfg.PreForkCmd,
|
||||
PreForkUseOnce: cfg.PreForkUseOnce,
|
||||
PreForkNetworks: cfg.PreForkNetworks,
|
||||
MaxTmpFsInodes: cfg.MaxTmpFsInodes,
|
||||
EnableReadOnlyRootFs: !cfg.DisableReadOnlyRootFs,
|
||||
})
|
||||
}
|
||||
|
||||
func (a *agent) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
||||
return a.da.GetAppByID(ctx, appID)
|
||||
}
|
||||
@@ -788,9 +816,12 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
||||
stats: &call.Stats,
|
||||
}
|
||||
|
||||
// pull & create container before we return a slot, so as to be friendly
|
||||
// about timing out if this takes a while...
|
||||
cookie, err := a.driver.Prepare(ctx, container)
|
||||
cookie, err := a.driver.CreateCookie(ctx, container)
|
||||
if err == nil {
|
||||
// pull & create container before we return a slot, so as to be friendly
|
||||
// about timing out if this takes a while...
|
||||
err = a.driver.PrepareCookie(ctx, cookie)
|
||||
}
|
||||
|
||||
call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots)
|
||||
|
||||
@@ -819,13 +850,20 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout})
|
||||
ctx = common.WithLogger(ctx, logger)
|
||||
|
||||
cookie, err := a.driver.Prepare(ctx, container)
|
||||
cookie, err := a.driver.CreateCookie(ctx, container)
|
||||
if err != nil {
|
||||
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
|
||||
return
|
||||
}
|
||||
|
||||
defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out
|
||||
|
||||
err = a.driver.PrepareCookie(ctx, cookie)
|
||||
if err != nil {
|
||||
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
|
||||
return
|
||||
}
|
||||
|
||||
waiter, err := cookie.Run(ctx)
|
||||
if err != nil {
|
||||
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
|
||||
@@ -971,14 +1009,15 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
// and stderr can be swapped out by new calls in the container. input and
|
||||
// output must be copied in and out.
|
||||
type container struct {
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
extensions map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
@@ -1053,6 +1092,7 @@ func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*contai
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
@@ -1107,6 +1147,7 @@ func (c *container) Memory() uint64 { return c.memory * 1024 * 1
|
||||
func (c *container) CPUs() uint64 { return c.cpus }
|
||||
func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
func (c *container) Extensions() map[string]string { return c.extensions }
|
||||
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
|
||||
@@ -235,6 +235,14 @@ func WithContext(ctx context.Context) CallOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// Pure runner can use this to pass an extension to the call
|
||||
func WithExtensions(extensions map[string]string) CallOpt {
|
||||
return func(c *call) error {
|
||||
c.extensions = extensions
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetCall builds a Call that can be used to submit jobs to the agent.
|
||||
//
|
||||
// TODO where to put this? async and sync both call this
|
||||
@@ -310,12 +318,19 @@ type call struct {
|
||||
containerState ContainerState
|
||||
slotHashId string
|
||||
isLB bool
|
||||
|
||||
// LB & Pure Runner Extra Config
|
||||
extensions map[string]string
|
||||
}
|
||||
|
||||
func (c *call) SlotHashId() string {
|
||||
return c.slotHashId
|
||||
}
|
||||
|
||||
func (c *call) Extensions() map[string]string {
|
||||
return c.extensions
|
||||
}
|
||||
|
||||
func (c *call) RequestBody() io.ReadCloser {
|
||||
if c.req.Body != nil && c.req.GetBody != nil {
|
||||
rdr, err := c.req.GetBody()
|
||||
|
||||
189
api/agent/drivers/docker/cookie.go
Normal file
189
api/agent/drivers/docker/cookie.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/fnproject/fn/api/agent/drivers"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// A cookie identifies a unique request to run a task.
|
||||
type cookie struct {
|
||||
// namespace id used from prefork pool if applicable
|
||||
poolId string
|
||||
// network name from docker networks if applicable
|
||||
netId string
|
||||
// docker container create options created by Driver.CreateCookie, required for Driver.Prepare()
|
||||
opts docker.CreateContainerOptions
|
||||
// task associated with this cookie
|
||||
task drivers.ContainerTask
|
||||
// pointer to docker driver
|
||||
drv *DockerDriver
|
||||
}
|
||||
|
||||
func (c *cookie) configureFsSize(log logrus.FieldLogger) {
|
||||
if c.task.FsSize() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// If defined, impose file system size limit. In MB units.
|
||||
if c.opts.HostConfig.StorageOpt == nil {
|
||||
c.opts.HostConfig.StorageOpt = make(map[string]string)
|
||||
}
|
||||
|
||||
opt := fmt.Sprintf("%vM", c.task.FsSize())
|
||||
log.WithFields(logrus.Fields{"size": opt, "call_id": c.task.Id()}).Debug("setting storage option")
|
||||
c.opts.HostConfig.StorageOpt["size"] = opt
|
||||
}
|
||||
|
||||
func (c *cookie) configureTmpFs(log logrus.FieldLogger) {
|
||||
// if RO Root is NOT enabled and TmpFsSize does not have any limit, then we do not need
|
||||
// any tmpfs in the container since function can freely write whereever it wants.
|
||||
if c.task.TmpFsSize() == 0 && !c.drv.conf.EnableReadOnlyRootFs {
|
||||
return
|
||||
}
|
||||
|
||||
if c.opts.HostConfig.Tmpfs == nil {
|
||||
c.opts.HostConfig.Tmpfs = make(map[string]string)
|
||||
}
|
||||
|
||||
var tmpFsOption string
|
||||
if c.task.TmpFsSize() != 0 {
|
||||
if c.drv.conf.MaxTmpFsInodes != 0 {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", c.task.TmpFsSize(), c.drv.conf.MaxTmpFsInodes)
|
||||
} else {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm", c.task.TmpFsSize())
|
||||
}
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{"target": "/tmp", "options": tmpFsOption, "call_id": c.task.Id()}).Debug("setting tmpfs")
|
||||
c.opts.HostConfig.Tmpfs["/tmp"] = tmpFsOption
|
||||
}
|
||||
|
||||
func (c *cookie) configureVolumes(log logrus.FieldLogger) {
|
||||
if len(c.task.Volumes()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if c.opts.Config.Volumes == nil {
|
||||
c.opts.Config.Volumes = map[string]struct{}{}
|
||||
}
|
||||
|
||||
for _, mapping := range c.task.Volumes() {
|
||||
hostDir := mapping[0]
|
||||
containerDir := mapping[1]
|
||||
c.opts.Config.Volumes[containerDir] = struct{}{}
|
||||
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
||||
c.opts.HostConfig.Binds = append(c.opts.HostConfig.Binds, mapn)
|
||||
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": c.task.Id()}).Debug("setting volumes")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cookie) configureCPU(log logrus.FieldLogger) {
|
||||
// Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation)
|
||||
// eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period,
|
||||
// which is approx 8 CPUS in CFS world.
|
||||
// Also see docker run options --cpu-quota and --cpu-period
|
||||
if c.task.CPUs() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
quota := int64(c.task.CPUs() * 100)
|
||||
period := int64(100000)
|
||||
|
||||
log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": c.task.Id()}).Debug("setting CPU")
|
||||
c.opts.HostConfig.CPUQuota = quota
|
||||
c.opts.HostConfig.CPUPeriod = period
|
||||
}
|
||||
|
||||
func (c *cookie) configureWorkDir(log logrus.FieldLogger) {
|
||||
wd := c.task.WorkDir()
|
||||
if wd == "" {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{"wd": wd, "call_id": c.task.Id()}).Debug("setting work dir")
|
||||
c.opts.Config.WorkingDir = wd
|
||||
}
|
||||
|
||||
func (c *cookie) configureHostname(log logrus.FieldLogger) {
|
||||
// hostname and container NetworkMode is not compatible.
|
||||
if c.opts.HostConfig.NetworkMode != "" {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{"hostname": c.drv.hostname, "call_id": c.task.Id()}).Debug("setting hostname")
|
||||
c.opts.Config.Hostname = c.drv.hostname
|
||||
}
|
||||
|
||||
func (c *cookie) configureCmd(log logrus.FieldLogger) {
|
||||
if c.task.Command() == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
cmd := strings.Fields(c.task.Command())
|
||||
log.WithFields(logrus.Fields{"call_id": c.task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
c.opts.Config.Cmd = cmd
|
||||
}
|
||||
|
||||
func (c *cookie) configureEnv(log logrus.FieldLogger) {
|
||||
if len(c.task.EnvVars()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
envvars := make([]string, 0, len(c.task.EnvVars()))
|
||||
for name, val := range c.task.EnvVars() {
|
||||
envvars = append(envvars, name+"="+val)
|
||||
}
|
||||
|
||||
c.opts.Config.Env = envvars
|
||||
}
|
||||
|
||||
// implements Cookie
|
||||
func (c *cookie) Close(ctx context.Context) error {
|
||||
err := c.drv.removeContainer(ctx, c.task.Id())
|
||||
c.drv.unpickPool(c)
|
||||
c.drv.unpickNetwork(c)
|
||||
return err
|
||||
}
|
||||
|
||||
// implements Cookie
|
||||
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
|
||||
return c.drv.run(ctx, c.task.Id(), c.task)
|
||||
}
|
||||
|
||||
// implements Cookie
|
||||
func (c *cookie) ContainerOptions() interface{} {
|
||||
return c.opts
|
||||
}
|
||||
|
||||
// implements Cookie
|
||||
func (c *cookie) Freeze(ctx context.Context) error {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"})
|
||||
log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker pause")
|
||||
|
||||
err := c.drv.docker.PauseContainer(c.task.Id(), ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error pausing container")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// implements Cookie
|
||||
func (c *cookie) Unfreeze(ctx context.Context) error {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"})
|
||||
log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker unpause")
|
||||
|
||||
err := c.drv.docker.UnpauseContainer(c.task.Id(), ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error unpausing container")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var _ drivers.Cookie = &cookie{}
|
||||
@@ -144,37 +144,37 @@ func (drv *DockerDriver) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) pickPool(ctx context.Context, container *docker.CreateContainerOptions) string {
|
||||
func (drv *DockerDriver) pickPool(ctx context.Context, c *cookie) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"})
|
||||
|
||||
if drv.pool == nil || container.HostConfig.NetworkMode != "" {
|
||||
return ""
|
||||
if drv.pool == nil || c.opts.HostConfig.NetworkMode != "" {
|
||||
return
|
||||
}
|
||||
|
||||
id, err := drv.pool.AllocPoolId()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not fetch pre fork pool container")
|
||||
return ""
|
||||
return
|
||||
}
|
||||
|
||||
// We are able to fetch a container from pool. Now, use its
|
||||
// network, ipc and pid namespaces.
|
||||
container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id)
|
||||
//container.HostConfig.IpcMode = linker
|
||||
//container.HostConfig.PidMode = linker
|
||||
return id
|
||||
c.opts.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id)
|
||||
//c.opts.HostConfig.IpcMode = linker
|
||||
//c.opts.HostConfig.PidMode = linker
|
||||
c.poolId = id
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) unpickPool(poolId string) {
|
||||
if poolId != "" && drv.pool != nil {
|
||||
drv.pool.FreePoolId(poolId)
|
||||
func (drv *DockerDriver) unpickPool(c *cookie) {
|
||||
if c.poolId != "" && drv.pool != nil {
|
||||
drv.pool.FreePoolId(c.poolId)
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) pickNetwork(container *docker.CreateContainerOptions) string {
|
||||
func (drv *DockerDriver) pickNetwork(c *cookie) {
|
||||
|
||||
if len(drv.networks) == 0 || container.HostConfig.NetworkMode != "" {
|
||||
return ""
|
||||
if len(drv.networks) == 0 || c.opts.HostConfig.NetworkMode != "" {
|
||||
return
|
||||
}
|
||||
|
||||
var id string
|
||||
@@ -190,132 +190,23 @@ func (drv *DockerDriver) pickNetwork(container *docker.CreateContainerOptions) s
|
||||
drv.networks[id]++
|
||||
drv.networksLock.Unlock()
|
||||
|
||||
container.HostConfig.NetworkMode = id
|
||||
return id
|
||||
c.opts.HostConfig.NetworkMode = id
|
||||
c.netId = id
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) unpickNetwork(netId string) {
|
||||
if netId != "" {
|
||||
drv.networksLock.Lock()
|
||||
drv.networks[netId]--
|
||||
drv.networksLock.Unlock()
|
||||
func (drv *DockerDriver) unpickNetwork(c *cookie) {
|
||||
if c.netId != "" {
|
||||
c.drv.networksLock.Lock()
|
||||
c.drv.networks[c.netId]--
|
||||
c.drv.networksLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.FsSize() != 0 {
|
||||
// If defined, impose file system size limit. In MB units.
|
||||
if container.HostConfig.StorageOpt == nil {
|
||||
container.HostConfig.StorageOpt = make(map[string]string)
|
||||
}
|
||||
func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
|
||||
opt := fmt.Sprintf("%vM", task.FsSize())
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CreateCookie"})
|
||||
|
||||
log.WithFields(logrus.Fields{"size": opt, "call_id": task.Id()}).Debug("setting storage option")
|
||||
container.HostConfig.StorageOpt["size"] = opt
|
||||
}
|
||||
|
||||
container.HostConfig.ReadonlyRootfs = drv.conf.EnableReadOnlyRootFs
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureTmpFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.TmpFsSize() == 0 && !drv.conf.EnableReadOnlyRootFs {
|
||||
return
|
||||
}
|
||||
|
||||
if container.HostConfig.Tmpfs == nil {
|
||||
container.HostConfig.Tmpfs = make(map[string]string)
|
||||
}
|
||||
|
||||
var tmpFsOption string
|
||||
if task.TmpFsSize() != 0 {
|
||||
if drv.conf.MaxTmpFsInodes != 0 {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", task.TmpFsSize(), drv.conf.MaxTmpFsInodes)
|
||||
} else {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm", task.TmpFsSize())
|
||||
}
|
||||
}
|
||||
target := "/tmp"
|
||||
|
||||
log.WithFields(logrus.Fields{"target": target, "options": tmpFsOption, "call_id": task.Id()}).Debug("setting tmpfs")
|
||||
container.HostConfig.Tmpfs[target] = tmpFsOption
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureVolumes(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if len(task.Volumes()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if container.Config.Volumes == nil {
|
||||
container.Config.Volumes = map[string]struct{}{}
|
||||
}
|
||||
|
||||
for _, mapping := range task.Volumes() {
|
||||
hostDir := mapping[0]
|
||||
containerDir := mapping[1]
|
||||
container.Config.Volumes[containerDir] = struct{}{}
|
||||
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
||||
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
|
||||
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureCPU(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
// Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation)
|
||||
// eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period,
|
||||
// which is approx 8 CPUS in CFS world.
|
||||
// Also see docker run options --cpu-quota and --cpu-period
|
||||
if task.CPUs() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
quota := int64(task.CPUs() * 100)
|
||||
period := int64(100000)
|
||||
|
||||
log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": task.Id()}).Debug("setting CPU")
|
||||
container.HostConfig.CPUQuota = quota
|
||||
container.HostConfig.CPUPeriod = period
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureWorkDir(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if wd := task.WorkDir(); wd != "" {
|
||||
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
||||
container.Config.WorkingDir = wd
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureHostname(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if container.HostConfig.NetworkMode == "" {
|
||||
// hostname and container NetworkMode is not compatible.
|
||||
log.WithFields(logrus.Fields{"hostname": drv.hostname, "call_id": task.Id()}).Debug("setting hostname")
|
||||
container.Config.Hostname = drv.hostname
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureCmd(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.Command() == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
cmd := strings.Fields(task.Command())
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
container.Config.Cmd = cmd
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureEnv(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
envvars := make([]string, 0, len(task.EnvVars()))
|
||||
for name, val := range task.EnvVars() {
|
||||
envvars = append(envvars, name+"="+val)
|
||||
}
|
||||
|
||||
container.Config.Env = envvars
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
|
||||
|
||||
container := docker.CreateContainerOptions{
|
||||
opts := docker.CreateContainerOptions{
|
||||
Name: task.Id(),
|
||||
Config: &docker.Config{
|
||||
Memory: int64(task.Memory()),
|
||||
@@ -333,91 +224,63 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
LogConfig: docker.LogConfig{
|
||||
Type: "none",
|
||||
},
|
||||
ReadonlyRootfs: drv.conf.EnableReadOnlyRootFs,
|
||||
},
|
||||
Context: ctx,
|
||||
}
|
||||
|
||||
drv.configureCmd(log, &container, task)
|
||||
drv.configureEnv(log, &container, task)
|
||||
drv.configureCPU(log, &container, task)
|
||||
drv.configureFs(log, &container, task)
|
||||
drv.configureTmpFs(log, &container, task)
|
||||
drv.configureVolumes(log, &container, task)
|
||||
drv.configureWorkDir(log, &container, task)
|
||||
|
||||
poolId := drv.pickPool(ctx, &container)
|
||||
netId := drv.pickNetwork(&container)
|
||||
|
||||
drv.configureHostname(log, &container, task)
|
||||
|
||||
err := drv.ensureImage(ctx, task)
|
||||
if err != nil {
|
||||
drv.unpickPool(poolId)
|
||||
drv.unpickNetwork(netId)
|
||||
return nil, err
|
||||
cookie := &cookie{
|
||||
opts: opts,
|
||||
task: task,
|
||||
drv: drv,
|
||||
}
|
||||
|
||||
_, err = drv.docker.CreateContainer(container)
|
||||
cookie.configureCmd(log)
|
||||
cookie.configureEnv(log)
|
||||
cookie.configureCPU(log)
|
||||
cookie.configureFsSize(log)
|
||||
cookie.configureTmpFs(log)
|
||||
cookie.configureVolumes(log)
|
||||
cookie.configureWorkDir(log)
|
||||
|
||||
// Order is important, if pool is enabled, it overrides pick network
|
||||
drv.pickPool(ctx, cookie)
|
||||
drv.pickNetwork(cookie)
|
||||
|
||||
// Order is important, Hostname doesn't play well with Network config
|
||||
cookie.configureHostname(log)
|
||||
|
||||
return cookie, nil
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) PrepareCookie(ctx context.Context, c drivers.Cookie) error {
|
||||
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "PrepareCookie"})
|
||||
cookie, ok := c.(*cookie)
|
||||
if !ok {
|
||||
return errors.New("unknown cookie implementation")
|
||||
}
|
||||
|
||||
err := drv.ensureImage(ctx, cookie.task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = drv.docker.CreateContainer(cookie.opts)
|
||||
if err != nil {
|
||||
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
|
||||
if err != docker.ErrContainerAlreadyExists {
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
|
||||
"cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name,
|
||||
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name,
|
||||
log.WithFields(logrus.Fields{"call_id": cookie.task.Id(), "command": cookie.opts.Config.Cmd, "memory": cookie.opts.Config.Memory,
|
||||
"cpu_quota": cookie.task.CPUs(), "hostname": cookie.opts.Config.Hostname, "name": cookie.opts.Name,
|
||||
"image": cookie.opts.Config.Image, "volumes": cookie.opts.Config.Volumes, "binds": cookie.opts.HostConfig.Binds, "container": cookie.opts.Name,
|
||||
}).WithError(err).Error("Could not create container")
|
||||
|
||||
// NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container
|
||||
drv.unpickPool(poolId)
|
||||
drv.unpickNetwork(netId)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// discard removal error
|
||||
return &cookie{id: task.Id(), task: task, drv: drv, poolId: poolId, netId: netId}, nil
|
||||
}
|
||||
|
||||
type cookie struct {
|
||||
id string
|
||||
poolId string
|
||||
netId string
|
||||
task drivers.ContainerTask
|
||||
drv *DockerDriver
|
||||
}
|
||||
|
||||
func (c *cookie) Close(ctx context.Context) error {
|
||||
defer func() {
|
||||
c.drv.unpickPool(c.poolId)
|
||||
c.drv.unpickNetwork(c.netId)
|
||||
}()
|
||||
|
||||
return c.drv.removeContainer(ctx, c.id)
|
||||
}
|
||||
|
||||
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
|
||||
return c.drv.run(ctx, c.id, c.task)
|
||||
}
|
||||
|
||||
func (c *cookie) Freeze(ctx context.Context) error {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"})
|
||||
log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker pause")
|
||||
|
||||
err := c.drv.docker.PauseContainer(c.id, ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error pausing container")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *cookie) Unfreeze(ctx context.Context) error {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"})
|
||||
log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker unpause")
|
||||
|
||||
err := c.drv.docker.UnpauseContainer(c.id, ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error unpausing container")
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error {
|
||||
@@ -782,3 +645,5 @@ func (w *waitResult) wait(ctx context.Context) (status string, err error) {
|
||||
return drivers.StatusKilled, models.NewAPIError(http.StatusBadGateway, err)
|
||||
}
|
||||
}
|
||||
|
||||
var _ drivers.Driver = &DockerDriver{}
|
||||
|
||||
@@ -70,6 +70,7 @@ func (c *poolTask) Memory() uint64 { return 0
|
||||
func (c *poolTask) CPUs() uint64 { return 0 }
|
||||
func (c *poolTask) FsSize() uint64 { return 0 }
|
||||
func (c *poolTask) TmpFsSize() uint64 { return 0 }
|
||||
func (c *poolTask) Extensions() map[string]string { return nil }
|
||||
func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {}
|
||||
|
||||
type dockerPoolItem struct {
|
||||
|
||||
@@ -22,7 +22,6 @@ func (f *taskDockerTest) Command() string { return "" }
|
||||
func (f *taskDockerTest) EnvVars() map[string]string {
|
||||
return map[string]string{"FN_FORMAT": "default"}
|
||||
}
|
||||
func (f *taskDockerTest) Labels() map[string]string { return nil }
|
||||
func (f *taskDockerTest) Id() string { return f.id }
|
||||
func (f *taskDockerTest) Group() string { return "" }
|
||||
func (f *taskDockerTest) Image() string { return "fnproject/fn-test-utils" }
|
||||
@@ -37,6 +36,7 @@ func (f *taskDockerTest) TmpFsSize() uint64 { return 0 }
|
||||
func (f *taskDockerTest) WorkDir() string { return "" }
|
||||
func (f *taskDockerTest) Close() {}
|
||||
func (f *taskDockerTest) Input() io.Reader { return f.input }
|
||||
func (f *taskDockerTest) Extensions() map[string]string { return nil }
|
||||
|
||||
func TestRunnerDocker(t *testing.T) {
|
||||
dkr := NewDocker(drivers.Config{})
|
||||
@@ -46,11 +46,17 @@ func TestRunnerDocker(t *testing.T) {
|
||||
|
||||
task := &taskDockerTest{"test-docker", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors}
|
||||
|
||||
cookie, err := dkr.Prepare(ctx, task)
|
||||
cookie, err := dkr.CreateCookie(ctx, task)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't create task cookie")
|
||||
}
|
||||
|
||||
defer cookie.Close(ctx)
|
||||
|
||||
err = dkr.PrepareCookie(ctx, cookie)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't prepare task test")
|
||||
}
|
||||
defer cookie.Close(ctx)
|
||||
|
||||
waiter, err := cookie.Run(ctx)
|
||||
if err != nil {
|
||||
@@ -80,13 +86,26 @@ func TestRunnerDockerNetworks(t *testing.T) {
|
||||
task1 := &taskDockerTest{"test-docker1", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors}
|
||||
task2 := &taskDockerTest{"test-docker2", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors}
|
||||
|
||||
cookie1, err := dkr.Prepare(ctx, task1)
|
||||
cookie1, err := dkr.CreateCookie(ctx, task1)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't create task1 cookie")
|
||||
}
|
||||
|
||||
defer cookie1.Close(ctx)
|
||||
|
||||
err = dkr.PrepareCookie(ctx, cookie1)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't prepare task1 test")
|
||||
}
|
||||
defer cookie1.Close(ctx)
|
||||
|
||||
cookie2, err := dkr.Prepare(ctx, task2)
|
||||
cookie2, err := dkr.CreateCookie(ctx, task2)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't create task2 cookie")
|
||||
}
|
||||
|
||||
defer cookie2.Close(ctx)
|
||||
|
||||
err = dkr.PrepareCookie(ctx, cookie2)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't prepare task2 test")
|
||||
}
|
||||
@@ -141,11 +160,17 @@ func TestRunnerDockerStdin(t *testing.T) {
|
||||
|
||||
task := &taskDockerTest{"test-docker-stdin", bytes.NewBufferString(input), &output, &errors}
|
||||
|
||||
cookie, err := dkr.Prepare(ctx, task)
|
||||
cookie, err := dkr.CreateCookie(ctx, task)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't create task cookie")
|
||||
}
|
||||
|
||||
defer cookie.Close(ctx)
|
||||
|
||||
err = dkr.PrepareCookie(ctx, cookie)
|
||||
if err != nil {
|
||||
t.Fatal("Couldn't prepare task test")
|
||||
}
|
||||
defer cookie.Close(ctx)
|
||||
|
||||
waiter, err := cookie.Run(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -40,6 +40,12 @@ type Cookie interface {
|
||||
|
||||
// Unfreeze a frozen container to unpause frozen processes
|
||||
Unfreeze(ctx context.Context) error
|
||||
|
||||
// Fetch driver specific container configuration. Use this to
|
||||
// access the container create options. If Driver.Prepare() is not
|
||||
// yet called with the cookie, then this can be used to modify container
|
||||
// create options.
|
||||
ContainerOptions() interface{}
|
||||
}
|
||||
|
||||
type WaitResult interface {
|
||||
@@ -51,7 +57,11 @@ type WaitResult interface {
|
||||
}
|
||||
|
||||
type Driver interface {
|
||||
// Prepare can be used in order to do any preparation that a specific driver
|
||||
// Create a new cookie with defaults and/or settings from container task.
|
||||
// Callers should Close the cookie regardless of whether they prepare or run it.
|
||||
CreateCookie(ctx context.Context, task ContainerTask) (Cookie, error)
|
||||
|
||||
// PrepareCookie can be used in order to do any preparation that a specific driver
|
||||
// may need to do before running the task, and can be useful to put
|
||||
// preparation that the task can recover from into (i.e. if pulling an image
|
||||
// fails because a registry is down, the task doesn't need to be failed). It
|
||||
@@ -59,7 +69,7 @@ type Driver interface {
|
||||
// Callers should Close the cookie regardless of whether they run it.
|
||||
//
|
||||
// The returned cookie should respect the task's timeout when it is run.
|
||||
Prepare(ctx context.Context, task ContainerTask) (Cookie, error)
|
||||
PrepareCookie(ctx context.Context, cookie Cookie) error
|
||||
|
||||
// close & shutdown the driver
|
||||
Close() error
|
||||
@@ -129,6 +139,9 @@ type ContainerTask interface {
|
||||
// Close is used to perform cleanup after task execution.
|
||||
// Close should be safe to call multiple times.
|
||||
Close()
|
||||
|
||||
// Extra Configuration Options
|
||||
Extensions() map[string]string
|
||||
}
|
||||
|
||||
// Stat is a bucket of stats from a driver at a point in time for a certain task.
|
||||
|
||||
@@ -17,14 +17,20 @@ type Mocker struct {
|
||||
count int
|
||||
}
|
||||
|
||||
func (m *Mocker) Prepare(context.Context, drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
func (m *Mocker) CreateCookie(context.Context, drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
return &cookie{m}, nil
|
||||
}
|
||||
|
||||
func (m *Mocker) PrepareCookie(context.Context, drivers.Cookie) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Mocker) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ drivers.Driver = &Mocker{}
|
||||
|
||||
type cookie struct {
|
||||
m *Mocker
|
||||
}
|
||||
@@ -39,6 +45,8 @@ func (c *cookie) Unfreeze(context.Context) error {
|
||||
|
||||
func (c *cookie) Close(context.Context) error { return nil }
|
||||
|
||||
func (c *cookie) ContainerOptions() interface{} { return nil }
|
||||
|
||||
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
|
||||
c.m.count++
|
||||
if c.m.count%100 == 0 {
|
||||
@@ -51,6 +59,8 @@ func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ drivers.Cookie = &cookie{}
|
||||
|
||||
type runResult struct {
|
||||
err error
|
||||
status string
|
||||
|
||||
@@ -43,8 +43,9 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
// Request to allocate a slot for a call
|
||||
type TryCall struct {
|
||||
ModelsCallJson string `protobuf:"bytes,1,opt,name=models_call_json,json=modelsCallJson" json:"models_call_json,omitempty"`
|
||||
SlotHashId string `protobuf:"bytes,2,opt,name=slot_hash_id,json=slotHashId" json:"slot_hash_id,omitempty"`
|
||||
ModelsCallJson string `protobuf:"bytes,1,opt,name=models_call_json,json=modelsCallJson" json:"models_call_json,omitempty"`
|
||||
SlotHashId string `protobuf:"bytes,2,opt,name=slot_hash_id,json=slotHashId" json:"slot_hash_id,omitempty"`
|
||||
Extensions map[string]string `protobuf:"bytes,3,rep,name=extensions" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
|
||||
}
|
||||
|
||||
func (m *TryCall) Reset() { *m = TryCall{} }
|
||||
@@ -66,6 +67,13 @@ func (m *TryCall) GetSlotHashId() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *TryCall) GetExtensions() map[string]string {
|
||||
if m != nil {
|
||||
return m.Extensions
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Data sent C2S and S2C - as soon as the runner sees the first of these it
|
||||
// will start running. If empty content, there must be one of these with eof.
|
||||
// The runner will send these for the body of the response, AFTER it has sent
|
||||
@@ -713,39 +721,41 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{
|
||||
func init() { proto.RegisterFile("runner.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 529 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x5d, 0x8b, 0x13, 0x31,
|
||||
0x14, 0xed, 0xf4, 0xbb, 0x77, 0x66, 0xd7, 0x12, 0x44, 0x86, 0xba, 0x60, 0x19, 0x3f, 0x28, 0x08,
|
||||
0xa9, 0x56, 0x7d, 0xf5, 0xc1, 0xba, 0xcb, 0x28, 0x2c, 0x48, 0xaa, 0xbe, 0x0e, 0xe9, 0x24, 0x9d,
|
||||
0x19, 0x4d, 0x27, 0x25, 0xc9, 0x2c, 0x14, 0xfc, 0x23, 0xfe, 0x5b, 0x49, 0x66, 0x3a, 0x5b, 0x7c,
|
||||
0xd9, 0xb7, 0x9c, 0x7b, 0x72, 0x3f, 0xce, 0xb9, 0x09, 0x04, 0xaa, 0x2a, 0x4b, 0xae, 0xf0, 0x41,
|
||||
0x49, 0x23, 0x67, 0x4f, 0x33, 0x29, 0x33, 0xc1, 0x97, 0x0e, 0x6d, 0xab, 0xdd, 0x92, 0xef, 0x0f,
|
||||
0xe6, 0x58, 0x93, 0xd1, 0x0f, 0x18, 0x7d, 0x57, 0xc7, 0x35, 0x15, 0x02, 0x2d, 0x60, 0xba, 0x97,
|
||||
0x8c, 0x0b, 0x9d, 0xa4, 0x54, 0x88, 0xe4, 0x97, 0x96, 0x65, 0xe8, 0xcd, 0xbd, 0xc5, 0x84, 0x5c,
|
||||
0xd6, 0x71, 0x7b, 0xeb, 0xab, 0x96, 0x25, 0x9a, 0x43, 0xa0, 0x85, 0x34, 0x49, 0x4e, 0x75, 0x9e,
|
||||
0x14, 0x2c, 0xec, 0xba, 0x5b, 0x60, 0x63, 0x31, 0xd5, 0xf9, 0x17, 0x16, 0xbd, 0x85, 0xc9, 0x67,
|
||||
0x6a, 0xe8, 0x8d, 0xa2, 0x7b, 0x8e, 0x10, 0xf4, 0x19, 0x35, 0xd4, 0x15, 0x0b, 0x88, 0x3b, 0xa3,
|
||||
0x29, 0xf4, 0xb8, 0xdc, 0xb9, 0xcc, 0x31, 0xb1, 0xc7, 0xe8, 0x3d, 0x40, 0x6c, 0xcc, 0x21, 0xe6,
|
||||
0x94, 0x71, 0x65, 0xf9, 0xdf, 0xfc, 0xd8, 0xf4, 0xb7, 0x47, 0xf4, 0x18, 0x06, 0x77, 0x54, 0x54,
|
||||
0xbc, 0xe9, 0x56, 0x83, 0xe8, 0x27, 0x04, 0x36, 0x8b, 0x70, 0x7d, 0xb8, 0xe5, 0x86, 0xa2, 0x67,
|
||||
0xe0, 0x6b, 0x43, 0x4d, 0xa5, 0x93, 0x54, 0x32, 0xee, 0xf2, 0x07, 0x04, 0xea, 0xd0, 0x5a, 0x32,
|
||||
0x8e, 0x5e, 0xc2, 0x28, 0x77, 0x2d, 0x74, 0xd8, 0x9d, 0xf7, 0x16, 0xfe, 0xca, 0xc7, 0xf7, 0x6d,
|
||||
0xc9, 0x89, 0x8b, 0x3e, 0xc2, 0x23, 0x2b, 0x97, 0x70, 0x5d, 0x09, 0xb3, 0x31, 0x54, 0x19, 0xf4,
|
||||
0x1c, 0xfa, 0xb9, 0x31, 0x87, 0x90, 0xcd, 0xbd, 0x85, 0xbf, 0xba, 0xc0, 0xe7, 0x7d, 0xe3, 0x0e,
|
||||
0x71, 0xe4, 0xa7, 0x21, 0xf4, 0xf7, 0xdc, 0xd0, 0xe8, 0x0f, 0x04, 0x36, 0xff, 0xa6, 0x28, 0x0b,
|
||||
0x9d, 0x73, 0x86, 0x42, 0x18, 0xe9, 0x2a, 0x4d, 0xb9, 0xd6, 0x6e, 0xa6, 0x31, 0x39, 0x41, 0xcb,
|
||||
0x30, 0x6e, 0x68, 0x21, 0x74, 0xa3, 0xec, 0x04, 0xd1, 0x15, 0x4c, 0xb8, 0x52, 0x52, 0xd9, 0xb9,
|
||||
0xc3, 0x9e, 0x53, 0x72, 0x1f, 0x40, 0x33, 0x18, 0x3b, 0xb0, 0x31, 0x2a, 0xec, 0xbb, 0xc4, 0x16,
|
||||
0x47, 0x1b, 0x98, 0xac, 0x45, 0xc1, 0x4b, 0x73, 0xab, 0x33, 0x74, 0x05, 0x3d, 0xa3, 0x6a, 0x2b,
|
||||
0xfd, 0xd5, 0x18, 0x37, 0xeb, 0x8e, 0x3b, 0xc4, 0x86, 0xd1, 0xbc, 0x59, 0x4e, 0xd7, 0xd1, 0x80,
|
||||
0xdb, 0xb5, 0x59, 0x49, 0x96, 0xb1, 0x92, 0xb6, 0x92, 0x1d, 0xa3, 0xbf, 0x1e, 0x4c, 0x88, 0x7b,
|
||||
0x58, 0xb6, 0xea, 0x07, 0x08, 0x94, 0x33, 0x27, 0xd1, 0xd6, 0x9d, 0xa6, 0xfc, 0x14, 0xff, 0xe7,
|
||||
0x5a, 0xdc, 0x21, 0xbe, 0x3a, 0x33, 0xf1, 0xc1, 0x76, 0xe8, 0x35, 0x8c, 0x77, 0x8d, 0x6b, 0x4e,
|
||||
0xb4, 0xb5, 0xfa, 0xdc, 0xca, 0xb8, 0x43, 0xda, 0x0b, 0xed, 0x6c, 0xaf, 0x20, 0xa8, 0x47, 0xdb,
|
||||
0xb8, 0x4d, 0xa3, 0x27, 0x30, 0xa4, 0xa9, 0x29, 0xee, 0xea, 0xd7, 0x32, 0x20, 0x0d, 0x5a, 0x65,
|
||||
0x70, 0x59, 0xdf, 0xfb, 0x66, 0x5f, 0x7f, 0x2a, 0x05, 0x7a, 0x01, 0xc3, 0xeb, 0x32, 0xa3, 0x19,
|
||||
0x47, 0x80, 0x5b, 0xcf, 0x66, 0x80, 0x5b, 0xa5, 0x0b, 0xef, 0x8d, 0x87, 0x96, 0x30, 0x3c, 0x55,
|
||||
0xc6, 0xf5, 0x77, 0xc2, 0xa7, 0xef, 0x84, 0xaf, 0xed, 0x77, 0x9a, 0x5d, 0xe0, 0xf3, 0x01, 0xb6,
|
||||
0x43, 0x47, 0xbf, 0xfb, 0x17, 0x00, 0x00, 0xff, 0xff, 0x0a, 0xb4, 0xdf, 0x3c, 0x8b, 0x03, 0x00,
|
||||
0x00,
|
||||
// 576 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xdb, 0x8a, 0x13, 0x4d,
|
||||
0x10, 0xce, 0x24, 0xd9, 0x6c, 0x52, 0x99, 0x3d, 0xd0, 0xfc, 0xfc, 0x0c, 0x71, 0xc1, 0x30, 0x1e,
|
||||
0x08, 0x08, 0xbd, 0x1a, 0x15, 0x16, 0x41, 0x2f, 0x5c, 0xb3, 0x8c, 0xc2, 0x82, 0x74, 0xc4, 0xdb,
|
||||
0xd0, 0x3b, 0x53, 0x99, 0x19, 0xed, 0x4c, 0x87, 0xee, 0x9e, 0xc5, 0x80, 0x2f, 0xe2, 0x4b, 0xf9,
|
||||
0x4c, 0xd2, 0x3d, 0x87, 0x0d, 0x7b, 0xa3, 0x77, 0x5d, 0xf5, 0x55, 0xd5, 0x57, 0xf5, 0x55, 0x17,
|
||||
0xf8, 0xaa, 0x2c, 0x0a, 0x54, 0x74, 0xab, 0xa4, 0x91, 0x93, 0x07, 0xa9, 0x94, 0xa9, 0xc0, 0x73,
|
||||
0x67, 0xdd, 0x94, 0xeb, 0x73, 0xdc, 0x6c, 0xcd, 0xae, 0x02, 0xc3, 0xdf, 0x1e, 0x1c, 0x7e, 0x51,
|
||||
0xbb, 0x4b, 0x2e, 0x04, 0x99, 0xc1, 0xe9, 0x46, 0x26, 0x28, 0xf4, 0x2a, 0xe6, 0x42, 0xac, 0xbe,
|
||||
0x69, 0x59, 0x04, 0xde, 0xd4, 0x9b, 0x8d, 0xd8, 0x71, 0xe5, 0xb7, 0x51, 0x9f, 0xb4, 0x2c, 0xc8,
|
||||
0x14, 0x7c, 0x2d, 0xa4, 0x59, 0x65, 0x5c, 0x67, 0xab, 0x3c, 0x09, 0xba, 0x2e, 0x0a, 0xac, 0x2f,
|
||||
0xe2, 0x3a, 0xfb, 0x98, 0x90, 0x0b, 0x00, 0xfc, 0x61, 0xb0, 0xd0, 0xb9, 0x2c, 0x74, 0xd0, 0x9b,
|
||||
0xf6, 0x66, 0xe3, 0x79, 0x40, 0x6b, 0x26, 0xba, 0x68, 0xa1, 0x45, 0x61, 0xd4, 0x8e, 0xed, 0xc5,
|
||||
0x4e, 0xde, 0xc2, 0xc9, 0x3d, 0x98, 0x9c, 0x42, 0xef, 0x3b, 0xee, 0xea, 0x5e, 0xec, 0x93, 0xfc,
|
||||
0x07, 0x07, 0xb7, 0x5c, 0x94, 0x58, 0x33, 0x57, 0xc6, 0x9b, 0xee, 0x85, 0x17, 0xbe, 0x80, 0xd1,
|
||||
0x07, 0x6e, 0xf8, 0x95, 0xe2, 0x1b, 0x24, 0x04, 0xfa, 0x09, 0x37, 0xdc, 0x65, 0xfa, 0xcc, 0xbd,
|
||||
0x6d, 0x31, 0x94, 0x6b, 0x97, 0x38, 0x64, 0xf6, 0x19, 0xbe, 0x02, 0x88, 0x8c, 0xd9, 0x46, 0xc8,
|
||||
0x13, 0x54, 0xff, 0x4a, 0x16, 0x7e, 0x05, 0xdf, 0x66, 0x31, 0xd4, 0xdb, 0x6b, 0x34, 0x9c, 0x3c,
|
||||
0x84, 0xb1, 0x36, 0xdc, 0x94, 0x7a, 0x15, 0xcb, 0x04, 0x5d, 0xfe, 0x01, 0x83, 0xca, 0x75, 0x29,
|
||||
0x13, 0x24, 0x4f, 0xe0, 0x30, 0x73, 0x14, 0x3a, 0xe8, 0x3a, 0x3d, 0xc6, 0xf4, 0x8e, 0x96, 0x35,
|
||||
0x58, 0xf8, 0x0e, 0x4e, 0xac, 0x46, 0x0c, 0x75, 0x29, 0xcc, 0xd2, 0x70, 0x65, 0xc8, 0x23, 0xe8,
|
||||
0x67, 0xc6, 0x6c, 0x83, 0x64, 0xea, 0xcd, 0xc6, 0xf3, 0x23, 0xba, 0xcf, 0x1b, 0x75, 0x98, 0x03,
|
||||
0xdf, 0x0f, 0xa0, 0xbf, 0x41, 0xc3, 0xc3, 0x9f, 0xe0, 0xdb, 0xfc, 0xab, 0xbc, 0xc8, 0x75, 0x86,
|
||||
0x09, 0x09, 0xe0, 0x50, 0x97, 0x71, 0x8c, 0x5a, 0xbb, 0x9e, 0x86, 0xac, 0x31, 0x2d, 0x92, 0xa0,
|
||||
0xe1, 0xb9, 0xd0, 0xf5, 0x64, 0x8d, 0x49, 0xce, 0x60, 0x84, 0x4a, 0x49, 0x65, 0xfb, 0x0e, 0x7a,
|
||||
0x6e, 0x92, 0x3b, 0x07, 0x99, 0xc0, 0xd0, 0x19, 0x4b, 0xa3, 0x82, 0xbe, 0x4b, 0x6c, 0xed, 0x70,
|
||||
0x09, 0xa3, 0x4b, 0x91, 0x63, 0x61, 0xae, 0x75, 0x4a, 0xce, 0xa0, 0x67, 0x54, 0x25, 0xe5, 0x78,
|
||||
0x3e, 0x6c, 0xb6, 0x1f, 0x75, 0x98, 0x75, 0x93, 0x69, 0xbd, 0x9c, 0xae, 0x83, 0x81, 0xb6, 0x6b,
|
||||
0xb3, 0x23, 0x59, 0xc4, 0x8e, 0x74, 0x23, 0x93, 0x5d, 0xf8, 0xcb, 0x83, 0x11, 0x73, 0x5f, 0xda,
|
||||
0x56, 0x7d, 0x0d, 0xbe, 0x72, 0xe2, 0xac, 0xb4, 0x55, 0xa7, 0x2e, 0x7f, 0x4a, 0xef, 0xa9, 0x16,
|
||||
0x75, 0xd8, 0x58, 0xed, 0x89, 0xf8, 0x57, 0x3a, 0xf2, 0x0c, 0x86, 0xeb, 0x5a, 0x35, 0x37, 0xb4,
|
||||
0x95, 0x7a, 0x5f, 0xca, 0xa8, 0xc3, 0xda, 0x80, 0xb6, 0xb7, 0xa7, 0xe0, 0x57, 0xad, 0x2d, 0xdd,
|
||||
0xa6, 0xc9, 0xff, 0x30, 0xe0, 0xb1, 0xc9, 0x6f, 0xab, 0xdf, 0x72, 0xc0, 0x6a, 0x6b, 0x9e, 0xc2,
|
||||
0x71, 0x15, 0xf7, 0xd9, 0xde, 0x5d, 0x2c, 0x05, 0x79, 0x0c, 0x83, 0x45, 0x91, 0xf2, 0x14, 0x09,
|
||||
0xd0, 0x56, 0xb3, 0x09, 0xd0, 0x76, 0xd2, 0x99, 0xf7, 0xdc, 0x23, 0xe7, 0x30, 0x68, 0x2a, 0xd3,
|
||||
0xea, 0x90, 0x69, 0x73, 0xc8, 0x74, 0x61, 0x0f, 0x79, 0x72, 0x44, 0xf7, 0x1b, 0xb8, 0x19, 0x38,
|
||||
0xf8, 0xe5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x6e, 0xb0, 0x89, 0x05, 0x04, 0x00, 0x00,
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import "google/protobuf/empty.proto";
|
||||
message TryCall {
|
||||
string models_call_json = 1;
|
||||
string slot_hash_id = 2;
|
||||
map<string,string> extensions = 3;
|
||||
}
|
||||
|
||||
// Data sent C2S and S2C - as soon as the runner sees the first of these it
|
||||
|
||||
@@ -17,34 +17,49 @@ import (
|
||||
"github.com/fnproject/fn/fnext"
|
||||
)
|
||||
|
||||
type CallOverrider func(*models.Call, map[string]string) (map[string]string, error)
|
||||
|
||||
type lbAgent struct {
|
||||
cfg AgentConfig
|
||||
da DataAccess
|
||||
callListeners []fnext.CallListener
|
||||
rp pool.RunnerPool
|
||||
placer pool.Placer
|
||||
|
||||
shutWg *common.WaitGroup
|
||||
callEndCount int64
|
||||
callOverrider CallOverrider
|
||||
shutWg *common.WaitGroup
|
||||
callEndCount int64
|
||||
}
|
||||
|
||||
func NewLBAgentConfig() (*AgentConfig, error) {
|
||||
type LBAgentOption func(*lbAgent) error
|
||||
|
||||
func WithLBAgentConfig(cfg *AgentConfig) LBAgentOption {
|
||||
return func(a *lbAgent) error {
|
||||
a.cfg = *cfg
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// LB agents can use this to register a CallOverrider to modify a Call and extensions
|
||||
func WithCallOverrider(fn CallOverrider) LBAgentOption {
|
||||
return func(a *lbAgent) error {
|
||||
if a.callOverrider != nil {
|
||||
return errors.New("lb-agent call overriders already exists")
|
||||
}
|
||||
a.callOverrider = fn
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewLBAgent creates an Agent that knows how to load-balance function calls
|
||||
// across a group of runner nodes.
|
||||
func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error) {
|
||||
|
||||
// Yes, LBAgent and Agent both use an AgentConfig.
|
||||
cfg, err := NewAgentConfig()
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg)
|
||||
}
|
||||
if cfg.MaxRequestSize == 0 {
|
||||
return cfg, errors.New("lb-agent requires MaxRequestSize limit")
|
||||
}
|
||||
if cfg.MaxResponseSize == 0 {
|
||||
return cfg, errors.New("lb-agent requires MaxResponseSize limit")
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// NewLBAgentWithConfig creates an Agent configured with a supplied AgentConfig
|
||||
func NewLBAgentWithConfig(da DataAccess, rp pool.RunnerPool, p pool.Placer, cfg *AgentConfig) (Agent, error) {
|
||||
logrus.Infof("lb-agent starting cfg=%+v", cfg)
|
||||
a := &lbAgent{
|
||||
cfg: *cfg,
|
||||
da: da,
|
||||
@@ -52,47 +67,52 @@ func NewLBAgentWithConfig(da DataAccess, rp pool.RunnerPool, p pool.Placer, cfg
|
||||
placer: p,
|
||||
shutWg: common.NewWaitGroup(),
|
||||
}
|
||||
|
||||
// Allow overriding config
|
||||
for _, option := range options {
|
||||
err = option(a)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalf("error in lb-agent options")
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Infof("lb-agent starting cfg=%+v", a.cfg)
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// NewLBAgent creates an Agent that knows how to load-balance function calls
|
||||
// across a group of runner nodes.
|
||||
func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) {
|
||||
|
||||
// TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig
|
||||
cfg, err := NewLBAgentConfig()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg)
|
||||
}
|
||||
return NewLBAgentWithConfig(da, rp, p, cfg)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) AddCallListener(listener fnext.CallListener) {
|
||||
a.callListeners = append(a.callListeners, listener)
|
||||
}
|
||||
|
||||
// implements callTrigger
|
||||
func (a *lbAgent) fireBeforeCall(ctx context.Context, call *models.Call) error {
|
||||
return fireBeforeCallFun(a.callListeners, ctx, call)
|
||||
}
|
||||
|
||||
// implements callTrigger
|
||||
func (a *lbAgent) fireAfterCall(ctx context.Context, call *models.Call) error {
|
||||
return fireAfterCallFun(a.callListeners, ctx, call)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
// GetAppID is to get the match of an app name to its ID
|
||||
func (a *lbAgent) GetAppID(ctx context.Context, appName string) (string, error) {
|
||||
return a.da.GetAppID(ctx, appName)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
// GetAppByID is to get the app by ID
|
||||
func (a *lbAgent) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
||||
return a.da.GetAppByID(ctx, appID)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) {
|
||||
return a.da.GetRoute(ctx, appID, path)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
var c call
|
||||
|
||||
@@ -108,6 +128,16 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
return nil, errors.New("no model or request provided for call")
|
||||
}
|
||||
|
||||
// If overrider is present, let's allow it to modify models.Call
|
||||
// and call extensions
|
||||
if a.callOverrider != nil {
|
||||
ext, err := a.callOverrider(c.Call, c.extensions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.extensions = ext
|
||||
}
|
||||
|
||||
err := setMaxBodyLimit(&a.cfg, &c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -120,10 +150,10 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
c.ct = a
|
||||
c.stderr = &nullReadWriter{}
|
||||
c.slotHashId = getSlotQueueKey(&c)
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) Close() error {
|
||||
|
||||
// start closing the front gate first
|
||||
@@ -140,6 +170,7 @@ func (a *lbAgent) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) Submit(callI Call) error {
|
||||
if !a.shutWg.AddSession(1) {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
@@ -229,6 +260,7 @@ func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer
|
||||
}
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
|
||||
logrus.Error("Enqueue not implemented")
|
||||
return errors.New("Enqueue not implemented")
|
||||
@@ -260,3 +292,6 @@ func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isSt
|
||||
handleStatsDequeue(ctx, err)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
var _ Agent = &lbAgent{}
|
||||
var _ callTrigger = &lbAgent{}
|
||||
|
||||
@@ -128,6 +128,10 @@ func (c *mockRunnerCall) SlotHashId() string {
|
||||
return c.slotHashId
|
||||
}
|
||||
|
||||
func (c *mockRunnerCall) Extensions() map[string]string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockRunnerCall) RequestBody() io.ReadCloser {
|
||||
return c.r.Body
|
||||
}
|
||||
|
||||
@@ -460,41 +460,41 @@ func (ch *callHandle) getDataMsg() *runner.DataFrame {
|
||||
return msg
|
||||
}
|
||||
|
||||
// TODO: decomission/remove this once dependencies are cleaned up
|
||||
type CapacityGate interface {
|
||||
CheckAndReserveCapacity(units uint64) error
|
||||
ReleaseCapacity(units uint64)
|
||||
}
|
||||
|
||||
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
|
||||
// and provides the gRPC server that implements the LB <-> Runner protocol.
|
||||
type pureRunner struct {
|
||||
gRPCServer *grpc.Server
|
||||
listen string
|
||||
creds credentials.TransportCredentials
|
||||
a Agent
|
||||
inflight int32
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) {
|
||||
return pr.a.GetAppID(ctx, appName)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
||||
return pr.a.GetAppByID(ctx, appID)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
|
||||
return pr.a.GetCall(opts...)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) {
|
||||
return pr.a.GetRoute(ctx, appID, path)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) Submit(Call) error {
|
||||
return errors.New("Submit cannot be called directly in a Pure Runner.")
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) Close() error {
|
||||
// First stop accepting requests
|
||||
pr.gRPCServer.GracefulStop()
|
||||
@@ -506,10 +506,12 @@ func (pr *pureRunner) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
|
||||
pr.a.AddCallListener(cl)
|
||||
}
|
||||
|
||||
// implements Agent
|
||||
func (pr *pureRunner) Enqueue(context.Context, *models.Call) error {
|
||||
return errors.New("Enqueue cannot be called directly in a Pure Runner.")
|
||||
}
|
||||
@@ -531,7 +533,11 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
|
||||
return err
|
||||
}
|
||||
|
||||
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state), WithContext(state.ctx))
|
||||
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR),
|
||||
WithWriter(state),
|
||||
WithContext(state.ctx),
|
||||
WithExtensions(tc.GetExtensions()),
|
||||
)
|
||||
if err != nil {
|
||||
state.enqueueCallResponse(err)
|
||||
return err
|
||||
@@ -551,6 +557,7 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// implements RunnerProtocolServer
|
||||
// Handles a client engagement
|
||||
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
|
||||
grpc.EnableTracing = false
|
||||
@@ -605,80 +612,87 @@ DataLoop:
|
||||
return state.waitError()
|
||||
}
|
||||
|
||||
// implements RunnerProtocolServer
|
||||
func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) {
|
||||
return &runner.RunnerStatus{
|
||||
Active: atomic.LoadInt32(&pr.inflight),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pr *pureRunner) Start() error {
|
||||
logrus.Info("Pure Runner listening on ", pr.listen)
|
||||
lis, err := net.Listen("tcp", pr.listen)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not listen on %s: %s", pr.listen, err)
|
||||
}
|
||||
|
||||
if err := pr.gRPCServer.Serve(lis); err != nil {
|
||||
return fmt.Errorf("grpc serve error: %s", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error) {
|
||||
return NewPureRunner(cancel, addr, da, "", "", "", nil)
|
||||
}
|
||||
|
||||
func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (Agent, error) {
|
||||
return NewPureRunner(cancel, addr, da, cert, key, ca, nil)
|
||||
|
||||
agent := New(da, WithoutAsyncDequeue())
|
||||
|
||||
// WARNING: SSL creds are optional.
|
||||
if cert == "" || key == "" || ca == "" {
|
||||
return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent))
|
||||
}
|
||||
return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent), PureRunnerWithSSL(cert, key, ca))
|
||||
}
|
||||
|
||||
func ValidatePureRunnerConfig() AgentOption {
|
||||
return func(a *agent) error {
|
||||
type PureRunnerOption func(*pureRunner) error
|
||||
|
||||
if a.cfg.MaxResponseSize == 0 {
|
||||
return errors.New("pure runner requires MaxResponseSize limits")
|
||||
func PureRunnerWithSSL(cert string, key string, ca string) PureRunnerOption {
|
||||
return func(pr *pureRunner) error {
|
||||
c, err := createCreds(cert, key, ca)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create pure runner credentials: %s", err)
|
||||
}
|
||||
if a.cfg.MaxRequestSize == 0 {
|
||||
return errors.New("pure runner requires MaxRequestSize limits")
|
||||
}
|
||||
|
||||
// pure runner requires a non-blocking resource tracker
|
||||
if !a.cfg.EnableNBResourceTracker {
|
||||
return errors.New("pure runner requires EnableNBResourceTracker true")
|
||||
}
|
||||
|
||||
pr.creds = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, unused CapacityGate) (Agent, error) {
|
||||
// TODO: gate unused, decommission/remove it after cleaning up dependencies to it.
|
||||
func PureRunnerWithAgent(a Agent) PureRunnerOption {
|
||||
return func(pr *pureRunner) error {
|
||||
if pr.a != nil {
|
||||
return errors.New("Failed to create pure runner: agent already created")
|
||||
}
|
||||
|
||||
a := createAgent(da, ValidatePureRunnerConfig())
|
||||
var pr *pureRunner
|
||||
var err error
|
||||
if cert != "" && key != "" && ca != "" {
|
||||
c, err := creds(cert, key, ca)
|
||||
pr.a = a
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) {
|
||||
|
||||
pr := &pureRunner{}
|
||||
|
||||
for _, option := range options {
|
||||
err := option(pr)
|
||||
if err != nil {
|
||||
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
|
||||
return nil, err
|
||||
}
|
||||
pr, err = createPureRunner(addr, a, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
logrus.Warn("Running pure runner in insecure mode!")
|
||||
pr, err = createPureRunner(addr, a, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logrus.WithError(err).Fatalf("error in pure runner options")
|
||||
}
|
||||
}
|
||||
|
||||
if pr.a == nil {
|
||||
logrus.Fatal("agent not provided in pure runner options")
|
||||
}
|
||||
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
opts = append(opts, grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor))
|
||||
opts = append(opts, grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor))
|
||||
|
||||
if pr.creds != nil {
|
||||
opts = append(opts, grpc.Creds(pr.creds))
|
||||
} else {
|
||||
logrus.Warn("Running pure runner in insecure mode!")
|
||||
}
|
||||
|
||||
pr.gRPCServer = grpc.NewServer(opts...)
|
||||
runner.RegisterRunnerProtocolServer(pr.gRPCServer, pr)
|
||||
|
||||
lis, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalf("Could not listen on %s", addr)
|
||||
}
|
||||
|
||||
logrus.Info("Pure Runner listening on ", addr)
|
||||
|
||||
go func() {
|
||||
err := pr.Start()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to start pure runner")
|
||||
if err := pr.gRPCServer.Serve(lis); err != nil {
|
||||
logrus.WithError(err).Error("grpc serve error")
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
@@ -686,7 +700,11 @@ func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert s
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
|
||||
func createCreds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
|
||||
if cert == "" || key == "" || ca == "" {
|
||||
return nil, errors.New("Failed to create credentials, cert/key/ca not provided")
|
||||
}
|
||||
|
||||
// Load the certificates from disk
|
||||
certificate, err := tls.LoadX509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
@@ -711,25 +729,5 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials
|
||||
}), nil
|
||||
}
|
||||
|
||||
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) {
|
||||
var srv *grpc.Server
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
sInterceptor := grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor)
|
||||
uInterceptor := grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor)
|
||||
opts = append(opts, sInterceptor)
|
||||
opts = append(opts, uInterceptor)
|
||||
if creds != nil {
|
||||
opts = append(opts, grpc.Creds(creds))
|
||||
}
|
||||
srv = grpc.NewServer(opts...)
|
||||
|
||||
pr := &pureRunner{
|
||||
gRPCServer: srv,
|
||||
listen: addr,
|
||||
a: a,
|
||||
}
|
||||
|
||||
runner.RegisterRunnerProtocolServer(srv, pr)
|
||||
return pr, nil
|
||||
}
|
||||
var _ runner.RunnerProtocolServer = &pureRunner{}
|
||||
var _ Agent = &pureRunner{}
|
||||
|
||||
@@ -52,6 +52,7 @@ func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool
|
||||
}, nil
|
||||
}
|
||||
|
||||
// implements Runner
|
||||
func (r *gRPCRunner) Close(context.Context) error {
|
||||
r.shutWg.CloseGroup()
|
||||
return r.conn.Close()
|
||||
@@ -82,6 +83,7 @@ func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.Cl
|
||||
return conn, protocolClient, nil
|
||||
}
|
||||
|
||||
// implements Runner
|
||||
func (r *gRPCRunner) Address() string {
|
||||
return r.address
|
||||
}
|
||||
@@ -101,6 +103,7 @@ func isTooBusy(err error) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// implements Runner
|
||||
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
log := common.Logger(ctx).WithField("runner_addr", r.address)
|
||||
|
||||
@@ -135,6 +138,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e
|
||||
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{
|
||||
ModelsCallJson: string(modelJSON),
|
||||
SlotHashId: hex.EncodeToString([]byte(call.SlotHashId())),
|
||||
Extensions: call.Extensions(),
|
||||
}}})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to send message to runner node")
|
||||
@@ -312,3 +316,5 @@ DataLoop:
|
||||
tryQueueError(ErrorPureRunnerNoEOF, done)
|
||||
}
|
||||
}
|
||||
|
||||
var _ pool.Runner = &gRPCRunner{}
|
||||
|
||||
@@ -42,6 +42,7 @@ type Runner interface {
|
||||
// processed by a RunnerPool
|
||||
type RunnerCall interface {
|
||||
SlotHashId() string
|
||||
Extensions() map[string]string
|
||||
RequestBody() io.ReadCloser
|
||||
ResponseWriter() http.ResponseWriter
|
||||
StdErr() io.ReadWriteCloser
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
sdkmodels "github.com/fnproject/fn_go/models"
|
||||
)
|
||||
|
||||
// See fn-test-utils for json response
|
||||
func getEchoContent(respBytes []byte) (string, error) {
|
||||
|
||||
var respJs map[string]interface{}
|
||||
@@ -39,6 +40,29 @@ func getEchoContent(respBytes []byte) (string, error) {
|
||||
return echo, nil
|
||||
}
|
||||
|
||||
// See fn-test-utils for json response
|
||||
func getConfigContent(key string, respBytes []byte) (string, error) {
|
||||
|
||||
var respJs map[string]interface{}
|
||||
|
||||
err := json.Unmarshal(respBytes, &respJs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
cfg, ok := respJs["config"].(map[string]interface{})
|
||||
if !ok {
|
||||
return "", errors.New("unexpected json: config map")
|
||||
}
|
||||
|
||||
val, ok := cfg[key].(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("unexpected json: %s string", key)
|
||||
}
|
||||
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func TestCanExecuteFunction(t *testing.T) {
|
||||
s := apiutils.SetupHarness()
|
||||
s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName})
|
||||
@@ -79,6 +103,13 @@ func TestCanExecuteFunction(t *testing.T) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("StatusCode check failed on %v", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Now let's check FN_CHEESE, since LB and runners have override/extension mechanism
|
||||
// to insert FN_CHEESE into config
|
||||
cheese, err := getConfigContent("FN_CHEESE", output.Bytes())
|
||||
if err != nil || cheese != "Tete de Moine" {
|
||||
t.Fatalf("getConfigContent/FN_CHEESE check failed (%v) on %v", err, output)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanExecuteBigOutput(t *testing.T) {
|
||||
|
||||
@@ -6,12 +6,17 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/agent/drivers"
|
||||
"github.com/fnproject/fn/api/agent/hybrid"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
"github.com/fnproject/fn/api/server"
|
||||
_ "github.com/fnproject/fn/api/server/defaultexts"
|
||||
|
||||
// We need docker client here, since we have a custom driver that wraps generic
|
||||
// docker driver.
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"io/ioutil"
|
||||
@@ -219,12 +224,14 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) {
|
||||
keys := []string{"fn_appname", "fn_path"}
|
||||
pool.RegisterPlacerViews(keys)
|
||||
|
||||
agent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer)
|
||||
// Create an LB Agent with a Call Overrider to intercept calls in GetCall(). Overrider in this example
|
||||
// scrubs CPU/TmpFsSize and adds FN_CHEESE key/value into extensions.
|
||||
lbAgent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer, agent.WithCallOverrider(LBCallOverrider))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, server.WithAgent(agent))
|
||||
|
||||
opts = append(opts, server.WithAgent(lbAgent))
|
||||
return server.New(ctx, opts...), nil
|
||||
}
|
||||
|
||||
@@ -246,14 +253,30 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro
|
||||
return nil, err
|
||||
}
|
||||
grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum)
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", nil)
|
||||
// This is our Agent config, which we will use for both inner agent and docker.
|
||||
cfg, err := agent.NewAgentConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx))
|
||||
|
||||
// customer driver that overrides generic docker driver
|
||||
drv := &customDriver{
|
||||
drv: agent.NewDockerDriver(cfg),
|
||||
}
|
||||
|
||||
// inner agent for pure-runners
|
||||
innerAgent := agent.New(ds, agent.WithConfig(cfg), agent.WithDockerDriver(drv), agent.WithoutAsyncDequeue())
|
||||
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// now create pure-runner that wraps agent.
|
||||
pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, agent.PureRunnerWithAgent(innerAgent))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts = append(opts, server.WithAgent(pureRunner), server.WithExtraCtx(cancelCtx))
|
||||
return server.New(ctx, opts...), nil
|
||||
}
|
||||
|
||||
@@ -326,3 +349,65 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
os.Exit(result)
|
||||
}
|
||||
|
||||
// Memory Only LB Agent Call Option
|
||||
func LBCallOverrider(c *models.Call, exts map[string]string) (map[string]string, error) {
|
||||
|
||||
// Set TmpFsSize and CPU to unlimited. This means LB operates on Memory
|
||||
// only. Operators/Service providers are expected to override this
|
||||
// and apply their own filter to set/override CPU/TmpFsSize/Memory
|
||||
// and Extension variables.
|
||||
c.TmpFsSize = 0
|
||||
c.CPUs = models.MilliCPUs(0)
|
||||
delete(c.Config, "FN_CPUS")
|
||||
|
||||
if exts == nil {
|
||||
exts = make(map[string]string)
|
||||
}
|
||||
|
||||
// Add an FN_CHEESE extension to be intercepted and specially handled by Pure Runner customDriver below
|
||||
exts["FN_CHEESE"] = "Tete de Moine"
|
||||
return exts, nil
|
||||
}
|
||||
|
||||
// An example Pure Runner docker driver. Using CreateCookie, it intercepts a generated cookie to
|
||||
// add an environment variable FN_CHEESE if it finds a FN_CHEESE extension.
|
||||
type customDriver struct {
|
||||
drv drivers.Driver
|
||||
}
|
||||
|
||||
// implements Driver
|
||||
func (d *customDriver) CreateCookie(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
cookie, err := d.drv.CreateCookie(ctx, task)
|
||||
if err != nil {
|
||||
return cookie, err
|
||||
}
|
||||
|
||||
// if call extensions include 'foo', then let's add FN_CHEESE env vars, which should
|
||||
// end up in Env/Config.
|
||||
ext := task.Extensions()
|
||||
cheese, ok := ext["FN_CHEESE"]
|
||||
if ok {
|
||||
// docker driver specific data
|
||||
obj := cookie.ContainerOptions()
|
||||
opts, ok := obj.(docker.CreateContainerOptions)
|
||||
if !ok {
|
||||
logrus.Fatal("Unexpected driver, should be docker")
|
||||
}
|
||||
opts.Config.Env = append(opts.Config.Env, "FN_CHEESE="+cheese)
|
||||
}
|
||||
|
||||
return cookie, nil
|
||||
}
|
||||
|
||||
// implements Driver
|
||||
func (d *customDriver) PrepareCookie(ctx context.Context, cookie drivers.Cookie) error {
|
||||
return d.drv.PrepareCookie(ctx, cookie)
|
||||
}
|
||||
|
||||
// implements Driver
|
||||
func (d *customDriver) Close() error {
|
||||
return d.drv.Close()
|
||||
}
|
||||
|
||||
var _ drivers.Driver = &customDriver{}
|
||||
|
||||
Reference in New Issue
Block a user