mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: size restricted tmpfs /tmp and read-only / support (#1012)
* fn: size restricted tmpfs /tmp and read-only / support *) read-only Root Fs Support *) removed CPUShares from docker API. This was unused. *) docker.Prepare() refactoring *) added docker.configureTmpFs() for size limited tmpfs on /tmp *) tmpfs size support in routes and resource tracker *) fix fn-test-utils to handle sparse files better in create file * test typo fix
This commit is contained in:
@@ -162,13 +162,15 @@ func createAgent(da DataAccess, options ...AgentOption) Agent {
|
||||
|
||||
// TODO: Create drivers.New(runnerConfig)
|
||||
a.driver = docker.NewDocker(drivers.Config{
|
||||
DockerNetworks: a.cfg.DockerNetworks,
|
||||
ServerVersion: a.cfg.MinDockerVersion,
|
||||
PreForkPoolSize: a.cfg.PreForkPoolSize,
|
||||
PreForkImage: a.cfg.PreForkImage,
|
||||
PreForkCmd: a.cfg.PreForkCmd,
|
||||
PreForkUseOnce: a.cfg.PreForkUseOnce,
|
||||
PreForkNetworks: a.cfg.PreForkNetworks,
|
||||
DockerNetworks: a.cfg.DockerNetworks,
|
||||
ServerVersion: a.cfg.MinDockerVersion,
|
||||
PreForkPoolSize: a.cfg.PreForkPoolSize,
|
||||
PreForkImage: a.cfg.PreForkImage,
|
||||
PreForkCmd: a.cfg.PreForkCmd,
|
||||
PreForkUseOnce: a.cfg.PreForkUseOnce,
|
||||
PreForkNetworks: a.cfg.PreForkNetworks,
|
||||
MaxTmpFsInodes: a.cfg.MaxTmpFsInodes,
|
||||
EnableReadOnlyRootFs: a.cfg.EnableReadOnlyRootFs,
|
||||
})
|
||||
|
||||
a.da = da
|
||||
@@ -478,6 +480,8 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
|
||||
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container")
|
||||
|
||||
mem := call.Memory + uint64(call.TmpFsSize)
|
||||
|
||||
// WARNING: Tricky flow below. We are here because: isNeeded is set,
|
||||
// in other words, we need to launch a new container at this time due to high load.
|
||||
//
|
||||
@@ -496,7 +500,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
// Non-blocking mode only applies to cpu+mem, and if isNeeded decided that we do not
|
||||
// need to start a new container, then waiters will wait.
|
||||
select {
|
||||
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB):
|
||||
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB):
|
||||
if tok != nil && tok.Error() != nil {
|
||||
tryNotify(notifyChan, tok.Error())
|
||||
} else if a.shutWg.AddSession(1) {
|
||||
@@ -577,8 +581,10 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
|
||||
|
||||
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
|
||||
|
||||
mem := call.Memory + uint64(call.TmpFsSize)
|
||||
|
||||
select {
|
||||
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB):
|
||||
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB):
|
||||
if tok.Error() != nil {
|
||||
return nil, tok.Error()
|
||||
}
|
||||
@@ -955,13 +961,14 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
// and stderr can be swapped out by new calls in the container. input and
|
||||
// output must be copied in and out.
|
||||
type container struct {
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
@@ -1039,6 +1046,7 @@ func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*contai
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
@@ -1088,6 +1096,7 @@ func (c *container) EnvVars() map[string]string { return c.env }
|
||||
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
|
||||
func (c *container) CPUs() uint64 { return c.cpus }
|
||||
func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
|
||||
@@ -632,6 +632,113 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTmpFsSize(t *testing.T) {
|
||||
// TODO it may be a good idea to mock out the http server and use a real
|
||||
// response writer with sync, and also test that this works with async + log
|
||||
|
||||
appName := "myapp"
|
||||
path := "/hello"
|
||||
url := "http://127.0.0.1:8080/r/" + appName + path
|
||||
|
||||
app := &models.App{Name: appName}
|
||||
app.SetDefaults()
|
||||
// we need to load in app & route so that FromRequest works
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Route{
|
||||
{
|
||||
Path: path,
|
||||
AppID: app.ID,
|
||||
Image: "fnproject/fn-test-utils",
|
||||
Type: "sync",
|
||||
Format: "http", // this _is_ the test
|
||||
Timeout: 5,
|
||||
IdleTimeout: 10,
|
||||
Memory: 64,
|
||||
TmpFsSize: 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
cfg, err := NewAgentConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg.MaxTmpFsInodes = 1024
|
||||
cfg.EnableReadOnlyRootFs = true
|
||||
|
||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
||||
defer checkClose(t, a)
|
||||
|
||||
// Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB
|
||||
bodOne := `{"readFile":"/proc/mounts", "createFile":"/tmp/salsa", "createFileSize": 4194304, "isDebug": true}`
|
||||
|
||||
req, err := http.NewRequest("GET", url, &dummyReader{Reader: strings.NewReader(bodOne)})
|
||||
if err != nil {
|
||||
t.Fatal("unexpected error building request", err)
|
||||
}
|
||||
|
||||
var out bytes.Buffer
|
||||
callI, err := a.GetCall(FromRequest(a, app, path, req), WithWriter(&out))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = a.Submit(callI)
|
||||
if err != nil {
|
||||
t.Error("submit should not error:", err)
|
||||
}
|
||||
|
||||
// we're using http format so this will have written a whole http request
|
||||
res, err := http.ReadResponse(bufio.NewReader(&out), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
// Let's fetch read output and write results. See fn-test-utils AppResponse struct (data field)
|
||||
var resp struct {
|
||||
R struct {
|
||||
MountsRead string `json:"/proc/mounts.read_output"`
|
||||
CreateFile string `json:"/tmp/salsa.create_error"`
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
json.NewDecoder(res.Body).Decode(&resp)
|
||||
|
||||
// Let's check what mounts are on...
|
||||
mounts := strings.Split(resp.R.MountsRead, "\n")
|
||||
isFound := false
|
||||
isRootFound := false
|
||||
for _, mnt := range mounts {
|
||||
tokens := strings.Split(mnt, " ")
|
||||
if len(tokens) < 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
point := tokens[1]
|
||||
opts := tokens[3]
|
||||
|
||||
if point == "/tmp" && opts == "rw,nosuid,nodev,noexec,relatime,size=1024k,nr_inodes=1024" {
|
||||
// good
|
||||
isFound = true
|
||||
} else if point == "/" && strings.HasPrefix(opts, "ro,") {
|
||||
// Read-only root, good...
|
||||
isRootFound = true
|
||||
}
|
||||
}
|
||||
|
||||
if !isFound || !isRootFound {
|
||||
t.Fatal(`didn't get proper mounts for /tmp or /, got /proc/mounts content of:\n`, resp.R.MountsRead)
|
||||
}
|
||||
|
||||
// write file should have failed...
|
||||
if !strings.Contains(resp.R.CreateFile, "no space left on device") {
|
||||
t.Fatal(`limited tmpfs should generate fs full error, but got output: `, resp.R.CreateFile)
|
||||
}
|
||||
}
|
||||
|
||||
// return a model with all fields filled in with fnproject/fn-test-utils:latest image, change as needed
|
||||
func testCall() *models.Call {
|
||||
appName := "myapp"
|
||||
|
||||
@@ -119,6 +119,7 @@ func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallO
|
||||
Priority: new(int32), // TODO this is crucial, apparently
|
||||
Timeout: route.Timeout,
|
||||
IdleTimeout: route.IdleTimeout,
|
||||
TmpFsSize: route.TmpFsSize,
|
||||
Memory: route.Memory,
|
||||
CPUs: route.CPUs,
|
||||
Config: buildConfig(app, route),
|
||||
@@ -252,7 +253,9 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
return nil, errors.New("no model or request provided for call")
|
||||
}
|
||||
|
||||
if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) {
|
||||
mem := c.Memory + uint64(c.TmpFsSize)
|
||||
|
||||
if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) {
|
||||
// if we're not going to be able to run this call on this machine, bail here.
|
||||
return nil, models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
|
||||
@@ -30,6 +30,8 @@ type AgentConfig struct {
|
||||
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
|
||||
PreForkNetworks string `json:"pre_fork_networks"`
|
||||
EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"`
|
||||
MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"`
|
||||
EnableReadOnlyRootFs bool `json:"enable_readonly_rootfs"`
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -53,6 +55,8 @@ const (
|
||||
EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE"
|
||||
EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS"
|
||||
EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER"
|
||||
EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES"
|
||||
EnvEnableReadOnlyRootFs = "FN_ENABLE_READONLY_ROOTFS"
|
||||
|
||||
MaxDisabledMsecs = time.Duration(math.MaxInt64)
|
||||
|
||||
@@ -93,6 +97,7 @@ func NewAgentConfig() (*AgentConfig, error) {
|
||||
err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce)
|
||||
err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks)
|
||||
err = setEnvStr(err, EnvDockerNetworks, &cfg.DockerNetworks)
|
||||
err = setEnvUint(err, EnvMaxTmpFsInodes, &cfg.MaxTmpFsInodes)
|
||||
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
@@ -102,6 +107,10 @@ func NewAgentConfig() (*AgentConfig, error) {
|
||||
cfg.EnableNBResourceTracker = true
|
||||
}
|
||||
|
||||
if _, ok := os.LookupEnv(EnvEnableReadOnlyRootFs); ok {
|
||||
cfg.EnableReadOnlyRootFs = true
|
||||
}
|
||||
|
||||
if cfg.EjectIdle == time.Duration(0) {
|
||||
return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle)
|
||||
}
|
||||
|
||||
@@ -202,31 +202,124 @@ func (drv *DockerDriver) unpickNetwork(netId string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
|
||||
var cmd []string
|
||||
if task.Command() != "" {
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
cmd = strings.Fields(task.Command())
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
func (drv *DockerDriver) configureFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.FsSize() != 0 {
|
||||
// If defined, impose file system size limit. In MB units.
|
||||
if container.HostConfig.StorageOpt == nil {
|
||||
container.HostConfig.StorageOpt = make(map[string]string)
|
||||
}
|
||||
|
||||
opt := fmt.Sprintf("%vM", task.FsSize())
|
||||
|
||||
log.WithFields(logrus.Fields{"size": opt, "call_id": task.Id()}).Debug("setting storage option")
|
||||
container.HostConfig.StorageOpt["size"] = opt
|
||||
}
|
||||
|
||||
container.HostConfig.ReadonlyRootfs = drv.conf.EnableReadOnlyRootFs
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureTmpFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.TmpFsSize() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if container.HostConfig.Tmpfs == nil {
|
||||
container.HostConfig.Tmpfs = make(map[string]string)
|
||||
}
|
||||
|
||||
var tmpFsOption string
|
||||
if drv.conf.MaxTmpFsInodes != 0 {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", task.TmpFsSize(), drv.conf.MaxTmpFsInodes)
|
||||
} else {
|
||||
tmpFsOption = fmt.Sprintf("size=%dm", task.TmpFsSize())
|
||||
}
|
||||
target := "/tmp"
|
||||
|
||||
log.WithFields(logrus.Fields{"target": target, "options": tmpFsOption, "call_id": task.Id()}).Debug("setting tmpfs")
|
||||
container.HostConfig.Tmpfs[target] = tmpFsOption
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureVolumes(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if len(task.Volumes()) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
if container.Config.Volumes == nil {
|
||||
container.Config.Volumes = map[string]struct{}{}
|
||||
}
|
||||
|
||||
for _, mapping := range task.Volumes() {
|
||||
hostDir := mapping[0]
|
||||
containerDir := mapping[1]
|
||||
container.Config.Volumes[containerDir] = struct{}{}
|
||||
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
||||
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
|
||||
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureCPU(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
// Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation)
|
||||
// eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period,
|
||||
// which is approx 8 CPUS in CFS world.
|
||||
// Also see docker run options --cpu-quota and --cpu-period
|
||||
if task.CPUs() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
quota := int64(task.CPUs() * 100)
|
||||
period := int64(100000)
|
||||
|
||||
log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": task.Id()}).Debug("setting CPU")
|
||||
container.HostConfig.CPUQuota = quota
|
||||
container.HostConfig.CPUPeriod = period
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureWorkDir(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if wd := task.WorkDir(); wd != "" {
|
||||
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
||||
container.Config.WorkingDir = wd
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureHostname(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if container.HostConfig.NetworkMode == "" {
|
||||
// hostname and container NetworkMode is not compatible.
|
||||
log.WithFields(logrus.Fields{"hostname": drv.hostname, "call_id": task.Id()}).Debug("setting hostname")
|
||||
container.Config.Hostname = drv.hostname
|
||||
}
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureCmd(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
if task.Command() == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
cmd := strings.Fields(task.Command())
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
container.Config.Cmd = cmd
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) configureEnv(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) {
|
||||
envvars := make([]string, 0, len(task.EnvVars()))
|
||||
for name, val := range task.EnvVars() {
|
||||
envvars = append(envvars, name+"="+val)
|
||||
}
|
||||
|
||||
container.Config.Env = envvars
|
||||
}
|
||||
|
||||
func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"})
|
||||
|
||||
container := docker.CreateContainerOptions{
|
||||
Name: task.Id(),
|
||||
Config: &docker.Config{
|
||||
Env: envvars,
|
||||
Cmd: cmd,
|
||||
Memory: int64(task.Memory()),
|
||||
MemorySwap: int64(task.Memory()), // disables swap
|
||||
KernelMemory: int64(task.Memory()),
|
||||
CPUShares: drv.conf.CPUShares,
|
||||
Image: task.Image(),
|
||||
Volumes: map[string]struct{}{},
|
||||
OpenStdin: true,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
@@ -242,44 +335,18 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
Context: ctx,
|
||||
}
|
||||
|
||||
drv.configureCmd(log, &container, task)
|
||||
drv.configureEnv(log, &container, task)
|
||||
drv.configureCPU(log, &container, task)
|
||||
drv.configureFs(log, &container, task)
|
||||
drv.configureTmpFs(log, &container, task)
|
||||
drv.configureVolumes(log, &container, task)
|
||||
drv.configureWorkDir(log, &container, task)
|
||||
|
||||
poolId := drv.pickPool(ctx, &container)
|
||||
netId := drv.pickNetwork(&container)
|
||||
|
||||
if container.HostConfig.NetworkMode == "" {
|
||||
// hostname and container NetworkMode is not compatible.
|
||||
container.Config.Hostname = drv.hostname
|
||||
}
|
||||
|
||||
// Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation)
|
||||
// eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period,
|
||||
// which is approx 8 CPUS in CFS world.
|
||||
// Also see docker run options --cpu-quota and --cpu-period
|
||||
if task.CPUs() != 0 {
|
||||
container.HostConfig.CPUQuota = int64(task.CPUs() * 100)
|
||||
container.HostConfig.CPUPeriod = 100000
|
||||
}
|
||||
|
||||
// If defined, impose file system size limit. In MB units.
|
||||
if task.FsSize() != 0 {
|
||||
container.HostConfig.StorageOpt = make(map[string]string)
|
||||
sizeOption := fmt.Sprintf("%vM", task.FsSize())
|
||||
container.HostConfig.StorageOpt["size"] = sizeOption
|
||||
}
|
||||
|
||||
volumes := task.Volumes()
|
||||
for _, mapping := range volumes {
|
||||
hostDir := mapping[0]
|
||||
containerDir := mapping[1]
|
||||
container.Config.Volumes[containerDir] = struct{}{}
|
||||
mapn := fmt.Sprintf("%s:%s", hostDir, containerDir)
|
||||
container.HostConfig.Binds = append(container.HostConfig.Binds, mapn)
|
||||
log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes")
|
||||
}
|
||||
|
||||
if wd := task.WorkDir(); wd != "" {
|
||||
log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir")
|
||||
container.Config.WorkingDir = wd
|
||||
}
|
||||
drv.configureHostname(log, &container, task)
|
||||
|
||||
err := drv.ensureImage(ctx, task)
|
||||
if err != nil {
|
||||
@@ -293,7 +360,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
// since we retry under the hood, if the container gets created and retry fails, we can just ignore error
|
||||
if err != docker.ErrContainerAlreadyExists {
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory,
|
||||
"cpu_shares": container.Config.CPUShares, "cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name,
|
||||
"cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name,
|
||||
"image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name,
|
||||
}).WithError(err).Error("Could not create container")
|
||||
|
||||
|
||||
@@ -69,6 +69,7 @@ func (c *poolTask) EnvVars() map[string]string { return ni
|
||||
func (c *poolTask) Memory() uint64 { return 0 }
|
||||
func (c *poolTask) CPUs() uint64 { return 0 }
|
||||
func (c *poolTask) FsSize() uint64 { return 0 }
|
||||
func (c *poolTask) TmpFsSize() uint64 { return 0 }
|
||||
func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {}
|
||||
|
||||
type dockerPoolItem struct {
|
||||
|
||||
@@ -34,6 +34,7 @@ func (f *taskDockerTest) Volumes() [][2]string { return [][2]
|
||||
func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 }
|
||||
func (f *taskDockerTest) CPUs() uint64 { return 0 }
|
||||
func (f *taskDockerTest) FsSize() uint64 { return 0 }
|
||||
func (f *taskDockerTest) TmpFsSize() uint64 { return 0 }
|
||||
func (f *taskDockerTest) WorkDir() string { return "" }
|
||||
func (f *taskDockerTest) Close() {}
|
||||
func (f *taskDockerTest) Input() io.Reader { return f.input }
|
||||
|
||||
@@ -119,6 +119,9 @@ type ContainerTask interface {
|
||||
// Filesystem size limit for the container, in megabytes.
|
||||
FsSize() uint64
|
||||
|
||||
// Tmpfs Filesystem size limit for the container, in megabytes.
|
||||
TmpFsSize() uint64
|
||||
|
||||
// WorkDir returns the working directory to use for the task. Empty string
|
||||
// leaves it unset.
|
||||
WorkDir() string
|
||||
@@ -190,16 +193,16 @@ const (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Docker string `json:"docker"`
|
||||
DockerNetworks string `json:"docker_networks"`
|
||||
// TODO CPUShares should likely be on a per container basis
|
||||
CPUShares int64 `json:"cpu_shares"`
|
||||
ServerVersion string `json:"server_version"`
|
||||
PreForkPoolSize uint64 `json:"pre_fork_pool_size"`
|
||||
PreForkImage string `json:"pre_fork_image"`
|
||||
PreForkCmd string `json:"pre_fork_cmd"`
|
||||
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
|
||||
PreForkNetworks string `json:"pre_fork_networks"`
|
||||
Docker string `json:"docker"`
|
||||
DockerNetworks string `json:"docker_networks"`
|
||||
ServerVersion string `json:"server_version"`
|
||||
PreForkPoolSize uint64 `json:"pre_fork_pool_size"`
|
||||
PreForkImage string `json:"pre_fork_image"`
|
||||
PreForkCmd string `json:"pre_fork_cmd"`
|
||||
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
|
||||
PreForkNetworks string `json:"pre_fork_networks"`
|
||||
MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"`
|
||||
EnableReadOnlyRootFs bool `json:"enable_readonly_rootfs"`
|
||||
}
|
||||
|
||||
func average(samples []Stat) (Stat, bool) {
|
||||
|
||||
@@ -304,6 +304,9 @@ func getSlotQueueKey(call *call) string {
|
||||
binary.LittleEndian.PutUint32(byt[:4], uint32(call.IdleTimeout))
|
||||
hash.Write(byt[:4])
|
||||
|
||||
binary.LittleEndian.PutUint32(byt[:4], uint32(call.TmpFsSize))
|
||||
hash.Write(byt[:4])
|
||||
|
||||
binary.LittleEndian.PutUint64(byt[:], call.Memory)
|
||||
hash.Write(byt[:])
|
||||
|
||||
|
||||
27
api/datastore/sql/migrations/14_add_tmpfs_size_route.go
Normal file
27
api/datastore/sql/migrations/14_add_tmpfs_size_route.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fnproject/fn/api/datastore/sql/migratex"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func up14(ctx context.Context, tx *sqlx.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "ALTER TABLE routes ADD tmpfs_size int;")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func down14(ctx context.Context, tx *sqlx.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "ALTER TABLE routes DROP COLUMN tmpfs_size;")
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
Migrations = append(Migrations, &migratex.MigFields{
|
||||
VersionFunc: vfunc(14),
|
||||
UpFunc: up14,
|
||||
DownFunc: down14,
|
||||
})
|
||||
}
|
||||
@@ -48,6 +48,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
|
||||
cpus int,
|
||||
timeout int NOT NULL,
|
||||
idle_timeout int NOT NULL,
|
||||
tmpfs_size int,
|
||||
type varchar(16) NOT NULL,
|
||||
headers text NOT NULL,
|
||||
config text NOT NULL,
|
||||
@@ -88,7 +89,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
|
||||
}
|
||||
|
||||
const (
|
||||
routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, headers, config, annotations, created_at, updated_at FROM routes`
|
||||
routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, tmpfs_size, headers, config, annotations, created_at, updated_at FROM routes`
|
||||
callSelector = `SELECT id, created_at, started_at, completed_at, status, app_id, path, stats, error FROM calls`
|
||||
appIDSelector = `SELECT id, name, config, annotations, syslog_url, created_at, updated_at FROM apps WHERE id=?`
|
||||
ensureAppSelector = `SELECT id FROM apps WHERE name=?`
|
||||
@@ -523,6 +524,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
|
||||
type,
|
||||
timeout,
|
||||
idle_timeout,
|
||||
tmpfs_size,
|
||||
headers,
|
||||
config,
|
||||
annotations,
|
||||
@@ -539,6 +541,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
|
||||
:type,
|
||||
:timeout,
|
||||
:idle_timeout,
|
||||
:tmpfs_size,
|
||||
:headers,
|
||||
:config,
|
||||
:annotations,
|
||||
@@ -581,6 +584,7 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
|
||||
type = :type,
|
||||
timeout = :timeout,
|
||||
idle_timeout = :idle_timeout,
|
||||
tmpfs_size = :tmpfs_size,
|
||||
headers = :headers,
|
||||
config = :config,
|
||||
annotations = :annotations,
|
||||
|
||||
@@ -108,6 +108,9 @@ type Call struct {
|
||||
// Hot function idle timeout in seconds before termination.
|
||||
IdleTimeout int32 `json:"idle_timeout,omitempty" db:"-"`
|
||||
|
||||
// Tmpfs size in megabytes.
|
||||
TmpFsSize uint32 `json:"tmpfs_size,omitempty" db:"-"`
|
||||
|
||||
// Memory is the amount of RAM this call is allocated.
|
||||
Memory uint64 `json:"memory,omitempty" db:"-"`
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ type Route struct {
|
||||
Format string `json:"format" db:"format"`
|
||||
Timeout int32 `json:"timeout" db:"timeout"`
|
||||
IdleTimeout int32 `json:"idle_timeout" db:"idle_timeout"`
|
||||
TmpFsSize uint32 `json:"tmpfs_size" db:"tmpfs_size"`
|
||||
Config Config `json:"config,omitempty" db:"config"`
|
||||
Annotations Annotations `json:"annotations,omitempty" db:"annotations"`
|
||||
CreatedAt strfmt.DateTime `json:"created_at,omitempty" db:"created_at"`
|
||||
@@ -174,6 +175,7 @@ func (r1 *Route) Equals(r2 *Route) bool {
|
||||
eq = eq && r1.Format == r2.Format
|
||||
eq = eq && r1.Timeout == r2.Timeout
|
||||
eq = eq && r1.IdleTimeout == r2.IdleTimeout
|
||||
eq = eq && r1.TmpFsSize == r2.TmpFsSize
|
||||
eq = eq && r1.Config.Equals(r2.Config)
|
||||
eq = eq && r1.Annotations.Equals(r2.Annotations)
|
||||
// NOTE: datastore tests are not very fun to write with timestamp checks,
|
||||
@@ -207,6 +209,9 @@ func (r *Route) Update(patch *Route) {
|
||||
if patch.IdleTimeout != 0 {
|
||||
r.IdleTimeout = patch.IdleTimeout
|
||||
}
|
||||
if patch.TmpFsSize != 0 {
|
||||
r.TmpFsSize = patch.TmpFsSize
|
||||
}
|
||||
if patch.Format != "" {
|
||||
r.Format = patch.Format
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ func TestRouteSimple(t *testing.T) {
|
||||
Format: "http",
|
||||
Timeout: 10,
|
||||
IdleTimeout: 10,
|
||||
TmpFsSize: 10,
|
||||
}
|
||||
|
||||
err := route1.Validate()
|
||||
|
||||
Reference in New Issue
Block a user