From 409c104df3e93026b8c15df930e2fc25a066825e Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Mon, 30 Jul 2018 16:04:27 -0700 Subject: [PATCH] make agent options/config pass lint checks (#1144) --- api/agent/agent.go | 26 +++++---- api/agent/agent_test.go | 6 +- api/agent/call.go | 2 +- api/agent/config.go | 86 +++++++++++++++++++---------- api/agent/lb_agent.go | 8 +-- api/agent/resource.go | 8 +-- test/fn-system-tests/system_test.go | 2 +- 7 files changed, 85 insertions(+), 53 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index f68674721..c7851b156 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -90,7 +90,7 @@ type Agent interface { } type agent struct { - cfg AgentConfig + cfg Config da CallHandler callListeners []fnext.CallListener @@ -112,13 +112,13 @@ type agent struct { onStartup []func() } -// AgentOption configures an agent at startup -type AgentOption func(*agent) error +// Option configures an agent at startup +type Option func(*agent) error // New creates an Agent that executes functions locally as Docker containers. -func New(da CallHandler, options ...AgentOption) Agent { +func New(da CallHandler, options ...Option) Agent { - cfg, err := NewAgentConfig() + cfg, err := NewConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) } @@ -164,7 +164,7 @@ func (a *agent) addStartup(sup func()) { } // WithAsync Enables Async operations on the agent -func WithAsync(dqda DequeueDataAccess) AgentOption { +func WithAsync(dqda DequeueDataAccess) Option { return func(a *agent) error { if !a.shutWg.AddSession(1) { logrus.Fatalf("cannot start agent, unable to add session") @@ -175,7 +175,9 @@ func WithAsync(dqda DequeueDataAccess) AgentOption { return nil } } -func WithConfig(cfg *AgentConfig) AgentOption { + +// WithConfig sets the agent config to the provided config +func WithConfig(cfg *Config) Option { return func(a *agent) error { a.cfg = *cfg return nil @@ -183,7 +185,7 @@ func WithConfig(cfg *AgentConfig) AgentOption { } // WithDockerDriver Provides a customer driver to agent -func WithDockerDriver(drv drivers.Driver) AgentOption { +func WithDockerDriver(drv drivers.Driver) Option { return func(a *agent) error { if a.driver != nil { return errors.New("cannot add driver to agent, driver already exists") @@ -195,7 +197,7 @@ func WithDockerDriver(drv drivers.Driver) AgentOption { } // WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction -func WithCallOverrider(fn CallOverrider) AgentOption { +func WithCallOverrider(fn CallOverrider) Option { return func(a *agent) error { if a.callOverrider != nil { return errors.New("lb-agent call overriders already exists") @@ -206,7 +208,7 @@ func WithCallOverrider(fn CallOverrider) AgentOption { } // NewDockerDriver creates a default docker driver from agent config -func NewDockerDriver(cfg *AgentConfig) (drivers.Driver, error) { +func NewDockerDriver(cfg *Config) (drivers.Driver, error) { return drivers.New("docker", drivers.Config{ DockerNetworks: cfg.DockerNetworks, DockerLoadFile: cfg.DockerLoadFile, @@ -1038,10 +1040,10 @@ type container struct { } //newHotContainer creates a container that can be used for multiple sequential events -func newHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*container, func()) { +func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container, func()) { // if freezer is enabled, be consistent with freezer behavior and // block stdout and stderr between calls. - isBlockIdleIO := MaxDisabledMsecs != cfg.FreezeIdle + isBlockIdleIO := MaxMsDisabled != cfg.FreezeIdle id := id.New().String() diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 8159cfc50..357bd7cc0 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -425,7 +425,7 @@ func TestReqTooLarge(t *testing.T) { Method: "GET", } - cfg, err := NewAgentConfig() + cfg, err := NewConfig() if err != nil { t.Fatal(err) } @@ -730,7 +730,7 @@ func TestTmpFsSize(t *testing.T) { TmpFsSize: 1, } - cfg, err := NewAgentConfig() + cfg, err := NewConfig() if err != nil { t.Fatal(err) } @@ -1141,7 +1141,7 @@ func TestNBIOResourceTracker(t *testing.T) { Memory: call.Memory, } - cfg, err := NewAgentConfig() + cfg, err := NewConfig() if err != nil { t.Fatalf("bad config %+v", cfg) } diff --git a/api/agent/call.go b/api/agent/call.go index 147c27782..622ebcd0c 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -379,7 +379,7 @@ func setupCtx(c *call) { c.req = c.req.WithContext(ctx) } -func setMaxBodyLimit(cfg *AgentConfig, c *call) error { +func setMaxBodyLimit(cfg *Config, c *call) error { if cfg.MaxRequestSize > 0 && c.req.ContentLength > 0 && uint64(c.req.ContentLength) > cfg.MaxRequestSize { return models.ErrRequestContentTooBig } diff --git a/api/agent/config.go b/api/agent/config.go index 36c9a6539..00f01d68c 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -8,7 +8,8 @@ import ( "time" ) -type AgentConfig struct { +// Config specifies various settings for an agent +type Config struct { MinDockerVersion string `json:"min_docker_version"` DockerNetworks string `json:"docker_networks"` DockerLoadFile string `json:"docker_load_file"` @@ -36,40 +37,69 @@ type AgentConfig struct { } const ( - EnvDockerNetworks = "FN_DOCKER_NETWORKS" - EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE" - EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" - EnvEjectIdle = "FN_EJECT_IDLE_MSECS" - EnvHotPoll = "FN_HOT_POLL_MSECS" - EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" - EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" - EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" - EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" - EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" - EnvMaxRequestSize = "FN_MAX_REQUEST_SIZE" - EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" - EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" - EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" - EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" - EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" - EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" - EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" - EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" - EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + // EnvDockerNetworks is a comma separated list of networks to attach to each container started + EnvDockerNetworks = "FN_DOCKER_NETWORKS" + // EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup + EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE" + // EnvFreezeIdle is the delay between a container being last used and being frozen + EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" + // EnvEjectIdle is the delay before allowing an idle container to be evictable if another container + // requests the space for itself + EnvEjectIdle = "FN_EJECT_IDLE_MSECS" + // EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be + // launched for a given function + EnvHotPoll = "FN_HOT_POLL_MSECS" + // EnvHotLauncherTimeout is the timeout for a hot container to become available for use + EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" + // EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations + EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" + // EnvCallEndTimeout is the timeout after a call is completed to store information about that invocation + EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" + // EnvMaxCallEndStacking is the maximum number of concurrent calls in call.End storing info + EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" + // EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation + EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" + // EnvMaxRequestSize is the maximum request size that may be passed to an agent TODO kill me from here + EnvMaxRequestSize = "FN_MAX_REQUEST_SIZE" + // EnvMaxLogSize is the maximum size that a function's log may reach + EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" + // EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers + EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" + // EnvMaxTotalMemory is the maximum memory that will be reserved across all containers + EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" + // EnvMaxFsSize is the maximum filesystem size that a function may use + EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" + // EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency + EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" + // EnvPreForkImage is the image to use for the pre-fork pool + EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" + // EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time + EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" + // EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled + EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" + // EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers + EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + // EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either + // available or it will return an error immediately EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" - EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES" - EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS" + // EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container + EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES" + // EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only + EnvDisableReadOnlyRootFs = "FN_DISABLE_READONLY_ROOTFS" - MaxDisabledMsecs = time.Duration(math.MaxInt64) + // MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever + MaxMsDisabled = time.Duration(math.MaxInt64) // defaults + // DefaultHotPoll is the default value for EnvHotPoll DefaultHotPoll = 200 * time.Millisecond ) -func NewAgentConfig() (*AgentConfig, error) { +// NewConfig returns a config set from env vars, plus defaults +func NewConfig() (*Config, error) { - cfg := &AgentConfig{ + cfg := &Config{ MinDockerVersion: "17.10.0-ce", MaxLogSize: 1 * 1024 * 1024, MaxCallEndStacking: 8192, @@ -160,8 +190,8 @@ func setEnvMsecs(err error, name string, dst *time.Duration, defaultVal time.Dur return fmt.Errorf("error invalid %s=%s err=%s", name, dur, err) } // disable if negative or set to msecs specified. - if durInt < 0 || time.Duration(durInt) >= MaxDisabledMsecs/time.Millisecond { - *dst = MaxDisabledMsecs + if durInt < 0 || time.Duration(durInt) >= MaxMsDisabled/time.Millisecond { + *dst = MaxMsDisabled } else { *dst = time.Duration(durInt) * time.Millisecond } diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index eaeb76169..7b286bed0 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -18,7 +18,7 @@ import ( ) type lbAgent struct { - cfg AgentConfig + cfg Config cda CallHandler callListeners []fnext.CallListener rp pool.RunnerPool @@ -30,7 +30,7 @@ type lbAgent struct { type LBAgentOption func(*lbAgent) error -func WithLBAgentConfig(cfg *AgentConfig) LBAgentOption { +func WithLBAgentConfig(cfg *Config) LBAgentOption { return func(a *lbAgent) error { a.cfg = *cfg return nil @@ -52,8 +52,8 @@ func WithLBCallOverrider(fn CallOverrider) LBAgentOption { // across a group of runner nodes. func NewLBAgent(da CallHandler, rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error) { - // Yes, LBAgent and Agent both use an AgentConfig. - cfg, err := NewAgentConfig() + // Yes, LBAgent and Agent both use an Config. + cfg, err := NewConfig() if err != nil { logrus.WithError(err).Fatalf("error in lb-agent config cfg=%+v", cfg) } diff --git a/api/agent/resource.go b/api/agent/resource.go index 51c985c60..b4ba6793a 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -76,7 +76,7 @@ type resourceTracker struct { cpuAsyncHWMark uint64 } -func NewResourceTracker(cfg *AgentConfig) ResourceTracker { +func NewResourceTracker(cfg *Config) ResourceTracker { obj := &resourceTracker{ cond: sync.NewCond(new(sync.Mutex)), @@ -339,7 +339,7 @@ func clampUint64(val, min, max uint64) uint64 { return val } -func (a *resourceTracker) initializeCPU(cfg *AgentConfig) { +func (a *resourceTracker) initializeCPU(cfg *Config) { var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64 @@ -410,7 +410,7 @@ func (a *resourceTracker) initializeCPU(cfg *AgentConfig) { a.cpuAsyncTotal = maxAsyncCPU } -func (a *resourceTracker) initializeMemory(cfg *AgentConfig) { +func (a *resourceTracker) initializeMemory(cfg *Config) { var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64 @@ -482,7 +482,7 @@ func (a *resourceTracker) initializeMemory(cfg *AgentConfig) { } // headroom estimation in order not to consume entire RAM if possible -func getMemoryHeadRoom(usableMemory uint64, cfg *AgentConfig) (uint64, error) { +func getMemoryHeadRoom(usableMemory uint64, cfg *Config) (uint64, error) { // get %10 of the RAM headRoom := uint64(usableMemory / 10) diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 62a4e7884..10f449850 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -255,7 +255,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) // This is our Agent config, which we will use for both inner agent and docker. - cfg, err := agent.NewAgentConfig() + cfg, err := agent.NewConfig() if err != nil { return nil, err }