From 584e4e75eb9fec4968cb518c73294e3c25b70446 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 5 Apr 2018 15:07:30 -0700 Subject: [PATCH] Experimental Pre-fork Pool: Recycle net ns (#890) * fn: experimental prefork recycle and other improvements *) Recycle and do not use same pool container again option. *) Two state processing: initializing versus ready (start-kill). *) Ready state is exempt from rate limiter. * fn: experimental prefork pool multiple network support In order to exceed 1023 container (bridge port) limit, add multiple networks: for i in fn-net1 fn-net2 fn-net3 fn-net4 do docker network create $i done to Docker startup, (eg. dind preentry.sh), then provide this to prefork pool using: export FN_EXPERIMENTAL_PREFORK_NETWORKS="fn-net1 fn-net2 fn-net3 fn-net4" which should be able to spawn 1023 * 4 containers. * fn: fixup tests for cfg move * fn: add ipc and pid namespaces into prefork pooling * fn: revert ipc and pid namespaces for now Pid/Ipc opens up the function container to pause container. --- api/agent/agent.go | 2 + api/agent/config.go | 23 +- api/agent/drivers/docker/docker.go | 37 ++- api/agent/drivers/docker/docker_client.go | 13 + api/agent/drivers/docker/docker_pool.go | 246 ++++++++++++------- api/agent/drivers/docker/docker_pool_test.go | 12 +- api/agent/drivers/driver.go | 2 + 7 files changed, 229 insertions(+), 106 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 0e9c4b112..1ff4434c4 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -141,6 +141,8 @@ func createAgent(da DataAccess, withDocker bool) Agent { PreForkPoolSize: cfg.PreForkPoolSize, PreForkImage: cfg.PreForkImage, PreForkCmd: cfg.PreForkCmd, + PreForkUseOnce: cfg.PreForkUseOnce, + PreForkNetworks: cfg.PreForkNetworks, }) } else { driver = mock.New() diff --git a/api/agent/config.go b/api/agent/config.go index 32ad1138b..5b2fabef4 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -25,6 +25,8 @@ type AgentConfig struct { PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` PreForkCmd string `json:"pre_fork_pool_cmd"` + PreForkUseOnce uint64 `json:"pre_fork_use_once"` + PreForkNetworks string `json:"pre_fork_networks"` } const ( @@ -43,6 +45,8 @@ const ( EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" + EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" + EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" MaxDisabledMsecs = time.Duration(math.MaxInt64) ) @@ -53,6 +57,8 @@ func NewAgentConfig() (*AgentConfig, error) { MinDockerVersion: "17.10.0-ce", MaxLogSize: 1 * 1024 * 1024, MaxCallEndStacking: 8192, + PreForkImage: "busybox", + PreForkCmd: "tail -f /dev/null", } var err error @@ -70,14 +76,15 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize) err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize) err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking) + err = setEnvStr(err, EnvPreForkImage, &cfg.PreForkImage) + err = setEnvStr(err, EnvPreForkCmd, &cfg.PreForkCmd) + err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce) + err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks) if err != nil { return cfg, err } - cfg.PreForkImage = os.Getenv(EnvPreForkImage) - cfg.PreForkCmd = os.Getenv(EnvPreForkCmd) - if cfg.EjectIdle == time.Duration(0) { return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) } @@ -89,6 +96,16 @@ func NewAgentConfig() (*AgentConfig, error) { return cfg, nil } +func setEnvStr(err error, name string, dst *string) error { + if err != nil { + return err + } + if tmp, ok := os.LookupEnv(name); ok { + *dst = tmp + } + return nil +} + func setEnvUint(err error, name string, dst *uint64) error { if err != nil { return err diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 9ecd6739b..bb0878f80 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -131,6 +131,29 @@ func (drv *DockerDriver) Close() error { return err } +func (drv *DockerDriver) tryUsePool(ctx context.Context, container *docker.CreateContainerOptions) string { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"}) + + if drv.pool != nil { + id, err := drv.pool.AllocPoolId() + if err == nil { + linker := fmt.Sprintf("container:%s", id) + // We are able to fetch a container from pool. Now, use its + // network, ipc and pid namespaces. + container.HostConfig.NetworkMode = linker + //container.HostConfig.IpcMode = linker + //container.HostConfig.PidMode = linker + return id + } + + log.WithError(err).Error("Could not fetch pre fork pool container") + } + + // hostname and container NetworkMode is not compatible. + container.Config.Hostname = drv.hostname + return "" +} + func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) var cmd []string @@ -171,19 +194,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Context: ctx, } - poolId := "" - if drv.pool != nil { - id, err := drv.pool.AllocPoolId() - if err != nil { - log.WithError(err).Error("Could not fetch pre fork pool container") - } else { - poolId = id - container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id) - } - } else { - // hostname and container NetworkMode is not compatible. - container.Config.Hostname = drv.hostname - } + poolId := drv.tryUsePool(ctx, &container) // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index 5d0a22734..e50684e4d 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -35,6 +35,7 @@ type dockerClient interface { AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) WaitContainerWithContext(id string, ctx context.Context) (int, error) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error + KillContainer(opts docker.KillContainerOptions) error CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error) RemoveContainer(opts docker.RemoveContainerOptions) error PauseContainer(id string, ctx context.Context) error @@ -336,6 +337,18 @@ func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *doc return c, err } +func (d *dockerWrap) KillContainer(opts docker.KillContainerOptions) (err error) { + ctx, span := trace.StartSpan(opts.Context, "docker_kill_container") + defer span.End() + + logger := common.Logger(ctx).WithField("docker_cmd", "KillContainer") + err = d.retry(ctx, logger, func() error { + err = d.dockerNoTimeout.KillContainer(opts) + return err + }) + return err +} + func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) (err error) { ctx, span := trace.StartSpan(opts.Context, "docker_pull_image") defer span.End() diff --git a/api/agent/drivers/docker/docker_pool.go b/api/agent/drivers/docker/docker_pool.go index 9cee36ad4..6cad9e551 100644 --- a/api/agent/drivers/docker/docker_pool.go +++ b/api/agent/drivers/docker/docker_pool.go @@ -34,20 +34,26 @@ var ( ErrorPoolEmpty = errors.New("docker pre fork pool empty") ) +type PoolTaskStateType int + +const ( + PoolTaskStateInit PoolTaskStateType = iota // initializing + PoolTaskStateReady // ready to be run +) + const ( LimitPerSec = 10 LimitBurst = 20 - DefaultImage = "busybox" - DefaultCmd = "tail -f /dev/null" - ShutdownTimeout = time.Duration(1) * time.Second ) type poolTask struct { - id string - image string - cmd string + id string + image string + cmd string + netMode string + state PoolTaskStateType } func (c *poolTask) Id() string { return c.id } @@ -65,13 +71,19 @@ func (c *poolTask) CPUs() uint64 { return 0 func (c *poolTask) FsSize() uint64 { return 0 } func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {} +type dockerPoolItem struct { + id string + cancel func() +} + type dockerPool struct { - lock sync.Mutex - inuse map[string]struct{} - free []string - limiter *rate.Limiter - cancel func() - wg sync.WaitGroup // TODO rename + lock sync.Mutex + inuse map[string]dockerPoolItem + free []dockerPoolItem + limiter *rate.Limiter + cancel func() + wg sync.WaitGroup + isRecycle bool } type DockerPoolStats struct { @@ -81,7 +93,7 @@ type DockerPoolStats struct { type DockerPool interface { // fetch a pre-allocated free id from the pool - // may return too busy error + // may return too busy error. AllocPoolId() (string, error) // Release the id back to the pool @@ -107,28 +119,31 @@ func NewDockerPool(conf drivers.Config, driver *DockerDriver) DockerPool { log.Error("WARNING: Experimental Prefork Docker Pool Enabled") pool := &dockerPool{ - inuse: make(map[string]struct{}, conf.PreForkPoolSize), - free: make([]string, 0, conf.PreForkPoolSize), + inuse: make(map[string]dockerPoolItem, conf.PreForkPoolSize), + free: make([]dockerPoolItem, 0, conf.PreForkPoolSize), limiter: rate.NewLimiter(LimitPerSec, LimitBurst), cancel: cancel, } - for i := uint64(0); i < conf.PreForkPoolSize; i++ { + if conf.PreForkUseOnce != 0 { + pool.isRecycle = true + } + + networks := strings.Fields(conf.PreForkNetworks) + if len(networks) == 0 { + networks = append(networks, "") + } + + pool.wg.Add(int(conf.PreForkPoolSize)) + for i := 0; i < int(conf.PreForkPoolSize); i++ { task := &poolTask{ - id: fmt.Sprintf("%d_prefork_%s", i, id.New().String()), - image: DefaultImage, - cmd: DefaultCmd, + id: fmt.Sprintf("%d_prefork_%s", i, id.New().String()), + image: conf.PreForkImage, + cmd: conf.PreForkCmd, + netMode: networks[i%len(networks)], } - if conf.PreForkImage != "" { - task.image = conf.PreForkImage - } - if conf.PreForkCmd != "" { - task.cmd = conf.PreForkCmd - } - - pool.wg.Add(1) go pool.nannyContainer(ctx, driver, task) } @@ -141,10 +156,15 @@ func (pool *dockerPool) Close() error { return nil } -func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) { - defer pool.wg.Done() +func (pool *dockerPool) performInitState(ctx context.Context, driver *DockerDriver, task *poolTask) { - log := common.Logger(ctx).WithFields(logrus.Fields{"name": task.Id()}) + log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode}) + + err := driver.ensureImage(ctx, task) + if err != nil { + log.WithError(err).Info("prefork pool image pull failed") + return + } containerOpts := docker.CreateContainerOptions{ Name: task.Id(), @@ -163,7 +183,7 @@ func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver LogConfig: docker.LogConfig{ Type: "none", }, - AutoRemove: true, + NetworkMode: task.netMode, }, Context: ctx, } @@ -175,67 +195,109 @@ func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver Context: ctx, } - // We spin forever, keeping the pool resident and running at all times. - for { - err := pool.limiter.Wait(ctx) - if err != nil { - // should not really happen unless ctx has a deadline or burst is 0. - log.WithError(err).Info("prefork pool rate limiter failed") - break - } + // ignore failure here + driver.docker.RemoveContainer(removeOpts) - // Let's try to clean up any left overs - err = driver.docker.RemoveContainer(removeOpts) - if err != nil { - log.WithError(err).Info("prefork pool container remove failed (this is probably OK)") - } - - err = driver.ensureImage(ctx, task) - if err != nil { - log.WithError(err).Info("prefork pool image pull failed") - continue - } - - _, err = driver.docker.CreateContainer(containerOpts) - if err != nil { - log.WithError(err).Info("prefork pool container create failed") - continue - } - - err = driver.docker.StartContainerWithContext(task.Id(), nil, ctx) - if err != nil { - log.WithError(err).Info("prefork pool container start failed") - continue - } - - log.Debug("prefork pool container ready") - - // IMPORTANT: container is now up and running. Register it to make it - // available for function containers. - pool.register(task.Id()) - - // We are optimistic here where provided image and command really blocks - // and runs forever. - exitCode, err := driver.docker.WaitContainerWithContext(task.Id(), ctx) - - // IMPORTANT: We have exited. This window is potentially very destructive, as any new - // function containers created during this window will fail. We must immediately - // proceed to unregister ourself to avoid further issues. - pool.unregister(task.Id()) - - log.WithError(err).Infof("prefork pool container exited exit_code=%d", exitCode) + _, err = driver.docker.CreateContainer(containerOpts) + if err != nil { + log.WithError(err).Info("prefork pool container create failed") + return } - // final exit cleanup + task.state = PoolTaskStateReady +} + +func (pool *dockerPool) performReadyState(ctx context.Context, driver *DockerDriver, task *poolTask) { + + log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode}) + + killOpts := docker.KillContainerOptions{ + ID: task.Id(), + Context: ctx, + } + + defer func() { + err := driver.docker.KillContainer(killOpts) + if err != nil { + log.WithError(err).Info("prefork pool container kill failed") + task.state = PoolTaskStateInit + } + }() + + err := driver.docker.StartContainerWithContext(task.Id(), nil, ctx) + if err != nil { + log.WithError(err).Info("prefork pool container start failed") + task.state = PoolTaskStateInit + return + } + + log.Debug("prefork pool container ready") + + // IMPORTANT: container is now up and running. Register it to make it + // available for function containers. + ctx, cancel := context.WithCancel(ctx) + + pool.register(task.Id(), cancel) + exitCode, err := driver.docker.WaitContainerWithContext(task.Id(), ctx) + pool.unregister(task.Id()) + + if ctx.Err() == nil { + log.WithError(err).Infof("prefork pool container exited exit_code=%d", exitCode) + task.state = PoolTaskStateInit + } +} + +func (pool *dockerPool) performTeardown(ctx context.Context, driver *DockerDriver, task *poolTask) { + ctx, cancel := context.WithTimeout(context.Background(), ShutdownTimeout) defer cancel() - removeOpts.Context = ctx + + removeOpts := docker.RemoveContainerOptions{ + ID: task.Id(), + Force: true, + RemoveVolumes: true, + Context: ctx, + } + driver.docker.RemoveContainer(removeOpts) } -func (pool *dockerPool) register(id string) { +func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) { + defer pool.performTeardown(ctx, driver, task) + defer pool.wg.Done() + + log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode}) + + // We spin forever, keeping the pool resident and running at all times. + for ctx.Err() == nil { + + if task.state != PoolTaskStateReady { + err := pool.limiter.Wait(ctx) + if err != nil { + // should not really happen unless ctx has a deadline or burst is 0. + log.WithError(err).Info("prefork pool rate limiter failed") + break + } + } + + if task.state != PoolTaskStateReady { + pool.performInitState(ctx, driver, task) + } + + if task.state == PoolTaskStateReady { + pool.performReadyState(ctx, driver, task) + } + } +} + +func (pool *dockerPool) register(id string, cancel func()) { + item := dockerPoolItem{ + id: id, + cancel: cancel, + } + pool.lock.Lock() - pool.free = append(pool.free, id) + pool.free = append(pool.free, item) pool.lock.Unlock() } @@ -247,7 +309,7 @@ func (pool *dockerPool) unregister(id string) { delete(pool.inuse, id) } else { for i := 0; i < len(pool.free); i += 1 { - if pool.free[i] == id { + if pool.free[i].id == id { pool.free = append(pool.free[:i], pool.free[i+1:]...) break } @@ -268,18 +330,26 @@ func (pool *dockerPool) AllocPoolId() (string, error) { id := pool.free[len(pool.free)-1] pool.free = pool.free[:len(pool.free)-1] - pool.inuse[id] = struct{}{} + pool.inuse[id.id] = id - return id, nil + return id.id, nil } func (pool *dockerPool) FreePoolId(id string) { + + isRecycle := pool.isRecycle + pool.lock.Lock() - _, ok := pool.inuse[id] + item, ok := pool.inuse[id] if ok { + if item.cancel != nil && isRecycle { + item.cancel() + } delete(pool.inuse, id) - pool.free = append(pool.free, id) + if !isRecycle { + pool.free = append(pool.free, item) + } } pool.lock.Unlock() diff --git a/api/agent/drivers/docker/docker_pool_test.go b/api/agent/drivers/docker/docker_pool_test.go index b0a9e6c08..0b2799564 100644 --- a/api/agent/drivers/docker/docker_pool_test.go +++ b/api/agent/drivers/docker/docker_pool_test.go @@ -8,13 +8,21 @@ import ( "github.com/fnproject/fn/api/agent/drivers" ) +func getDefaultCfg() *drivers.Config { + cfg := &drivers.Config{ + PreForkImage: "busybox", + PreForkCmd: "tail -f /dev/null", + } + return cfg +} + func TestRunnerDockerPool(t *testing.T) { if runtime.GOOS != "linux" { t.Skip("prefork only supported on Linux") return } - cfg := &drivers.Config{} + cfg := getDefaultCfg() // shouldn't spin up a pool since cfg is empty drv := NewDocker(*cfg) @@ -95,7 +103,7 @@ func TestRunnerDockerPoolFaulty(t *testing.T) { return } - cfg := &drivers.Config{} + cfg := getDefaultCfg() // shouldn't spin up a pool since cfg is empty drv := NewDocker(*cfg) diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 74cccddaf..4dd327a49 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -197,6 +197,8 @@ type Config struct { PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkImage string `json:"pre_fork_image"` PreForkCmd string `json:"pre_fork_cmd"` + PreForkUseOnce uint64 `json:"pre_fork_use_once"` + PreForkNetworks string `json:"pre_fork_networks"` } func average(samples []Stat) (Stat, bool) {