From 0addcb8911684062919d24da046e49bbf53a14eb Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 23 Mar 2018 16:35:35 -0700 Subject: [PATCH] fn: pre-fork pool for namespace/network speedup (#874) * fn: pre-fork pool experimental implementation --- Gopkg.lock | 8 +- Makefile | 4 +- api/agent/agent.go | 11 +- api/agent/config.go | 10 + api/agent/drivers/docker/docker.go | 40 +- api/agent/drivers/docker/docker_client.go | 2 +- api/agent/drivers/docker/docker_pool.go | 297 ++++++++++++ api/agent/drivers/docker/docker_pool_test.go | 131 ++++++ api/agent/drivers/driver.go | 10 +- api/agent/drivers/mock/mocker.go | 4 + api/agent/resource.go | 12 +- vendor/golang.org/x/time/AUTHORS | 3 + vendor/golang.org/x/time/CONTRIBUTING.md | 26 ++ vendor/golang.org/x/time/CONTRIBUTORS | 3 + vendor/golang.org/x/time/LICENSE | 27 ++ vendor/golang.org/x/time/PATENTS | 22 + vendor/golang.org/x/time/README.md | 17 + vendor/golang.org/x/time/rate/rate.go | 380 ++++++++++++++++ vendor/golang.org/x/time/rate/rate_go16.go | 21 + vendor/golang.org/x/time/rate/rate_go17.go | 21 + vendor/golang.org/x/time/rate/rate_test.go | 449 +++++++++++++++++++ 21 files changed, 1484 insertions(+), 14 deletions(-) create mode 100644 api/agent/drivers/docker/docker_pool.go create mode 100644 api/agent/drivers/docker/docker_pool_test.go create mode 100644 vendor/golang.org/x/time/AUTHORS create mode 100644 vendor/golang.org/x/time/CONTRIBUTING.md create mode 100644 vendor/golang.org/x/time/CONTRIBUTORS create mode 100644 vendor/golang.org/x/time/LICENSE create mode 100644 vendor/golang.org/x/time/PATENTS create mode 100644 vendor/golang.org/x/time/README.md create mode 100644 vendor/golang.org/x/time/rate/rate.go create mode 100644 vendor/golang.org/x/time/rate/rate_go16.go create mode 100644 vendor/golang.org/x/time/rate/rate_go17.go create mode 100644 vendor/golang.org/x/time/rate/rate_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 96958a57f..0afd1fdda 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -587,6 +587,12 @@ revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" version = "v0.3.0" +[[projects]] + branch = "master" + name = "golang.org/x/time" + packages = ["rate"] + revision = "26559e0f760e39c24d730d3224364aef164ee23f" + [[projects]] branch = "master" name = "google.golang.org/api" @@ -653,6 +659,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "bd152d9d0bb0ac9975ecc30f368747c21a047f30bd5037fb19e4835631baae52" + inputs-digest = "a7ba96a9cacdff4bbcb3f1824aea4095601292e2ac97987245bb27f9853d887b" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 6f164ed62..26841d589 100644 --- a/Makefile +++ b/Makefile @@ -57,6 +57,8 @@ test-system: test-basic full-test: test test-api test-system +img-busybox: + docker pull busybox img-hello: docker pull fnproject/hello img-mysql: @@ -66,7 +68,7 @@ img-postgres: img-minio: docker pull minio/minio -pull-images: img-hello img-mysql img-postgres img-minio +pull-images: img-hello img-mysql img-postgres img-minio img-busybox test-datastore: cd api/datastore && go test -v ./... diff --git a/api/agent/agent.go b/api/agent/agent.go index 2644bb017..12c61b1de 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -129,7 +129,10 @@ func createAgent(da DataAccess, withDocker bool) Agent { var driver drivers.Driver if withDocker { driver = docker.NewDocker(drivers.Config{ - ServerVersion: cfg.MinDockerVersion, + ServerVersion: cfg.MinDockerVersion, + PreForkPoolSize: cfg.PreForkPoolSize, + PreForkImage: cfg.PreForkImage, + PreForkCmd: cfg.PreForkCmd, }) } else { driver = mock.New() @@ -154,12 +157,16 @@ func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { } func (a *agent) Close() error { + var err error a.shutonce.Do(func() { + if a.driver != nil { + err = a.driver.Close() + } close(a.shutdown) }) a.wg.Wait() - return nil + return err } func (a *agent) Submit(callI Call) error { diff --git a/api/agent/config.go b/api/agent/config.go index 8e2837380..06327e82c 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -20,6 +20,9 @@ type AgentConfig struct { MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` MaxTotalMemory uint64 `json:"max_total_memory_bytes"` MaxFsSize uint64 `json:"max_fs_size_mb"` + PreForkPoolSize uint64 `json:"pre_fork_pool_size"` + PreForkImage string `json:"pre_fork_image"` + PreForkCmd string `json:"pre_fork_pool_cmd"` } const ( @@ -33,6 +36,9 @@ const ( EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" + EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" + EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" + EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" MaxDisabledMsecs = time.Duration(math.MaxInt64) ) @@ -56,11 +62,15 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU) err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory) err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize) + err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize) if err != nil { return cfg, err } + cfg.PreForkImage = os.Getenv(EnvPreForkImage) + cfg.PreForkCmd = os.Getenv(EnvPreForkCmd) + if cfg.EjectIdle == time.Duration(0) { return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) } diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index c1c1d9b74..9ecd6739b 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -52,6 +52,7 @@ type DockerDriver struct { docker dockerClient // retries on *docker.Client, restricts ad hoc *docker.Client usage / retries hostname string auths map[string]docker.AuthConfiguration + pool DockerPool } // implements drivers.Driver @@ -75,6 +76,10 @@ func NewDocker(conf drivers.Config) *DockerDriver { } } + if conf.PreForkPoolSize != 0 { + driver.pool = NewDockerPool(conf, driver) + } + return driver } @@ -118,6 +123,14 @@ func registryFromEnv() map[string]docker.AuthConfiguration { return auths.Configs } +func (drv *DockerDriver) Close() error { + var err error + if drv.pool != nil { + err = drv.pool.Close() + } + return err +} + func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) var cmd []string @@ -141,7 +154,6 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask MemorySwap: int64(task.Memory()), // disables swap KernelMemory: int64(task.Memory()), CPUShares: drv.conf.CPUShares, - Hostname: drv.hostname, Image: task.Image(), Volumes: map[string]struct{}{}, OpenStdin: true, @@ -159,6 +171,20 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Context: ctx, } + poolId := "" + if drv.pool != nil { + id, err := drv.pool.AllocPoolId() + if err != nil { + log.WithError(err).Error("Could not fetch pre fork pool container") + } else { + poolId = id + container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id) + } + } else { + // hostname and container NetworkMode is not compatible. + container.Config.Hostname = drv.hostname + } + // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, // which is approx 8 CPUS in CFS world. @@ -210,16 +236,20 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask } // discard removal error - return &cookie{id: task.Id(), task: task, drv: drv}, nil + return &cookie{id: task.Id(), task: task, drv: drv, poolId: poolId}, nil } type cookie struct { - id string - task drivers.ContainerTask - drv *DockerDriver + id string + poolId string + task drivers.ContainerTask + drv *DockerDriver } func (c *cookie) Close(ctx context.Context) error { + if c.poolId != "" && c.drv.pool != nil { + defer c.drv.pool.FreePoolId(c.poolId) + } return c.drv.removeContainer(ctx, c.id) } diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index 6ba04b3c1..88f5cd8e3 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -275,7 +275,7 @@ func filterNoSuchContainer(ctx context.Context, err error) error { _, containerNotFound := err.(*docker.NoSuchContainer) dockerErr, ok := err.(*docker.Error) if containerNotFound || (ok && dockerErr.Status == 404) { - log.WithError(err).Error("filtering error") + log.WithError(err).Info("filtering error") return nil } return err diff --git a/api/agent/drivers/docker/docker_pool.go b/api/agent/drivers/docker/docker_pool.go new file mode 100644 index 000000000..9cee36ad4 --- /dev/null +++ b/api/agent/drivers/docker/docker_pool.go @@ -0,0 +1,297 @@ +package docker + +import ( + "context" + "errors" + "fmt" + "io" + "runtime" + "strings" + "sync" + "time" + + "github.com/fnproject/fn/api/agent/drivers" + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/id" + + "github.com/fsouza/go-dockerclient" + "github.com/sirupsen/logrus" + "golang.org/x/time/rate" +) + +// Prefork Pool is used in namespace optimizations to avoid creating and +// tearing down namespaces with every function container run. Instead, we reuse +// already available namespaces from these running container instances. These +// containers are not designed to run anything but placeholders for namespaces +// such as a minimal busybox container with 'tail -f /dev/null' which blocks +// forever. In other words, every function container is paired with a pool buddy +// where pool buddy provides already creates namespaces. These are currently +// network and user namespaces, but perhaps can be extended to also use pid and ipc. +// (see docker.go Prepare() on how this is currently being used.) +// Currently the pool is a set size and it does not grow on demand. + +var ( + ErrorPoolEmpty = errors.New("docker pre fork pool empty") +) + +const ( + LimitPerSec = 10 + LimitBurst = 20 + + DefaultImage = "busybox" + DefaultCmd = "tail -f /dev/null" + + ShutdownTimeout = time.Duration(1) * time.Second +) + +type poolTask struct { + id string + image string + cmd string +} + +func (c *poolTask) Id() string { return c.id } +func (c *poolTask) Command() string { return c.cmd } +func (c *poolTask) Input() io.Reader { return nil } +func (c *poolTask) Logger() (io.Writer, io.Writer) { return nil, nil } +func (c *poolTask) Volumes() [][2]string { return nil } +func (c *poolTask) WorkDir() string { return "" } +func (c *poolTask) Close() {} +func (c *poolTask) Image() string { return c.image } +func (c *poolTask) Timeout() time.Duration { return 0 } +func (c *poolTask) EnvVars() map[string]string { return nil } +func (c *poolTask) Memory() uint64 { return 0 } +func (c *poolTask) CPUs() uint64 { return 0 } +func (c *poolTask) FsSize() uint64 { return 0 } +func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {} + +type dockerPool struct { + lock sync.Mutex + inuse map[string]struct{} + free []string + limiter *rate.Limiter + cancel func() + wg sync.WaitGroup // TODO rename +} + +type DockerPoolStats struct { + inuse int + free int +} + +type DockerPool interface { + // fetch a pre-allocated free id from the pool + // may return too busy error + AllocPoolId() (string, error) + + // Release the id back to the pool + FreePoolId(id string) + + // stop and terminate the pool + Close() error + + // returns inuse versus free + Usage() DockerPoolStats +} + +func NewDockerPool(conf drivers.Config, driver *DockerDriver) DockerPool { + + // Docker pool is an optimization & feature only for Linux + if runtime.GOOS != "linux" || conf.PreForkPoolSize == 0 { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + + log := common.Logger(ctx) + log.Error("WARNING: Experimental Prefork Docker Pool Enabled") + + pool := &dockerPool{ + inuse: make(map[string]struct{}, conf.PreForkPoolSize), + free: make([]string, 0, conf.PreForkPoolSize), + limiter: rate.NewLimiter(LimitPerSec, LimitBurst), + cancel: cancel, + } + + for i := uint64(0); i < conf.PreForkPoolSize; i++ { + + task := &poolTask{ + id: fmt.Sprintf("%d_prefork_%s", i, id.New().String()), + image: DefaultImage, + cmd: DefaultCmd, + } + + if conf.PreForkImage != "" { + task.image = conf.PreForkImage + } + if conf.PreForkCmd != "" { + task.cmd = conf.PreForkCmd + } + + pool.wg.Add(1) + go pool.nannyContainer(ctx, driver, task) + } + + return pool +} + +func (pool *dockerPool) Close() error { + pool.cancel() + pool.wg.Wait() + return nil +} + +func (pool *dockerPool) nannyContainer(ctx context.Context, driver *DockerDriver, task *poolTask) { + defer pool.wg.Done() + + log := common.Logger(ctx).WithFields(logrus.Fields{"name": task.Id()}) + + containerOpts := docker.CreateContainerOptions{ + Name: task.Id(), + Config: &docker.Config{ + Cmd: strings.Fields(task.Command()), + Hostname: task.Id(), + Image: task.Image(), + Volumes: map[string]struct{}{}, + OpenStdin: false, + AttachStdout: false, + AttachStderr: false, + AttachStdin: false, + StdinOnce: false, + }, + HostConfig: &docker.HostConfig{ + LogConfig: docker.LogConfig{ + Type: "none", + }, + AutoRemove: true, + }, + Context: ctx, + } + + removeOpts := docker.RemoveContainerOptions{ + ID: task.Id(), + Force: true, + RemoveVolumes: true, + Context: ctx, + } + + // We spin forever, keeping the pool resident and running at all times. + for { + err := pool.limiter.Wait(ctx) + if err != nil { + // should not really happen unless ctx has a deadline or burst is 0. + log.WithError(err).Info("prefork pool rate limiter failed") + break + } + + // Let's try to clean up any left overs + err = driver.docker.RemoveContainer(removeOpts) + if err != nil { + log.WithError(err).Info("prefork pool container remove failed (this is probably OK)") + } + + err = driver.ensureImage(ctx, task) + if err != nil { + log.WithError(err).Info("prefork pool image pull failed") + continue + } + + _, err = driver.docker.CreateContainer(containerOpts) + if err != nil { + log.WithError(err).Info("prefork pool container create failed") + continue + } + + err = driver.docker.StartContainerWithContext(task.Id(), nil, ctx) + if err != nil { + log.WithError(err).Info("prefork pool container start failed") + continue + } + + log.Debug("prefork pool container ready") + + // IMPORTANT: container is now up and running. Register it to make it + // available for function containers. + pool.register(task.Id()) + + // We are optimistic here where provided image and command really blocks + // and runs forever. + exitCode, err := driver.docker.WaitContainerWithContext(task.Id(), ctx) + + // IMPORTANT: We have exited. This window is potentially very destructive, as any new + // function containers created during this window will fail. We must immediately + // proceed to unregister ourself to avoid further issues. + pool.unregister(task.Id()) + + log.WithError(err).Infof("prefork pool container exited exit_code=%d", exitCode) + } + + // final exit cleanup + ctx, cancel := context.WithTimeout(context.Background(), ShutdownTimeout) + defer cancel() + removeOpts.Context = ctx + driver.docker.RemoveContainer(removeOpts) +} + +func (pool *dockerPool) register(id string) { + pool.lock.Lock() + pool.free = append(pool.free, id) + pool.lock.Unlock() +} + +func (pool *dockerPool) unregister(id string) { + pool.lock.Lock() + + _, ok := pool.inuse[id] + if ok { + delete(pool.inuse, id) + } else { + for i := 0; i < len(pool.free); i += 1 { + if pool.free[i] == id { + pool.free = append(pool.free[:i], pool.free[i+1:]...) + break + } + } + } + + pool.lock.Unlock() +} + +func (pool *dockerPool) AllocPoolId() (string, error) { + pool.lock.Lock() + defer pool.lock.Unlock() + + // We currently do not grow the pool if we run out of pre-forked containers + if len(pool.free) == 0 { + return "", ErrorPoolEmpty + } + + id := pool.free[len(pool.free)-1] + pool.free = pool.free[:len(pool.free)-1] + pool.inuse[id] = struct{}{} + + return id, nil +} + +func (pool *dockerPool) FreePoolId(id string) { + pool.lock.Lock() + + _, ok := pool.inuse[id] + if ok { + delete(pool.inuse, id) + pool.free = append(pool.free, id) + } + + pool.lock.Unlock() +} + +func (pool *dockerPool) Usage() DockerPoolStats { + var stats DockerPoolStats + pool.lock.Lock() + + stats.inuse = len(pool.inuse) + stats.free = len(pool.free) + + pool.lock.Unlock() + return stats +} diff --git a/api/agent/drivers/docker/docker_pool_test.go b/api/agent/drivers/docker/docker_pool_test.go new file mode 100644 index 000000000..a1c0e34c2 --- /dev/null +++ b/api/agent/drivers/docker/docker_pool_test.go @@ -0,0 +1,131 @@ +package docker + +import ( + "testing" + "time" + + "github.com/fnproject/fn/api/agent/drivers" +) + +func TestRunnerDockerPool(t *testing.T) { + + cfg := &drivers.Config{} + + // shouldn't spin up a pool since cfg is empty + drv := NewDocker(*cfg) + + cfg.PreForkPoolSize = 2 + pool := NewDockerPool(*cfg, drv) + + // primitive wait here + i := 0 + for ; i < 10; i++ { + stats := pool.Usage() + if stats.free == 2 { + break + } + + <-time.After(time.Duration(500) * time.Millisecond) + } + if i == 10 { + t.Fatalf("pool initialize timeout stats=%+v", pool.Usage()) + } + + id1, err := pool.AllocPoolId() + if err != nil { + t.Fatalf("pool AllocPoolId id1 err=%s", err.Error()) + } + t.Logf("pool AllocPoolId id1=%s", id1) + + id2, err := pool.AllocPoolId() + if err != nil { + t.Fatalf("pool AllocPoolId id2 err=%s", err.Error()) + } + t.Logf("pool AllocPoolId id2=%s", id2) + + id3, err := pool.AllocPoolId() + if err == nil { + t.Fatalf("pool AllocPoolId id3 should be err, but got id=%s", id3) + } + t.Logf("pool AllocPoolId id3=%s", id3) + + pool.FreePoolId("nonsense") + + id4, err := pool.AllocPoolId() + if err == nil { + t.Fatalf("pool AllocPoolId id4 should be err, but got id=%s", id3) + } + t.Logf("pool AllocPoolId id4=%s", id4) + + pool.FreePoolId(id1) + + id5, err := pool.AllocPoolId() + if err != nil { + t.Fatalf("pool AllocPoolId id5 err=%s", err.Error()) + } + t.Logf("pool AllocPoolId id5=%s", id5) + if id5 != id1 { + t.Fatalf("pool AllocPoolId id5 != id1 (%s != %s)", id5, id1) + } + + err = pool.Close() + if err != nil { + t.Fatalf("pool close err=%s", err.Error()) + } + + err = drv.Close() + if err != nil { + t.Fatalf("drv close err=%s", err.Error()) + } + + stats := pool.Usage() + if stats.free != 0 && stats.inuse != 0 { + t.Fatalf("pool shutdown timeout stats=%+v", stats) + } +} + +func TestRunnerDockerPoolFaulty(t *testing.T) { + + cfg := &drivers.Config{} + + // shouldn't spin up a pool since cfg is empty + drv := NewDocker(*cfg) + + cfg.PreForkPoolSize = 2 + cfg.PreForkCmd = "sleep 0" + + pool := NewDockerPool(*cfg, drv) + + <-time.After(time.Duration(500) * time.Millisecond) + + // Not much to see if pre-fork has exited, but let's close + // and wait at least to make sure we don't crash and burn. + id1, err := pool.AllocPoolId() + t.Logf("pool AllocPoolId id=%s err=%v", id1, err) + if id1 != "" { + pool.FreePoolId(id1) + } + + <-time.After(time.Duration(500) * time.Millisecond) + + id2, err := pool.AllocPoolId() + t.Logf("pool AllocPoolId id=%s err=%v", id2, err) + if id2 != "" { + pool.FreePoolId(id2) + } + + err = pool.Close() + if err != nil { + t.Fatalf("pool close err=%s", err.Error()) + } + + err = drv.Close() + if err != nil { + t.Fatalf("drv close err=%s", err.Error()) + } + + stats := pool.Usage() + if stats.free != 0 && stats.inuse != 0 { + t.Fatalf("pool shutdown timeout stats=%+v", stats) + } +} diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 1d6497f17..74cccddaf 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -60,6 +60,9 @@ type Driver interface { // // The returned cookie should respect the task's timeout when it is run. Prepare(ctx context.Context, task ContainerTask) (Cookie, error) + + // close & shutdown the driver + Close() error } // RunResult indicates only the final state of the task. @@ -189,8 +192,11 @@ const ( type Config struct { Docker string `json:"docker"` // TODO CPUShares should likely be on a per container basis - CPUShares int64 `json:"cpu_shares"` - ServerVersion string `json:"server_version"` + CPUShares int64 `json:"cpu_shares"` + ServerVersion string `json:"server_version"` + PreForkPoolSize uint64 `json:"pre_fork_pool_size"` + PreForkImage string `json:"pre_fork_image"` + PreForkCmd string `json:"pre_fork_cmd"` } func average(samples []Stat) (Stat, bool) { diff --git a/api/agent/drivers/mock/mocker.go b/api/agent/drivers/mock/mocker.go index 79415af7e..dfc2568d4 100644 --- a/api/agent/drivers/mock/mocker.go +++ b/api/agent/drivers/mock/mocker.go @@ -20,6 +20,10 @@ func (m *Mocker) Prepare(context.Context, drivers.ContainerTask) (drivers.Cookie return &cookie{m}, nil } +func (m *Mocker) Close() error { + return nil +} + type cookie struct { m *Mocker } diff --git a/api/agent/resource.go b/api/agent/resource.go index cd22191f6..72da51e45 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -388,7 +388,7 @@ func (a *resourceTracker) initializeMemory(cfg *AgentConfig) { } // clamp the available memory by head room (for docker, ourselves, other processes) - headRoom, err := getMemoryHeadRoom(availMemory) + headRoom, err := getMemoryHeadRoom(availMemory, cfg) if err != nil { logrus.WithError(err).Fatal("Out of memory") } @@ -441,11 +441,19 @@ func (a *resourceTracker) initializeMemory(cfg *AgentConfig) { } // headroom estimation in order not to consume entire RAM if possible -func getMemoryHeadRoom(usableMemory uint64) (uint64, error) { +func getMemoryHeadRoom(usableMemory uint64, cfg *AgentConfig) (uint64, error) { // get %10 of the RAM headRoom := uint64(usableMemory / 10) + // TODO: improve this pre-fork calculation, we should fetch/query this + // instead of estimate below. + // if pre-fork pool is enabled, add 1 MB per pool-item + if cfg != nil && cfg.PreForkPoolSize != 0 { + headRoom += Mem1MB * cfg.PreForkPoolSize + } + + // TODO: improve these calculations. // clamp this with 256MB min -- 5GB max maxHeadRoom := uint64(5 * Mem1GB) minHeadRoom := uint64(256 * Mem1MB) diff --git a/vendor/golang.org/x/time/AUTHORS b/vendor/golang.org/x/time/AUTHORS new file mode 100644 index 000000000..15167cd74 --- /dev/null +++ b/vendor/golang.org/x/time/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/time/CONTRIBUTING.md b/vendor/golang.org/x/time/CONTRIBUTING.md new file mode 100644 index 000000000..d0485e887 --- /dev/null +++ b/vendor/golang.org/x/time/CONTRIBUTING.md @@ -0,0 +1,26 @@ +# Contributing to Go + +Go is an open source project. + +It is the work of hundreds of contributors. We appreciate your help! + +## Filing issues + +When [filing an issue](https://golang.org/issue/new), make sure to answer these five questions: + +1. What version of Go are you using (`go version`)? +2. What operating system and processor architecture are you using? +3. What did you do? +4. What did you expect to see? +5. What did you see instead? + +General questions should go to the [golang-nuts mailing list](https://groups.google.com/group/golang-nuts) instead of the issue tracker. +The gophers there will answer or ask you to file an issue if you've tripped over a bug. + +## Contributing code + +Please read the [Contribution Guidelines](https://golang.org/doc/contribute.html) +before sending patches. + +Unless otherwise noted, the Go source files are distributed under +the BSD-style license found in the LICENSE file. diff --git a/vendor/golang.org/x/time/CONTRIBUTORS b/vendor/golang.org/x/time/CONTRIBUTORS new file mode 100644 index 000000000..1c4577e96 --- /dev/null +++ b/vendor/golang.org/x/time/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/README.md b/vendor/golang.org/x/time/README.md new file mode 100644 index 000000000..ce9becdde --- /dev/null +++ b/vendor/golang.org/x/time/README.md @@ -0,0 +1,17 @@ +# Go Time + +This repository provides supplementary Go time packages. + +## Download/Install + +The easiest way to install is to run `go get -u golang.org/x/time`. You can +also manually git clone the repository to `$GOPATH/src/golang.org/x/time`. + +## Report Issues / Send Patches + +This repository uses Gerrit for code changes. To learn how to submit changes to +this repository, see https://golang.org/doc/contribute.html. + +The main issue tracker for the time repository is located at +https://github.com/golang/go/issues. Prefix your issue with "x/time:" in the +subject line, so it is easy to find. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 000000000..eabcd1147 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,380 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// contextContext is a temporary(?) copy of the context.Context type +// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ +// with the built-in context package. If people ever stop using Go 1.6 +// we can remove this. +type contextContext interface { + Deadline() (deadline time.Time, ok bool) + Done() <-chan struct{} + Err() error + Value(key interface{}) interface{} +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) wait(ctx contextContext) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/rate_go16.go b/vendor/golang.org/x/time/rate/rate_go16.go new file mode 100644 index 000000000..6bab1850f --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go16.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package rate + +import "golang.org/x/net/context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/golang.org/x/time/rate/rate_go17.go b/vendor/golang.org/x/time/rate/rate_go17.go new file mode 100644 index 000000000..f90d85f51 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go17.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package rate + +import "context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/golang.org/x/time/rate/rate_test.go b/vendor/golang.org/x/time/rate/rate_test.go new file mode 100644 index 000000000..e8add694f --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_test.go @@ -0,0 +1,449 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package rate + +import ( + "context" + "math" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestLimit(t *testing.T) { + if Limit(10) == Inf { + t.Errorf("Limit(10) == Inf should be false") + } +} + +func closeEnough(a, b Limit) bool { + return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9 +} + +func TestEvery(t *testing.T) { + cases := []struct { + interval time.Duration + lim Limit + }{ + {0, Inf}, + {-1, Inf}, + {1 * time.Nanosecond, Limit(1e9)}, + {1 * time.Microsecond, Limit(1e6)}, + {1 * time.Millisecond, Limit(1e3)}, + {10 * time.Millisecond, Limit(100)}, + {100 * time.Millisecond, Limit(10)}, + {1 * time.Second, Limit(1)}, + {2 * time.Second, Limit(0.5)}, + {time.Duration(2.5 * float64(time.Second)), Limit(0.4)}, + {4 * time.Second, Limit(0.25)}, + {10 * time.Second, Limit(0.1)}, + {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))}, + } + for _, tc := range cases { + lim := Every(tc.interval) + if !closeEnough(lim, tc.lim) { + t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim) + } + } +} + +const ( + d = 100 * time.Millisecond +) + +var ( + t0 = time.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t9 = t0.Add(time.Duration(9) * d) +) + +type allow struct { + t time.Time + n int + ok bool +} + +func run(t *testing.T, lim *Limiter, allows []allow) { + for i, allow := range allows { + ok := lim.AllowN(allow.t, allow.n) + if ok != allow.ok { + t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v", + i, allow.t, allow.n, ok, allow.ok) + } + } +} + +func TestLimiterBurst1(t *testing.T) { + run(t, NewLimiter(10, 1), []allow{ + {t0, 1, true}, + {t0, 1, false}, + {t0, 1, false}, + {t1, 1, true}, + {t1, 1, false}, + {t1, 1, false}, + {t2, 2, false}, // burst size is 1, so n=2 always fails + {t2, 1, true}, + {t2, 1, false}, + }) +} + +func TestLimiterBurst3(t *testing.T) { + run(t, NewLimiter(10, 3), []allow{ + {t0, 2, true}, + {t0, 2, false}, + {t0, 1, true}, + {t0, 1, false}, + {t1, 4, false}, + {t2, 1, true}, + {t3, 1, true}, + {t4, 1, true}, + {t4, 1, true}, + {t4, 1, false}, + {t4, 1, false}, + {t9, 3, true}, + {t9, 0, true}, + }) +} + +func TestLimiterJumpBackwards(t *testing.T) { + run(t, NewLimiter(10, 3), []allow{ + {t1, 1, true}, // start at t1 + {t0, 1, true}, // jump back to t0, two tokens remain + {t0, 1, true}, + {t0, 1, false}, + {t0, 1, false}, + {t1, 1, true}, // got a token + {t1, 1, false}, + {t1, 1, false}, + {t2, 1, true}, // got another token + {t2, 1, false}, + {t2, 1, false}, + }) +} + +func TestSimultaneousRequests(t *testing.T) { + const ( + limit = 1 + burst = 5 + numRequests = 15 + ) + var ( + wg sync.WaitGroup + numOK = uint32(0) + ) + + // Very slow replenishing bucket. + lim := NewLimiter(limit, burst) + + // Tries to take a token, atomically updates the counter and decreases the wait + // group counter. + f := func() { + defer wg.Done() + if ok := lim.Allow(); ok { + atomic.AddUint32(&numOK, 1) + } + } + + wg.Add(numRequests) + for i := 0; i < numRequests; i++ { + go f() + } + wg.Wait() + if numOK != burst { + t.Errorf("numOK = %d, want %d", numOK, burst) + } +} + +func TestLongRunningQPS(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + if runtime.GOOS == "openbsd" { + t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)") + return + } + + // The test runs for a few seconds executing many requests and then checks + // that overall number of requests is reasonable. + const ( + limit = 100 + burst = 100 + ) + var numOK = int32(0) + + lim := NewLimiter(limit, burst) + + var wg sync.WaitGroup + f := func() { + if ok := lim.Allow(); ok { + atomic.AddInt32(&numOK, 1) + } + wg.Done() + } + + start := time.Now() + end := start.Add(5 * time.Second) + for time.Now().Before(end) { + wg.Add(1) + go f() + + // This will still offer ~500 requests per second, but won't consume + // outrageous amount of CPU. + time.Sleep(2 * time.Millisecond) + } + wg.Wait() + elapsed := time.Since(start) + ideal := burst + (limit * float64(elapsed) / float64(time.Second)) + + // We should never get more requests than allowed. + if want := int32(ideal + 1); numOK > want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } + // We should get very close to the number of requests allowed. + if want := int32(0.999 * ideal); numOK < want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } +} + +type request struct { + t time.Time + n int + act time.Time + ok bool +} + +// dFromDuration converts a duration to a multiple of the global constant d +func dFromDuration(dur time.Duration) int { + // Adding a millisecond to be swallowed by the integer division + // because we don't care about small inaccuracies + return int((dur + time.Millisecond) / d) +} + +// dSince returns multiples of d since t0 +func dSince(t time.Time) int { + return dFromDuration(t.Sub(t0)) +} + +func runReserve(t *testing.T, lim *Limiter, req request) *Reservation { + return runReserveMax(t, lim, req, InfDuration) +} + +func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation { + r := lim.reserveN(req.t, req.n, maxReserve) + if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok { + t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)", + dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok) + } + return &r +} + +func TestSimpleReserve(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + runReserve(t, lim, request{t0, 2, t2, true}) + runReserve(t, lim, request{t3, 2, t4, true}) +} + +func TestMix(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst + runReserve(t, lim, request{t0, 2, t0, true}) + run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow + runReserve(t, lim, request{t1, 2, t2, true}) + run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow + run(t, lim, []allow{{t3, 1, true}}) +} + +func TestCancelInvalid(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 3, t3, false}) + r.CancelAt(t0) // should have no effect + runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens +} + +func TestCancelLast(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 2, t2, true}) + r.CancelAt(t1) // got 2 tokens back + runReserve(t, lim, request{t1, 2, t2, true}) +} + +func TestCancelTooLate(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 2, t2, true}) + r.CancelAt(t3) // too late to cancel - should have no effect + runReserve(t, lim, request{t3, 2, t4, true}) +} + +func TestCancel0Tokens(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 1, t1, true}) + runReserve(t, lim, request{t0, 1, t2, true}) + r.CancelAt(t0) // got 0 tokens back + runReserve(t, lim, request{t0, 1, t3, true}) +} + +func TestCancel1Token(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 2, t2, true}) + runReserve(t, lim, request{t0, 1, t3, true}) + r.CancelAt(t2) // got 1 token back + runReserve(t, lim, request{t2, 2, t4, true}) +} + +func TestCancelMulti(t *testing.T) { + lim := NewLimiter(10, 4) + + runReserve(t, lim, request{t0, 4, t0, true}) + rA := runReserve(t, lim, request{t0, 3, t3, true}) + runReserve(t, lim, request{t0, 1, t4, true}) + rC := runReserve(t, lim, request{t0, 1, t5, true}) + rC.CancelAt(t1) // get 1 token back + rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved + runReserve(t, lim, request{t1, 3, t5, true}) +} + +func TestReserveJumpBack(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 + runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst + runReserve(t, lim, request{t2, 2, t3, true}) +} + +func TestReserveJumpBackCancel(t *testing.T) { + lim := NewLimiter(10, 2) + + runReserve(t, lim, request{t1, 2, t1, true}) // start at t1 + r := runReserve(t, lim, request{t1, 2, t3, true}) + runReserve(t, lim, request{t1, 1, t4, true}) + r.CancelAt(t0) // cancel at t0, get 1 token back + runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst +} + +func TestReserveSetLimit(t *testing.T) { + lim := NewLimiter(5, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + runReserve(t, lim, request{t0, 2, t4, true}) + lim.SetLimitAt(t2, 10) + runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst +} + +func TestReserveSetLimitCancel(t *testing.T) { + lim := NewLimiter(5, 2) + + runReserve(t, lim, request{t0, 2, t0, true}) + r := runReserve(t, lim, request{t0, 2, t4, true}) + lim.SetLimitAt(t2, 10) + r.CancelAt(t2) // 2 tokens back + runReserve(t, lim, request{t2, 2, t3, true}) +} + +func TestReserveMax(t *testing.T) { + lim := NewLimiter(10, 2) + maxT := d + + runReserveMax(t, lim, request{t0, 2, t0, true}, maxT) + runReserveMax(t, lim, request{t0, 1, t1, true}, maxT) // reserve for close future + runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future +} + +type wait struct { + name string + ctx context.Context + n int + delay int // in multiples of d + nilErr bool +} + +func runWait(t *testing.T, lim *Limiter, w wait) { + start := time.Now() + err := lim.WaitN(w.ctx, w.n) + delay := time.Now().Sub(start) + if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) { + errString := "" + if !w.nilErr { + errString = "" + } + t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v", + w.name, w.n, err, delay, errString, d*time.Duration(w.delay)) + } +} + +func TestWaitSimple(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false}) + + runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false}) + + runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true}) + runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true}) +} + +func TestWaitCancel(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithCancel(context.Background()) + runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1 + go func() { + time.Sleep(d) + cancel() + }() + runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false}) + // should get 3 tokens back, and have lim.tokens = 2 + t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent) + runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true}) +} + +func TestWaitTimeout(t *testing.T) { + lim := NewLimiter(10, 3) + + ctx, cancel := context.WithTimeout(context.Background(), d) + defer cancel() + runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) + runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false}) +} + +func TestWaitInf(t *testing.T) { + lim := NewLimiter(Inf, 0) + + runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true}) +} + +func BenchmarkAllowN(b *testing.B) { + lim := NewLimiter(Every(1*time.Second), 1) + now := time.Now() + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + lim.AllowN(now, 1) + } + }) +}