Idle Hot Container Freeze/Preempt Support (#733)

* fn: freeze/unfreeze and eject idle under resource contention
This commit is contained in:
Tolga Ceylan
2018-02-07 17:21:53 -08:00
committed by GitHub
parent 105947d031
commit f27d47f2dd
8 changed files with 184 additions and 22 deletions

View File

@@ -3,7 +3,10 @@ package agent
import (
"context"
"io"
"math"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -26,8 +29,6 @@ import (
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
// TODO storing logs / call can push call over the timeout
// TODO if we don't cap the number of any one container we could get into a situation
// where the machine is full but all the containers are idle up to the idle timeout. meh.
// TODO async is still broken, but way less so. we need to modify mq semantics
// to be much more robust. now we're at least running it if we delete the msg,
// but we may never store info about that execution so still broked (if fn
@@ -115,6 +116,9 @@ type agent struct {
shutonce sync.Once
shutdown chan struct{}
freezeIdleMsecs time.Duration
ejectIdleMsecs time.Duration
stats // TODO kill me
// Prometheus HTTP handler
@@ -127,13 +131,25 @@ func New(da DataAccess) Agent {
ServerVersion: "17.06.0",
})
freezeIdleMsecs, err := getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond)
if err != nil {
logrus.WithError(err).Fatal("error initializing freeze idle delay")
}
ejectIdleMsecs, err := getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond)
if err != nil {
logrus.WithError(err).Fatal("error initializing eject idle delay")
}
a := &agent{
da: da,
driver: driver,
slotMgr: NewSlotQueueMgr(),
resources: NewResourceTracker(),
shutdown: make(chan struct{}),
promHandler: promhttp.Handler(),
da: da,
driver: driver,
slotMgr: NewSlotQueueMgr(),
resources: NewResourceTracker(),
shutdown: make(chan struct{}),
freezeIdleMsecs: freezeIdleMsecs,
ejectIdleMsecs: ejectIdleMsecs,
promHandler: promhttp.Handler(),
}
// TODO assert that agent doesn't get started for API nodes up above ?
@@ -143,6 +159,26 @@ func New(da DataAccess) Agent {
return a
}
func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) {
delay := defaultVal
if dur := os.Getenv(name); dur != "" {
durInt, err := strconv.ParseInt(dur, 10, 64)
if err != nil {
return defaultVal, err
}
// disable if negative or set to msecs specified.
if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond {
delay = math.MaxInt64
} else {
delay = time.Duration(durInt) * time.Millisecond
}
}
return delay, nil
}
// TODO shuffle this around somewhere else (maybe)
func (a *agent) Enqueue(ctx context.Context, call *models.Call) error {
return a.da.Enqueue(ctx, call)
@@ -681,25 +717,68 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
default: // ok
}
isFrozen := false
elapsed := time.Duration(0)
freezerTicker := a.freezeIdleMsecs
idleTimeout := time.Duration(call.IdleTimeout) * time.Second
done := make(chan struct{})
state.UpdateState(ctx, ContainerStateIdle, call.slots)
s := call.slots.queueSlot(&hotSlot{done, errC, container, nil})
select {
case <-s.trigger:
case <-time.After(time.Duration(call.IdleTimeout) * time.Second):
if call.slots.ejectSlot(s) {
logger.Info("Canceling inactive hot function")
return
}
case <-ctx.Done(): // container shutdown
if call.slots.ejectSlot(s) {
return
}
case <-a.shutdown: // server shutdown
if call.slots.ejectSlot(s) {
for {
select {
case <-s.trigger: // slot already consumed
case <-ctx.Done(): // container shutdown
case <-a.shutdown: // server shutdown
case <-time.After(idleTimeout): // in case idleTimeout < a.freezeIdleMsecs or idleTimeout < a.ejectIdleMsecs
case <-time.After(freezerTicker):
elapsed += a.freezeIdleMsecs
freezerTicker = math.MaxInt64 // do not fire again
if elapsed < idleTimeout { // in case idleTimeout <= a.freezeIdleMsecs
if !isFrozen {
err := cookie.Freeze(ctx)
if err != nil {
logger.WithError(err).Error("freeze error")
return
}
isFrozen = true
}
continue
}
case <-time.After(a.ejectIdleMsecs):
elapsed += a.ejectIdleMsecs
if elapsed < idleTimeout {
// if someone is waiting for resource in our slot queue, we must not terminate,
// otherwise, see if other slot queues have resource waiters that are blocked.
stats := call.slots.getStats()
if stats.containerStates[ContainerStateWait] > 0 ||
a.resources.GetResourceTokenWaiterCount() <= 0 {
continue
}
logger.Debug("attempting hot function eject")
}
}
break
}
// if we can eject token, that means we are here due to
// abort/shutdown/timeout, attempt to eject and terminate,
// otherwise continue processing the request
if call.slots.ejectSlot(s) {
return
}
if isFrozen {
err := cookie.Unfreeze(ctx)
if err != nil {
logger.WithError(err).Error("unfreeze error")
return
}
isFrozen = false
}
state.UpdateState(ctx, ContainerStateBusy, call.slots)

View File

@@ -212,6 +212,28 @@ func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {
return c.drv.run(ctx, c.id, c.task)
}
func (c *cookie) Freeze(ctx context.Context) error {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Freeze"})
log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker pause")
err := c.drv.docker.PauseContainer(c.id, ctx)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error pausing container")
}
return err
}
func (c *cookie) Unfreeze(ctx context.Context) error {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Unfreeze"})
log.WithFields(logrus.Fields{"call_id": c.id}).Debug("docker unpause")
err := c.drv.docker.UnpauseContainer(c.id, ctx)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{"call_id": c.id}).Error("error unpausing container")
}
return err
}
func (drv *DockerDriver) removeContainer(ctx context.Context, container string) error {
err := drv.docker.RemoveContainer(docker.RemoveContainerOptions{
ID: container, Force: true, RemoveVolumes: true, Context: ctx})

View File

@@ -19,6 +19,7 @@ import (
const (
retryTimeout = 10 * time.Minute
pauseTimeout = 5 * time.Second
)
// wrap docker client calls so we can retry 500s, kind of sucks but fsouza doesn't
@@ -34,6 +35,8 @@ type dockerClient interface {
StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error
CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error)
RemoveContainer(opts docker.RemoveContainerOptions) error
PauseContainer(id string, ctx context.Context) error
UnpauseContainer(id string, ctx context.Context) error
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
InspectImage(ctx context.Context, name string) (*docker.Image, error)
InspectContainerWithContext(container string, ctx context.Context) (*docker.Container, error)
@@ -260,6 +263,30 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er
return filterNoSuchContainer(ctx, err)
}
func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error) {
span, _ := opentracing.StartSpanFromContext(ctx, "docker_pause_container")
defer span.Finish()
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
defer cancel()
err = d.retry(ctx, func() error {
err = d.docker.PauseContainer(id)
return err
})
return filterNoSuchContainer(ctx, err)
}
func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error) {
span, _ := opentracing.StartSpanFromContext(ctx, "docker_unpause_container")
defer span.Finish()
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
defer cancel()
err = d.retry(ctx, func() error {
err = d.docker.UnpauseContainer(id)
return err
})
return filterNoSuchContainer(ctx, err)
}
func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.Image, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_inspect_image")
defer span.Finish()

