mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: cleanup of docker timeouts and docker health check (#1292)
Moving the timeout management of various docker operations to agent. This allows for finer control over what operation should use. For instance, for pause/unpause our tolerance is very low to avoid resource issues. For docker remove, the consequences of failure will lead to potential agent failure and therefore we wait up to 10 minute. For cookie create/prepare (which includes docker-pull) we cap this at 10 minutes by default. With new UDS/FDK contract, health check is now obsoleted as container advertise health using UDS availibility.
This commit is contained in:
@@ -27,6 +27,10 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
const (
|
||||
pauseTimeout = 5 * time.Second // docker pause/unpause
|
||||
)
|
||||
|
||||
// TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution)
|
||||
// TODO async calls need to add route.Headers as well
|
||||
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
|
||||
@@ -764,18 +768,20 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
}
|
||||
defer container.Close()
|
||||
|
||||
// XXX(reed): we need to timeout the cookie create / prepare since docker client doesn't have timeout anymore,
|
||||
// and handle cookie close having a timed out context it still needs to delete the thing. fun stuff
|
||||
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "idle_timeout": call.IdleTimeout})
|
||||
ctx = common.WithLogger(ctx, logger)
|
||||
|
||||
ctx, cancel := context.WithTimeout(common.BackgroundContext(ctx), a.cfg.HotStartTimeout)
|
||||
defer cancel()
|
||||
|
||||
cookie, err := a.driver.CreateCookie(ctx, container)
|
||||
if err != nil {
|
||||
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err})
|
||||
return
|
||||
}
|
||||
|
||||
defer cookie.Close(ctx)
|
||||
// WARNING: we wait forever.
|
||||
defer cookie.Close(common.BackgroundContext(ctx))
|
||||
|
||||
err = a.driver.PrepareCookie(ctx, cookie)
|
||||
if err != nil {
|
||||
@@ -783,7 +789,8 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
return
|
||||
}
|
||||
|
||||
ctx, shutdownContainer := context.WithCancel(ctx)
|
||||
cancel()
|
||||
ctx, shutdownContainer := context.WithCancel(common.BackgroundContext(ctx))
|
||||
defer shutdownContainer() // close this if our waiter returns, to call off slots, needs to follow cookie.Close so the cookie crumbles
|
||||
|
||||
udsAwait := make(chan error)
|
||||
@@ -978,7 +985,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
case <-idleTimer.C:
|
||||
case <-freezeTimer.C:
|
||||
if !isFrozen {
|
||||
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
|
||||
err = cookie.Freeze(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@@ -1009,7 +1018,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
// In case, timer/acquireSlot failure landed us here, make
|
||||
// sure to unfreeze.
|
||||
if isFrozen {
|
||||
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
|
||||
err = cookie.Unfreeze(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ type Config struct {
|
||||
FreezeIdle time.Duration `json:"freeze_idle_msecs"`
|
||||
HotPoll time.Duration `json:"hot_poll_msecs"`
|
||||
HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"`
|
||||
HotStartTimeout time.Duration `json:"hot_start_timeout_msecs"`
|
||||
AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"`
|
||||
MaxResponseSize uint64 `json:"max_response_size_bytes"`
|
||||
MaxLogSize uint64 `json:"max_log_size_bytes"`
|
||||
@@ -48,8 +49,10 @@ const (
|
||||
// EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be
|
||||
// launched for a given function
|
||||
EnvHotPoll = "FN_HOT_POLL_MSECS"
|
||||
// EnvHotLauncherTimeout is the timeout for a hot container to become available for use
|
||||
// EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle
|
||||
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
|
||||
// EnvHotStartTimeout is the timeout for a hot container to become available for use including docker-pull
|
||||
EnvHotStartTimeout = "FN_HOT_START_TIMEOUT_MSECS"
|
||||
// EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations
|
||||
EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
|
||||
// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
|
||||
@@ -125,6 +128,7 @@ func NewConfig() (*Config, error) {
|
||||
err = setEnvMsecs(err, EnvFreezeIdle, &cfg.FreezeIdle, 50*time.Millisecond)
|
||||
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll)
|
||||
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
||||
err = setEnvMsecs(err, EnvHotStartTimeout, &cfg.HotStartTimeout, time.Duration(10)*time.Minute)
|
||||
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
||||
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
||||
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize)
|
||||
|
||||
@@ -233,7 +233,6 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe
|
||||
ReadonlyRootfs: drv.conf.EnableReadOnlyRootFs,
|
||||
Init: drv.conf.EnableTini,
|
||||
},
|
||||
Context: ctx,
|
||||
}
|
||||
|
||||
cookie := &cookie{
|
||||
@@ -276,6 +275,7 @@ func (drv *DockerDriver) PrepareCookie(ctx context.Context, c drivers.Cookie) er
|
||||
return err
|
||||
}
|
||||
|
||||
cookie.opts.Context = ctx
|
||||
_, err = drv.docker.CreateContainer(cookie.opts)
|
||||
if err != nil {
|
||||
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
|
||||
@@ -577,36 +577,7 @@ func (drv *DockerDriver) startTask(ctx context.Context, container string) error
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// see if there's any healthcheck, and if so, wait for it to complete
|
||||
return drv.awaitHealthcheck(ctx, container)
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) awaitHealthcheck(ctx context.Context, container string) error {
|
||||
// inspect the container and check if there is any health check presented,
|
||||
// if there is, then wait for it to move to healthy before returning.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
cont, err := drv.docker.InspectContainerWithContext(container, ctx)
|
||||
if err != nil {
|
||||
// TODO unknown fiddling to be had
|
||||
return err
|
||||
}
|
||||
|
||||
// if no health check for this image (""), or it's healthy, then stop waiting.
|
||||
// state machine is "starting" -> "healthy" | "unhealthy"
|
||||
if cont.State.Health.Status == "" || cont.State.Health.Status == "healthy" {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // avoid spin loop in case docker is actually fast
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *waitResult) wait(ctx context.Context) (status string, err error) {
|
||||
|
||||
@@ -20,8 +20,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
retryTimeout = 10 * time.Minute
|
||||
pauseTimeout = 5 * time.Second
|
||||
eventRetryDelay = 1 * time.Second
|
||||
)
|
||||
|
||||
@@ -43,7 +41,6 @@ type dockerClient interface {
|
||||
UnpauseContainer(id string, ctx context.Context) error
|
||||
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
|
||||
InspectImage(ctx context.Context, name string) (*docker.Image, error)
|
||||
InspectContainerWithContext(container string, ctx context.Context) (*docker.Container, error)
|
||||
Stats(opts docker.StatsOptions) error
|
||||
Info(ctx context.Context) (*docker.DockerInfo, error)
|
||||
LoadImages(ctx context.Context, filePath string) error
|
||||
@@ -386,15 +383,9 @@ func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthCon
|
||||
}
|
||||
|
||||
func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err error) {
|
||||
// extract the span, but do not keep the context, since the enclosing context
|
||||
// may be timed out, and we still want to remove the container. TODO in caller? who cares?
|
||||
ctx := common.BackgroundContext(opts.Context)
|
||||
ctx, closer := makeTracker(ctx, "docker_remove_container")
|
||||
ctx, closer := makeTracker(opts.Context, "docker_remove_container")
|
||||
defer closer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx).WithField("docker_cmd", "RemoveContainer")
|
||||
err = d.retry(ctx, logger, func() error {
|
||||
err = d.docker.RemoveContainer(opts)
|
||||
@@ -407,9 +398,6 @@ func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error)
|
||||
ctx, closer := makeTracker(ctx, "docker_pause_container")
|
||||
defer closer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx).WithField("docker_cmd", "PauseContainer")
|
||||
err = d.retry(ctx, logger, func() error {
|
||||
err = d.docker.PauseContainer(id)
|
||||
@@ -422,9 +410,6 @@ func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error
|
||||
ctx, closer := makeTracker(ctx, "docker_unpause_container")
|
||||
defer closer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx).WithField("docker_cmd", "UnpauseContainer")
|
||||
err = d.retry(ctx, logger, func() error {
|
||||
err = d.docker.UnpauseContainer(id)
|
||||
@@ -437,9 +422,6 @@ func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.I
|
||||
ctx, closer := makeTracker(ctx, "docker_inspect_image")
|
||||
defer closer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx).WithField("docker_cmd", "InspectImage")
|
||||
err = d.retry(ctx, logger, func() error {
|
||||
i, err = d.docker.InspectImage(name)
|
||||
@@ -448,21 +430,6 @@ func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.I
|
||||
return i, err
|
||||
}
|
||||
|
||||
func (d *dockerWrap) InspectContainerWithContext(container string, ctx context.Context) (c *docker.Container, err error) {
|
||||
ctx, closer := makeTracker(ctx, "docker_inspect_container")
|
||||
defer closer()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx).WithField("docker_cmd", "InspectContainer")
|
||||
err = d.retry(ctx, logger, func() error {
|
||||
c, err = d.docker.InspectContainerWithContext(container, ctx)
|
||||
return err
|
||||
})
|
||||
return c, err
|
||||
}
|
||||
|
||||
func (d *dockerWrap) Stats(opts docker.StatsOptions) (err error) {
|
||||
// we can't retry this one this way since the callee closes the
|
||||
// stats chan, need a fancier retry mechanism where we can swap out
|
||||
|
||||
@@ -44,8 +44,6 @@ const (
|
||||
const (
|
||||
LimitPerSec = 10
|
||||
LimitBurst = 20
|
||||
|
||||
ShutdownTimeout = time.Duration(1) * time.Second
|
||||
)
|
||||
|
||||
type poolTask struct {
|
||||
@@ -254,15 +252,11 @@ func (pool *dockerPool) performReadyState(ctx context.Context, driver *DockerDri
|
||||
}
|
||||
|
||||
func (pool *dockerPool) performTeardown(ctx context.Context, driver *DockerDriver, task *poolTask) {
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
removeOpts := docker.RemoveContainerOptions{
|
||||
ID: task.Id(),
|
||||
Force: true,
|
||||
RemoveVolumes: true,
|
||||
Context: ctx,
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
driver.docker.RemoveContainer(removeOpts)
|
||||
|
||||
Reference in New Issue
Block a user