From 95846431422d4d8c9d69b008d5f73083dca6a050 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 25 May 2018 14:12:29 -0700 Subject: [PATCH] fn: size restricted tmpfs /tmp and read-only / support (#1012) * fn: size restricted tmpfs /tmp and read-only / support *) read-only Root Fs Support *) removed CPUShares from docker API. This was unused. *) docker.Prepare() refactoring *) added docker.configureTmpFs() for size limited tmpfs on /tmp *) tmpfs size support in routes and resource tracker *) fix fn-test-utils to handle sparse files better in create file * test typo fix --- api/agent/agent.go | 41 +++-- api/agent/agent_test.go | 107 ++++++++++++ api/agent/call.go | 5 +- api/agent/config.go | 9 + api/agent/drivers/docker/docker.go | 161 +++++++++++++----- api/agent/drivers/docker/docker_pool.go | 1 + api/agent/drivers/docker/docker_test.go | 1 + api/agent/drivers/driver.go | 23 +-- api/agent/slots.go | 3 + .../sql/migrations/14_add_tmpfs_size_route.go | 27 +++ api/datastore/sql/sql.go | 6 +- api/models/call.go | 3 + api/models/route.go | 5 + api/models/route_test.go | 1 + images/fn-test-utils/fn-test-utils.go | 24 ++- 15 files changed, 339 insertions(+), 78 deletions(-) create mode 100644 api/datastore/sql/migrations/14_add_tmpfs_size_route.go diff --git a/api/agent/agent.go b/api/agent/agent.go index e57829c1c..65538ff0c 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -162,13 +162,15 @@ func createAgent(da DataAccess, options ...AgentOption) Agent { // TODO: Create drivers.New(runnerConfig) a.driver = docker.NewDocker(drivers.Config{ - DockerNetworks: a.cfg.DockerNetworks, - ServerVersion: a.cfg.MinDockerVersion, - PreForkPoolSize: a.cfg.PreForkPoolSize, - PreForkImage: a.cfg.PreForkImage, - PreForkCmd: a.cfg.PreForkCmd, - PreForkUseOnce: a.cfg.PreForkUseOnce, - PreForkNetworks: a.cfg.PreForkNetworks, + DockerNetworks: a.cfg.DockerNetworks, + ServerVersion: a.cfg.MinDockerVersion, + PreForkPoolSize: a.cfg.PreForkPoolSize, + PreForkImage: a.cfg.PreForkImage, + PreForkCmd: a.cfg.PreForkCmd, + PreForkUseOnce: a.cfg.PreForkUseOnce, + PreForkNetworks: a.cfg.PreForkNetworks, + MaxTmpFsInodes: a.cfg.MaxTmpFsInodes, + EnableReadOnlyRootFs: a.cfg.EnableReadOnlyRootFs, }) a.da = da @@ -478,6 +480,8 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + mem := call.Memory + uint64(call.TmpFsSize) + // WARNING: Tricky flow below. We are here because: isNeeded is set, // in other words, we need to launch a new container at this time due to high load. // @@ -496,7 +500,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err // Non-blocking mode only applies to cpu+mem, and if isNeeded decided that we do not // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): if tok != nil && tok.Error() != nil { tryNotify(notifyChan, tok.Error()) } else if a.shutWg.AddSession(1) { @@ -577,8 +581,10 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { call.containerState.UpdateState(ctx, ContainerStateWait, call.slots) + mem := call.Memory + uint64(call.TmpFsSize) + select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): if tok.Error() != nil { return nil, tok.Error() } @@ -955,13 +961,14 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, // and stderr can be swapped out by new calls in the container. input and // output must be copied in and out. type container struct { - id string // contrived - image string - env map[string]string - memory uint64 - cpus uint64 - fsSize uint64 - timeout time.Duration // cold only (superfluous, but in case) + id string // contrived + image string + env map[string]string + memory uint64 + cpus uint64 + fsSize uint64 + tmpFsSize uint64 + timeout time.Duration // cold only (superfluous, but in case) stdin io.Reader stdout io.Writer @@ -1039,6 +1046,7 @@ func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*contai memory: call.Memory, cpus: uint64(call.CPUs), fsSize: cfg.MaxFsSize, + tmpFsSize: uint64(call.TmpFsSize), stdin: stdin, stdout: stdout, stderr: stderr, @@ -1088,6 +1096,7 @@ func (c *container) EnvVars() map[string]string { return c.env } func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB func (c *container) CPUs() uint64 { return c.cpus } func (c *container) FsSize() uint64 { return c.fsSize } +func (c *container) TmpFsSize() uint64 { return c.tmpFsSize } // WriteStat publishes each metric in the specified Stats structure as a histogram metric func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index f3f172d75..e5e01fb2c 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -632,6 +632,113 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { } } +func TestTmpFsSize(t *testing.T) { + // TODO it may be a good idea to mock out the http server and use a real + // response writer with sync, and also test that this works with async + log + + appName := "myapp" + path := "/hello" + url := "http://127.0.0.1:8080/r/" + appName + path + + app := &models.App{Name: appName} + app.SetDefaults() + // we need to load in app & route so that FromRequest works + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Route{ + { + Path: path, + AppID: app.ID, + Image: "fnproject/fn-test-utils", + Type: "sync", + Format: "http", // this _is_ the test + Timeout: 5, + IdleTimeout: 10, + Memory: 64, + TmpFsSize: 1, + }, + }, + ) + + cfg, err := NewAgentConfig() + if err != nil { + t.Fatal(err) + } + + cfg.MaxTmpFsInodes = 1024 + cfg.EnableReadOnlyRootFs = true + + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) + defer checkClose(t, a) + + // Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB + bodOne := `{"readFile":"/proc/mounts", "createFile":"/tmp/salsa", "createFileSize": 4194304, "isDebug": true}` + + req, err := http.NewRequest("GET", url, &dummyReader{Reader: strings.NewReader(bodOne)}) + if err != nil { + t.Fatal("unexpected error building request", err) + } + + var out bytes.Buffer + callI, err := a.GetCall(FromRequest(a, app, path, req), WithWriter(&out)) + if err != nil { + t.Fatal(err) + } + + err = a.Submit(callI) + if err != nil { + t.Error("submit should not error:", err) + } + + // we're using http format so this will have written a whole http request + res, err := http.ReadResponse(bufio.NewReader(&out), nil) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + + // Let's fetch read output and write results. See fn-test-utils AppResponse struct (data field) + var resp struct { + R struct { + MountsRead string `json:"/proc/mounts.read_output"` + CreateFile string `json:"/tmp/salsa.create_error"` + } `json:"data"` + } + + json.NewDecoder(res.Body).Decode(&resp) + + // Let's check what mounts are on... + mounts := strings.Split(resp.R.MountsRead, "\n") + isFound := false + isRootFound := false + for _, mnt := range mounts { + tokens := strings.Split(mnt, " ") + if len(tokens) < 3 { + continue + } + + point := tokens[1] + opts := tokens[3] + + if point == "/tmp" && opts == "rw,nosuid,nodev,noexec,relatime,size=1024k,nr_inodes=1024" { + // good + isFound = true + } else if point == "/" && strings.HasPrefix(opts, "ro,") { + // Read-only root, good... + isRootFound = true + } + } + + if !isFound || !isRootFound { + t.Fatal(`didn't get proper mounts for /tmp or /, got /proc/mounts content of:\n`, resp.R.MountsRead) + } + + // write file should have failed... + if !strings.Contains(resp.R.CreateFile, "no space left on device") { + t.Fatal(`limited tmpfs should generate fs full error, but got output: `, resp.R.CreateFile) + } +} + // return a model with all fields filled in with fnproject/fn-test-utils:latest image, change as needed func testCall() *models.Call { appName := "myapp" diff --git a/api/agent/call.go b/api/agent/call.go index 8c53a1f80..fb674cddb 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -119,6 +119,7 @@ func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallO Priority: new(int32), // TODO this is crucial, apparently Timeout: route.Timeout, IdleTimeout: route.IdleTimeout, + TmpFsSize: route.TmpFsSize, Memory: route.Memory, CPUs: route.CPUs, Config: buildConfig(app, route), @@ -252,7 +253,9 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { return nil, errors.New("no model or request provided for call") } - if !a.resources.IsResourcePossible(c.Memory, uint64(c.CPUs), c.Type == models.TypeAsync) { + mem := c.Memory + uint64(c.TmpFsSize) + + if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { // if we're not going to be able to run this call on this machine, bail here. return nil, models.ErrCallTimeoutServerBusy } diff --git a/api/agent/config.go b/api/agent/config.go index fb0a7f6e1..2742abb35 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -30,6 +30,8 @@ type AgentConfig struct { PreForkUseOnce uint64 `json:"pre_fork_use_once"` PreForkNetworks string `json:"pre_fork_networks"` EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` + MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"` + EnableReadOnlyRootFs bool `json:"enable_readonly_rootfs"` } const ( @@ -53,6 +55,8 @@ const ( EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" + EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES" + EnvEnableReadOnlyRootFs = "FN_ENABLE_READONLY_ROOTFS" MaxDisabledMsecs = time.Duration(math.MaxInt64) @@ -93,6 +97,7 @@ func NewAgentConfig() (*AgentConfig, error) { err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce) err = setEnvStr(err, EnvPreForkNetworks, &cfg.PreForkNetworks) err = setEnvStr(err, EnvDockerNetworks, &cfg.DockerNetworks) + err = setEnvUint(err, EnvMaxTmpFsInodes, &cfg.MaxTmpFsInodes) if err != nil { return cfg, err @@ -102,6 +107,10 @@ func NewAgentConfig() (*AgentConfig, error) { cfg.EnableNBResourceTracker = true } + if _, ok := os.LookupEnv(EnvEnableReadOnlyRootFs); ok { + cfg.EnableReadOnlyRootFs = true + } + if cfg.EjectIdle == time.Duration(0) { return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) } diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index d68952236..8c484de38 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -202,31 +202,124 @@ func (drv *DockerDriver) unpickNetwork(netId string) { } } -func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { - ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) - var cmd []string - if task.Command() != "" { - // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests - cmd = strings.Fields(task.Command()) - log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") +func (drv *DockerDriver) configureFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if task.FsSize() != 0 { + // If defined, impose file system size limit. In MB units. + if container.HostConfig.StorageOpt == nil { + container.HostConfig.StorageOpt = make(map[string]string) + } + + opt := fmt.Sprintf("%vM", task.FsSize()) + + log.WithFields(logrus.Fields{"size": opt, "call_id": task.Id()}).Debug("setting storage option") + container.HostConfig.StorageOpt["size"] = opt } + container.HostConfig.ReadonlyRootfs = drv.conf.EnableReadOnlyRootFs +} + +func (drv *DockerDriver) configureTmpFs(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if task.TmpFsSize() == 0 { + return + } + + if container.HostConfig.Tmpfs == nil { + container.HostConfig.Tmpfs = make(map[string]string) + } + + var tmpFsOption string + if drv.conf.MaxTmpFsInodes != 0 { + tmpFsOption = fmt.Sprintf("size=%dm,nr_inodes=%d", task.TmpFsSize(), drv.conf.MaxTmpFsInodes) + } else { + tmpFsOption = fmt.Sprintf("size=%dm", task.TmpFsSize()) + } + target := "/tmp" + + log.WithFields(logrus.Fields{"target": target, "options": tmpFsOption, "call_id": task.Id()}).Debug("setting tmpfs") + container.HostConfig.Tmpfs[target] = tmpFsOption +} + +func (drv *DockerDriver) configureVolumes(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if len(task.Volumes()) == 0 { + return + } + + if container.Config.Volumes == nil { + container.Config.Volumes = map[string]struct{}{} + } + + for _, mapping := range task.Volumes() { + hostDir := mapping[0] + containerDir := mapping[1] + container.Config.Volumes[containerDir] = struct{}{} + mapn := fmt.Sprintf("%s:%s", hostDir, containerDir) + container.HostConfig.Binds = append(container.HostConfig.Binds, mapn) + log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes") + } +} + +func (drv *DockerDriver) configureCPU(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) + // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, + // which is approx 8 CPUS in CFS world. + // Also see docker run options --cpu-quota and --cpu-period + if task.CPUs() == 0 { + return + } + + quota := int64(task.CPUs() * 100) + period := int64(100000) + + log.WithFields(logrus.Fields{"quota": quota, "period": period, "call_id": task.Id()}).Debug("setting CPU") + container.HostConfig.CPUQuota = quota + container.HostConfig.CPUPeriod = period +} + +func (drv *DockerDriver) configureWorkDir(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if wd := task.WorkDir(); wd != "" { + log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir") + container.Config.WorkingDir = wd + } +} + +func (drv *DockerDriver) configureHostname(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if container.HostConfig.NetworkMode == "" { + // hostname and container NetworkMode is not compatible. + log.WithFields(logrus.Fields{"hostname": drv.hostname, "call_id": task.Id()}).Debug("setting hostname") + container.Config.Hostname = drv.hostname + } +} + +func (drv *DockerDriver) configureCmd(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { + if task.Command() == "" { + return + } + + // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests + cmd := strings.Fields(task.Command()) + log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") + container.Config.Cmd = cmd +} + +func (drv *DockerDriver) configureEnv(log logrus.FieldLogger, container *docker.CreateContainerOptions, task drivers.ContainerTask) { envvars := make([]string, 0, len(task.EnvVars())) for name, val := range task.EnvVars() { envvars = append(envvars, name+"="+val) } + container.Config.Env = envvars +} + +func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask) (drivers.Cookie, error) { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "Prepare"}) + container := docker.CreateContainerOptions{ Name: task.Id(), Config: &docker.Config{ - Env: envvars, - Cmd: cmd, Memory: int64(task.Memory()), MemorySwap: int64(task.Memory()), // disables swap KernelMemory: int64(task.Memory()), - CPUShares: drv.conf.CPUShares, Image: task.Image(), - Volumes: map[string]struct{}{}, OpenStdin: true, AttachStdout: true, AttachStderr: true, @@ -242,44 +335,18 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Context: ctx, } + drv.configureCmd(log, &container, task) + drv.configureEnv(log, &container, task) + drv.configureCPU(log, &container, task) + drv.configureFs(log, &container, task) + drv.configureTmpFs(log, &container, task) + drv.configureVolumes(log, &container, task) + drv.configureWorkDir(log, &container, task) + poolId := drv.pickPool(ctx, &container) netId := drv.pickNetwork(&container) - if container.HostConfig.NetworkMode == "" { - // hostname and container NetworkMode is not compatible. - container.Config.Hostname = drv.hostname - } - - // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) - // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, - // which is approx 8 CPUS in CFS world. - // Also see docker run options --cpu-quota and --cpu-period - if task.CPUs() != 0 { - container.HostConfig.CPUQuota = int64(task.CPUs() * 100) - container.HostConfig.CPUPeriod = 100000 - } - - // If defined, impose file system size limit. In MB units. - if task.FsSize() != 0 { - container.HostConfig.StorageOpt = make(map[string]string) - sizeOption := fmt.Sprintf("%vM", task.FsSize()) - container.HostConfig.StorageOpt["size"] = sizeOption - } - - volumes := task.Volumes() - for _, mapping := range volumes { - hostDir := mapping[0] - containerDir := mapping[1] - container.Config.Volumes[containerDir] = struct{}{} - mapn := fmt.Sprintf("%s:%s", hostDir, containerDir) - container.HostConfig.Binds = append(container.HostConfig.Binds, mapn) - log.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes") - } - - if wd := task.WorkDir(); wd != "" { - log.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir") - container.Config.WorkingDir = wd - } + drv.configureHostname(log, &container, task) err := drv.ensureImage(ctx, task) if err != nil { @@ -293,7 +360,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask // since we retry under the hood, if the container gets created and retry fails, we can just ignore error if err != docker.ErrContainerAlreadyExists { log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory, - "cpu_shares": container.Config.CPUShares, "cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name, + "cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name, "image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name, }).WithError(err).Error("Could not create container") diff --git a/api/agent/drivers/docker/docker_pool.go b/api/agent/drivers/docker/docker_pool.go index 6cad9e551..788085d89 100644 --- a/api/agent/drivers/docker/docker_pool.go +++ b/api/agent/drivers/docker/docker_pool.go @@ -69,6 +69,7 @@ func (c *poolTask) EnvVars() map[string]string { return ni func (c *poolTask) Memory() uint64 { return 0 } func (c *poolTask) CPUs() uint64 { return 0 } func (c *poolTask) FsSize() uint64 { return 0 } +func (c *poolTask) TmpFsSize() uint64 { return 0 } func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {} type dockerPoolItem struct { diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index afa7f11e9..a5581991d 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -34,6 +34,7 @@ func (f *taskDockerTest) Volumes() [][2]string { return [][2] func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 } func (f *taskDockerTest) CPUs() uint64 { return 0 } func (f *taskDockerTest) FsSize() uint64 { return 0 } +func (f *taskDockerTest) TmpFsSize() uint64 { return 0 } func (f *taskDockerTest) WorkDir() string { return "" } func (f *taskDockerTest) Close() {} func (f *taskDockerTest) Input() io.Reader { return f.input } diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 24231247a..a25ca6507 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -119,6 +119,9 @@ type ContainerTask interface { // Filesystem size limit for the container, in megabytes. FsSize() uint64 + // Tmpfs Filesystem size limit for the container, in megabytes. + TmpFsSize() uint64 + // WorkDir returns the working directory to use for the task. Empty string // leaves it unset. WorkDir() string @@ -190,16 +193,16 @@ const ( ) type Config struct { - Docker string `json:"docker"` - DockerNetworks string `json:"docker_networks"` - // TODO CPUShares should likely be on a per container basis - CPUShares int64 `json:"cpu_shares"` - ServerVersion string `json:"server_version"` - PreForkPoolSize uint64 `json:"pre_fork_pool_size"` - PreForkImage string `json:"pre_fork_image"` - PreForkCmd string `json:"pre_fork_cmd"` - PreForkUseOnce uint64 `json:"pre_fork_use_once"` - PreForkNetworks string `json:"pre_fork_networks"` + Docker string `json:"docker"` + DockerNetworks string `json:"docker_networks"` + ServerVersion string `json:"server_version"` + PreForkPoolSize uint64 `json:"pre_fork_pool_size"` + PreForkImage string `json:"pre_fork_image"` + PreForkCmd string `json:"pre_fork_cmd"` + PreForkUseOnce uint64 `json:"pre_fork_use_once"` + PreForkNetworks string `json:"pre_fork_networks"` + MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"` + EnableReadOnlyRootFs bool `json:"enable_readonly_rootfs"` } func average(samples []Stat) (Stat, bool) { diff --git a/api/agent/slots.go b/api/agent/slots.go index fa5fb4f9c..9d7e2e694 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -304,6 +304,9 @@ func getSlotQueueKey(call *call) string { binary.LittleEndian.PutUint32(byt[:4], uint32(call.IdleTimeout)) hash.Write(byt[:4]) + binary.LittleEndian.PutUint32(byt[:4], uint32(call.TmpFsSize)) + hash.Write(byt[:4]) + binary.LittleEndian.PutUint64(byt[:], call.Memory) hash.Write(byt[:]) diff --git a/api/datastore/sql/migrations/14_add_tmpfs_size_route.go b/api/datastore/sql/migrations/14_add_tmpfs_size_route.go new file mode 100644 index 000000000..e3b34e5a2 --- /dev/null +++ b/api/datastore/sql/migrations/14_add_tmpfs_size_route.go @@ -0,0 +1,27 @@ +package migrations + +import ( + "context" + + "github.com/fnproject/fn/api/datastore/sql/migratex" + "github.com/jmoiron/sqlx" +) + +func up14(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE routes ADD tmpfs_size int;") + + return err +} + +func down14(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE routes DROP COLUMN tmpfs_size;") + return err +} + +func init() { + Migrations = append(Migrations, &migratex.MigFields{ + VersionFunc: vfunc(14), + UpFunc: up14, + DownFunc: down14, + }) +} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 9c17f2f0d..f80a9540e 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -48,6 +48,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( cpus int, timeout int NOT NULL, idle_timeout int NOT NULL, + tmpfs_size int, type varchar(16) NOT NULL, headers text NOT NULL, config text NOT NULL, @@ -88,7 +89,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( } const ( - routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, headers, config, annotations, created_at, updated_at FROM routes` + routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, tmpfs_size, headers, config, annotations, created_at, updated_at FROM routes` callSelector = `SELECT id, created_at, started_at, completed_at, status, app_id, path, stats, error FROM calls` appIDSelector = `SELECT id, name, config, annotations, syslog_url, created_at, updated_at FROM apps WHERE id=?` ensureAppSelector = `SELECT id FROM apps WHERE name=?` @@ -523,6 +524,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode type, timeout, idle_timeout, + tmpfs_size, headers, config, annotations, @@ -539,6 +541,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode :type, :timeout, :idle_timeout, + :tmpfs_size, :headers, :config, :annotations, @@ -581,6 +584,7 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m type = :type, timeout = :timeout, idle_timeout = :idle_timeout, + tmpfs_size = :tmpfs_size, headers = :headers, config = :config, annotations = :annotations, diff --git a/api/models/call.go b/api/models/call.go index c7407c7b6..def3f6e0a 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -108,6 +108,9 @@ type Call struct { // Hot function idle timeout in seconds before termination. IdleTimeout int32 `json:"idle_timeout,omitempty" db:"-"` + // Tmpfs size in megabytes. + TmpFsSize uint32 `json:"tmpfs_size,omitempty" db:"-"` + // Memory is the amount of RAM this call is allocated. Memory uint64 `json:"memory,omitempty" db:"-"` diff --git a/api/models/route.go b/api/models/route.go index c616fa4bc..2558666be 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -35,6 +35,7 @@ type Route struct { Format string `json:"format" db:"format"` Timeout int32 `json:"timeout" db:"timeout"` IdleTimeout int32 `json:"idle_timeout" db:"idle_timeout"` + TmpFsSize uint32 `json:"tmpfs_size" db:"tmpfs_size"` Config Config `json:"config,omitempty" db:"config"` Annotations Annotations `json:"annotations,omitempty" db:"annotations"` CreatedAt strfmt.DateTime `json:"created_at,omitempty" db:"created_at"` @@ -174,6 +175,7 @@ func (r1 *Route) Equals(r2 *Route) bool { eq = eq && r1.Format == r2.Format eq = eq && r1.Timeout == r2.Timeout eq = eq && r1.IdleTimeout == r2.IdleTimeout + eq = eq && r1.TmpFsSize == r2.TmpFsSize eq = eq && r1.Config.Equals(r2.Config) eq = eq && r1.Annotations.Equals(r2.Annotations) // NOTE: datastore tests are not very fun to write with timestamp checks, @@ -207,6 +209,9 @@ func (r *Route) Update(patch *Route) { if patch.IdleTimeout != 0 { r.IdleTimeout = patch.IdleTimeout } + if patch.TmpFsSize != 0 { + r.TmpFsSize = patch.TmpFsSize + } if patch.Format != "" { r.Format = patch.Format } diff --git a/api/models/route_test.go b/api/models/route_test.go index 798c512a6..3b624f724 100644 --- a/api/models/route_test.go +++ b/api/models/route_test.go @@ -17,6 +17,7 @@ func TestRouteSimple(t *testing.T) { Format: "http", Timeout: 10, IdleTimeout: 10, + TmpFsSize: 10, } err := route1.Validate() diff --git a/images/fn-test-utils/fn-test-utils.go b/images/fn-test-utils/fn-test-utils.go index c61619f12..5b154bdd5 100644 --- a/images/fn-test-utils/fn-test-utils.go +++ b/images/fn-test-utils/fn-test-utils.go @@ -477,9 +477,27 @@ func createFile(name string, size int) error { } if size > 0 { - err := f.Truncate(int64(size)) - if err != nil { - return err + // create a 1K block (keep this buffer small to keep + // memory usage small) + chunk := make([]byte, 1024) + for i := 0; i < 1024; i++ { + chunk[i] = byte(i) + } + + for size > 0 { + dlen := size + if dlen > 1024 { + dlen = 1024 + } + + _, err := f.Write(chunk[:dlen]) + if err != nil { + return err + } + + // slightly modify the chunk to avoid any sparse file possibility + chunk[0]++ + size = size - dlen } } return nil