diff --git a/api/agent/agent.go b/api/agent/agent.go index 565aafc4a..4024c6d44 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -137,6 +137,7 @@ func createAgent(da DataAccess) Agent { // TODO: Create drivers.New(runnerConfig) driver := docker.NewDocker(drivers.Config{ + DockerNetworks: cfg.DockerNetworks, ServerVersion: cfg.MinDockerVersion, PreForkPoolSize: cfg.PreForkPoolSize, PreForkImage: cfg.PreForkImage, diff --git a/api/agent/config.go b/api/agent/config.go index eceadbaeb..5e192a645 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -10,6 +10,7 @@ import ( type AgentConfig struct { MinDockerVersion string `json:"min_docker_version"` + DockerNetworks string `json:"docker_networks"` FreezeIdle time.Duration `json:"freeze_idle_msecs"` EjectIdle time.Duration `json:"eject_idle_msecs"` HotPoll time.Duration `json:"hot_poll_msecs"` @@ -31,6 +32,7 @@ type AgentConfig struct { } const ( + EnvDockerNetworks = "FN_DOCKER_NETWORKS" EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" EnvEjectIdle = "FN_EJECT_IDLE_MSECS" EnvHotPoll = "FN_HOT_POLL_MSECS" @@ -82,6 +84,7 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvStr(err, EnvPreForkCmd, &cfg.PreForkCmd) err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce) err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks) + err = setEnvStr(err, EnvDockerNetworks, &cfg.DockerNetworks) if err != nil { return cfg, err diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index bb0878f80..d68952236 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -6,10 +6,12 @@ import ( "errors" "fmt" "io" + "math" "net/http" "os" "path" "strings" + "sync" "time" "go.opencensus.io/stats" @@ -53,6 +55,9 @@ type DockerDriver struct { hostname string auths map[string]docker.AuthConfiguration pool DockerPool + // protects networks map + networksLock sync.Mutex + networks map[string]uint64 } // implements drivers.Driver @@ -80,6 +85,14 @@ func NewDocker(conf drivers.Config) *DockerDriver { driver.pool = NewDockerPool(conf, driver) } + nets := strings.Fields(conf.DockerNetworks) + if len(nets) > 0 { + driver.networks = make(map[string]uint64, len(nets)) + for _, net := range nets { + driver.networks[net] = 0 + } + } + return driver } @@ -131,27 +144,62 @@ func (drv *DockerDriver) Close() error { return err } -func (drv *DockerDriver) tryUsePool(ctx context.Context, container *docker.CreateContainerOptions) string { +func (drv *DockerDriver) pickPool(ctx context.Context, container *docker.CreateContainerOptions) string { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "tryUsePool"}) - if drv.pool != nil { - id, err := drv.pool.AllocPoolId() - if err == nil { - linker := fmt.Sprintf("container:%s", id) - // We are able to fetch a container from pool. Now, use its - // network, ipc and pid namespaces. - container.HostConfig.NetworkMode = linker - //container.HostConfig.IpcMode = linker - //container.HostConfig.PidMode = linker - return id - } - - log.WithError(err).Error("Could not fetch pre fork pool container") + if drv.pool == nil || container.HostConfig.NetworkMode != "" { + return "" } - // hostname and container NetworkMode is not compatible. - container.Config.Hostname = drv.hostname - return "" + id, err := drv.pool.AllocPoolId() + if err != nil { + log.WithError(err).Error("Could not fetch pre fork pool container") + return "" + } + + // We are able to fetch a container from pool. Now, use its + // network, ipc and pid namespaces. + container.HostConfig.NetworkMode = fmt.Sprintf("container:%s", id) + //container.HostConfig.IpcMode = linker + //container.HostConfig.PidMode = linker + return id +} + +func (drv *DockerDriver) unpickPool(poolId string) { + if poolId != "" && drv.pool != nil { + drv.pool.FreePoolId(poolId) + } +} + +func (drv *DockerDriver) pickNetwork(container *docker.CreateContainerOptions) string { + + if len(drv.networks) == 0 || container.HostConfig.NetworkMode != "" { + return "" + } + + var id string + min := uint64(math.MaxUint64) + + drv.networksLock.Lock() + for key, val := range drv.networks { + if val < min { + id = key + min = val + } + } + drv.networks[id]++ + drv.networksLock.Unlock() + + container.HostConfig.NetworkMode = id + return id +} + +func (drv *DockerDriver) unpickNetwork(netId string) { + if netId != "" { + drv.networksLock.Lock() + drv.networks[netId]-- + drv.networksLock.Unlock() + } } func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { @@ -194,7 +242,13 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Context: ctx, } - poolId := drv.tryUsePool(ctx, &container) + poolId := drv.pickPool(ctx, &container) + netId := drv.pickNetwork(&container) + + if container.HostConfig.NetworkMode == "" { + // hostname and container NetworkMode is not compatible. + container.Config.Hostname = drv.hostname + } // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, @@ -229,6 +283,8 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask err := drv.ensureImage(ctx, task) if err != nil { + drv.unpickPool(poolId) + drv.unpickNetwork(netId) return nil, err } @@ -242,25 +298,30 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask }).WithError(err).Error("Could not create container") // NOTE: if the container fails to create we don't really want to show to user since they aren't directly configuring the container + drv.unpickPool(poolId) + drv.unpickNetwork(netId) return nil, err } } // discard removal error - return &cookie{id: task.Id(), task: task, drv: drv, poolId: poolId}, nil + return &cookie{id: task.Id(), task: task, drv: drv, poolId: poolId, netId: netId}, nil } type cookie struct { id string poolId string + netId string task drivers.ContainerTask drv *DockerDriver } func (c *cookie) Close(ctx context.Context) error { - if c.poolId != "" && c.drv.pool != nil { - defer c.drv.pool.FreePoolId(c.poolId) - } + defer func() { + c.drv.unpickPool(c.poolId) + c.drv.unpickNetwork(c.netId) + }() + return c.drv.removeContainer(ctx, c.id) } diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index 5017abf69..86f790536 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -68,6 +68,41 @@ func TestRunnerDocker(t *testing.T) { } } +func TestRunnerDockerNetworks(t *testing.T) { + dkr := NewDocker(drivers.Config{ + DockerNetworks: "test1 test2", + }) + + ctx := context.Background() + var output bytes.Buffer + var errors bytes.Buffer + + task1 := &taskDockerTest{"test-docker1", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors} + task2 := &taskDockerTest{"test-docker2", bytes.NewBufferString(`{"isDebug": true}`), &output, &errors} + + cookie1, err := dkr.Prepare(ctx, task1) + if err != nil { + t.Fatal("Couldn't prepare task1 test") + } + defer cookie1.Close(ctx) + + cookie2, err := dkr.Prepare(ctx, task2) + if err != nil { + t.Fatal("Couldn't prepare task2 test") + } + defer cookie2.Close(ctx) + + c1 := cookie1.(*cookie) + c2 := cookie2.(*cookie) + + if c1.netId != "test1" { + t.Fatalf("cookie1 netId should be %s but it is %s", "test1", c1.netId) + } + if c2.netId != "test2" { + t.Fatalf("cookie2 netId should be %s but it is %s", "test2", c2.netId) + } +} + func TestRunnerDockerVersion(t *testing.T) { dkr := NewDocker(drivers.Config{ diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 4dd327a49..24231247a 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -190,7 +190,8 @@ const ( ) type Config struct { - Docker string `json:"docker"` + Docker string `json:"docker"` + DockerNetworks string `json:"docker_networks"` // TODO CPUShares should likely be on a per container basis CPUShares int64 `json:"cpu_shares"` ServerVersion string `json:"server_version"`