From e67d0e5f3fbc96bb2cdb09ecbbe8ec11f1edb949 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 18 Jun 2018 14:42:28 -0700 Subject: [PATCH] fn: Call extensions/overriding and more customization friendly docker driver (#1065) In pure-runner and LB agent, service providers might want to set specific driver options. For example, to add cpu-shares to functions, LB can add the information as extensions to the Call and pass this via gRPC to runners. Runners then pick these extensions from gRPC call and pass it to driver. Using a custom driver implementation, pure-runners can process these extensions to modify docker.CreateContainerOptions. To achieve this, LB agents can now be configured using a call overrider. Pure-runners can be configured using a custom docker driver. RunnerCall and Call interfaces both expose call extensions. An example to demonstrate this is implemented in test/fn-system-tests/system_test.go which registers a call overrider for LB agent as well as a simple custom docker driver. In this example, LB agent adds a key-value to extensions and runners add this key-value as an environment variable to the container. --- api/agent/agent.go | 125 +++++++---- api/agent/call.go | 15 ++ api/agent/drivers/docker/cookie.go | 189 +++++++++++++++++ api/agent/drivers/docker/docker.go | 271 ++++++------------------ api/agent/drivers/docker/docker_pool.go | 1 + api/agent/drivers/docker/docker_test.go | 41 +++- api/agent/drivers/driver.go | 17 +- api/agent/drivers/mock/mocker.go | 12 +- api/agent/grpc/runner.pb.go | 84 ++++---- api/agent/grpc/runner.proto | 1 + api/agent/lb_agent.go | 93 +++++--- api/agent/lb_agent_test.go | 4 + api/agent/pure_runner.go | 166 +++++++-------- api/agent/runner_client.go | 6 + api/runnerpool/runner_pool.go | 1 + test/fn-system-tests/exec_test.go | 31 +++ test/fn-system-tests/system_test.go | 95 ++++++++- 17 files changed, 741 insertions(+), 411 deletions(-) create mode 100644 api/agent/drivers/docker/cookie.go diff --git a/api/agent/agent.go b/api/agent/agent.go index 685721257..25b489552 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -3,6 +3,7 @@ package agent import ( "bytes" "context" + "errors" "io" "log/syslog" "strings" @@ -115,31 +116,17 @@ type agent struct { resources ResourceTracker // used to track running calls / safe shutdown - shutWg *common.WaitGroup - shutonce sync.Once - callEndCount int64 + shutWg *common.WaitGroup + shutonce sync.Once + callEndCount int64 + disableAsyncDequeue bool } type AgentOption func(*agent) error // New creates an Agent that executes functions locally as Docker containers. func New(da DataAccess, options ...AgentOption) Agent { - a := createAgent(da, options...).(*agent) - if !a.shutWg.AddSession(1) { - logrus.Fatalf("cannot start agent, unable to add session") - } - go a.asyncDequeue() // safe shutdown can nanny this fine - return a -} -func WithConfig(cfg *AgentConfig) AgentOption { - return func(a *agent) error { - a.cfg = *cfg - return nil - } -} - -func createAgent(da DataAccess, options ...AgentOption) Agent { cfg, err := NewAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) @@ -159,18 +146,9 @@ func createAgent(da DataAccess, options ...AgentOption) Agent { logrus.Infof("agent starting cfg=%+v", a.cfg) - // TODO: Create drivers.New(runnerConfig) - a.driver = docker.NewDocker(drivers.Config{ - DockerNetworks: a.cfg.DockerNetworks, - ServerVersion: a.cfg.MinDockerVersion, - PreForkPoolSize: a.cfg.PreForkPoolSize, - PreForkImage: a.cfg.PreForkImage, - PreForkCmd: a.cfg.PreForkCmd, - PreForkUseOnce: a.cfg.PreForkUseOnce, - PreForkNetworks: a.cfg.PreForkNetworks, - MaxTmpFsInodes: a.cfg.MaxTmpFsInodes, - EnableReadOnlyRootFs: !a.cfg.DisableReadOnlyRootFs, - }) + if a.driver == nil { + a.driver = NewDockerDriver(&a.cfg) + } a.da = da a.slotMgr = NewSlotQueueMgr() @@ -178,9 +156,59 @@ func createAgent(da DataAccess, options ...AgentOption) Agent { a.shutWg = common.NewWaitGroup() // TODO assert that agent doesn't get started for API nodes up above ? + if a.disableAsyncDequeue { + return a + } + + if !a.shutWg.AddSession(1) { + logrus.Fatalf("cannot start agent, unable to add session") + } + go a.asyncDequeue() // safe shutdown can nanny this fine + return a } +func WithConfig(cfg *AgentConfig) AgentOption { + return func(a *agent) error { + a.cfg = *cfg + return nil + } +} + +// Provide a customer driver to agent +func WithDockerDriver(drv drivers.Driver) AgentOption { + return func(a *agent) error { + if a.driver != nil { + return errors.New("cannot add driver to agent, driver already exists") + } + + a.driver = drv + return nil + } +} + +func WithoutAsyncDequeue() AgentOption { + return func(a *agent) error { + a.disableAsyncDequeue = true + return nil + } +} + +// Create a default docker driver from agent config +func NewDockerDriver(cfg *AgentConfig) *docker.DockerDriver { + return docker.NewDocker(drivers.Config{ + DockerNetworks: cfg.DockerNetworks, + ServerVersion: cfg.MinDockerVersion, + PreForkPoolSize: cfg.PreForkPoolSize, + PreForkImage: cfg.PreForkImage, + PreForkCmd: cfg.PreForkCmd, + PreForkUseOnce: cfg.PreForkUseOnce, + PreForkNetworks: cfg.PreForkNetworks, + MaxTmpFsInodes: cfg.MaxTmpFsInodes, + EnableReadOnlyRootFs: !cfg.DisableReadOnlyRootFs, + }) +} + func (a *agent) GetAppByID(ctx context.Context, appID string) (*models.App, error) { return a.da.GetAppByID(ctx, appID) } @@ -788,9 +816,12 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch stats: &call.Stats, } - // pull & create container before we return a slot, so as to be friendly - // about timing out if this takes a while... - cookie, err := a.driver.Prepare(ctx, container) + cookie, err := a.driver.CreateCookie(ctx, container) + if err == nil { + // pull & create container before we return a slot, so as to be friendly + // about timing out if this takes a while... + err = a.driver.PrepareCookie(ctx, cookie) + } call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots) @@ -819,13 +850,20 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout}) ctx = common.WithLogger(ctx, logger) - cookie, err := a.driver.Prepare(ctx, container) + cookie, err := a.driver.CreateCookie(ctx, container) if err != nil { call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err}) return } + defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out + err = a.driver.PrepareCookie(ctx, cookie) + if err != nil { + call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err}) + return + } + waiter, err := cookie.Run(ctx) if err != nil { call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err}) @@ -971,14 +1009,15 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, // and stderr can be swapped out by new calls in the container. input and // output must be copied in and out. type container struct { - id string // contrived - image string - env map[string]string - memory uint64 - cpus uint64 - fsSize uint64 - tmpFsSize uint64 - timeout time.Duration // cold only (superfluous, but in case) + id string // contrived + image string + env map[string]string + extensions map[string]string + memory uint64 + cpus uint64 + fsSize uint64 + tmpFsSize uint64 + timeout time.Duration // cold only (superfluous, but in case) stdin io.Reader stdout io.Writer @@ -1053,6 +1092,7 @@ func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*contai id: id, // XXX we could just let docker generate ids... image: call.Image, env: map[string]string(call.Config), + extensions: call.extensions, memory: call.Memory, cpus: uint64(call.CPUs), fsSize: cfg.MaxFsSize, @@ -1107,6 +1147,7 @@ func (c *container) Memory() uint64 { return c.memory * 1024 * 1 func (c *container) CPUs() uint64 { return c.cpus } func (c *container) FsSize() uint64 { return c.fsSize } func (c *container) TmpFsSize() uint64 { return c.tmpFsSize } +func (c *container) Extensions() map[string]string { return c.extensions } // WriteStat publishes each metric in the specified Stats structure as a histogram metric func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { diff --git a/api/agent/call.go b/api/agent/call.go index 6febd115e..c387c3f00 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -235,6 +235,14 @@ func WithContext(ctx context.Context) CallOpt { } } +// Pure runner can use this to pass an extension to the call +func WithExtensions(extensions map[string]string) CallOpt { + return func(c *call) error { + c.extensions = extensions + return nil + } +} + // GetCall builds a Call that can be used to submit jobs to the agent. // // TODO where to put this? async and sync both call this @@ -310,12 +318,19 @@ type call struct { containerState ContainerState slotHashId string isLB bool + + // LB & Pure Runner Extra Config + extensions map[string]string } func (c *call) SlotHashId() string { return c.slotHashId } +func (c *call) Extensions() map[string]string { + return c.extensions +} + func (c *call) RequestBody() io.ReadCloser { if c.req.Body != nil && c.req.GetBody != nil { rdr, err := c.req.GetBody() diff --git a/api/agent/drivers/docker/cookie.go b/api/agent/drivers/docker/cookie.go new file mode 100644 index 000000000..03f015ca3 --- /dev/null +++ b/api/agent/drivers/docker/cookie.go @@ -0,0 +1,189 @@ +package docker + +import ( + "context" + "fmt" + "strings" + + "github.com/fnproject/fn/api/agent/drivers" + "github.com/fnproject/fn/api/common" + "github.com/fsouza/go-dockerclient" + "github.com/sirupsen/logrus" +) + +// A cookie identifies a unique request to run a task. +type cookie struct { + // namespace id used from prefork pool if applicable + poolId string + // network name from docker networks if applicable + netId string + // docker container create options created by Driver.CreateCookie, required for Driver.Prepare() + opts docker.CreateContainerOptions + // task associated with this cookie + task drivers.ContainerTask + // pointer to docker driver + drv *DockerDriver +} + +func (c *cookie) configureFsSize(log logrus.FieldLogger) { + if c.task.FsSize() == 0 { + return + } + + // If defined, impose file system size limit. In MB units. + if c.opts.HostConfig.StorageOpt == nil { + c.opts.HostConfig.StorageOpt = make(map[string]string) + } + + opt := fmt.Sprintf("%vM", c.task.FsSize()) + log.WithFields(logrus.Fields{"size": opt, "call_id": c.task.Id()}).Debug("setting storage option") + c.opts.HostConfig.StorageOpt["size"] = opt +} + +func (c *cookie) configureTmpFs(log logrus.FieldLogger) { + // if RO Root is NOT enabled and TmpFsSize does not have any limit, then we do not need + // any tmpfs in the container since function can freely write whereever it wants. + if c.task.TmpFsSize() == 0 && !c.drv.conf.EnableReadOnlyRootFs { + return + } + + if c.opts.HostConfig.Tmpfs == nil { + c.opts.HostConfig.Tmpfs = make(map[string]string) + } + + var tmpFsOption string + if c.task.TmpFsSize() != 0 { + if c.drv.conf.MaxTmpFsInodes != 0 { + tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", c.task.TmpFsSize(), c.drv.conf.MaxTmpFsInodes) + } else { + tmpFsOption = fmt.Sprintf("size=%dm", c.task.TmpFsSize()) + } + } + + log.WithFields(logrus.Fields{"target": "/tmp", "options": tmpFsOption, "call_id": c.task.Id()}).Debug("setting tmpfs") + c.opts.HostConfig.Tmpfs["/tmp"] = tmpFsOption +} + +func (c *cookie) configureVolumes(log logrus.FieldLogger) { + if len(c.task.Volumes()) == 0 { + return + } + + if c.opts.Config.Volumes == nil { + c.opts.Config.Volumes = map[string]struct{}{} + } + + for _, mapping := range c.task.Volumes() { + hostDir := mapping[0] + containerDir := mapping[1] + c.opts.Config.Volumes[containerDir] = struct{}{} + mapn := fmt.Sprintf("%s:%s", hostDir, containerDir) + c.opts.HostConfig.Binds = append(c.opts.HostConfig.Binds, mapn) + log.WithFields(logrus.Fields{"volumes": mapn, "call_id": c.task.Id()}).Debug("setting volumes") + } +} + +func (c *cookie) configureCPU(log logrus.FieldLogger) { + // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) + // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, + // which is approx 8 CPUS in CFS world. + // Also see docker run options --cpu-quota and --cpu-period + if c.task.CPUs() == 0 { + return + } + + quota := int64(c.task.CPUs() * 100) + period := int64(100000) + + log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": c.task.Id()}).Debug("setting CPU") + c.opts.HostConfig.CPUQuota = quota + c.opts.HostConfig.CPUPeriod = period +} + +func (c *cookie) configureWorkDir(log logrus.FieldLogger) { + wd := c.task.WorkDir() + if wd == "" { + return + } + + log.WithFields(logrus.Fields{"wd": wd, "call_id": c.task.Id()}).Debug("setting work dir") + c.opts.Config.WorkingDir = wd +} + +func (c *cookie) configureHostname(log logrus.FieldLogger) { + // hostname and container NetworkMode is not compatible. + if c.opts.HostConfig.NetworkMode != "" { + return + } + + log.WithFields(logrus.Fields{"hostname": c.drv.hostname, "call_id": c.task.Id()}).Debug("setting hostname") + c.opts.Config.Hostname = c.drv.hostname +} + +func (c *cookie) configureCmd(log logrus.FieldLogger) { + if c.task.Command() == "" { + return + } + + // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests + cmd := strings.Fields(c.task.Command()) + log.WithFields(logrus.Fields{"call_id": c.task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") + c.opts.Config.Cmd = cmd +} + +func (c *cookie) configureEnv(log logrus.FieldLogger) { + if len(c.task.EnvVars()) == 0 { + return + } + + envvars := make([]string, 0, len(c.task.EnvVars())) + for name, val := range c.task.EnvVars() { + envvars = append(envvars, name+"="+val) + } + + c.opts.Config.Env = envvars +} + +// implements Cookie +func (c *cookie) Close(ctx context.Context) error { + err := c.drv.removeContainer(ctx, c.task.Id()) + c.drv.unpickPool(c) + c.drv.unpickNetwork(c) + return err +} + +// implements Cookie +func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { + return c.drv.run(ctx, c.task.Id(), c.task) +} + +// implements Cookie +func (c *cookie) ContainerOptions() interface{} { + return c.opts +} + +// implements Cookie +func (c *cookie) Freeze(ctx context.Context) error { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"}) + log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker pause") + + err := c.drv.docker.PauseContainer(c.task.Id(), ctx) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error pausing container") + } + return err +} + +// implements Cookie +func (c *cookie) Unfreeze(ctx context.Context) error { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"}) + log.WithFields(logrus.Fields{"call_id": c.task.Id()}).Debug("docker unpause") + + err := c.drv.docker.UnpauseContainer(c.task.Id(), ctx) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.task.Id()}).Error("error unpausing container") + } + return err +} + +var _ drivers.Cookie = &cookie{} diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index a3e4097c2..760518014 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -144,37 +144,37 @@ func (drv *DockerDriver) Close() error { return err } -func (drv *DockerDriver) pickPool(ctx context.Context, container *docker.CreateContainerOptions) string { +func (drv *DockerDriver) pickPool(ctx context.Context, c *cookie) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"}) - if drv.pool == nil || container.HostConfig.NetworkMode != "" { - return "" + if drv.pool == nil || c.opts.HostConfig.NetworkMode != "" { + return } id, err := drv.pool.AllocPoolId() if err != nil { log.WithError(err).Error("Could not fetch pre fork pool container") - return "" + return } // We are able to fetch a container from pool. Now, use its // network, ipc and pid namespaces. - container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id) - //container.HostConfig.IpcMode = linker - //container.HostConfig.PidMode = linker - return id + c.opts.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id) + //c.opts.HostConfig.IpcMode = linker + //c.opts.HostConfig.PidMode = linker + c.poolId = id } -func (drv *DockerDriver) unpickPool(poolId string) { - if poolId != "" && drv.pool != nil { - drv.pool.FreePoolId(poolId) +func (drv *DockerDriver) unpickPool(c *cookie) { + if c.poolId != "" && drv.pool != nil { + drv.pool.FreePoolId(c.poolId) } } -func (drv *DockerDriver) pickNetwork(container *docker.CreateContainerOptions) string { +func (drv *DockerDriver) pickNetwork(c *cookie) { - if len(drv.networks) == 0 || container.HostConfig.NetworkMode != "" { - return "" + if len(drv.networks) == 0 || c.opts.HostConfig.NetworkMode != "" { + return } var id string @@ -190,132 +190,23 @@ func (drv *DockerDriver) pickNetwork(container *docker.CreateContainerOptions) s drv.networks[id]++ drv.networksLock.Unlock() - container.HostConfig.NetworkMode = id - return id + c.opts.HostConfig.NetworkMode = id + c.netId = id } -func (drv *DockerDriver) unpickNetwork(netId string) { - if netId != "" { - drv.networksLock.Lock() - drv.networks[netId]-- - drv.networksLock.Unlock() +func (drv *DockerDriver) unpickNetwork(c *cookie) { + if c.netId != "" { + c.drv.networksLock.Lock() + c.drv.networks[c.netId]-- + c.drv.networksLock.Unlock() } } -func (drv *DockerDriver) configureFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if task.FsSize() != 0 { - // If defined, impose file system size limit. In MB units. - if container.HostConfig.StorageOpt == nil { - container.HostConfig.StorageOpt = make(map[string]string) - } +func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { - opt := fmt.Sprintf("%vM", task.FsSize()) + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CreateCookie"}) - log.WithFields(logrus.Fields{"size": opt, "call_id": task.Id()}).Debug("setting storage option") - container.HostConfig.StorageOpt["size"] = opt - } - - container.HostConfig.ReadonlyRootfs = drv.conf.EnableReadOnlyRootFs -} - -func (drv *DockerDriver) configureTmpFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if task.TmpFsSize() == 0 && !drv.conf.EnableReadOnlyRootFs { - return - } - - if container.HostConfig.Tmpfs == nil { - container.HostConfig.Tmpfs = make(map[string]string) - } - - var tmpFsOption string - if task.TmpFsSize() != 0 { - if drv.conf.MaxTmpFsInodes != 0 { - tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", task.TmpFsSize(), drv.conf.MaxTmpFsInodes) - } else { - tmpFsOption = fmt.Sprintf("size=%dm", task.TmpFsSize()) - } - } - target := "/tmp" - - log.WithFields(logrus.Fields{"target": target, "options": tmpFsOption, "call_id": task.Id()}).Debug("setting tmpfs") - container.HostConfig.Tmpfs[target] = tmpFsOption -} - -func (drv *DockerDriver) configureVolumes(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if len(task.Volumes()) == 0 { - return - } - - if container.Config.Volumes == nil { - container.Config.Volumes = map[string]struct{}{} - } - - for _, mapping := range task.Volumes() { - hostDir := mapping[0] - containerDir := mapping[1] - container.Config.Volumes[containerDir] = struct{}{} - mapn := fmt.Sprintf("%s:%s", hostDir, containerDir) - container.HostConfig.Binds = append(container.HostConfig.Binds, mapn) - log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes") - } -} - -func (drv *DockerDriver) configureCPU(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) - // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, - // which is approx 8 CPUS in CFS world. - // Also see docker run options --cpu-quota and --cpu-period - if task.CPUs() == 0 { - return - } - - quota := int64(task.CPUs() * 100) - period := int64(100000) - - log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": task.Id()}).Debug("setting CPU") - container.HostConfig.CPUQuota = quota - container.HostConfig.CPUPeriod = period -} - -func (drv *DockerDriver) configureWorkDir(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if wd := task.WorkDir(); wd != "" { - log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir") - container.Config.WorkingDir = wd - } -} - -func (drv *DockerDriver) configureHostname(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if container.HostConfig.NetworkMode == "" { - // hostname and container NetworkMode is not compatible. - log.WithFields(logrus.Fields{"hostname": drv.hostname, "call_id": task.Id()}).Debug("setting hostname") - container.Config.Hostname = drv.hostname - } -} - -func (drv *DockerDriver) configureCmd(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - if task.Command() == "" { - return - } - - // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests - cmd := strings.Fields(task.Command()) - log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") - container.Config.Cmd = cmd -} - -func (drv *DockerDriver) configureEnv(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { - envvars := make([]string, 0, len(task.EnvVars())) - for name, val := range task.EnvVars() { - envvars = append(envvars, name+"="+val) - } - - container.Config.Env = envvars -} - -func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) - - container := docker.CreateContainerOptions{ + opts := docker.CreateContainerOptions{ Name: task.Id(), Config: &docker.Config{ Memory: int64(task.Memory()), @@ -333,91 +224,63 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask LogConfig: docker.LogConfig{ Type: "none", }, + ReadonlyRootfs: drv.conf.EnableReadOnlyRootFs, }, Context: ctx, } - drv.configureCmd(log, &container, task) - drv.configureEnv(log, &container, task) - drv.configureCPU(log, &container, task) - drv.configureFs(log, &container, task) - drv.configureTmpFs(log, &container, task) - drv.configureVolumes(log, &container, task) - drv.configureWorkDir(log, &container, task) - - poolId := drv.pickPool(ctx, &container) - netId := drv.pickNetwork(&container) - - drv.configureHostname(log, &container, task) - - err := drv.ensureImage(ctx, task) - if err != nil { - drv.unpickPool(poolId) - drv.unpickNetwork(netId) - return nil, err + cookie := &cookie{ + opts: opts, + task: task, + drv: drv, } - _, err = drv.docker.CreateContainer(container) + cookie.configureCmd(log) + cookie.configureEnv(log) + cookie.configureCPU(log) + cookie.configureFsSize(log) + cookie.configureTmpFs(log) + cookie.configureVolumes(log) + cookie.configureWorkDir(log) + + // Order is important, if pool is enabled, it overrides pick network + drv.pickPool(ctx, cookie) + drv.pickNetwork(cookie) + + // Order is important, Hostname doesn't play well with Network config + cookie.configureHostname(log) + + return cookie, nil +} + +func (drv *DockerDriver) PrepareCookie(ctx context.Context, c drivers.Cookie) error { + + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "PrepareCookie"}) + cookie, ok := c.(*cookie) + if !ok { + return errors.New("unknown cookie implementation") + } + + err := drv.ensureImage(ctx, cookie.task) + if err != nil { + return err + } + + _, err = drv.docker.CreateContainer(cookie.opts) if err != nil { // since we retry under the hood, if the container gets created and retry fails, we can just ignore error if err != docker.ErrContainerAlreadyExists { - log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory, - "cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name, - "image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name, + log.WithFields(logrus.Fields{"call_id": cookie.task.Id(), "command": cookie.opts.Config.Cmd, "memory": cookie.opts.Config.Memory, + "cpu_quota": cookie.task.CPUs(), "hostname": cookie.opts.Config.Hostname, "name": cookie.opts.Name, + "image": cookie.opts.Config.Image, "volumes": cookie.opts.Config.Volumes, "binds": cookie.opts.HostConfig.Binds, "container": cookie.opts.Name, }).WithError(err).Error("Could not create container") - // NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container - drv.unpickPool(poolId) - drv.unpickNetwork(netId) - return nil, err + return err } } // discard removal error - return &cookie{id: task.Id(), task: task, drv: drv, poolId: poolId, netId: netId}, nil -} - -type cookie struct { - id string - poolId string - netId string - task drivers.ContainerTask - drv *DockerDriver -} - -func (c *cookie) Close(ctx context.Context) error { - defer func() { - c.drv.unpickPool(c.poolId) - c.drv.unpickNetwork(c.netId) - }() - - return c.drv.removeContainer(ctx, c.id) -} - -func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { - return c.drv.run(ctx, c.id, c.task) -} - -func (c *cookie) Freeze(ctx context.Context) error { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"}) - log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker pause") - - err := c.drv.docker.PauseContainer(c.id, ctx) - if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error pausing container") - } - return err -} - -func (c *cookie) Unfreeze(ctx context.Context) error { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"}) - log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker unpause") - - err := c.drv.docker.UnpauseContainer(c.id, ctx) - if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error unpausing container") - } - return err + return nil } func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error { @@ -782,3 +645,5 @@ func (w *waitResult) wait(ctx context.Context) (status string, err error) { return drivers.StatusKilled, models.NewAPIError(http.StatusBadGateway, err) } } + +var _ drivers.Driver = &DockerDriver{} diff --git a/api/agent/drivers/docker/docker_pool.go b/api/agent/drivers/docker/docker_pool.go index 788085d89..45c16dd69 100644 --- a/api/agent/drivers/docker/docker_pool.go +++ b/api/agent/drivers/docker/docker_pool.go @@ -70,6 +70,7 @@ func (c *poolTask) Memory() uint64 { return 0 func (c *poolTask) CPUs() uint64 { return 0 } func (c *poolTask) FsSize() uint64 { return 0 } func (c *poolTask) TmpFsSize() uint64 { return 0 } +func (c *poolTask) Extensions() map[string]string { return nil } func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {} type dockerPoolItem struct { diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index 1bfda3c13..ccac688ac 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -22,7 +22,6 @@ func (f *taskDockerTest) Command() string { return "" } func (f *taskDockerTest) EnvVars() map[string]string { return map[string]string{"FN_FORMAT": "default"} } -func (f *taskDockerTest) Labels() map[string]string { return nil } func (f *taskDockerTest) Id() string { return f.id } func (f *taskDockerTest) Group() string { return "" } func (f *taskDockerTest) Image() string { return "fnproject/fn-test-utils" } @@ -37,6 +36,7 @@ func (f *taskDockerTest) TmpFsSize() uint64 { return 0 } func (f *taskDockerTest) WorkDir() string { return "" } func (f *taskDockerTest) Close() {} func (f *taskDockerTest) Input() io.Reader { return f.input } +func (f *taskDockerTest) Extensions() map[string]string { return nil } func TestRunnerDocker(t *testing.T) { dkr := NewDocker(drivers.Config{}) @@ -46,11 +46,17 @@ func TestRunnerDocker(t *testing.T) { task := &taskDockerTest{"test-docker", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors} - cookie, err := dkr.Prepare(ctx, task) + cookie, err := dkr.CreateCookie(ctx, task) + if err != nil { + t.Fatal("Couldn't create task cookie") + } + + defer cookie.Close(ctx) + + err = dkr.PrepareCookie(ctx, cookie) if err != nil { t.Fatal("Couldn't prepare task test") } - defer cookie.Close(ctx) waiter, err := cookie.Run(ctx) if err != nil { @@ -80,13 +86,26 @@ func TestRunnerDockerNetworks(t *testing.T) { task1 := &taskDockerTest{"test-docker1", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors} task2 := &taskDockerTest{"test-docker2", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors} - cookie1, err := dkr.Prepare(ctx, task1) + cookie1, err := dkr.CreateCookie(ctx, task1) + if err != nil { + t.Fatal("Couldn't create task1 cookie") + } + + defer cookie1.Close(ctx) + + err = dkr.PrepareCookie(ctx, cookie1) if err != nil { t.Fatal("Couldn't prepare task1 test") } - defer cookie1.Close(ctx) - cookie2, err := dkr.Prepare(ctx, task2) + cookie2, err := dkr.CreateCookie(ctx, task2) + if err != nil { + t.Fatal("Couldn't create task2 cookie") + } + + defer cookie2.Close(ctx) + + err = dkr.PrepareCookie(ctx, cookie2) if err != nil { t.Fatal("Couldn't prepare task2 test") } @@ -141,11 +160,17 @@ func TestRunnerDockerStdin(t *testing.T) { task := &taskDockerTest{"test-docker-stdin", bytes.NewBufferString(input), &output, &errors} - cookie, err := dkr.Prepare(ctx, task) + cookie, err := dkr.CreateCookie(ctx, task) + if err != nil { + t.Fatal("Couldn't create task cookie") + } + + defer cookie.Close(ctx) + + err = dkr.PrepareCookie(ctx, cookie) if err != nil { t.Fatal("Couldn't prepare task test") } - defer cookie.Close(ctx) waiter, err := cookie.Run(ctx) if err != nil { diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index a25ca6507..b5977fa9d 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -40,6 +40,12 @@ type Cookie interface { // Unfreeze a frozen container to unpause frozen processes Unfreeze(ctx context.Context) error + + // Fetch driver specific container configuration. Use this to + // access the container create options. If Driver.Prepare() is not + // yet called with the cookie, then this can be used to modify container + // create options. + ContainerOptions() interface{} } type WaitResult interface { @@ -51,7 +57,11 @@ type WaitResult interface { } type Driver interface { - // Prepare can be used in order to do any preparation that a specific driver + // Create a new cookie with defaults and/or settings from container task. + // Callers should Close the cookie regardless of whether they prepare or run it. + CreateCookie(ctx context.Context, task ContainerTask) (Cookie, error) + + // PrepareCookie can be used in order to do any preparation that a specific driver // may need to do before running the task, and can be useful to put // preparation that the task can recover from into (i.e. if pulling an image // fails because a registry is down, the task doesn't need to be failed). It @@ -59,7 +69,7 @@ type Driver interface { // Callers should Close the cookie regardless of whether they run it. // // The returned cookie should respect the task's timeout when it is run. - Prepare(ctx context.Context, task ContainerTask) (Cookie, error) + PrepareCookie(ctx context.Context, cookie Cookie) error // close & shutdown the driver Close() error @@ -129,6 +139,9 @@ type ContainerTask interface { // Close is used to perform cleanup after task execution. // Close should be safe to call multiple times. Close() + + // Extra Configuration Options + Extensions() map[string]string } // Stat is a bucket of stats from a driver at a point in time for a certain task. diff --git a/api/agent/drivers/mock/mocker.go b/api/agent/drivers/mock/mocker.go index b4199539f..dfe44739b 100644 --- a/api/agent/drivers/mock/mocker.go +++ b/api/agent/drivers/mock/mocker.go @@ -17,14 +17,20 @@ type Mocker struct { count int } -func (m *Mocker) Prepare(context.Context, drivers.ContainerTask) (drivers.Cookie, error) { +func (m *Mocker) CreateCookie(context.Context, drivers.ContainerTask) (drivers.Cookie, error) { return &cookie{m}, nil } +func (m *Mocker) PrepareCookie(context.Context, drivers.Cookie) error { + return nil +} + func (m *Mocker) Close() error { return nil } +var _ drivers.Driver = &Mocker{} + type cookie struct { m *Mocker } @@ -39,6 +45,8 @@ func (c *cookie) Unfreeze(context.Context) error { func (c *cookie) Close(context.Context) error { return nil } +func (c *cookie) ContainerOptions() interface{} { return nil } + func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { c.m.count++ if c.m.count%100 == 0 { @@ -51,6 +59,8 @@ func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { }, nil } +var _ drivers.Cookie = &cookie{} + type runResult struct { err error status string diff --git a/api/agent/grpc/runner.pb.go b/api/agent/grpc/runner.pb.go index 1a997c5ba..3059aac06 100644 --- a/api/agent/grpc/runner.pb.go +++ b/api/agent/grpc/runner.pb.go @@ -43,8 +43,9 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // Request to allocate a slot for a call type TryCall struct { - ModelsCallJson string `protobuf:"bytes,1,opt,name=models_call_json,json=modelsCallJson" json:"models_call_json,omitempty"` - SlotHashId string `protobuf:"bytes,2,opt,name=slot_hash_id,json=slotHashId" json:"slot_hash_id,omitempty"` + ModelsCallJson string `protobuf:"bytes,1,opt,name=models_call_json,json=modelsCallJson" json:"models_call_json,omitempty"` + SlotHashId string `protobuf:"bytes,2,opt,name=slot_hash_id,json=slotHashId" json:"slot_hash_id,omitempty"` + Extensions map[string]string `protobuf:"bytes,3,rep,name=extensions" json:"extensions,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` } func (m *TryCall) Reset() { *m = TryCall{} } @@ -66,6 +67,13 @@ func (m *TryCall) GetSlotHashId() string { return "" } +func (m *TryCall) GetExtensions() map[string]string { + if m != nil { + return m.Extensions + } + return nil +} + // Data sent C2S and S2C - as soon as the runner sees the first of these it // will start running. If empty content, there must be one of these with eof. // The runner will send these for the body of the response, AFTER it has sent @@ -713,39 +721,41 @@ var _RunnerProtocol_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("runner.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 529 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x5d, 0x8b, 0x13, 0x31, - 0x14, 0xed, 0xf4, 0xbb, 0x77, 0x66, 0xd7, 0x12, 0x44, 0x86, 0xba, 0x60, 0x19, 0x3f, 0x28, 0x08, - 0xa9, 0x56, 0x7d, 0xf5, 0xc1, 0xba, 0xcb, 0x28, 0x2c, 0x48, 0xaa, 0xbe, 0x0e, 0xe9, 0x24, 0x9d, - 0x19, 0x4d, 0x27, 0x25, 0xc9, 0x2c, 0x14, 0xfc, 0x23, 0xfe, 0x5b, 0x49, 0x66, 0x3a, 0x5b, 0x7c, - 0xd9, 0xb7, 0x9c, 0x7b, 0x72, 0x3f, 0xce, 0xb9, 0x09, 0x04, 0xaa, 0x2a, 0x4b, 0xae, 0xf0, 0x41, - 0x49, 0x23, 0x67, 0x4f, 0x33, 0x29, 0x33, 0xc1, 0x97, 0x0e, 0x6d, 0xab, 0xdd, 0x92, 0xef, 0x0f, - 0xe6, 0x58, 0x93, 0xd1, 0x0f, 0x18, 0x7d, 0x57, 0xc7, 0x35, 0x15, 0x02, 0x2d, 0x60, 0xba, 0x97, - 0x8c, 0x0b, 0x9d, 0xa4, 0x54, 0x88, 0xe4, 0x97, 0x96, 0x65, 0xe8, 0xcd, 0xbd, 0xc5, 0x84, 0x5c, - 0xd6, 0x71, 0x7b, 0xeb, 0xab, 0x96, 0x25, 0x9a, 0x43, 0xa0, 0x85, 0x34, 0x49, 0x4e, 0x75, 0x9e, - 0x14, 0x2c, 0xec, 0xba, 0x5b, 0x60, 0x63, 0x31, 0xd5, 0xf9, 0x17, 0x16, 0xbd, 0x85, 0xc9, 0x67, - 0x6a, 0xe8, 0x8d, 0xa2, 0x7b, 0x8e, 0x10, 0xf4, 0x19, 0x35, 0xd4, 0x15, 0x0b, 0x88, 0x3b, 0xa3, - 0x29, 0xf4, 0xb8, 0xdc, 0xb9, 0xcc, 0x31, 0xb1, 0xc7, 0xe8, 0x3d, 0x40, 0x6c, 0xcc, 0x21, 0xe6, - 0x94, 0x71, 0x65, 0xf9, 0xdf, 0xfc, 0xd8, 0xf4, 0xb7, 0x47, 0xf4, 0x18, 0x06, 0x77, 0x54, 0x54, - 0xbc, 0xe9, 0x56, 0x83, 0xe8, 0x27, 0x04, 0x36, 0x8b, 0x70, 0x7d, 0xb8, 0xe5, 0x86, 0xa2, 0x67, - 0xe0, 0x6b, 0x43, 0x4d, 0xa5, 0x93, 0x54, 0x32, 0xee, 0xf2, 0x07, 0x04, 0xea, 0xd0, 0x5a, 0x32, - 0x8e, 0x5e, 0xc2, 0x28, 0x77, 0x2d, 0x74, 0xd8, 0x9d, 0xf7, 0x16, 0xfe, 0xca, 0xc7, 0xf7, 0x6d, - 0xc9, 0x89, 0x8b, 0x3e, 0xc2, 0x23, 0x2b, 0x97, 0x70, 0x5d, 0x09, 0xb3, 0x31, 0x54, 0x19, 0xf4, - 0x1c, 0xfa, 0xb9, 0x31, 0x87, 0x90, 0xcd, 0xbd, 0x85, 0xbf, 0xba, 0xc0, 0xe7, 0x7d, 0xe3, 0x0e, - 0x71, 0xe4, 0xa7, 0x21, 0xf4, 0xf7, 0xdc, 0xd0, 0xe8, 0x0f, 0x04, 0x36, 0xff, 0xa6, 0x28, 0x0b, - 0x9d, 0x73, 0x86, 0x42, 0x18, 0xe9, 0x2a, 0x4d, 0xb9, 0xd6, 0x6e, 0xa6, 0x31, 0x39, 0x41, 0xcb, - 0x30, 0x6e, 0x68, 0x21, 0x74, 0xa3, 0xec, 0x04, 0xd1, 0x15, 0x4c, 0xb8, 0x52, 0x52, 0xd9, 0xb9, - 0xc3, 0x9e, 0x53, 0x72, 0x1f, 0x40, 0x33, 0x18, 0x3b, 0xb0, 0x31, 0x2a, 0xec, 0xbb, 0xc4, 0x16, - 0x47, 0x1b, 0x98, 0xac, 0x45, 0xc1, 0x4b, 0x73, 0xab, 0x33, 0x74, 0x05, 0x3d, 0xa3, 0x6a, 0x2b, - 0xfd, 0xd5, 0x18, 0x37, 0xeb, 0x8e, 0x3b, 0xc4, 0x86, 0xd1, 0xbc, 0x59, 0x4e, 0xd7, 0xd1, 0x80, - 0xdb, 0xb5, 0x59, 0x49, 0x96, 0xb1, 0x92, 0xb6, 0x92, 0x1d, 0xa3, 0xbf, 0x1e, 0x4c, 0x88, 0x7b, - 0x58, 0xb6, 0xea, 0x07, 0x08, 0x94, 0x33, 0x27, 0xd1, 0xd6, 0x9d, 0xa6, 0xfc, 0x14, 0xff, 0xe7, - 0x5a, 0xdc, 0x21, 0xbe, 0x3a, 0x33, 0xf1, 0xc1, 0x76, 0xe8, 0x35, 0x8c, 0x77, 0x8d, 0x6b, 0x4e, - 0xb4, 0xb5, 0xfa, 0xdc, 0xca, 0xb8, 0x43, 0xda, 0x0b, 0xed, 0x6c, 0xaf, 0x20, 0xa8, 0x47, 0xdb, - 0xb8, 0x4d, 0xa3, 0x27, 0x30, 0xa4, 0xa9, 0x29, 0xee, 0xea, 0xd7, 0x32, 0x20, 0x0d, 0x5a, 0x65, - 0x70, 0x59, 0xdf, 0xfb, 0x66, 0x5f, 0x7f, 0x2a, 0x05, 0x7a, 0x01, 0xc3, 0xeb, 0x32, 0xa3, 0x19, - 0x47, 0x80, 0x5b, 0xcf, 0x66, 0x80, 0x5b, 0xa5, 0x0b, 0xef, 0x8d, 0x87, 0x96, 0x30, 0x3c, 0x55, - 0xc6, 0xf5, 0x77, 0xc2, 0xa7, 0xef, 0x84, 0xaf, 0xed, 0x77, 0x9a, 0x5d, 0xe0, 0xf3, 0x01, 0xb6, - 0x43, 0x47, 0xbf, 0xfb, 0x17, 0x00, 0x00, 0xff, 0xff, 0x0a, 0xb4, 0xdf, 0x3c, 0x8b, 0x03, 0x00, - 0x00, + // 576 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xdb, 0x8a, 0x13, 0x4d, + 0x10, 0xce, 0x24, 0xd9, 0x6c, 0x52, 0x99, 0x3d, 0xd0, 0xfc, 0xfc, 0x0c, 0x71, 0xc1, 0x30, 0x1e, + 0x08, 0x08, 0xbd, 0x1a, 0x15, 0x16, 0x41, 0x2f, 0x5c, 0xb3, 0x8c, 0xc2, 0x82, 0x74, 0xc4, 0xdb, + 0xd0, 0x3b, 0x53, 0x99, 0x19, 0xed, 0x4c, 0x87, 0xee, 0x9e, 0xc5, 0x80, 0x2f, 0xe2, 0x4b, 0xf9, + 0x4c, 0xd2, 0x3d, 0x87, 0x0d, 0x7b, 0xa3, 0x77, 0x5d, 0xf5, 0x55, 0xd5, 0x57, 0xf5, 0x55, 0x17, + 0xf8, 0xaa, 0x2c, 0x0a, 0x54, 0x74, 0xab, 0xa4, 0x91, 0x93, 0x07, 0xa9, 0x94, 0xa9, 0xc0, 0x73, + 0x67, 0xdd, 0x94, 0xeb, 0x73, 0xdc, 0x6c, 0xcd, 0xae, 0x02, 0xc3, 0xdf, 0x1e, 0x1c, 0x7e, 0x51, + 0xbb, 0x4b, 0x2e, 0x04, 0x99, 0xc1, 0xe9, 0x46, 0x26, 0x28, 0xf4, 0x2a, 0xe6, 0x42, 0xac, 0xbe, + 0x69, 0x59, 0x04, 0xde, 0xd4, 0x9b, 0x8d, 0xd8, 0x71, 0xe5, 0xb7, 0x51, 0x9f, 0xb4, 0x2c, 0xc8, + 0x14, 0x7c, 0x2d, 0xa4, 0x59, 0x65, 0x5c, 0x67, 0xab, 0x3c, 0x09, 0xba, 0x2e, 0x0a, 0xac, 0x2f, + 0xe2, 0x3a, 0xfb, 0x98, 0x90, 0x0b, 0x00, 0xfc, 0x61, 0xb0, 0xd0, 0xb9, 0x2c, 0x74, 0xd0, 0x9b, + 0xf6, 0x66, 0xe3, 0x79, 0x40, 0x6b, 0x26, 0xba, 0x68, 0xa1, 0x45, 0x61, 0xd4, 0x8e, 0xed, 0xc5, + 0x4e, 0xde, 0xc2, 0xc9, 0x3d, 0x98, 0x9c, 0x42, 0xef, 0x3b, 0xee, 0xea, 0x5e, 0xec, 0x93, 0xfc, + 0x07, 0x07, 0xb7, 0x5c, 0x94, 0x58, 0x33, 0x57, 0xc6, 0x9b, 0xee, 0x85, 0x17, 0xbe, 0x80, 0xd1, + 0x07, 0x6e, 0xf8, 0x95, 0xe2, 0x1b, 0x24, 0x04, 0xfa, 0x09, 0x37, 0xdc, 0x65, 0xfa, 0xcc, 0xbd, + 0x6d, 0x31, 0x94, 0x6b, 0x97, 0x38, 0x64, 0xf6, 0x19, 0xbe, 0x02, 0x88, 0x8c, 0xd9, 0x46, 0xc8, + 0x13, 0x54, 0xff, 0x4a, 0x16, 0x7e, 0x05, 0xdf, 0x66, 0x31, 0xd4, 0xdb, 0x6b, 0x34, 0x9c, 0x3c, + 0x84, 0xb1, 0x36, 0xdc, 0x94, 0x7a, 0x15, 0xcb, 0x04, 0x5d, 0xfe, 0x01, 0x83, 0xca, 0x75, 0x29, + 0x13, 0x24, 0x4f, 0xe0, 0x30, 0x73, 0x14, 0x3a, 0xe8, 0x3a, 0x3d, 0xc6, 0xf4, 0x8e, 0x96, 0x35, + 0x58, 0xf8, 0x0e, 0x4e, 0xac, 0x46, 0x0c, 0x75, 0x29, 0xcc, 0xd2, 0x70, 0x65, 0xc8, 0x23, 0xe8, + 0x67, 0xc6, 0x6c, 0x83, 0x64, 0xea, 0xcd, 0xc6, 0xf3, 0x23, 0xba, 0xcf, 0x1b, 0x75, 0x98, 0x03, + 0xdf, 0x0f, 0xa0, 0xbf, 0x41, 0xc3, 0xc3, 0x9f, 0xe0, 0xdb, 0xfc, 0xab, 0xbc, 0xc8, 0x75, 0x86, + 0x09, 0x09, 0xe0, 0x50, 0x97, 0x71, 0x8c, 0x5a, 0xbb, 0x9e, 0x86, 0xac, 0x31, 0x2d, 0x92, 0xa0, + 0xe1, 0xb9, 0xd0, 0xf5, 0x64, 0x8d, 0x49, 0xce, 0x60, 0x84, 0x4a, 0x49, 0x65, 0xfb, 0x0e, 0x7a, + 0x6e, 0x92, 0x3b, 0x07, 0x99, 0xc0, 0xd0, 0x19, 0x4b, 0xa3, 0x82, 0xbe, 0x4b, 0x6c, 0xed, 0x70, + 0x09, 0xa3, 0x4b, 0x91, 0x63, 0x61, 0xae, 0x75, 0x4a, 0xce, 0xa0, 0x67, 0x54, 0x25, 0xe5, 0x78, + 0x3e, 0x6c, 0xb6, 0x1f, 0x75, 0x98, 0x75, 0x93, 0x69, 0xbd, 0x9c, 0xae, 0x83, 0x81, 0xb6, 0x6b, + 0xb3, 0x23, 0x59, 0xc4, 0x8e, 0x74, 0x23, 0x93, 0x5d, 0xf8, 0xcb, 0x83, 0x11, 0x73, 0x5f, 0xda, + 0x56, 0x7d, 0x0d, 0xbe, 0x72, 0xe2, 0xac, 0xb4, 0x55, 0xa7, 0x2e, 0x7f, 0x4a, 0xef, 0xa9, 0x16, + 0x75, 0xd8, 0x58, 0xed, 0x89, 0xf8, 0x57, 0x3a, 0xf2, 0x0c, 0x86, 0xeb, 0x5a, 0x35, 0x37, 0xb4, + 0x95, 0x7a, 0x5f, 0xca, 0xa8, 0xc3, 0xda, 0x80, 0xb6, 0xb7, 0xa7, 0xe0, 0x57, 0xad, 0x2d, 0xdd, + 0xa6, 0xc9, 0xff, 0x30, 0xe0, 0xb1, 0xc9, 0x6f, 0xab, 0xdf, 0x72, 0xc0, 0x6a, 0x6b, 0x9e, 0xc2, + 0x71, 0x15, 0xf7, 0xd9, 0xde, 0x5d, 0x2c, 0x05, 0x79, 0x0c, 0x83, 0x45, 0x91, 0xf2, 0x14, 0x09, + 0xd0, 0x56, 0xb3, 0x09, 0xd0, 0x76, 0xd2, 0x99, 0xf7, 0xdc, 0x23, 0xe7, 0x30, 0x68, 0x2a, 0xd3, + 0xea, 0x90, 0x69, 0x73, 0xc8, 0x74, 0x61, 0x0f, 0x79, 0x72, 0x44, 0xf7, 0x1b, 0xb8, 0x19, 0x38, + 0xf8, 0xe5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x6e, 0xb0, 0x89, 0x05, 0x04, 0x00, 0x00, } diff --git a/api/agent/grpc/runner.proto b/api/agent/grpc/runner.proto index 569a5c9d7..e174e631b 100644 --- a/api/agent/grpc/runner.proto +++ b/api/agent/grpc/runner.proto @@ -6,6 +6,7 @@ import "google/protobuf/empty.proto"; message TryCall { string models_call_json = 1; string slot_hash_id = 2; + map extensions = 3; } // Data sent C2S and S2C - as soon as the runner sees the first of these it diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 2bbf0faa7..dbea7fe9a 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -17,34 +17,49 @@ import ( "github.com/fnproject/fn/fnext" ) +type CallOverrider func(*models.Call, map[string]string) (map[string]string, error) + type lbAgent struct { cfg AgentConfig da DataAccess callListeners []fnext.CallListener rp pool.RunnerPool placer pool.Placer - - shutWg *common.WaitGroup - callEndCount int64 + callOverrider CallOverrider + shutWg *common.WaitGroup + callEndCount int64 } -func NewLBAgentConfig() (*AgentConfig, error) { +type LBAgentOption func(*lbAgent) error + +func WithLBAgentConfig(cfg *AgentConfig) LBAgentOption { + return func(a *lbAgent) error { + a.cfg = *cfg + return nil + } +} + +// LB agents can use this to register a CallOverrider to modify a Call and extensions +func WithCallOverrider(fn CallOverrider) LBAgentOption { + return func(a *lbAgent) error { + if a.callOverrider != nil { + return errors.New("lb-agent call overriders already exists") + } + a.callOverrider = fn + return nil + } +} + +// NewLBAgent creates an Agent that knows how to load-balance function calls +// across a group of runner nodes. +func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error) { + + // Yes, LBAgent and Agent both use an AgentConfig. cfg, err := NewAgentConfig() if err != nil { - return cfg, err + logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) } - if cfg.MaxRequestSize == 0 { - return cfg, errors.New("lb-agent requires MaxRequestSize limit") - } - if cfg.MaxResponseSize == 0 { - return cfg, errors.New("lb-agent requires MaxResponseSize limit") - } - return cfg, nil -} -// NewLBAgentWithConfig creates an Agent configured with a supplied AgentConfig -func NewLBAgentWithConfig(da DataAccess, rp pool.RunnerPool, p pool.Placer, cfg *AgentConfig) (Agent, error) { - logrus.Infof("lb-agent starting cfg=%+v", cfg) a := &lbAgent{ cfg: *cfg, da: da, @@ -52,47 +67,52 @@ func NewLBAgentWithConfig(da DataAccess, rp pool.RunnerPool, p pool.Placer, cfg placer: p, shutWg: common.NewWaitGroup(), } + + // Allow overriding config + for _, option := range options { + err = option(a) + if err != nil { + logrus.WithError(err).Fatalf("error in lb-agent options") + } + } + + logrus.Infof("lb-agent starting cfg=%+v", a.cfg) return a, nil } -// NewLBAgent creates an Agent that knows how to load-balance function calls -// across a group of runner nodes. -func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) { - - // TODO: Move the constants above to Agent Config or an LB specific LBAgentConfig - cfg, err := NewLBAgentConfig() - if err != nil { - logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) - } - return NewLBAgentWithConfig(da, rp, p, cfg) -} - +// implements Agent func (a *lbAgent) AddCallListener(listener fnext.CallListener) { a.callListeners = append(a.callListeners, listener) } +// implements callTrigger func (a *lbAgent) fireBeforeCall(ctx context.Context, call *models.Call) error { return fireBeforeCallFun(a.callListeners, ctx, call) } +// implements callTrigger func (a *lbAgent) fireAfterCall(ctx context.Context, call *models.Call) error { return fireAfterCallFun(a.callListeners, ctx, call) } +// implements Agent // GetAppID is to get the match of an app name to its ID func (a *lbAgent) GetAppID(ctx context.Context, appName string) (string, error) { return a.da.GetAppID(ctx, appName) } +// implements Agent // GetAppByID is to get the app by ID func (a *lbAgent) GetAppByID(ctx context.Context, appID string) (*models.App, error) { return a.da.GetAppByID(ctx, appID) } +// implements Agent func (a *lbAgent) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { return a.da.GetRoute(ctx, appID, path) } +// implements Agent func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { var c call @@ -108,6 +128,16 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } + // If overrider is present, let's allow it to modify models.Call + // and call extensions + if a.callOverrider != nil { + ext, err := a.callOverrider(c.Call, c.extensions) + if err != nil { + return nil, err + } + c.extensions = ext + } + err := setMaxBodyLimit(&a.cfg, &c) if err != nil { return nil, err @@ -120,10 +150,10 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { c.ct = a c.stderr = &nullReadWriter{} c.slotHashId = getSlotQueueKey(&c) - return &c, nil } +// implements Agent func (a *lbAgent) Close() error { // start closing the front gate first @@ -140,6 +170,7 @@ func (a *lbAgent) Close() error { return err } +// implements Agent func (a *lbAgent) Submit(callI Call) error { if !a.shutWg.AddSession(1) { return models.ErrCallTimeoutServerBusy @@ -229,6 +260,7 @@ func (a *lbAgent) setRequestBody(ctx context.Context, call *call) (*bytes.Buffer } } +// implements Agent func (a *lbAgent) Enqueue(context.Context, *models.Call) error { logrus.Error("Enqueue not implemented") return errors.New("Enqueue not implemented") @@ -260,3 +292,6 @@ func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isSt handleStatsDequeue(ctx, err) return transformTimeout(err, true) } + +var _ Agent = &lbAgent{} +var _ callTrigger = &lbAgent{} diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 62e45ac1f..4c11b5d17 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -128,6 +128,10 @@ func (c *mockRunnerCall) SlotHashId() string { return c.slotHashId } +func (c *mockRunnerCall) Extensions() map[string]string { + return nil +} + func (c *mockRunnerCall) RequestBody() io.ReadCloser { return c.r.Body } diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 0ce1a81e9..cd58100f5 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -460,41 +460,41 @@ func (ch *callHandle) getDataMsg() *runner.DataFrame { return msg } -// TODO: decomission/remove this once dependencies are cleaned up -type CapacityGate interface { - CheckAndReserveCapacity(units uint64) error - ReleaseCapacity(units uint64) -} - // pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it // and provides the gRPC server that implements the LB <-> Runner protocol. type pureRunner struct { gRPCServer *grpc.Server - listen string + creds credentials.TransportCredentials a Agent inflight int32 } +// implements Agent func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) { return pr.a.GetAppID(ctx, appName) } +// implements Agent func (pr *pureRunner) GetAppByID(ctx context.Context, appID string) (*models.App, error) { return pr.a.GetAppByID(ctx, appID) } +// implements Agent func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) { return pr.a.GetCall(opts...) } +// implements Agent func (pr *pureRunner) GetRoute(ctx context.Context, appID string, path string) (*models.Route, error) { return pr.a.GetRoute(ctx, appID, path) } +// implements Agent func (pr *pureRunner) Submit(Call) error { return errors.New("Submit cannot be called directly in a Pure Runner.") } +// implements Agent func (pr *pureRunner) Close() error { // First stop accepting requests pr.gRPCServer.GracefulStop() @@ -506,10 +506,12 @@ func (pr *pureRunner) Close() error { return nil } +// implements Agent func (pr *pureRunner) AddCallListener(cl fnext.CallListener) { pr.a.AddCallListener(cl) } +// implements Agent func (pr *pureRunner) Enqueue(context.Context, *models.Call) error { return errors.New("Enqueue cannot be called directly in a Pure Runner.") } @@ -531,7 +533,11 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error return err } - agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), WithWriter(state), WithContext(state.ctx)) + agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), + WithWriter(state), + WithContext(state.ctx), + WithExtensions(tc.GetExtensions()), + ) if err != nil { state.enqueueCallResponse(err) return err @@ -551,6 +557,7 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error return nil } +// implements RunnerProtocolServer // Handles a client engagement func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error { grpc.EnableTracing = false @@ -605,80 +612,87 @@ DataLoop: return state.waitError() } +// implements RunnerProtocolServer func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) { return &runner.RunnerStatus{ Active: atomic.LoadInt32(&pr.inflight), }, nil } -func (pr *pureRunner) Start() error { - logrus.Info("Pure Runner listening on ", pr.listen) - lis, err := net.Listen("tcp", pr.listen) - if err != nil { - return fmt.Errorf("Could not listen on %s: %s", pr.listen, err) - } - - if err := pr.gRPCServer.Serve(lis); err != nil { - return fmt.Errorf("grpc serve error: %s", err) - } - return err -} - -func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error) { - return NewPureRunner(cancel, addr, da, "", "", "", nil) -} - func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (Agent, error) { - return NewPureRunner(cancel, addr, da, cert, key, ca, nil) + + agent := New(da, WithoutAsyncDequeue()) + + // WARNING: SSL creds are optional. + if cert == "" || key == "" || ca == "" { + return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent)) + } + return NewPureRunner(cancel, addr, PureRunnerWithAgent(agent), PureRunnerWithSSL(cert, key, ca)) } -func ValidatePureRunnerConfig() AgentOption { - return func(a *agent) error { +type PureRunnerOption func(*pureRunner) error - if a.cfg.MaxResponseSize == 0 { - return errors.New("pure runner requires MaxResponseSize limits") +func PureRunnerWithSSL(cert string, key string, ca string) PureRunnerOption { + return func(pr *pureRunner) error { + c, err := createCreds(cert, key, ca) + if err != nil { + return fmt.Errorf("Failed to create pure runner credentials: %s", err) } - if a.cfg.MaxRequestSize == 0 { - return errors.New("pure runner requires MaxRequestSize limits") - } - - // pure runner requires a non-blocking resource tracker - if !a.cfg.EnableNBResourceTracker { - return errors.New("pure runner requires EnableNBResourceTracker true") - } - + pr.creds = c return nil } } -func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, unused CapacityGate) (Agent, error) { - // TODO: gate unused, decommission/remove it after cleaning up dependencies to it. +func PureRunnerWithAgent(a Agent) PureRunnerOption { + return func(pr *pureRunner) error { + if pr.a != nil { + return errors.New("Failed to create pure runner: agent already created") + } - a := createAgent(da, ValidatePureRunnerConfig()) - var pr *pureRunner - var err error - if cert != "" && key != "" && ca != "" { - c, err := creds(cert, key, ca) + pr.a = a + return nil + } +} + +func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error) { + + pr := &pureRunner{} + + for _, option := range options { + err := option(pr) if err != nil { - logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!") - return nil, err - } - pr, err = createPureRunner(addr, a, c) - if err != nil { - return nil, err - } - } else { - logrus.Warn("Running pure runner in insecure mode!") - pr, err = createPureRunner(addr, a, nil) - if err != nil { - return nil, err + logrus.WithError(err).Fatalf("error in pure runner options") } } + if pr.a == nil { + logrus.Fatal("agent not provided in pure runner options") + } + + var opts []grpc.ServerOption + + opts = append(opts, grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor)) + opts = append(opts, grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor)) + + if pr.creds != nil { + opts = append(opts, grpc.Creds(pr.creds)) + } else { + logrus.Warn("Running pure runner in insecure mode!") + } + + pr.gRPCServer = grpc.NewServer(opts...) + runner.RegisterRunnerProtocolServer(pr.gRPCServer, pr) + + lis, err := net.Listen("tcp", addr) + if err != nil { + logrus.WithError(err).Fatalf("Could not listen on %s", addr) + } + + logrus.Info("Pure Runner listening on ", addr) + go func() { - err := pr.Start() - if err != nil { - logrus.WithError(err).Error("Failed to start pure runner") + if err := pr.gRPCServer.Serve(lis); err != nil { + logrus.WithError(err).Error("grpc serve error") cancel() } }() @@ -686,7 +700,11 @@ func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert s return pr, nil } -func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) { +func createCreds(cert string, key string, ca string) (credentials.TransportCredentials, error) { + if cert == "" || key == "" || ca == "" { + return nil, errors.New("Failed to create credentials, cert/key/ca not provided") + } + // Load the certificates from disk certificate, err := tls.LoadX509KeyPair(cert, key) if err != nil { @@ -711,25 +729,5 @@ func creds(cert string, key string, ca string) (credentials.TransportCredentials }), nil } -func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials) (*pureRunner, error) { - var srv *grpc.Server - var opts []grpc.ServerOption - - sInterceptor := grpc.StreamInterceptor(grpcutil.RIDStreamServerInterceptor) - uInterceptor := grpc.UnaryInterceptor(grpcutil.RIDUnaryServerInterceptor) - opts = append(opts, sInterceptor) - opts = append(opts, uInterceptor) - if creds != nil { - opts = append(opts, grpc.Creds(creds)) - } - srv = grpc.NewServer(opts...) - - pr := &pureRunner{ - gRPCServer: srv, - listen: addr, - a: a, - } - - runner.RegisterRunnerProtocolServer(srv, pr) - return pr, nil -} +var _ runner.RunnerProtocolServer = &pureRunner{} +var _ Agent = &pureRunner{} diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index dd60de6af..178fd60c1 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -52,6 +52,7 @@ func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool }, nil } +// implements Runner func (r *gRPCRunner) Close(context.Context) error { r.shutWg.CloseGroup() return r.conn.Close() @@ -82,6 +83,7 @@ func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.Cl return conn, protocolClient, nil } +// implements Runner func (r *gRPCRunner) Address() string { return r.address } @@ -101,6 +103,7 @@ func isTooBusy(err error) bool { return false } +// implements Runner func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) { log := common.Logger(ctx).WithField("runner_addr", r.address) @@ -135,6 +138,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ ModelsCallJson: string(modelJSON), SlotHashId: hex.EncodeToString([]byte(call.SlotHashId())), + Extensions: call.Extensions(), }}}) if err != nil { log.WithError(err).Error("Failed to send message to runner node") @@ -312,3 +316,5 @@ DataLoop: tryQueueError(ErrorPureRunnerNoEOF, done) } } + +var _ pool.Runner = &gRPCRunner{} diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index 961c76ffd..e09000493 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -42,6 +42,7 @@ type Runner interface { // processed by a RunnerPool type RunnerCall interface { SlotHashId() string + Extensions() map[string]string RequestBody() io.ReadCloser ResponseWriter() http.ResponseWriter StdErr() io.ReadWriteCloser diff --git a/test/fn-system-tests/exec_test.go b/test/fn-system-tests/exec_test.go index 6f5d99508..5acbb0bdd 100644 --- a/test/fn-system-tests/exec_test.go +++ b/test/fn-system-tests/exec_test.go @@ -17,6 +17,7 @@ import ( sdkmodels "github.com/fnproject/fn_go/models" ) +// See fn-test-utils for json response func getEchoContent(respBytes []byte) (string, error) { var respJs map[string]interface{} @@ -39,6 +40,29 @@ func getEchoContent(respBytes []byte) (string, error) { return echo, nil } +// See fn-test-utils for json response +func getConfigContent(key string, respBytes []byte) (string, error) { + + var respJs map[string]interface{} + + err := json.Unmarshal(respBytes, &respJs) + if err != nil { + return "", err + } + + cfg, ok := respJs["config"].(map[string]interface{}) + if !ok { + return "", errors.New("unexpected json: config map") + } + + val, ok := cfg[key].(string) + if !ok { + return "", fmt.Errorf("unexpected json: %s string", key) + } + + return val, nil +} + func TestCanExecuteFunction(t *testing.T) { s := apiutils.SetupHarness() s.GivenAppExists(t, &sdkmodels.App{Name: s.AppName}) @@ -79,6 +103,13 @@ func TestCanExecuteFunction(t *testing.T) { if resp.StatusCode != http.StatusOK { t.Fatalf("StatusCode check failed on %v", resp.StatusCode) } + + // Now let's check FN_CHEESE, since LB and runners have override/extension mechanism + // to insert FN_CHEESE into config + cheese, err := getConfigContent("FN_CHEESE", output.Bytes()) + if err != nil || cheese != "Tete de Moine" { + t.Fatalf("getConfigContent/FN_CHEESE check failed (%v) on %v", err, output) + } } func TestCanExecuteBigOutput(t *testing.T) { diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 318af9d44..d186d12e2 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -6,12 +6,17 @@ import ( "fmt" "github.com/fnproject/fn/api/agent" + "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/agent/hybrid" "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" pool "github.com/fnproject/fn/api/runnerpool" "github.com/fnproject/fn/api/server" _ "github.com/fnproject/fn/api/server/defaultexts" + // We need docker client here, since we have a custom driver that wraps generic + // docker driver. + "github.com/fsouza/go-dockerclient" "github.com/sirupsen/logrus" "io/ioutil" @@ -219,12 +224,14 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { keys := []string{"fn_appname", "fn_path"} pool.RegisterPlacerViews(keys) - agent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer) + // Create an LB Agent with a Call Overrider to intercept calls in GetCall(). Overrider in this example + // scrubs CPU/TmpFsSize and adds FN_CHEESE key/value into extensions. + lbAgent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer, agent.WithCallOverrider(LBCallOverrider)) if err != nil { return nil, err } - opts = append(opts, server.WithAgent(agent)) + opts = append(opts, server.WithAgent(lbAgent)) return server.New(ctx, opts...), nil } @@ -246,14 +253,30 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro return nil, err } grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) - cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", nil) + // This is our Agent config, which we will use for both inner agent and docker. + cfg, err := agent.NewAgentConfig() if err != nil { return nil, err } - opts = append(opts, server.WithAgent(prAgent), server.WithExtraCtx(cancelCtx)) + // customer driver that overrides generic docker driver + drv := &customDriver{ + drv: agent.NewDockerDriver(cfg), + } + + // inner agent for pure-runners + innerAgent := agent.New(ds, agent.WithConfig(cfg), agent.WithDockerDriver(drv), agent.WithoutAsyncDequeue()) + + cancelCtx, cancel := context.WithCancel(ctx) + + // now create pure-runner that wraps agent. + pureRunner, err := agent.NewPureRunner(cancel, grpcAddr, agent.PureRunnerWithAgent(innerAgent)) + if err != nil { + return nil, err + } + + opts = append(opts, server.WithAgent(pureRunner), server.WithExtraCtx(cancelCtx)) return server.New(ctx, opts...), nil } @@ -326,3 +349,65 @@ func TestMain(m *testing.M) { } os.Exit(result) } + +// Memory Only LB Agent Call Option +func LBCallOverrider(c *models.Call, exts map[string]string) (map[string]string, error) { + + // Set TmpFsSize and CPU to unlimited. This means LB operates on Memory + // only. Operators/Service providers are expected to override this + // and apply their own filter to set/override CPU/TmpFsSize/Memory + // and Extension variables. + c.TmpFsSize = 0 + c.CPUs = models.MilliCPUs(0) + delete(c.Config, "FN_CPUS") + + if exts == nil { + exts = make(map[string]string) + } + + // Add an FN_CHEESE extension to be intercepted and specially handled by Pure Runner customDriver below + exts["FN_CHEESE"] = "Tete de Moine" + return exts, nil +} + +// An example Pure Runner docker driver. Using CreateCookie, it intercepts a generated cookie to +// add an environment variable FN_CHEESE if it finds a FN_CHEESE extension. +type customDriver struct { + drv drivers.Driver +} + +// implements Driver +func (d *customDriver) CreateCookie(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { + cookie, err := d.drv.CreateCookie(ctx, task) + if err != nil { + return cookie, err + } + + // if call extensions include 'foo', then let's add FN_CHEESE env vars, which should + // end up in Env/Config. + ext := task.Extensions() + cheese, ok := ext["FN_CHEESE"] + if ok { + // docker driver specific data + obj := cookie.ContainerOptions() + opts, ok := obj.(docker.CreateContainerOptions) + if !ok { + logrus.Fatal("Unexpected driver, should be docker") + } + opts.Config.Env = append(opts.Config.Env, "FN_CHEESE="+cheese) + } + + return cookie, nil +} + +// implements Driver +func (d *customDriver) PrepareCookie(ctx context.Context, cookie drivers.Cookie) error { + return d.drv.PrepareCookie(ctx, cookie) +} + +// implements Driver +func (d *customDriver) Close() error { + return d.drv.Close() +} + +var _ drivers.Driver = &customDriver{}