Experimental Pre-fork Pool: Recycle net ns (#890)

* fn: experimental prefork recycle and other improvements

*) Recycle and do not use same pool container again option.
*) Two state processing: initializing versus ready (start-kill).
*) Ready state is exempt from rate limiter.

* fn: experimental prefork pool multiple network support

In order to exceed 1023 container (bridge port) limit, add
multiple networks:

    for i in fn-net1 fn-net2 fn-net3 fn-net4
    do
            docker network create $i
    done

to Docker startup, (eg. dind preentry.sh), then provide this
to prefork pool using:

    export FN_EXPERIMENTAL_PREFORK_NETWORKS="fn-net1 fn-net2 fn-net3 fn-net4"

which should be able to spawn 1023 * 4 containers.

* fn: fixup tests for cfg move

* fn: add ipc and pid namespaces into prefork pooling

* fn: revert ipc and pid namespaces for now

Pid/Ipc opens up the function container to pause container.
This commit is contained in:
Tolga Ceylan
2018-04-05 15:07:30 -07:00
committed by GitHub
parent 629559ecc8
commit 584e4e75eb
7 changed files with 229 additions and 106 deletions

View File

@@ -141,6 +141,8 @@ func createAgent(da DataAccess, withDocker bool) Agent {
PreForkPoolSize: cfg.PreForkPoolSize, PreForkPoolSize: cfg.PreForkPoolSize,
PreForkImage: cfg.PreForkImage, PreForkImage: cfg.PreForkImage,
PreForkCmd: cfg.PreForkCmd, PreForkCmd: cfg.PreForkCmd,
PreForkUseOnce: cfg.PreForkUseOnce,
PreForkNetworks: cfg.PreForkNetworks,
}) })
} else { } else {
driver = mock.New() driver = mock.New()

View File

@@ -25,6 +25,8 @@ type AgentConfig struct {
PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"`
PreForkImage string `json:"pre_fork_image"` PreForkImage string `json:"pre_fork_image"`
PreForkCmd string `json:"pre_fork_pool_cmd"` PreForkCmd string `json:"pre_fork_pool_cmd"`
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
PreForkNetworks string `json:"pre_fork_networks"`
} }
const ( const (
@@ -43,6 +45,8 @@ const (
EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE"
EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE"
EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD"
EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE"
EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS"
MaxDisabledMsecs = time.Duration(math.MaxInt64) MaxDisabledMsecs = time.Duration(math.MaxInt64)
) )
@@ -53,6 +57,8 @@ func NewAgentConfig() (*AgentConfig, error) {
MinDockerVersion: "17.10.0-ce", MinDockerVersion: "17.10.0-ce",
MaxLogSize: 1 * 1024 * 1024, MaxLogSize: 1 * 1024 * 1024,
MaxCallEndStacking: 8192, MaxCallEndStacking: 8192,
PreForkImage: "busybox",
PreForkCmd: "tail -f /dev/null",
} }
var err error var err error
@@ -70,14 +76,15 @@ func NewAgentConfig() (*AgentConfig, error) {
err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize) err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize)
err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize) err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize)
err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking) err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking)
err = setEnvStr(err, EnvPreForkImage, &cfg.PreForkImage)
err = setEnvStr(err, EnvPreForkCmd, &cfg.PreForkCmd)
err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce)
err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks)
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
cfg.PreForkImage = os.Getenv(EnvPreForkImage)
cfg.PreForkCmd = os.Getenv(EnvPreForkCmd)
if cfg.EjectIdle == time.Duration(0) { if cfg.EjectIdle == time.Duration(0) {
return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle)
} }
@@ -89,6 +96,16 @@ func NewAgentConfig() (*AgentConfig, error) {
return cfg, nil return cfg, nil
} }
func setEnvStr(err error, name string, dst *string) error {
if err != nil {
return err
}
if tmp, ok := os.LookupEnv(name); ok {
*dst = tmp
}
return nil
}
func setEnvUint(err error, name string, dst *uint64) error { func setEnvUint(err error, name string, dst *uint64) error {
if err != nil { if err != nil {
return err return err

View File

@@ -131,6 +131,29 @@ func (drv *DockerDriver) Close() error {
return err return err
} }
func (drv *DockerDriver) tryUsePool(ctx context.Context, container *docker.CreateContainerOptions) string {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"})
if drv.pool != nil {
id, err := drv.pool.AllocPoolId()
if err == nil {
linker := fmt.Sprintf("container:%s", id)
// We are able to fetch a container from pool. Now, use its
// network, ipc and pid namespaces.
container.HostConfig.NetworkMode = linker
//container.HostConfig.IpcMode = linker
//container.HostConfig.PidMode = linker
return id
}
log.WithError(err).Error("Could not fetch pre fork pool container")
}
// hostname and container NetworkMode is not compatible.
container.Config.Hostname = drv.hostname
return ""
}
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
var cmd []string var cmd []string
@@ -171,19 +194,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
Context: ctx, Context: ctx,
} }
poolId := "" poolId := drv.tryUsePool(ctx, &container)
if drv.pool != nil {
id, err := drv.pool.AllocPoolId()
if err != nil {
log.WithError(err).Error("Could not fetch pre fork pool container")
} else {
poolId = id
container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id)
}
} else {
// hostname and container NetworkMode is not compatible.
container.Config.Hostname = drv.hostname
}
// Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation)
// eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period,

