mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: introducing docker-syslog driver as default logger (#1189)
* fn: introducing docker-syslog driver as default logger With this change, fn-agent prefers RFC2454 docker-syslog driver for logging stdout/stderr from containers. The advantage of this is to offload it to docker itself instead of streaming stderr along with stdout, which gets multiplexed through single connection via docker-API. The change will need support from FDKs in order to log correct call-id and supress '\n' that splits syslog lines.
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/syslog"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -696,7 +695,7 @@ type hotSlot struct {
|
||||
done chan struct{} // signal we are done with slot
|
||||
errC <-chan error // container error
|
||||
container *container // TODO mask this
|
||||
maxRespSize uint64 // TODO boo.
|
||||
cfg *Config
|
||||
fatalErr error
|
||||
containerSpan trace.SpanContext
|
||||
}
|
||||
@@ -741,23 +740,10 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
// NOTE: stderr is limited separately (though line writer is vulnerable to attack?)
|
||||
// limit the bytes allowed to be written to the stdout pipe, which handles any
|
||||
// buffering overflows (json to a string, http to a buffer, etc)
|
||||
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.maxRespSize, models.ErrFunctionResponseTooBig)
|
||||
|
||||
// get our own syslogger with THIS call id (cheap), using the container's already open syslog conns (expensive)
|
||||
// TODO? we can basically just do this whether there are conns or not, this is relatively cheap (despite appearances)
|
||||
buf1 := bufPool.Get().(*bytes.Buffer)
|
||||
buf2 := bufPool.Get().(*bytes.Buffer)
|
||||
defer bufPool.Put(buf1)
|
||||
defer bufPool.Put(buf2)
|
||||
|
||||
sw := newSyslogWriter(call.ID, call.Path, call.AppName, syslog.LOG_ERR, s.container.syslogConns, buf1)
|
||||
var syslog io.WriteCloser = &nopCloser{sw}
|
||||
syslog = newLineWriterWithBuffer(buf2, syslog)
|
||||
defer syslog.Close() // close syslogger from here, but NOT the call log stderr OR conns
|
||||
stderr := multiWriteCloser{call.stderr, syslog} // use multiWriteCloser for its error ignoring properties
|
||||
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig)
|
||||
|
||||
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
|
||||
swapBack := s.container.swap(stdinRead, stdoutWrite, stderr, &call.Stats)
|
||||
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
|
||||
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
|
||||
|
||||
errApp := make(chan error, 1)
|
||||
@@ -815,10 +801,17 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: a.cfg.MaxFsSize,
|
||||
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
|
||||
stdin: call.req.Body,
|
||||
stdout: common.NewClampWriter(call.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig),
|
||||
stderr: call.stderr,
|
||||
stats: &call.Stats,
|
||||
logCfg: drivers.LoggerConfig{
|
||||
URL: strings.TrimSpace(call.SyslogURL),
|
||||
Tags: []drivers.LoggerTag{
|
||||
{Name: "app_name", Value: call.AppName},
|
||||
{Name: "func_name", Value: call.Path},
|
||||
},
|
||||
},
|
||||
stdin: call.req.Body,
|
||||
stdout: common.NewClampWriter(call.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig),
|
||||
stderr: call.stderr,
|
||||
stats: &call.Stats,
|
||||
}
|
||||
|
||||
cookie, err := a.driver.CreateCookie(ctx, container)
|
||||
@@ -899,7 +892,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
done: make(chan struct{}),
|
||||
errC: errC,
|
||||
container: container,
|
||||
maxRespSize: a.cfg.MaxResponseSize,
|
||||
cfg: &a.cfg,
|
||||
containerSpan: trace.FromContext(ctx).SpanContext(),
|
||||
}
|
||||
if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
|
||||
@@ -1028,11 +1021,11 @@ type container struct {
|
||||
fsSize uint64
|
||||
tmpFsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
logCfg drivers.LoggerConfig
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
syslogConns io.WriteCloser
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
|
||||
// swapMu protects the stats swapping
|
||||
swapMu sync.Mutex
|
||||
@@ -1051,13 +1044,6 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
stderr := common.NewGhostWriter()
|
||||
stdout := common.NewGhostWriter()
|
||||
|
||||
// these are only the conns, this doesn't write the syslog format (since it will change between calls)
|
||||
syslogConns, err := syslogConns(ctx, call.SyslogURL)
|
||||
if err != nil {
|
||||
// TODO we could write this to between stderr but between stderr doesn't go to user either. kill me.
|
||||
common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}).Error("error dialing syslog urls")
|
||||
}
|
||||
|
||||
// for use if no freezer (or we ever make up our minds)
|
||||
var bufs []*bytes.Buffer
|
||||
|
||||
@@ -1070,7 +1056,7 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
|
||||
// wrap the syslog and debug loggers in the same (respective) line writer
|
||||
// syslog complete chain for this (from top):
|
||||
// stderr -> line writer -> syslog -> []conns
|
||||
// stderr -> line writer
|
||||
|
||||
// TODO(reed): I guess this is worth it
|
||||
// TODO(reed): there's a bug here where the between writers could have
|
||||
@@ -1079,39 +1065,38 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
// and mostly came to the conclusion that life is meaningless.
|
||||
buf1 := bufPool.Get().(*bytes.Buffer)
|
||||
buf2 := bufPool.Get().(*bytes.Buffer)
|
||||
buf3 := bufPool.Get().(*bytes.Buffer)
|
||||
buf4 := bufPool.Get().(*bytes.Buffer)
|
||||
bufs = []*bytes.Buffer{buf1, buf2, buf3, buf4}
|
||||
bufs = []*bytes.Buffer{buf1, buf2}
|
||||
|
||||
// stdout = LOG_INFO, stderr = LOG_ERR -- ONLY for the between writers, normal stdout is a response
|
||||
so := newSyslogWriter(call.ID, call.Path, call.AppName, syslog.LOG_INFO, syslogConns, buf1)
|
||||
se := newSyslogWriter(call.ID, call.Path, call.AppName, syslog.LOG_ERR, syslogConns, buf2)
|
||||
|
||||
// use multiWriteCloser since it ignores errors (io.MultiWriter does not)
|
||||
soc := multiWriteCloser{&nopCloser{so}, &nopCloser{&logWriter{
|
||||
soc := &nopCloser{&logWriter{
|
||||
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
|
||||
}}}
|
||||
sec := multiWriteCloser{&nopCloser{se}, &nopCloser{&logWriter{
|
||||
}}
|
||||
sec := &nopCloser{&logWriter{
|
||||
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
|
||||
}}}
|
||||
}}
|
||||
|
||||
stdout.Swap(newLineWriterWithBuffer(buf4, soc))
|
||||
stderr.Swap(newLineWriterWithBuffer(buf3, sec))
|
||||
stdout.Swap(newLineWriterWithBuffer(buf1, soc))
|
||||
stderr.Swap(newLineWriterWithBuffer(buf2, sec))
|
||||
}
|
||||
|
||||
return &container{
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
syslogConns: syslogConns,
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
extensions: call.extensions,
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
tmpFsSize: uint64(call.TmpFsSize),
|
||||
logCfg: drivers.LoggerConfig{
|
||||
URL: strings.TrimSpace(call.SyslogURL),
|
||||
Tags: []drivers.LoggerTag{
|
||||
{Name: "app_name", Value: call.AppName},
|
||||
{Name: "func_name", Value: call.Path},
|
||||
},
|
||||
},
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
}, func() {
|
||||
stdin.Close()
|
||||
stderr.Close()
|
||||
@@ -1119,7 +1104,6 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config) (*container,
|
||||
for _, b := range bufs {
|
||||
bufPool.Put(b)
|
||||
}
|
||||
syslogConns.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1144,21 +1128,22 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.
|
||||
}
|
||||
}
|
||||
|
||||
func (c *container) Id() string { return c.id }
|
||||
func (c *container) Command() string { return "" }
|
||||
func (c *container) Input() io.Reader { return c.stdin }
|
||||
func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr }
|
||||
func (c *container) Volumes() [][2]string { return nil }
|
||||
func (c *container) WorkDir() string { return "" }
|
||||
func (c *container) Close() {}
|
||||
func (c *container) Image() string { return c.image }
|
||||
func (c *container) Timeout() time.Duration { return c.timeout }
|
||||
func (c *container) EnvVars() map[string]string { return c.env }
|
||||
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
|
||||
func (c *container) CPUs() uint64 { return c.cpus }
|
||||
func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
func (c *container) Extensions() map[string]string { return c.extensions }
|
||||
func (c *container) Id() string { return c.id }
|
||||
func (c *container) Command() string { return "" }
|
||||
func (c *container) Input() io.Reader { return c.stdin }
|
||||
func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr }
|
||||
func (c *container) Volumes() [][2]string { return nil }
|
||||
func (c *container) WorkDir() string { return "" }
|
||||
func (c *container) Close() {}
|
||||
func (c *container) Image() string { return c.image }
|
||||
func (c *container) Timeout() time.Duration { return c.timeout }
|
||||
func (c *container) EnvVars() map[string]string { return c.env }
|
||||
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
|
||||
func (c *container) CPUs() uint64 { return c.cpus }
|
||||
func (c *container) FsSize() uint64 { return c.fsSize }
|
||||
func (c *container) TmpFsSize() uint64 { return c.tmpFsSize }
|
||||
func (c *container) Extensions() map[string]string { return c.extensions }
|
||||
func (c *container) LoggerConfig() drivers.LoggerConfig { return c.logCfg }
|
||||
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
|
||||
Reference in New Issue
Block a user