mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Add tmpfs IOFS (#1212)
* Define an interface for IOFS handling. Add no-op and temporary directory implementations. * Move IOFS stuff out into separate file, add basic tmpfs implementation for linux only * Switch between directory and tmpfs based on platform and config * Respect FN_IOFS_OPTS * Make directory iofs default on all platforms * At least try to clean up a bit on failure * Add backout if IOFS creation fails * Add comment about iofs.Close
This commit is contained in:
committed by
Reed Allman
parent
822fa71fd2
commit
493790dbd2
@@ -6,10 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -834,6 +832,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: a.cfg.MaxFsSize,
|
||||
iofs: &noopIOFS{},
|
||||
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
|
||||
logCfg: drivers.LoggerConfig{
|
||||
URL: strings.TrimSpace(call.SyslogURL),
|
||||
@@ -1007,49 +1006,6 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
logger.WithError(res.Error()).Info("hot function terminated")
|
||||
}
|
||||
|
||||
// Creates an IO directory for container sockets - returns a pair of directories
|
||||
// first is the directory relative to the agent (what I watch and talk to), second is a directory relative to docker (what I ask docker to mount)
|
||||
// e.g. If IOFSAgentPath is set /data/iofs/ and IOFSMountRoot is set to /my/path/to/iofs/ this will return paths like:
|
||||
// /data/iofs/iofs829027/ /my/path/to/iofs/iofs829027/ respectively
|
||||
// If either IOFSAgentPath is unset it will always return paths relative to /tmp/ on the docker host and
|
||||
// if only IOFSMountPath is unset it will return the same directory for both
|
||||
func createIOFS(cfg *Config) (string, string, error) {
|
||||
// XXX(reed): need to ensure these are cleaned up if any of these ops in here fail...
|
||||
|
||||
dir := cfg.IOFSAgentPath
|
||||
if dir == "" {
|
||||
// /tmp should be a memory backed filesystem, where we can get user perms
|
||||
// on the socket file (fdks must give write permissions to users on sock).
|
||||
// /var/run is root only, hence this...
|
||||
dir = "/tmp"
|
||||
}
|
||||
|
||||
// create a tmpdir
|
||||
iofsDir, err := ioutil.TempDir(dir, "iofs")
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("cannot create tmpdir for iofs: %v", err)
|
||||
}
|
||||
|
||||
opts := cfg.IOFSOpts
|
||||
if opts == "" {
|
||||
// opts = "size=1k,nr_inodes=8,mode=0777"
|
||||
}
|
||||
if cfg.IOFSAgentPath != "" && cfg.IOFSMountRoot != "" {
|
||||
return iofsDir, filepath.Join(cfg.IOFSMountRoot, filepath.Base(iofsDir)), nil
|
||||
}
|
||||
return iofsDir, iofsDir, nil
|
||||
|
||||
// under tmpdir, create tmpfs
|
||||
// TODO uh, yea, idk
|
||||
//if cfg.IOFSAgentPath != "" {
|
||||
//err = syscall.Mount("tmpfs", iofsDir, "tmpfs", uintptr( [>syscall.MS_NOEXEC|syscall.MS_NOSUID|syscall.MS_NODEV<] 0), opts)
|
||||
//if err != nil {
|
||||
//return "", fmt.Errorf("cannot mount/create tmpfs=%s", iofsDir)
|
||||
//}
|
||||
//}
|
||||
|
||||
}
|
||||
|
||||
func inotifyUDS(ctx context.Context, iofsDir string, awaitUDS chan<- error) {
|
||||
// XXX(reed): I forgot how to plumb channels temporarily forgive me for this sin (inotify will timeout, this is just bad programming)
|
||||
err := inotifyAwait(ctx, iofsDir)
|
||||
@@ -1197,20 +1153,18 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
// and stderr can be swapped out by new calls in the container. input and
|
||||
// output must be copied in and out.
|
||||
type container struct {
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
extensions map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
iofsAgentPath string
|
||||
iofsDockerPath string
|
||||
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
logCfg drivers.LoggerConfig
|
||||
close func()
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
extensions map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
iofs iofs
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
logCfg drivers.LoggerConfig
|
||||
close func()
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
@@ -1267,42 +1221,33 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
|
||||
}
|
||||
|
||||
var iofsAgentPath, iofsDockerPath string
|
||||
var iofs iofs
|
||||
var err error
|
||||
closer := func() {} // XXX(reed):
|
||||
if call.Format == models.FormatHTTPStream {
|
||||
// XXX(reed): we should also point stdout to stderr, and not have stdin
|
||||
if cfg.IOFSEnableTmpfs {
|
||||
iofs, err = newTmpfsIOFS(ctx, cfg)
|
||||
} else {
|
||||
iofs, err = newDirectoryIOFS(ctx, cfg)
|
||||
}
|
||||
|
||||
iofsAgentPath, iofsDockerPath, err = createIOFS(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// XXX(reed): futz with this, we have to make sure shit gets cleaned up properly
|
||||
closer = func() {
|
||||
//err := syscall.Unmount(iofs, 0)
|
||||
//if err != nil {
|
||||
//common.Logger(ctx).WithError(err).Error("error unmounting iofs")
|
||||
//}
|
||||
|
||||
err = os.RemoveAll(iofsAgentPath)
|
||||
if err != nil {
|
||||
common.Logger(ctx).WithError(err).Error("error removing iofs")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
iofs = &noopIOFS{}
|
||||
}
|
||||
|
||||
return &container{
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
iofsAgentPath: iofsAgentPath,
|
||||
iofsDockerPath: iofsDockerPath,
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
iofs: iofs,
|
||||
logCfg: drivers.LoggerConfig{
|
||||
URL: strings.TrimSpace(call.SyslogURL),
|
||||
Tags: []drivers.LoggerTag{
|
||||
@@ -1320,7 +1265,11 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
for _, b := range bufs {
|
||||
bufPool.Put(b)
|
||||
}
|
||||
closer() // XXX(reed): clean up
|
||||
// iofs.Close MUST be called here or we will leak directories and/or tmpfs mounts!
|
||||
if err = iofs.Close(); err != nil {
|
||||
// Note: This is logged with the context of the container creation
|
||||
common.Logger(ctx).WithError(err).Error("Error closing IOFS")
|
||||
}
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -1362,8 +1311,8 @@ func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
func (c *container) Extensions() map[string]string { return c.extensions }
|
||||
func (c *container) LoggerConfig() drivers.LoggerConfig { return c.logCfg }
|
||||
func (c *container) UDSAgentPath() string { return c.iofsAgentPath }
|
||||
func (c *container) UDSDockerPath() string { return c.iofsDockerPath }
|
||||
func (c *container) UDSAgentPath() string { return c.iofs.AgentPath() }
|
||||
func (c *container) UDSDockerPath() string { return c.iofs.DockerPath() }
|
||||
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
|
||||
Reference in New Issue
Block a user