View File

@@ -35,6 +35,7 @@ type dockerClient interface {
AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error)
WaitContainerWithContext(id string, ctx context.Context) (int, error) WaitContainerWithContext(id string, ctx context.Context) (int, error)
StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error
KillContainer(opts docker.KillContainerOptions) error
CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error) CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error)
RemoveContainer(opts docker.RemoveContainerOptions) error RemoveContainer(opts docker.RemoveContainerOptions) error
PauseContainer(id string, ctx context.Context) error PauseContainer(id string, ctx context.Context) error
@@ -336,6 +337,18 @@ func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *doc
return c, err return c, err
} }
func (d *dockerWrap) KillContainer(opts docker.KillContainerOptions) (err error) {
ctx, span := trace.StartSpan(opts.Context, "docker_kill_container")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "KillContainer")
err = d.retry(ctx, logger, func() error {
err = d.dockerNoTimeout.KillContainer(opts)
return err
})
return err
}
func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) (err error) { func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) (err error) {
ctx, span := trace.StartSpan(opts.Context, "docker_pull_image") ctx, span := trace.StartSpan(opts.Context, "docker_pull_image")
defer span.End() defer span.End()

View File

@@ -34,13 +34,17 @@ var (
ErrorPoolEmpty = errors.New("docker pre fork pool empty") ErrorPoolEmpty = errors.New("docker pre fork pool empty")
) )
type PoolTaskStateType int
const (
PoolTaskStateInit PoolTaskStateType = iota // initializing
PoolTaskStateReady // ready to be run
)
const ( const (
LimitPerSec = 10 LimitPerSec = 10
LimitBurst = 20 LimitBurst = 20
DefaultImage = "busybox"
DefaultCmd = "tail -f /dev/null"
ShutdownTimeout = time.Duration(1) * time.Second ShutdownTimeout = time.Duration(1) * time.Second
) )
@@ -48,6 +52,8 @@ type poolTask struct {
id string id string
image string image string
cmd string cmd string
netMode string
state PoolTaskStateType
} }
func (c *poolTask) Id() string { return c.id } func (c *poolTask) Id() string { return c.id }
@@ -65,13 +71,19 @@ func (c *poolTask) CPUs() uint64 { return 0
func (c *poolTask) FsSize() uint64 { return 0 } func (c *poolTask) FsSize() uint64 { return 0 }
func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {} func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {}
type dockerPoolItem struct {
id string
cancel func()
}
type dockerPool struct { type dockerPool struct {
lock sync.Mutex lock sync.Mutex
inuse map[string]struct{} inuse map[string]dockerPoolItem
free []string free []dockerPoolItem
limiter *rate.Limiter limiter *rate.Limiter
cancel func() cancel func()
wg sync.WaitGroup // TODO rename wg sync.WaitGroup
isRecycle bool
} }
type DockerPoolStats struct { type DockerPoolStats struct {
@@ -81,7 +93,7 @@ type DockerPoolStats struct {
type DockerPool interface { type DockerPool interface {
// fetch a pre-allocated free id from the pool // fetch a pre-allocated free id from the pool
// may return too busy error // may return too busy error.
AllocPoolId() (string, error) AllocPoolId() (string, error)
// Release the id back to the pool // Release the id back to the pool
@@ -107,28 +119,31 @@ func NewDockerPool(conf drivers.Config, driver *DockerDriver) DockerPool {
log.Error("WARNING: Experimental Prefork Docker Pool Enabled") log.Error("WARNING: Experimental Prefork Docker Pool Enabled")
pool := &dockerPool{ pool := &dockerPool{
inuse: make(map[string]struct{}, conf.PreForkPoolSize), inuse: make(map[string]dockerPoolItem, conf.PreForkPoolSize),
free: make([]string, 0, conf.PreForkPoolSize), free: make([]dockerPoolItem, 0, conf.PreForkPoolSize),
limiter: rate.NewLimiter(LimitPerSec, LimitBurst), limiter: rate.NewLimiter(LimitPerSec, LimitBurst),
cancel: cancel, cancel: cancel,
} }
for i := uint64(0); i < conf.PreForkPoolSize; i++ { if conf.PreForkUseOnce != 0 {
pool.isRecycle = true
}
networks := strings.Fields(conf.PreForkNetworks)
if len(networks) == 0 {
networks = append(networks, "")
}
pool.wg.Add(int(conf.PreForkPoolSize))
for i := 0; i < int(conf.PreForkPoolSize); i++ {
task := &poolTask{ task := &poolTask{
id: fmt.Sprintf("%d_prefork_%s", i, id.New().String()), id: fmt.Sprintf("%d_prefork_%s", i, id.New().String()),
image: DefaultImage, image: conf.PreForkImage,
cmd: DefaultCmd, cmd: conf.PreForkCmd,
netMode: networks[i%len(networks)],
} }
if conf.PreForkImage != "" {
task.image = conf.PreForkImage
}
if conf.PreForkCmd != "" {
task.cmd = conf.PreForkCmd
}
pool.wg.Add(1)
go pool.nannyContainer(ctx, driver, task) go pool.nannyContainer(ctx, driver, task)
} }
@@ -141,10 +156,15 @@ func (pool *dockerPool) Close() error {
return nil return nil
} }
func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) { func (pool *dockerPool) performInitState(ctx context.Context, driver *DockerDriver, task *poolTask) {
defer pool.wg.Done()
log := common.Logger(ctx).WithFields(logrus.Fields{"name": task.Id()}) log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode})
err := driver.ensureImage(ctx, task)
if err != nil {
log.WithError(err).Info("prefork pool image pull failed")
return
}
containerOpts := docker.CreateContainerOptions{ containerOpts := docker.CreateContainerOptions{
Name: task.Id(), Name: task.Id(),
@@ -163,7 +183,7 @@ func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver
LogConfig: docker.LogConfig{ LogConfig: docker.LogConfig{
Type: "none", Type: "none",
}, },
AutoRemove: true, NetworkMode: task.netMode,
}, },
Context: ctx, Context: ctx,
} }
@@ -175,67 +195,109 @@ func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver
Context: ctx, Context: ctx,
} }
// We spin forever, keeping the pool resident and running at all times. // ignore failure here
for { driver.docker.RemoveContainer(removeOpts)
err := pool.limiter.Wait(ctx)
if err != nil {
// should not really happen unless ctx has a deadline or burst is 0.
log.WithError(err).Info("prefork pool rate limiter failed")
break
}
// Let's try to clean up any left overs
err = driver.docker.RemoveContainer(removeOpts)
if err != nil {
log.WithError(err).Info("prefork pool container remove failed (this is probably OK)")
}
err = driver.ensureImage(ctx, task)
if err != nil {
log.WithError(err).Info("prefork pool image pull failed")
continue
}
_, err = driver.docker.CreateContainer(containerOpts) _, err = driver.docker.CreateContainer(containerOpts)
if err != nil { if err != nil {
log.WithError(err).Info("prefork pool container create failed") log.WithError(err).Info("prefork pool container create failed")
continue return
} }
err = driver.docker.StartContainerWithContext(task.Id(), nil, ctx) task.state = PoolTaskStateReady
}
func (pool *dockerPool) performReadyState(ctx context.Context, driver *DockerDriver, task *poolTask) {
log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode})
killOpts := docker.KillContainerOptions{
ID: task.Id(),
Context: ctx,
}
defer func() {
err := driver.docker.KillContainer(killOpts)
if err != nil {
log.WithError(err).Info("prefork pool container kill failed")
task.state = PoolTaskStateInit
}
}()
err := driver.docker.StartContainerWithContext(task.Id(), nil, ctx)
if err != nil { if err != nil {
log.WithError(err).Info("prefork pool container start failed") log.WithError(err).Info("prefork pool container start failed")
continue task.state = PoolTaskStateInit
return
} }
log.Debug("prefork pool container ready") log.Debug("prefork pool container ready")
// IMPORTANT: container is now up and running. Register it to make it // IMPORTANT: container is now up and running. Register it to make it
// available for function containers. // available for function containers.
pool.register(task.Id()) ctx, cancel := context.WithCancel(ctx)
// We are optimistic here where provided image and command really blocks pool.register(task.Id(), cancel)
// and runs forever.
exitCode, err := driver.docker.WaitContainerWithContext(task.Id(), ctx) exitCode, err := driver.docker.WaitContainerWithContext(task.Id(), ctx)
// IMPORTANT: We have exited. This window is potentially very destructive, as any new
// function containers created during this window will fail. We must immediately
// proceed to unregister ourself to avoid further issues.
pool.unregister(task.Id()) pool.unregister(task.Id())
if ctx.Err() == nil {
log.WithError(err).Infof("prefork pool container exited exit_code=%d", exitCode) log.WithError(err).Infof("prefork pool container exited exit_code=%d", exitCode)
task.state = PoolTaskStateInit
}
} }
// final exit cleanup func (pool *dockerPool) performTeardown(ctx context.Context, driver *DockerDriver, task *poolTask) {
ctx, cancel := context.WithTimeout(context.Background(), ShutdownTimeout) ctx, cancel := context.WithTimeout(context.Background(), ShutdownTimeout)
defer cancel() defer cancel()
removeOpts.Context = ctx
removeOpts := docker.RemoveContainerOptions{
ID: task.Id(),
Force: true,
RemoveVolumes: true,
Context: ctx,
}
driver.docker.RemoveContainer(removeOpts) driver.docker.RemoveContainer(removeOpts)
} }
func (pool *dockerPool) register(id string) { func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) {
defer pool.performTeardown(ctx, driver, task)
defer pool.wg.Done()
log := common.Logger(ctx).WithFields(logrus.Fields{"id": task.Id(), "net": task.netMode})
// We spin forever, keeping the pool resident and running at all times.
for ctx.Err() == nil {
if task.state != PoolTaskStateReady {
err := pool.limiter.Wait(ctx)
if err != nil {
// should not really happen unless ctx has a deadline or burst is 0.
log.WithError(err).Info("prefork pool rate limiter failed")
break
}
}
if task.state != PoolTaskStateReady {
pool.performInitState(ctx, driver, task)
}
if task.state == PoolTaskStateReady {
pool.performReadyState(ctx, driver, task)
}
}
}
func (pool *dockerPool) register(id string, cancel func()) {
item := dockerPoolItem{
id: id,
cancel: cancel,
}
pool.lock.Lock() pool.lock.Lock()
pool.free = append(pool.free, id) pool.free = append(pool.free, item)
pool.lock.Unlock() pool.lock.Unlock()
} }
@@ -247,7 +309,7 @@ func (pool *dockerPool) unregister(id string) {
delete(pool.inuse, id) delete(pool.inuse, id)
} else { } else {
for i := 0; i < len(pool.free); i += 1 { for i := 0; i < len(pool.free); i += 1 {
if pool.free[i] == id { if pool.free[i].id == id {
pool.free = append(pool.free[:i], pool.free[i+1:]...) pool.free = append(pool.free[:i], pool.free[i+1:]...)
break break
} }
@@ -268,18 +330,26 @@ func (pool *dockerPool) AllocPoolId() (string, error) {
id := pool.free[len(pool.free)-1] id := pool.free[len(pool.free)-1]
pool.free = pool.free[:len(pool.free)-1] pool.free = pool.free[:len(pool.free)-1]
pool.inuse[id] = struct{}{} pool.inuse[id.id] = id
return id, nil return id.id, nil
} }
func (pool *dockerPool) FreePoolId(id string) { func (pool *dockerPool) FreePoolId(id string) {
isRecycle := pool.isRecycle
pool.lock.Lock() pool.lock.Lock()
_, ok := pool.inuse[id] item, ok := pool.inuse[id]
if ok { if ok {
if item.cancel != nil && isRecycle {
item.cancel()
}
delete(pool.inuse, id) delete(pool.inuse, id)
pool.free = append(pool.free, id) if !isRecycle {
pool.free = append(pool.free, item)
}
} }
pool.lock.Unlock() pool.lock.Unlock()

View File

@@ -8,13 +8,21 @@ import (
"github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/agent/drivers"
) )
func getDefaultCfg() *drivers.Config {
cfg := &drivers.Config{
PreForkImage: "busybox",
PreForkCmd: "tail -f /dev/null",
}
return cfg
}
func TestRunnerDockerPool(t *testing.T) { func TestRunnerDockerPool(t *testing.T) {
if runtime.GOOS != "linux" { if runtime.GOOS != "linux" {
t.Skip("prefork only supported on Linux") t.Skip("prefork only supported on Linux")
return return
} }
cfg := &drivers.Config{} cfg := getDefaultCfg()
// shouldn't spin up a pool since cfg is empty // shouldn't spin up a pool since cfg is empty
drv := NewDocker(*cfg) drv := NewDocker(*cfg)
@@ -95,7 +103,7 @@ func TestRunnerDockerPoolFaulty(t *testing.T) {
return return
} }
cfg := &drivers.Config{} cfg := getDefaultCfg()
// shouldn't spin up a pool since cfg is empty // shouldn't spin up a pool since cfg is empty
drv := NewDocker(*cfg) drv := NewDocker(*cfg)

View File

@@ -197,6 +197,8 @@ type Config struct {
PreForkPoolSize uint64 `json:"pre_fork_pool_size"` PreForkPoolSize uint64 `json:"pre_fork_pool_size"`
PreForkImage string `json:"pre_fork_image"` PreForkImage string `json:"pre_fork_image"`
PreForkCmd string `json:"pre_fork_cmd"` PreForkCmd string `json:"pre_fork_cmd"`
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
PreForkNetworks string `json:"pre_fork_networks"`
} }
func average(samples []Stat) (Stat, bool) { func average(samples []Stat) (Stat, bool) {