mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
add user syslog writers to app (#970)
* add user syslog writers to app users may specify a syslog url[s] on apps now and all functions under that app will spew their logs out to it. the docs have more information around details there, please review those (swagger and operating/logging.md), tried to implement to spec in some parts and improve others, open to feedback on format though, lots of liberty there. design decision wise, I am looking to the future and ignoring cold containers. the overhead of the connections there will not be worth it, so this feature only works for hot functions, since we're killing cold anyway (even if a user can just straight up exit a hot container). syslog connections will be opened against a container when it starts up, and then the call id that is logged gets swapped out for each call that goes through the container, this cuts down on the cost of opening/closing connections significantly. there are buffers to accumulate logs until we get a `\n` to actually write a syslog line, and a buffer to save some bytes when we're writing the syslog formatting as well. underneath writers re-use the line writer in certain scenarios (swapper). we could likely improve the ease of setting this up, but opening the syslog conns against a container seems worth it, and is a different path than the other func loggers that we create when we make a call object. the Close() stuff is a little tricky, not sure how to make it easier and have the ^ benefits, open to idears. this does add another vector of 'limits' to consider for more strict service operators. one being how many syslog urls can a user add to an app (infinite, atm) and the other being on the order of number of containers per host we could run out of connections in certain scenarios. there may be some utility in having multiple syslog sinks to send to, it could help with debugging at times to send to another destination or if a user is a client w/ someone and both want the function logs, e.g. (have used this for that in the past, specifically). this also doesn't work behind a proxy, which is something i'm open to fixing, but afaict will require a 3rd party dependency (we can pretty much steal what docker does). this is mostly of utility for those of us that work behind a proxy all the time, not really for end users. there are some unit tests. integration tests for this don't sound very fun to maintain. I did test against papertrail with each protocol and it works (and even times out if you're behind a proxy!). closes #337 * add trace to syslog dial
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"log/syslog"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -687,8 +689,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
// buffering overflows (json to a string, http to a buffer, etc)
|
||||
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.maxRespSize, models.ErrFunctionResponseTooBig)
|
||||
|
||||
// get our own syslogger with THIS call id (cheap), using the container's already open syslog conns (expensive)
|
||||
// TODO? we can basically just do this whether there are conns or not, this is relatively cheap (despite appearances)
|
||||
buf1 := bufPool.Get().(*bytes.Buffer)
|
||||
buf2 := bufPool.Get().(*bytes.Buffer)
|
||||
defer bufPool.Put(buf1)
|
||||
defer bufPool.Put(buf2)
|
||||
|
||||
sw := newSyslogWriter(call.ID, call.Path, call.AppID, syslog.LOG_ERR, s.container.syslogConns, buf1)
|
||||
var syslog io.WriteCloser = &nopCloser{sw}
|
||||
syslog = newLineWriterWithBuffer(buf2, syslog)
|
||||
defer syslog.Close() // close syslogger from here, but NOT the call log stderr OR conns
|
||||
stderr := multiWriteCloser{call.stderr, syslog} // use multiWriteCloser for its error ignoring properties
|
||||
|
||||
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
|
||||
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
|
||||
swapBack := s.container.swap(stdinRead, stdoutWrite, stderr, &call.Stats)
|
||||
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
|
||||
|
||||
errApp := make(chan error, 1)
|
||||
@@ -775,7 +790,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
state.UpdateState(ctx, ContainerStateStart, call.slots)
|
||||
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
|
||||
|
||||
container, closer := NewHotContainer(call, &a.cfg)
|
||||
container, closer := NewHotContainer(ctx, call, &a.cfg)
|
||||
defer closer()
|
||||
|
||||
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app_id": call.AppID, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout})
|
||||
@@ -941,17 +956,17 @@ type container struct {
|
||||
fsSize uint64
|
||||
timeout time.Duration // cold only (superfluous, but in case)
|
||||
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
syslogConns io.WriteCloser
|
||||
|
||||
// lock protects the stats swapping
|
||||
statsMu sync.Mutex
|
||||
stats *drivers.Stats
|
||||
// swapMu protects the stats swapping
|
||||
swapMu sync.Mutex
|
||||
stats *drivers.Stats
|
||||
}
|
||||
|
||||
func NewHotContainer(call *call, cfg *AgentConfig) (*container, func()) {
|
||||
|
||||
func NewHotContainer(ctx context.Context, call *call, cfg *AgentConfig) (*container, func()) {
|
||||
// if freezer is enabled, be consistent with freezer behavior and
|
||||
// block stdout and stderr between calls.
|
||||
isBlockIdleIO := MaxDisabledMsecs != cfg.FreezeIdle
|
||||
@@ -962,34 +977,73 @@ func NewHotContainer(call *call, cfg *AgentConfig) (*container, func()) {
|
||||
stderr := common.NewGhostWriter()
|
||||
stdout := common.NewGhostWriter()
|
||||
|
||||
// these are only the conns, this doesn't write the syslog format (since it will change between calls)
|
||||
syslogConns, err := syslogConns(ctx, call.SyslogURL)
|
||||
if err != nil {
|
||||
// TODO we could write this to between stderr but between stderr doesn't go to user either. kill me.
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}).Error("error dialing syslog urls")
|
||||
}
|
||||
|
||||
// for use if no freezer (or we ever make up our minds)
|
||||
var bufs []*bytes.Buffer
|
||||
|
||||
// when not processing a request, do we block IO?
|
||||
if !isBlockIdleIO {
|
||||
// IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed
|
||||
// from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH*
|
||||
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
|
||||
// reading from the other one without risking head-of-line blocking.
|
||||
stderr.Swap(newLineWriter(&logWriter{
|
||||
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
|
||||
}))
|
||||
stdout.Swap(newLineWriter(&logWriter{
|
||||
|
||||
// wrap the syslog and debug loggers in the same (respective) line writer
|
||||
// syslog complete chain for this (from top):
|
||||
// stderr -> line writer -> syslog -> []conns
|
||||
|
||||
// TODO(reed): I guess this is worth it
|
||||
// TODO(reed): there's a bug here where the between writers could have
|
||||
// bytes in there, get swapped for real stdout/stderr, come back and write
|
||||
// bytes in and the bytes are [really] stale. I played with fixing this
|
||||
// and mostly came to the conclusion that life is meaningless.
|
||||
buf1 := bufPool.Get().(*bytes.Buffer)
|
||||
buf2 := bufPool.Get().(*bytes.Buffer)
|
||||
buf3 := bufPool.Get().(*bytes.Buffer)
|
||||
buf4 := bufPool.Get().(*bytes.Buffer)
|
||||
bufs = []*bytes.Buffer{buf1, buf2, buf3, buf4}
|
||||
|
||||
// stdout = LOG_INFO, stderr = LOG_ERR -- ONLY for the between writers, normal stdout is a response
|
||||
so := newSyslogWriter(call.ID, call.Path, call.AppID, syslog.LOG_INFO, syslogConns, buf1)
|
||||
se := newSyslogWriter(call.ID, call.Path, call.AppID, syslog.LOG_ERR, syslogConns, buf2)
|
||||
|
||||
// use multiWriteCloser since it ignores errors (io.MultiWriter does not)
|
||||
soc := multiWriteCloser{&nopCloser{so}, &nopCloser{&logWriter{
|
||||
logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
|
||||
}))
|
||||
}}}
|
||||
sec := multiWriteCloser{&nopCloser{se}, &nopCloser{&logWriter{
|
||||
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "path": call.Path, "image": call.Image, "container_id": id}),
|
||||
}}}
|
||||
|
||||
stdout.Swap(newLineWriterWithBuffer(buf4, soc))
|
||||
stderr.Swap(newLineWriterWithBuffer(buf3, sec))
|
||||
}
|
||||
|
||||
return &container{
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
id: id, // XXX we could just let docker generate ids...
|
||||
image: call.Image,
|
||||
env: map[string]string(call.Config),
|
||||
memory: call.Memory,
|
||||
cpus: uint64(call.CPUs),
|
||||
fsSize: cfg.MaxFsSize,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
syslogConns: syslogConns,
|
||||
}, func() {
|
||||
stdin.Close()
|
||||
stderr.Close()
|
||||
stdout.Close()
|
||||
for _, b := range bufs {
|
||||
bufPool.Put(b)
|
||||
}
|
||||
syslogConns.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -999,18 +1053,18 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.
|
||||
ostdout := c.stdout.(common.GhostWriter).Swap(stdout)
|
||||
ostderr := c.stderr.(common.GhostWriter).Swap(stderr)
|
||||
|
||||
c.statsMu.Lock()
|
||||
c.swapMu.Lock()
|
||||
ocs := c.stats
|
||||
c.stats = cs
|
||||
c.statsMu.Unlock()
|
||||
c.swapMu.Unlock()
|
||||
|
||||
return func() {
|
||||
c.stdin.(common.GhostReader).Swap(ostdin)
|
||||
c.stdout.(common.GhostWriter).Swap(ostdout)
|
||||
c.stderr.(common.GhostWriter).Swap(ostderr)
|
||||
c.statsMu.Lock()
|
||||
c.swapMu.Lock()
|
||||
c.stats = ocs
|
||||
c.statsMu.Unlock()
|
||||
c.swapMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1036,11 +1090,11 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
}
|
||||
}
|
||||
|
||||
c.statsMu.Lock()
|
||||
c.swapMu.Lock()
|
||||
if c.stats != nil {
|
||||
*(c.stats) = append(*(c.stats), stat)
|
||||
}
|
||||
c.statsMu.Unlock()
|
||||
c.swapMu.Unlock()
|
||||
}
|
||||
|
||||
var measures map[string]*stats.Int64Measure
|
||||
|
||||
@@ -336,8 +336,8 @@ func TestAsyncCallHeaders(t *testing.T) {
|
||||
func TestLoggerIsStringerAndWorks(t *testing.T) {
|
||||
// TODO test limit writer, logrus writer, etc etc
|
||||
|
||||
loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true})
|
||||
logger := setupLogger(loggyloo, 1*1024*1024)
|
||||
var call models.Call
|
||||
logger := setupLogger(context.Background(), 1*1024*1024, &call)
|
||||
|
||||
if _, ok := logger.(fmt.Stringer); !ok {
|
||||
// NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization...
|
||||
@@ -360,8 +360,8 @@ func TestLoggerIsStringerAndWorks(t *testing.T) {
|
||||
|
||||
func TestLoggerTooBig(t *testing.T) {
|
||||
|
||||
loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true})
|
||||
logger := setupLogger(loggyloo, 10)
|
||||
var call models.Call
|
||||
logger := setupLogger(context.Background(), 10, &call)
|
||||
|
||||
str := fmt.Sprintf("0 line\n1 l\n-----max log size 10 bytes exceeded, truncating log-----\n")
|
||||
|
||||
|
||||
@@ -103,6 +103,11 @@ func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallO
|
||||
return err
|
||||
}
|
||||
|
||||
var syslogURL string
|
||||
if app.SyslogURL != nil {
|
||||
syslogURL = *app.SyslogURL
|
||||
}
|
||||
|
||||
c.Call = &models.Call{
|
||||
ID: id,
|
||||
Path: route.Path,
|
||||
@@ -123,6 +128,7 @@ func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallO
|
||||
URL: reqURL(req),
|
||||
Method: req.Method,
|
||||
AppID: app.ID,
|
||||
SyslogURL: syslogURL,
|
||||
}
|
||||
|
||||
c.req = req
|
||||
@@ -258,9 +264,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
logrus.Fields{"id": c.ID, "app_id": c.AppID, "route": c.Path})
|
||||
c.req = c.req.WithContext(ctx)
|
||||
|
||||
// setup stderr logger separate (don't inherit ctx vars)
|
||||
logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "path": c.Path, "image": c.Image, "call_id": c.ID})
|
||||
c.stderr = setupLogger(logger, a.cfg.MaxLogSize)
|
||||
c.stderr = setupLogger(ctx, a.cfg.MaxLogSize, c.Call)
|
||||
if c.w == nil {
|
||||
// send STDOUT to logs if no writer given (async...)
|
||||
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
|
||||
|
||||
@@ -2,10 +2,12 @@ package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -14,10 +16,15 @@ var (
|
||||
logPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
|
||||
)
|
||||
|
||||
// setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's,
|
||||
// and may be read from the returned io.Reader (singular). After Close is called,
|
||||
// the Reader is not safe to read from, nor the Writer to write to.
|
||||
func setupLogger(logger logrus.FieldLogger, maxSize uint64) io.ReadWriteCloser {
|
||||
// setupLogger returns a ReadWriteCloser that may have:
|
||||
// * [always] writes bytes to a size limited buffer, that can be read from using io.Reader
|
||||
// * [always] writes bytes per line to stderr as DEBUG
|
||||
//
|
||||
// To prevent write failures from failing the call or any other writes,
|
||||
// multiWriteCloser ignores errors. Close will flush the line writers
|
||||
// appropriately. The returned io.ReadWriteCloser is not safe for use after
|
||||
// calling Close.
|
||||
func setupLogger(ctx context.Context, maxSize uint64, c *models.Call) io.ReadWriteCloser {
|
||||
lbuf := bufPool.Get().(*bytes.Buffer)
|
||||
dbuf := logPool.Get().(*bytes.Buffer)
|
||||
|
||||
@@ -30,17 +37,16 @@ func setupLogger(logger logrus.FieldLogger, maxSize uint64) io.ReadWriteCloser {
|
||||
return nil
|
||||
}
|
||||
|
||||
// we don't need to limit the log writer, but we do need it to dispense lines
|
||||
linew := newLineWriterWithBuffer(lbuf, &logWriter{logger})
|
||||
|
||||
// we don't need to log per line to db, but we do need to limit it
|
||||
limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)}
|
||||
|
||||
// TODO / NOTE: we want linew to be first because limitw may error if limit
|
||||
// is reached but we still want to log. we should probably ignore hitting the
|
||||
// limit error since we really just want to not write too much to db and
|
||||
// that's handled as is. put buffers back last to avoid misuse, if there's
|
||||
// an error they won't get put back and that's really okay too.
|
||||
// accumulate all line writers, wrap in same line writer (to re-use buffer)
|
||||
stderrLogger := logrus.WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "path": c.Path, "image": c.Image, "call_id": c.ID})
|
||||
loggo := &nopCloser{&logWriter{stderrLogger}}
|
||||
|
||||
// we don't need to limit the log writer(s), but we do need it to dispense lines
|
||||
linew := newLineWriterWithBuffer(lbuf, loggo)
|
||||
|
||||
mw := multiWriteCloser{linew, limitw, &fCloser{close}}
|
||||
return &rwc{mw, dbuf}
|
||||
}
|
||||
@@ -78,39 +84,34 @@ type nullReadWriter struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
func (n *nullReadWriter) Close() error {
|
||||
func (n nullReadWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
func (n *nullReadWriter) Read(b []byte) (int, error) {
|
||||
func (n nullReadWriter) Read(b []byte) (int, error) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
func (n *nullReadWriter) Write(b []byte) (int, error) {
|
||||
return 0, io.EOF
|
||||
func (n nullReadWriter) Write(b []byte) (int, error) {
|
||||
return len(b), io.EOF
|
||||
}
|
||||
|
||||
// multiWriteCloser returns the first write or close that returns a non-nil
|
||||
// err, if no non-nil err is returned, then the returned bytes written will be
|
||||
// from the last call to write.
|
||||
// multiWriteCloser ignores all errors from inner writers. you say, oh, this is a bad idea?
|
||||
// yes, well, we were going to silence them all individually anyway, so let's not be shy about it.
|
||||
// the main thing we need to ensure is that every close is called, even if another errors.
|
||||
// XXX(reed): maybe we should log it (for syslog, it may help debug, maybe we just log that one)
|
||||
type multiWriteCloser []io.WriteCloser
|
||||
|
||||
func (m multiWriteCloser) Write(b []byte) (n int, err error) {
|
||||
for _, mw := range m {
|
||||
n, err = mw.Write(b)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
mw.Write(b)
|
||||
}
|
||||
return n, err
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (m multiWriteCloser) Close() (err error) {
|
||||
for _, mw := range m {
|
||||
err = mw.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mw.Close()
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// logWriter will log (to real stderr) every call to Write as a line. it should
|
||||
@@ -130,14 +131,14 @@ func (l *logWriter) Write(b []byte) (int, error) {
|
||||
// will be appended in Close if none is present.
|
||||
type lineWriter struct {
|
||||
b *bytes.Buffer
|
||||
w io.Writer
|
||||
w io.WriteCloser
|
||||
}
|
||||
|
||||
func newLineWriter(w io.Writer) io.WriteCloser {
|
||||
func newLineWriter(w io.WriteCloser) io.WriteCloser {
|
||||
return &lineWriter{b: new(bytes.Buffer), w: w}
|
||||
}
|
||||
|
||||
func newLineWriterWithBuffer(b *bytes.Buffer, w io.Writer) io.WriteCloser {
|
||||
func newLineWriterWithBuffer(b *bytes.Buffer, w io.WriteCloser) io.WriteCloser {
|
||||
return &lineWriter{b: b, w: w}
|
||||
}
|
||||
|
||||
@@ -165,6 +166,8 @@ func (li *lineWriter) Write(ogb []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (li *lineWriter) Close() error {
|
||||
defer li.w.Close() // MUST close this (after writing last line)
|
||||
|
||||
// flush the remaining bytes in the buffer to underlying writer, adding a
|
||||
// newline if needed
|
||||
b := li.b.Bytes()
|
||||
|
||||
@@ -287,6 +287,8 @@ func getSlotQueueKey(call *call) string {
|
||||
|
||||
hash.Write(unsafeBytes(call.AppID))
|
||||
hash.Write(unsafeBytes("\x00"))
|
||||
hash.Write(unsafeBytes(call.SyslogURL))
|
||||
hash.Write(unsafeBytes("\x00"))
|
||||
hash.Write(unsafeBytes(call.Path))
|
||||
hash.Write(unsafeBytes("\x00"))
|
||||
hash.Write(unsafeBytes(call.Image))
|
||||
|
||||
145
api/agent/syslog.go
Normal file
145
api/agent/syslog.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/syslog"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// syslogConns may return a non-nil io.WriteCloser and an error simultaneously,
|
||||
// the error containing any errors from connecting to any of the syslog URLs, and the
|
||||
// io.WriteCloser writing to any syslogURLs that were successfully connected to.
|
||||
// the returned io.WriteCloser is a Writer to each conn, it should be wrapped in another
|
||||
// writer that writes syslog formatted messages (by line).
|
||||
func syslogConns(ctx context.Context, syslogURLs string) (io.WriteCloser, error) {
|
||||
// TODO(reed): we should likely add a trace per conn, need to plumb tagging better
|
||||
ctx, span := trace.StartSpan(ctx, "syslog_conns")
|
||||
defer span.End()
|
||||
|
||||
if len(syslogURLs) == 0 {
|
||||
return nullReadWriter{}, nil
|
||||
}
|
||||
|
||||
// gather all the conns, re-use the line we make in the syslogWriter
|
||||
// to write the same bytes to each of the conns.
|
||||
var conns []io.WriteCloser
|
||||
var errs []error
|
||||
|
||||
sinks := strings.Split(syslogURLs, ",")
|
||||
for _, s := range sinks {
|
||||
conn, err := dialSyslog(ctx, strings.TrimSpace(s))
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to setup remote syslog connection to %v: %v", s, err))
|
||||
continue
|
||||
}
|
||||
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
|
||||
// do this before checking length of conns
|
||||
var err error
|
||||
if len(errs) > 0 {
|
||||
for _, e := range errs {
|
||||
err = fmt.Errorf("%v%v, ", err, e)
|
||||
}
|
||||
}
|
||||
|
||||
if len(conns) == 0 {
|
||||
return nullReadWriter{}, err
|
||||
}
|
||||
|
||||
return multiWriteCloser(conns), err
|
||||
}
|
||||
|
||||
func dialSyslog(ctx context.Context, syslogURL string) (io.WriteCloser, error) {
|
||||
url, err := url.Parse(syslogURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
common.Logger(ctx).WithField("syslog_url", url).Debug("dialing syslog url")
|
||||
|
||||
var dialer net.Dialer
|
||||
deadline, ok := ctx.Deadline()
|
||||
if ok {
|
||||
dialer.Deadline = deadline
|
||||
}
|
||||
|
||||
// slice off 'xxx://' and dial it
|
||||
switch url.Scheme {
|
||||
case "udp", "tcp":
|
||||
return dialer.Dial(url.Scheme, syslogURL[6:])
|
||||
case "tls":
|
||||
return tls.DialWithDialer(&dialer, "tcp", syslogURL[6:], nil)
|
||||
default:
|
||||
return nil, fmt.Errorf("Unsupported scheme, please use {tcp|udp|tls}: %s: ", url.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
// syslogWriter prepends a syslog format with call-specific details
|
||||
// for each data segment provided in Write(). This doesn't use
|
||||
// log/syslog pkg because we do not need pid for every line (expensive),
|
||||
// and we have a format that is easier to read than hiding in preamble.
|
||||
// this writes logfmt formatted syslog with values for call, function, and
|
||||
// app, it is up to the user to use logfmt from their functions to get a
|
||||
// fully formatted line out.
|
||||
// TODO not pressing, but we could support json & other formats, too, upon request.
|
||||
type syslogWriter struct {
|
||||
pres []byte
|
||||
post []byte
|
||||
b *bytes.Buffer
|
||||
clock func() time.Time
|
||||
|
||||
// the syslog conns (presumably)
|
||||
io.Writer
|
||||
}
|
||||
|
||||
const severityMask = 0x07
|
||||
const facilityMask = 0xf8
|
||||
|
||||
func newSyslogWriter(call, function, app string, severity syslog.Priority, wc io.Writer, buf *bytes.Buffer) *syslogWriter {
|
||||
// Facility = LOG_USER
|
||||
pr := (syslog.LOG_USER & facilityMask) | (severity & severityMask)
|
||||
|
||||
// <priority>VERSION ISOTIMESTAMP HOSTNAME APPLICATION PID MESSAGEID STRUCTURED-DATA MSG
|
||||
//
|
||||
// and for us:
|
||||
// <22>2 ISOTIMESTAMP fn appID funcName callID - MSG
|
||||
// ex:
|
||||
//<11>2 2018-02-31T07:42:21Z Fn - - - - call_id=123 func_name=rdallman/yodawg app_id=123 loggo hereo
|
||||
|
||||
// TODO we could use json for structured data and do that whole thing. up to whoever.
|
||||
return &syslogWriter{
|
||||
pres: []byte(fmt.Sprintf(`<%d>2`, pr)),
|
||||
post: []byte(fmt.Sprintf(`fn - - - - call_id=%s func_name=%s app_id=%s `, call, function, app)),
|
||||
b: buf,
|
||||
Writer: wc,
|
||||
clock: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
func (sw *syslogWriter) Write(p []byte) (int, error) {
|
||||
// re-use buffer to write in timestamp hodge podge and reduce writes to
|
||||
// the conn by buffering a whole line here before writing to conn.
|
||||
|
||||
buf := sw.b
|
||||
buf.Reset()
|
||||
buf.Write(sw.pres)
|
||||
buf.WriteString(" ")
|
||||
buf.WriteString(sw.clock().UTC().Format(time.RFC3339))
|
||||
buf.WriteString(" ")
|
||||
buf.Write(sw.post)
|
||||
buf.Write(p)
|
||||
n, err := io.Copy(sw.Writer, buf)
|
||||
return int(n), err
|
||||
}
|
||||
29
api/agent/syslog_test.go
Normal file
29
api/agent/syslog_test.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log/syslog"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSyslogFormat(t *testing.T) {
|
||||
var b1 bytes.Buffer
|
||||
var b2 bytes.Buffer
|
||||
|
||||
call := "12345"
|
||||
fn := "yo/dawg"
|
||||
app := "sup"
|
||||
now := time.Date(1982, 6, 25, 12, 0, 0, 0, time.UTC)
|
||||
clock := func() time.Time { return now }
|
||||
|
||||
writer := newSyslogWriter(call, fn, app, syslog.LOG_ERR, &nopCloser{&b1}, &b2)
|
||||
writer.clock = clock
|
||||
writer.Write([]byte("yo"))
|
||||
|
||||
gold := `<11>2 1982-06-25T12:00:00Z fn - - - - call_id=12345 func_name=yo/dawg app_id=sup yo`
|
||||
|
||||
if b1.String() != gold {
|
||||
t.Fatal("syslog was not what we expected: ", b1.String())
|
||||
}
|
||||
}
|
||||
27
api/datastore/sql/migrations/13_add_syslogurl_app.go
Normal file
27
api/datastore/sql/migrations/13_add_syslogurl_app.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fnproject/fn/api/datastore/sql/migratex"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func up13(ctx context.Context, tx *sqlx.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "ALTER TABLE apps ADD syslog_url TEXT;")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func down13(ctx context.Context, tx *sqlx.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "ALTER TABLE apps DROP COLUMN syslog_url;")
|
||||
return err
|
||||
}
|
||||
|
||||
func init() {
|
||||
Migrations = append(Migrations, &migratex.MigFields{
|
||||
VersionFunc: vfunc(13),
|
||||
UpFunc: up13,
|
||||
DownFunc: down13,
|
||||
})
|
||||
}
|
||||
@@ -62,6 +62,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
|
||||
name varchar(256) NOT NULL PRIMARY KEY,
|
||||
config text NOT NULL,
|
||||
annotations text NOT NULL,
|
||||
syslog_url text,
|
||||
created_at varchar(256),
|
||||
updated_at varchar(256)
|
||||
);`,
|
||||
@@ -89,7 +90,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
|
||||
const (
|
||||
routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, headers, config, annotations, created_at, updated_at FROM routes`
|
||||
callSelector = `SELECT id, created_at, started_at, completed_at, status, app_id, path, stats, error FROM calls`
|
||||
appIDSelector = `SELECT id, name, config, annotations, created_at, updated_at FROM apps WHERE id=?`
|
||||
appIDSelector = `SELECT id, name, config, annotations, syslog_url, created_at, updated_at FROM apps WHERE id=?`
|
||||
ensureAppSelector = `SELECT id FROM apps WHERE name=?`
|
||||
|
||||
EnvDBPingMaxRetries = "FN_DS_DB_PING_MAX_RETRIES"
|
||||
@@ -333,6 +334,7 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App
|
||||
name,
|
||||
config,
|
||||
annotations,
|
||||
syslog_url,
|
||||
created_at,
|
||||
updated_at
|
||||
)
|
||||
@@ -341,6 +343,7 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App
|
||||
:name,
|
||||
:config,
|
||||
:annotations,
|
||||
:syslog_url,
|
||||
:created_at,
|
||||
:updated_at
|
||||
);`)
|
||||
@@ -389,7 +392,7 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
|
||||
return err
|
||||
}
|
||||
|
||||
query = tx.Rebind(`UPDATE apps SET config=:config, annotations=:annotations, updated_at=:updated_at WHERE name=:name`)
|
||||
query = tx.Rebind(`UPDATE apps SET config=:config, annotations=:annotations, syslog_url=:syslog_url, updated_at=:updated_at WHERE name=:name`)
|
||||
res, err := tx.NamedExecContext(ctx, query, app)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -466,7 +469,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config, annotations, created_at, updated_at FROM apps %s", query))
|
||||
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config, annotations, syslog_url, created_at, updated_at FROM apps %s", query))
|
||||
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
@@ -13,6 +17,7 @@ type App struct {
|
||||
Name string `json:"name" db:"name"`
|
||||
Config Config `json:"config,omitempty" db:"config"`
|
||||
Annotations Annotations `json:"annotations,omitempty" db:"annotations"`
|
||||
SyslogURL *string `json:"syslog_url,omitempty" db:"syslog_url"`
|
||||
CreatedAt strfmt.DateTime `json:"created_at,omitempty" db:"created_at"`
|
||||
UpdatedAt strfmt.DateTime `json:"updated_at,omitempty" db:"updated_at"`
|
||||
}
|
||||
@@ -49,6 +54,24 @@ func (a *App) Validate() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.SyslogURL != nil && *a.SyslogURL != "" {
|
||||
sinks := strings.Split(*a.SyslogURL, ",")
|
||||
for _, s := range sinks {
|
||||
url, err := url.Parse(strings.TrimSpace(s))
|
||||
fail := err != nil
|
||||
if !fail {
|
||||
switch url.Scheme {
|
||||
case "udp", "tcp", "tls":
|
||||
default:
|
||||
fail = true
|
||||
}
|
||||
}
|
||||
if fail {
|
||||
return ErrInvalidSyslog(fmt.Sprintf(`invalid syslog url: "%v"`, s))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -100,6 +123,14 @@ func (a *App) Update(patch *App) {
|
||||
}
|
||||
}
|
||||
|
||||
if patch.SyslogURL != nil {
|
||||
if *patch.SyslogURL == "" {
|
||||
a.SyslogURL = nil // hides it from jason
|
||||
} else {
|
||||
a.SyslogURL = patch.SyslogURL
|
||||
}
|
||||
}
|
||||
|
||||
a.Annotations = a.Annotations.MergeChange(patch.Annotations)
|
||||
|
||||
if !a.Equals(original) {
|
||||
@@ -107,6 +138,13 @@ func (a *App) Update(patch *App) {
|
||||
}
|
||||
}
|
||||
|
||||
var _ APIError = ErrInvalidSyslog("")
|
||||
|
||||
type ErrInvalidSyslog string
|
||||
|
||||
func (e ErrInvalidSyslog) Code() int { return http.StatusBadRequest }
|
||||
func (e ErrInvalidSyslog) Error() string { return string(e) }
|
||||
|
||||
// AppFilter is the filter used for querying apps
|
||||
type AppFilter struct {
|
||||
Name string
|
||||
|
||||
@@ -125,6 +125,9 @@ type Call struct {
|
||||
// Headers are headers from the request that created this call
|
||||
Headers http.Header `json:"headers,omitempty" db:"-"`
|
||||
|
||||
// SyslogURL is a syslog URL to send all logs to.
|
||||
SyslogURL string `json:"syslog_url,omitempty" db:"-"`
|
||||
|
||||
// Time when call completed, whether it was successul or failed. Always in UTC.
|
||||
CompletedAt strfmt.DateTime `json:"completed_at,omitempty" db:"completed_at"`
|
||||
|
||||
@@ -140,6 +143,7 @@ type Call struct {
|
||||
// Error is the reason why the call failed, it is only non-empty if
|
||||
// status is equal to "error".
|
||||
Error string `json:"error,omitempty" db:"error"`
|
||||
|
||||
// App this call belongs to.
|
||||
AppID string `json:"app_id" db:"app_id"`
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
@@ -54,9 +55,12 @@ func TestAppCreate(t *testing.T) {
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "&&%@!#$#@$" } }`, http.StatusBadRequest, models.ErrAppsInvalidName},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "app", "annotations" : { "":"val" }}}`, http.StatusBadRequest, models.ErrInvalidAnnotationKey},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "app", "annotations" : { "key":"" }}}`, http.StatusBadRequest, models.ErrInvalidAnnotationValue},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "app", "syslog_url":"yo"}}`, http.StatusBadRequest, errors.New(`invalid syslog url: "yo"`)},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "app", "syslog_url":"yo://sup.com:1"}}`, http.StatusBadRequest, errors.New(`invalid syslog url: "yo://sup.com:1"`)},
|
||||
// success
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" , "annotations": {"k1":"v1", "k2":[]}}}`, http.StatusOK, nil},
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste", "syslog_url":"tcp://example.com:443" } }`, http.StatusOK, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull)
|
||||
@@ -297,6 +301,9 @@ func TestAppUpdate(t *testing.T) {
|
||||
|
||||
// success
|
||||
{ds, logs.NewMock(), "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil},
|
||||
|
||||
// success
|
||||
{ds, logs.NewMock(), "/v1/apps/myapp", `{ "app": { "syslog_url":"tcp://example.com:443" } }`, http.StatusOK, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, ServerTypeFull)
|
||||
|
||||
@@ -24,9 +24,30 @@ Note the easily searchable `call_id=x` format.
|
||||
call_id=477949e2-922c-5da9-8633-0b2887b79f6e
|
||||
```
|
||||
|
||||
## Metrics
|
||||
## Remote syslog for functions
|
||||
|
||||
Metrics are emitted via the logs.
|
||||
You may add a syslog url to any function application and all functions that
|
||||
exist under that application will ship all of their logs to it. You may
|
||||
provide a comma separated list, if desired. Currently, we support `tcp`,
|
||||
`udp`, and `tls`, and this will not work if behind a proxy [yet?] (this is my
|
||||
life now). This feature only works for 'hot' functions.
|
||||
|
||||
See [Metrics](metrics.md) doc for more information.
|
||||
An example syslog url is:
|
||||
|
||||
```
|
||||
tls://logs.papertrailapp.com:1
|
||||
```
|
||||
|
||||
We log in a syslog format, with some variables added in logfmt format. If you
|
||||
find logfmt format offensive, please open an issue and we will consider adding
|
||||
more formats (or open a PR that does it, with tests, and you will receive 1
|
||||
free cookie along with the feature you want). The logs from the functions
|
||||
themselves are not formatted, only our pre-amble, thus, if you'd like a fully
|
||||
logfmt line, you must use a logfmt logger to log from your function.
|
||||
|
||||
* All log lines are sent as level error w/ the current time and `fn` as hostname.
|
||||
* call_id, func_name, and app_id will prefix every log line.
|
||||
|
||||
```
|
||||
<11>2 1982-06-25T12:00:00Z fn - - - - call_id=12345 func_name=yo/yo app_id=54321 this is your log line
|
||||
```
|
||||
|
||||
@@ -559,6 +559,9 @@ definitions:
|
||||
description: Application annotations - this is a map of annotations attached to this app, keys must not exceed 128 bytes and must consist of non-whitespace printable ascii characters, and the seralized representation of individual values must not exeed 512 bytes
|
||||
additionalProperties:
|
||||
type: object
|
||||
syslog_url:
|
||||
type: string
|
||||
description: A comma separated list of syslog urls to send all function logs to. supports tls, udp or tcp. e.g. tls://logs.papertrailapp.com:1
|
||||
created_at:
|
||||
type: string
|
||||
format: date-time
|
||||
|
||||
Reference in New Issue
Block a user