mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
support configuration-based relative dirs (host and agent) for iofs (#1213)
* support configuration-based relative dirs (host and agent) for iofs mounts * Send UDS requests as POST to <UDS>/call
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/trace"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution)
|
||||
@@ -482,9 +483,9 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
if tok != nil {
|
||||
tok.Close()
|
||||
}
|
||||
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
|
||||
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
|
||||
// idle container.
|
||||
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
|
||||
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
|
||||
// idle container.
|
||||
case <-time.After(a.cfg.HotPoll):
|
||||
a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
||||
case <-ctx.Done(): // timeout
|
||||
@@ -703,18 +704,40 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
|
||||
// client should respect the request context (right?) so we still need this (right?)
|
||||
errApp := make(chan error, 1)
|
||||
|
||||
req := call.req
|
||||
req.RequestURI = "" // we have to clear this before using it as a client request, see https://golang.org/pkg/net/http/#Request
|
||||
req, err := http.NewRequest("POST", "http://localhost/call", call.req.Body)
|
||||
if err != nil {
|
||||
errApp <- err
|
||||
return errApp
|
||||
}
|
||||
|
||||
req.Header = make(http.Header)
|
||||
for k, vs := range call.req.Header {
|
||||
for _, v := range vs {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
//req.Header.Set("FN_DEADLINE", ci.Deadline().String())
|
||||
// TODO(occ) : fix compatidupes when FDKs are updated
|
||||
req.Header.Set("Fn-Call-Id", call.ID)
|
||||
req.Header.Set("FN_CALL_ID", call.ID)
|
||||
deadline, ok := ctx.Deadline()
|
||||
if ok {
|
||||
deadlineStr := deadline.Format(time.RFC3339)
|
||||
req.Header.Set("Fn-Deadline", deadlineStr)
|
||||
req.Header.Set("FN_DEADLINE", deadlineStr)
|
||||
|
||||
}
|
||||
|
||||
go func() {
|
||||
resp, err := s.udsClient.Do(req)
|
||||
if err != nil {
|
||||
common.Logger(ctx).WithError(err).Debug("Got error from UDS socket")
|
||||
errApp <- err
|
||||
return
|
||||
}
|
||||
common.Logger(ctx).WithField("status", resp.StatusCode).Debug("Got resp from UDS socket")
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
select {
|
||||
@@ -864,14 +887,14 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
if call.Format == models.FormatHTTPStream {
|
||||
// start our listener before starting the container, so we don't miss the pretty things whispered in our ears
|
||||
// XXX(reed): figure out cleaner way to carry around the directory and expose the lsnr.sock file
|
||||
go inotifyUDS(ctx, container.UDSPath(), udsAwait)
|
||||
go inotifyUDS(ctx, container.UDSAgentPath(), udsAwait)
|
||||
|
||||
udsClient = http.Client{
|
||||
Transport: &http.Transport{
|
||||
// XXX(reed): other settings ?
|
||||
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
return d.DialContext(ctx, "unix", container.UDSPath()+"/lsnr.sock") // XXX(reed): hardcoded lsnr.sock
|
||||
return d.DialContext(ctx, "unix", filepath.Join(container.UDSAgentPath(), "lsnr.sock")) // XXX(reed): hardcoded lsnr.sock
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -966,10 +989,16 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
logger.WithError(res.Error()).Info("hot function terminated")
|
||||
}
|
||||
|
||||
func createIOFS(cfg *Config) (string, error) {
|
||||
// Creates an IO directory for container sockets - returns a pair of directories
|
||||
// first is the directory relative to the agent (what I watch and talk to), second is a directory relative to docker (what I ask docker to mount)
|
||||
// e.g. If IOFSAgentPath is set /data/iofs/ and IOFSMountRoot is set to /my/path/to/iofs/ this will return paths like:
|
||||
// /data/iofs/iofs829027/ /my/path/to/iofs/iofs829027/ respectively
|
||||
// If either IOFSAgentPath is unset it will always return paths relative to /tmp/ on the docker host and
|
||||
// if only IOFSMountPath is unset it will return the same directory for both
|
||||
func createIOFS(cfg *Config) (string, string, error) {
|
||||
// XXX(reed): need to ensure these are cleaned up if any of these ops in here fail...
|
||||
|
||||
dir := cfg.IOFSPath
|
||||
dir := cfg.IOFSAgentPath
|
||||
if dir == "" {
|
||||
// /tmp should be a memory backed filesystem, where we can get user perms
|
||||
// on the socket file (fdks must give write permissions to users on sock).
|
||||
@@ -980,24 +1009,27 @@ func createIOFS(cfg *Config) (string, error) {
|
||||
// create a tmpdir
|
||||
iofsDir, err := ioutil.TempDir(dir, "iofs")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot create tmpdir for iofs: %v", err)
|
||||
return "", "", fmt.Errorf("cannot create tmpdir for iofs: %v", err)
|
||||
}
|
||||
|
||||
opts := cfg.IOFSOpts
|
||||
if opts == "" {
|
||||
// opts = "size=1k,nr_inodes=8,mode=0777"
|
||||
}
|
||||
if cfg.IOFSAgentPath != "" && cfg.IOFSMountRoot != "" {
|
||||
return iofsDir, filepath.Join(cfg.IOFSMountRoot, filepath.Base(iofsDir)), nil
|
||||
}
|
||||
return iofsDir, iofsDir, nil
|
||||
|
||||
// under tmpdir, create tmpfs
|
||||
// TODO uh, yea, idk
|
||||
//if cfg.IOFSPath != "" {
|
||||
//if cfg.IOFSAgentPath != "" {
|
||||
//err = syscall.Mount("tmpfs", iofsDir, "tmpfs", uintptr( [>syscall.MS_NOEXEC|syscall.MS_NOSUID|syscall.MS_NODEV<] 0), opts)
|
||||
//if err != nil {
|
||||
//return "", fmt.Errorf("cannot mount/create tmpfs=%s", iofsDir)
|
||||
//}
|
||||
//}
|
||||
|
||||
return iofsDir, nil
|
||||
}
|
||||
|
||||
func inotifyUDS(ctx context.Context, iofsDir string, awaitUDS chan<- error) {
|
||||
@@ -1147,18 +1179,20 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
// and stderr can be swapped out by new calls in the container. input and
|
||||
// output must be copied in and out.
|
||||
type container struct {
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
extensions map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
iofs string
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
logCfg drivers.LoggerConfig
|
||||
close func()
|
||||
id string // contrived
|
||||
image string
|
||||
env map[string]string
|
||||
extensions map[string]string
|
||||
memory uint64
|
||||
cpus uint64
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
iofsAgentPath string
|
||||
iofsDockerPath string
|
||||
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
logCfg drivers.LoggerConfig
|
||||
close func()
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
@@ -1215,13 +1249,13 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
|
||||
}
|
||||
|
||||
var iofs string
|
||||
var iofsAgentPath, iofsDockerPath string
|
||||
var err error
|
||||
closer := func() {} // XXX(reed):
|
||||
if call.Format == models.FormatHTTPStream {
|
||||
// XXX(reed): we should also point stdout to stderr, and not have stdin
|
||||
|
||||
iofs, err = createIOFS(cfg)
|
||||
iofsAgentPath, iofsDockerPath, err = createIOFS(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1233,7 +1267,7 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
//common.Logger(ctx).WithError(err).Error("error unmounting iofs")
|
||||
//}
|
||||
|
||||
err = os.RemoveAll(iofs)
|
||||
err = os.RemoveAll(iofsAgentPath)
|
||||
if err != nil {
|
||||
common.Logger(ctx).WithError(err).Error("error removing iofs")
|
||||
}
|
||||
@@ -1241,15 +1275,16 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
}
|
||||
|
||||
return &container{
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
iofs: iofs,
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
iofsAgentPath: iofsAgentPath,
|
||||
iofsDockerPath: iofsDockerPath,
|
||||
logCfg: drivers.LoggerConfig{
|
||||
URL: strings.TrimSpace(call.SyslogURL),
|
||||
Tags: []drivers.LoggerTag{
|
||||
@@ -1309,7 +1344,8 @@ func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
func (c *container) Extensions() map[string]string { return c.extensions }
|
||||
func (c *container) LoggerConfig() drivers.LoggerConfig { return c.logCfg }
|
||||
func (c *container) UDSPath() string { return c.iofs }
|
||||
func (c *container) UDSAgentPath() string { return c.iofsAgentPath }
|
||||
func (c *container) UDSDockerPath() string { return c.iofsDockerPath }
|
||||
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
|
||||
@@ -33,7 +33,8 @@ type Config struct {
|
||||
DisableReadOnlyRootFs bool `json:"disable_readonly_rootfs"`
|
||||
DisableTini bool `json:"disable_tini"`
|
||||
DisableDebugUserLogs bool `json:"disable_debug_user_logs"`
|
||||
IOFSPath string `json:"iofs_path"`
|
||||
IOFSAgentPath string `json:"iofs_path"`
|
||||
IOFSMountRoot string `json:"iofs_mount_root"`
|
||||
IOFSOpts string `json:"iofs_opts"`
|
||||
}
|
||||
|
||||
@@ -85,8 +86,12 @@ const (
|
||||
EnvDisableTini = "FN_DISABLE_TINI"
|
||||
// EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production.
|
||||
EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS"
|
||||
// EnvIOFSPath is the path of a directory to configure for unix socket files for each container
|
||||
|
||||
// EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container
|
||||
EnvIOFSPath = "FN_IOFS_PATH"
|
||||
// EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with
|
||||
EnvIOFSDockerPath = "FN_IOFS_DOCKER_PATH"
|
||||
|
||||
// EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files
|
||||
EnvIOFSOpts = "FN_IOFS_OPTS"
|
||||
|
||||
@@ -129,7 +134,8 @@ func NewConfig() (*Config, error) {
|
||||
err = setEnvStr(err, EnvDockerNetworks, &cfg.DockerNetworks)
|
||||
err = setEnvStr(err, EnvDockerLoadFile, &cfg.DockerLoadFile)
|
||||
err = setEnvUint(err, EnvMaxTmpFsInodes, &cfg.MaxTmpFsInodes)
|
||||
err = setEnvStr(err, EnvIOFSPath, &cfg.IOFSPath)
|
||||
err = setEnvStr(err, EnvIOFSPath, &cfg.IOFSAgentPath)
|
||||
err = setEnvStr(err, EnvIOFSDockerPath, &cfg.IOFSMountRoot)
|
||||
err = setEnvStr(err, EnvIOFSOpts, &cfg.IOFSOpts)
|
||||
|
||||
if err != nil {
|
||||
@@ -167,6 +173,20 @@ func setEnvStr(err error, name string, dst *string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func setEnvBool(err error, name string, dst *bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tmp, ok := os.LookupEnv(name); ok {
|
||||
val, err := strconv.ParseBool(tmp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*dst = val
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setEnvUint(err error, name string, dst *uint64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -106,7 +106,7 @@ func (c *cookie) configureTmpFs(log logrus.FieldLogger) {
|
||||
}
|
||||
|
||||
func (c *cookie) configureIOFs(log logrus.FieldLogger) {
|
||||
path := c.task.UDSPath()
|
||||
path := c.task.UDSDockerPath()
|
||||
if path == "" {
|
||||
// TODO this should be required soon-ish
|
||||
return
|
||||
|
||||
@@ -73,7 +73,8 @@ func (c *poolTask) TmpFsSize() uint64 { return 0
|
||||
func (c *poolTask) Extensions() map[string]string { return nil }
|
||||
func (c *poolTask) LoggerConfig() drivers.LoggerConfig { return drivers.LoggerConfig{} }
|
||||
func (c *poolTask) WriteStat(ctx context.Context, stat drivers.Stat) {}
|
||||
func (c *poolTask) UDSPath() string { return "" }
|
||||
func (c *poolTask) UDSAgentPath() string { return "" }
|
||||
func (c *poolTask) UDSDockerPath() string { return "" }
|
||||
|
||||
type dockerPoolItem struct {
|
||||
id string
|
||||
|
||||
@@ -38,7 +38,8 @@ func (f *taskDockerTest) Close() {}
|
||||
func (f *taskDockerTest) Input() io.Reader { return f.input }
|
||||
func (f *taskDockerTest) Extensions() map[string]string { return nil }
|
||||
func (f *taskDockerTest) LoggerConfig() drivers.LoggerConfig { return drivers.LoggerConfig{} }
|
||||
func (f *taskDockerTest) UDSPath() string { return "" }
|
||||
func (f *taskDockerTest) UDSAgentPath() string { return "" }
|
||||
func (f *taskDockerTest) UDSDockerPath() string { return "" }
|
||||
|
||||
func TestRunnerDocker(t *testing.T) {
|
||||
dkr := NewDocker(drivers.Config{})
|
||||
|
||||
@@ -162,9 +162,14 @@ type ContainerTask interface {
|
||||
// more specific but it's easier to be lazy.
|
||||
Extensions() map[string]string
|
||||
|
||||
// UDSPath to use to configure the unix domain socket. the drivers
|
||||
// UDSAgentPath to use to configure the unix domain socket.
|
||||
// This is the mount point relative to the agent
|
||||
// abstractions have leaked so bad at this point it's a monsoon.
|
||||
UDSPath() string
|
||||
UDSAgentPath() string
|
||||
|
||||
// UDSDockerPath to use to configure the unix domain socket. the drivers
|
||||
// This is the mount point relative to the docker host.
|
||||
UDSDockerPath() string
|
||||
}
|
||||
|
||||
// Stat is a bucket of stats from a driver at a point in time for a certain task.
|
||||
|
||||
Reference in New Issue
Block a user