View File

@@ -34,6 +34,12 @@ type Cookie interface {
// Run() MUST monitor the context. task cancellation is indicated by
// cancelling the context.
Run(ctx context.Context) (WaitResult, error)
// Freeze the container to pause running processes
Freeze(ctx context.Context) error
// Unfreeze a frozen container to unpause frozen processes
Unfreeze(ctx context.Context) error
}
type WaitResult interface {

View File

@@ -24,6 +24,14 @@ type cookie struct {
m *Mocker
}
func (c *cookie) Freeze(context.Context) error {
return nil
}
func (c *cookie) Unfreeze(context.Context) error {
return nil
}
func (c *cookie) Close(context.Context) error { return nil }
func (c *cookie) Run(ctx context.Context) (drivers.WaitResult, error) {

View File

@@ -40,6 +40,9 @@ type ResourceTracker interface {
// machine. It must be called before GetResourceToken or GetResourceToken may hang.
// Memory is expected to be provided in MB units.
IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool
// returns number of waiters waiting for a resource token blocked on condition variable
GetResourceTokenWaiterCount() uint64
}
type resourceTracker struct {
@@ -66,6 +69,8 @@ type resourceTracker struct {
cpuAsyncUsed uint64
// cpu in use for async area in which agent stops dequeuing async jobs
cpuAsyncHWMark uint64
// number of waiters waiting for a token blocked on the condition variable
tokenWaiterCount uint64
}
func NewResourceTracker() ResourceTracker {
@@ -123,6 +128,17 @@ func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isA
}
}
// returns number of waiters waiting for a resource token blocked on condition variable
func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 {
var waiters uint64
a.cond.L.Lock()
waiters = a.tokenWaiterCount
a.cond.L.Unlock()
return waiters
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken {
@@ -158,7 +174,9 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
isWaiting = true
for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil {
a.tokenWaiterCount++
c.Wait()
a.tokenWaiterCount--
}
isWaiting = false