diff --git a/api/agent/agent.go b/api/agent/agent.go index f6b7f6c42..342ef26ef 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -3,7 +3,10 @@ package agent import ( "context" "io" + "math" "net/http" + "os" + "strconv" "strings" "sync" "time" @@ -26,8 +29,6 @@ import ( // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) // TODO storing logs / call can push call over the timeout -// TODO if we don't cap the number of any one container we could get into a situation -// where the machine is full but all the containers are idle up to the idle timeout. meh. // TODO async is still broken, but way less so. we need to modify mq semantics // to be much more robust. now we're at least running it if we delete the msg, // but we may never store info about that execution so still broked (if fn @@ -115,6 +116,9 @@ type agent struct { shutonce sync.Once shutdown chan struct{} + freezeIdleMsecs time.Duration + ejectIdleMsecs time.Duration + stats // TODO kill me // Prometheus HTTP handler @@ -127,13 +131,25 @@ func New(da DataAccess) Agent { ServerVersion: "17.06.0", }) + freezeIdleMsecs, err := getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) + if err != nil { + logrus.WithError(err).Fatal("error initializing freeze idle delay") + } + + ejectIdleMsecs, err := getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) + if err != nil { + logrus.WithError(err).Fatal("error initializing eject idle delay") + } + a := &agent{ - da: da, - driver: driver, - slotMgr: NewSlotQueueMgr(), - resources: NewResourceTracker(), - shutdown: make(chan struct{}), - promHandler: promhttp.Handler(), + da: da, + driver: driver, + slotMgr: NewSlotQueueMgr(), + resources: NewResourceTracker(), + shutdown: make(chan struct{}), + freezeIdleMsecs: freezeIdleMsecs, + ejectIdleMsecs: ejectIdleMsecs, + promHandler: promhttp.Handler(), } // TODO assert that agent doesn't get started for API nodes up above ? @@ -143,6 +159,26 @@ func New(da DataAccess) Agent { return a } +func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) { + + delay := defaultVal + + if dur := os.Getenv(name); dur != "" { + durInt, err := strconv.ParseInt(dur, 10, 64) + if err != nil { + return defaultVal, err + } + // disable if negative or set to msecs specified. + if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond { + delay = math.MaxInt64 + } else { + delay = time.Duration(durInt) * time.Millisecond + } + } + + return delay, nil +} + // TODO shuffle this around somewhere else (maybe) func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { return a.da.Enqueue(ctx, call) @@ -681,25 +717,68 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state default: // ok } + isFrozen := false + elapsed := time.Duration(0) + freezerTicker := a.freezeIdleMsecs + idleTimeout := time.Duration(call.IdleTimeout) * time.Second + done := make(chan struct{}) state.UpdateState(ctx, ContainerStateIdle, call.slots) s := call.slots.queueSlot(&hotSlot{done, errC, container, nil}) - select { - case <-s.trigger: - case <-time.After(time.Duration(call.IdleTimeout) * time.Second): - if call.slots.ejectSlot(s) { - logger.Info("Canceling inactive hot function") - return - } - case <-ctx.Done(): // container shutdown - if call.slots.ejectSlot(s) { - return - } - case <-a.shutdown: // server shutdown - if call.slots.ejectSlot(s) { + for { + select { + case <-s.trigger: // slot already consumed + case <-ctx.Done(): // container shutdown + case <-a.shutdown: // server shutdown + case <-time.After(idleTimeout): // in case idleTimeout < a.freezeIdleMsecs or idleTimeout < a.ejectIdleMsecs + case <-time.After(freezerTicker): + elapsed += a.freezeIdleMsecs + + freezerTicker = math.MaxInt64 // do not fire again + + if elapsed < idleTimeout { // in case idleTimeout <= a.freezeIdleMsecs + if !isFrozen { + err := cookie.Freeze(ctx) + if err != nil { + logger.WithError(err).Error("freeze error") + return + } + isFrozen = true + } + continue + } + case <-time.After(a.ejectIdleMsecs): + elapsed += a.ejectIdleMsecs + + if elapsed < idleTimeout { + // if someone is waiting for resource in our slot queue, we must not terminate, + // otherwise, see if other slot queues have resource waiters that are blocked. + stats := call.slots.getStats() + if stats.containerStates[ContainerStateWait] > 0 || + a.resources.GetResourceTokenWaiterCount() <= 0 { + continue + } + logger.Debug("attempting hot function eject") + } + } + break + } + + // if we can eject token, that means we are here due to + // abort/shutdown/timeout, attempt to eject and terminate, + // otherwise continue processing the request + if call.slots.ejectSlot(s) { + return + } + + if isFrozen { + err := cookie.Unfreeze(ctx) + if err != nil { + logger.WithError(err).Error("unfreeze error") return } + isFrozen = false } state.UpdateState(ctx, ContainerStateBusy, call.slots) diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 234f62fd9..179f14d9f 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -212,6 +212,28 @@ func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { return c.drv.run(ctx, c.id, c.task) } +func (c *cookie) Freeze(ctx context.Context) error { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"}) + log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker pause") + + err := c.drv.docker.PauseContainer(c.id, ctx) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error pausing container") + } + return err +} + +func (c *cookie) Unfreeze(ctx context.Context) error { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"}) + log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker unpause") + + err := c.drv.docker.UnpauseContainer(c.id, ctx) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error unpausing container") + } + return err +} + func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error { err := drv.docker.RemoveContainer(docker.RemoveContainerOptions{ ID: container, Force: true, RemoveVolumes: true, Context: ctx}) diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index ce883f0ec..76b6ee842 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -19,6 +19,7 @@ import ( const ( retryTimeout = 10 * time.Minute + pauseTimeout = 5 * time.Second ) // wrap docker client calls so we can retry 500s, kind of sucks but fsouza doesn't @@ -34,6 +35,8 @@ type dockerClient interface { StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error) RemoveContainer(opts docker.RemoveContainerOptions) error + PauseContainer(id string, ctx context.Context) error + UnpauseContainer(id string, ctx context.Context) error PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error InspectImage(ctx context.Context, name string) (*docker.Image, error) InspectContainerWithContext(container string, ctx context.Context) (*docker.Container, error) @@ -260,6 +263,30 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er return filterNoSuchContainer(ctx, err) } +func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error) { + span, _ := opentracing.StartSpanFromContext(ctx, "docker_pause_container") + defer span.Finish() + ctx, cancel := context.WithTimeout(ctx, pauseTimeout) + defer cancel() + err = d.retry(ctx, func() error { + err = d.docker.PauseContainer(id) + return err + }) + return filterNoSuchContainer(ctx, err) +} + +func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error) { + span, _ := opentracing.StartSpanFromContext(ctx, "docker_unpause_container") + defer span.Finish() + ctx, cancel := context.WithTimeout(ctx, pauseTimeout) + defer cancel() + err = d.retry(ctx, func() error { + err = d.docker.UnpauseContainer(id) + return err + }) + return filterNoSuchContainer(ctx, err) +} + func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.Image, err error) { span, ctx := opentracing.StartSpanFromContext(ctx, "docker_inspect_image") defer span.Finish() diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index dd389bf3c..74bbbb80a 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -34,6 +34,12 @@ type Cookie interface { // Run() MUST monitor the context. task cancellation is indicated by // cancelling the context. Run(ctx context.Context) (WaitResult, error) + + // Freeze the container to pause running processes + Freeze(ctx context.Context) error + + // Unfreeze a frozen container to unpause frozen processes + Unfreeze(ctx context.Context) error } type WaitResult interface { diff --git a/api/agent/drivers/mock/mocker.go b/api/agent/drivers/mock/mocker.go index 2cb7fe867..d73e4e7e0 100644 --- a/api/agent/drivers/mock/mocker.go +++ b/api/agent/drivers/mock/mocker.go @@ -24,6 +24,14 @@ type cookie struct { m *Mocker } +func (c *cookie) Freeze(context.Context) error { + return nil +} + +func (c *cookie) Unfreeze(context.Context) error { + return nil +} + func (c *cookie) Close(context.Context) error { return nil } func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) { diff --git a/api/agent/resource.go b/api/agent/resource.go index a22319c7e..5529c28ea 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -40,6 +40,9 @@ type ResourceTracker interface { // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool + + // returns number of waiters waiting for a resource token blocked on condition variable + GetResourceTokenWaiterCount() uint64 } type resourceTracker struct { @@ -66,6 +69,8 @@ type resourceTracker struct { cpuAsyncUsed uint64 // cpu in use for async area in which agent stops dequeuing async jobs cpuAsyncHWMark uint64 + // number of waiters waiting for a token blocked on the condition variable + tokenWaiterCount uint64 } func NewResourceTracker() ResourceTracker { @@ -123,6 +128,17 @@ func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isA } } +// returns number of waiters waiting for a resource token blocked on condition variable +func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 { + var waiters uint64 + + a.cond.L.Lock() + waiters = a.tokenWaiterCount + a.cond.L.Unlock() + + return waiters +} + // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { @@ -158,7 +174,9 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c isWaiting = true for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { + a.tokenWaiterCount++ c.Wait() + a.tokenWaiterCount-- } isWaiting = false diff --git a/docs/operating/docker.md b/docs/operating/docker.md index ea0197899..aaad237ca 100644 --- a/docs/operating/docker.md +++ b/docs/operating/docker.md @@ -4,7 +4,7 @@ To get the best performance, you'll want to ensure that Docker is configured pro 1. Linux 4.7 or newer with aufs or overlay2 module. 2. Ubuntu 16.04 LTS or newer with aufs or overlay2 module. -3. Docker 1.12 or newer to be available. +3. Docker 17.06 or newer to be available. It is important to reconfigure host's Docker with this filesystem module. Thus, in your Docker start scripts you must do as following: diff --git a/docs/operating/options.md b/docs/operating/options.md index 6caef3cf5..d75baa65a 100644 --- a/docs/operating/options.md +++ b/docs/operating/options.md @@ -29,6 +29,8 @@ docker run -e VAR_NAME=VALUE ... | `FN_LOG_DEST` | Set a url to send logs to, instead of stderr. [scheme://][host][:port][/path]; default scheme to udp:// if none given, possible schemes: { udp, tcp, file } | `FN_LOG_PREFIX` | If supplying a syslog url in `FN_LOG_DEST`, a prefix to add to each log line | `FN_API_CORS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None | +| `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 | + `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before attempting to terminate an idle hot container when the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | | `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock | | `DOCKER_API_VERSION` | Docker remote API version. | 1.24 | | `DOCKER_TLS_VERIFY` | Set this option to enable/disable Docker remote API over TLS/SSL. | 0 |