Changes on function/metric loggers (#343)

* initial fix logger

* dix DefaultFuncLogger

* fix runner and tests

* reverting: sending async task stdout to func logger
This commit is contained in:
Pedro Nasser
2016-11-27 16:36:40 -02:00
committed by GitHub
parent b62a2304b0
commit 867eb4b176
9 changed files with 61 additions and 51 deletions

View File

@@ -42,7 +42,6 @@ func getTask(ctx context.Context, url string) (*models.Task, error) {
func getCfg(task *models.Task) *Config { func getCfg(task *models.Task) *Config {
// TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := NewFuncLogger(task.AppName, task.Path, *task.Image, task.ID) // TODO: missing path here, how do i get that?
if task.Timeout == nil { if task.Timeout == nil {
timeout := int32(30) timeout := int32(30)
task.Timeout = &timeout task.Timeout = &timeout
@@ -52,8 +51,6 @@ func getCfg(task *models.Task) *Config {
Timeout: time.Duration(*task.Timeout) * time.Second, Timeout: time.Duration(*task.Timeout) * time.Second,
ID: task.ID, ID: task.ID,
AppName: task.AppName, AppName: task.AppName,
Stdout: stderr,
Stderr: stderr,
Env: task.EnvVars, Env: task.EnvVars,
} }
return cfg return cfg

View File

@@ -189,7 +189,7 @@ func TestTasksrvURL(t *testing.T) {
} }
func testRunner(t *testing.T) *Runner { func testRunner(t *testing.T) *Runner {
r, err := New(NewMetricLogger()) r, err := New(NewFuncLogger(), NewMetricLogger())
if err != nil { if err != nil {
t.Fatal("Test: failed to create new runner") t.Fatal("Test: failed to create new runner")
} }

View File

@@ -4,22 +4,29 @@ import (
"bufio" "bufio"
"io" "io"
"context"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/iron-io/runner/common"
) )
// FuncLogger reads STDERR output from a container and outputs it in a parseable structured log format, see: https://github.com/iron-io/functions/issues/76 type FuncLogger interface {
type FuncLogger struct { Writer(context.Context, string, string, string, string) io.Writer
r io.Reader
w io.Writer
} }
func NewFuncLogger(appName, path, function, requestID string) io.Writer { // FuncLogger reads STDERR output from a container and outputs it in a parseable structured log format, see: https://github.com/iron-io/functions/issues/76
type DefaultFuncLogger struct {
}
func NewFuncLogger() FuncLogger {
return &DefaultFuncLogger{}
}
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.Writer {
r, w := io.Pipe() r, w := io.Pipe()
funcLogger := &FuncLogger{
r: r, log := common.Logger(ctx)
w: w, log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "image": image, "call_id": reqID})
}
log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "call_id": requestID})
go func(reader io.Reader) { go func(reader io.Reader) {
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
for scanner.Scan() { for scanner.Scan() {
@@ -29,9 +36,6 @@ func NewFuncLogger(appName, path, function, requestID string) io.Writer {
log.WithError(err).Println("There was an error with the scanner in attached container") log.WithError(err).Println("There was an error with the scanner in attached container")
} }
}(r) }(r)
return funcLogger
}
func (l *FuncLogger) Write(p []byte) (n int, err error) { return w
return l.w.Write(p)
} }

View File

@@ -8,7 +8,7 @@ import (
"github.com/iron-io/runner/common" "github.com/iron-io/runner/common"
) )
type Logger interface { type MetricLogger interface {
Log(context.Context, map[string]interface{}) Log(context.Context, map[string]interface{})
LogCount(context.Context, string, int) LogCount(context.Context, string, int)
LogGauge(context.Context, string, int) LogGauge(context.Context, string, int)
@@ -17,18 +17,18 @@ type Logger interface {
type Metric map[string]interface{} type Metric map[string]interface{}
func NewMetricLogger() *MetricLogger { func NewMetricLogger() MetricLogger {
return &MetricLogger{} return &DefaultMetricLogger{}
} }
type MetricLogger struct{} type DefaultMetricLogger struct{}
func (l *MetricLogger) Log(ctx context.Context, metric map[string]interface{}) { func (l *DefaultMetricLogger) Log(ctx context.Context, metric map[string]interface{}) {
log := common.Logger(ctx) log := common.Logger(ctx)
log.WithFields(logrus.Fields(metric)).Info() log.WithFields(logrus.Fields(metric)).Info()
} }
func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) { func (l *DefaultMetricLogger) LogCount(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{ l.Log(ctx, Metric{
"name": name, "name": name,
"value": value, "value": value,
@@ -36,7 +36,7 @@ func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) {
}) })
} }
func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Duration) { func (l *DefaultMetricLogger) LogTime(ctx context.Context, name string, value time.Duration) {
l.Log(ctx, Metric{ l.Log(ctx, Metric{
"name": name, "name": name,
"value": value, "value": value,
@@ -44,7 +44,7 @@ func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Dura
}) })
} }
func (l *MetricLogger) LogGauge(ctx context.Context, name string, value int) { func (l *DefaultMetricLogger) LogGauge(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{ l.Log(ctx, Metric{
"name": name, "name": name,
"value": value, "value": value,

View File

@@ -27,6 +27,7 @@ type Config struct {
Image string Image string
Timeout time.Duration Timeout time.Duration
AppName string AppName string
Path string
Memory uint64 Memory uint64
Env map[string]string Env map[string]string
Stdin io.Reader Stdin io.Reader
@@ -37,7 +38,8 @@ type Config struct {
type Runner struct { type Runner struct {
driver drivers.Driver driver drivers.Driver
taskQueue chan *containerTask taskQueue chan *containerTask
ml Logger mlog MetricLogger
flog FuncLogger
availableMem int64 availableMem int64
usedMem int64 usedMem int64
usedMemMutex sync.RWMutex usedMemMutex sync.RWMutex
@@ -50,7 +52,7 @@ var (
WaitMemoryTimeout = 10 * time.Second WaitMemoryTimeout = 10 * time.Second
) )
func New(metricLogger Logger) (*Runner, error) { func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) {
// TODO: Is this really required for the container drivers? Can we remove it? // TODO: Is this really required for the container drivers? Can we remove it?
env := common.NewEnvironment(func(e *common.Environment) {}) env := common.NewEnvironment(func(e *common.Environment) {})
@@ -63,7 +65,8 @@ func New(metricLogger Logger) (*Runner, error) {
r := &Runner{ r := &Runner{
driver: driver, driver: driver,
taskQueue: make(chan *containerTask, 100), taskQueue: make(chan *containerTask, 100),
ml: metricLogger, flog: flog,
mlog: mlog,
availableMem: getAvailableMemory(), availableMem: getAvailableMemory(),
usedMem: 0, usedMem: 0,
} }
@@ -100,12 +103,12 @@ func (r *Runner) queueHandler() {
} }
metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName)
r.ml.LogTime(task.ctx, metricBaseName+"wait_time", waitTime) r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime)
r.ml.LogTime(task.ctx, "run.wait_time", waitTime) r.mlog.LogTime(task.ctx, "run.wait_time", waitTime)
if timedOut { if timedOut {
// Send to a signal to this task saying it cannot run // Send to a signal to this task saying it cannot run
r.ml.LogCount(task.ctx, metricBaseName+"timeout", 1) r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1)
task.canRun <- false task.canRun <- false
continue continue
} }
@@ -159,6 +162,11 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
cfg.Memory = 128 cfg.Memory = 128
} }
cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
if cfg.Stdout == nil {
cfg.Stdout = cfg.Stderr
}
ctask := &containerTask{ ctask := &containerTask{
ctx: ctx, ctx: ctx,
cfg: cfg, cfg: cfg,
@@ -166,7 +174,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
} }
metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName) metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName)
r.ml.LogCount(ctx, metricBaseName+"requests", 1) r.mlog.LogCount(ctx, metricBaseName+"requests", 1)
// Check if has enough available memory // Check if has enough available memory
// If available, use it // If available, use it
@@ -176,7 +184,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
case r.taskQueue <- ctask: case r.taskQueue <- ctask:
default: default:
// If queue is full, return error // If queue is full, return error
r.ml.LogCount(ctx, "queue.full", 1) r.mlog.LogCount(ctx, "queue.full", 1)
return nil, ErrFullQueue return nil, ErrFullQueue
} }
@@ -186,7 +194,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
return nil, ErrTimeOutNoMemory return nil, ErrTimeOutNoMemory
} }
} else { } else {
r.ml.LogTime(ctx, metricBaseName+"waittime", 0) r.mlog.LogTime(ctx, metricBaseName+"waittime", 0)
} }
defer r.addUsedMem(-1 * int64(cfg.Memory)) defer r.addUsedMem(-1 * int64(cfg.Memory))
@@ -204,14 +212,14 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
} }
if result.Status() == "success" { if result.Status() == "success" {
r.ml.LogCount(ctx, metricBaseName+"succeeded", 1) r.mlog.LogCount(ctx, metricBaseName+"succeeded", 1)
} else { } else {
r.ml.LogCount(ctx, metricBaseName+"error", 1) r.mlog.LogCount(ctx, metricBaseName+"error", 1)
} }
metricElapsed := time.Since(metricStart) metricElapsed := time.Since(metricStart)
r.ml.LogTime(ctx, metricBaseName+"time", metricElapsed) r.mlog.LogTime(ctx, metricBaseName+"time", metricElapsed)
r.ml.LogTime(ctx, "run.exec_time", metricElapsed) r.mlog.LogTime(ctx, "run.exec_time", metricElapsed)
return result, nil return result, nil
} }

View File

@@ -13,7 +13,7 @@ import (
func TestRunnerHello(t *testing.T) { func TestRunnerHello(t *testing.T) {
buf := setLogBuffer() buf := setLogBuffer()
runner, err := New(NewMetricLogger()) runner, err := New(NewFuncLogger(), NewMetricLogger())
if err != nil { if err != nil {
t.Fatalf("Test error during New() - %s", err) t.Fatalf("Test error during New() - %s", err)
} }
@@ -66,7 +66,7 @@ func TestRunnerHello(t *testing.T) {
func TestRunnerError(t *testing.T) { func TestRunnerError(t *testing.T) {
t.Skip() t.Skip()
buf := setLogBuffer() buf := setLogBuffer()
runner, err := New(NewMetricLogger()) runner, err := New(NewFuncLogger(), NewMetricLogger())
if err != nil { if err != nil {
t.Fatalf("Test error during New() - %s", err) t.Fatalf("Test error during New() - %s", err)
} }

View File

@@ -90,7 +90,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on LRU cache") log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on LRU cache")
route, ok := s.cacheget(appName, path) route, ok := s.cacheget(appName, path)
if ok && s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) { if ok && s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
s.refreshcache(appName, route) s.refreshcache(appName, route)
return return
} }
@@ -113,7 +113,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
route = routes[0] route = routes[0]
log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image}) log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image})
if s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) { if s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
s.refreshcache(appName, route) s.refreshcache(appName, route)
return return
} }
@@ -132,8 +132,9 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]*
return resp.([]*models.Route), err return resp.([]*models.Route), err
} }
func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { // TODO: Should remove *gin.Context from these functions, should use only context.Context
log = log.WithFields(logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image})
params, match := matchRoute(found.Path, route) params, match := matchRoute(found.Path, route)
if !match { if !match {
@@ -141,7 +142,6 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f
} }
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := runner.NewFuncLogger(appName, route, found.Image, reqID)
envVars := map[string]string{ envVars := map[string]string{
"METHOD": c.Request.Method, "METHOD": c.Request.Method,
@@ -173,7 +173,6 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f
ID: reqID, ID: reqID,
AppName: appName, AppName: appName,
Stdout: &stdout, Stdout: &stdout,
Stderr: stderr,
Env: envVars, Env: envVars,
Memory: found.Memory, Memory: found.Memory,
Stdin: payload, Stdin: payload,
@@ -205,7 +204,7 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f
c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID})
default: default:
result, err := runner.RunTask(s.tasks, c, cfg) result, err := runner.RunTask(s.tasks, ctx, cfg)
if err != nil { if err != nil {
break break
} }

View File

@@ -14,7 +14,7 @@ import (
) )
func testRunner(t *testing.T) *runner.Runner { func testRunner(t *testing.T) *runner.Runner {
r, err := runner.New(runner.NewMetricLogger()) r, err := runner.New(runner.NewFuncLogger(), runner.NewMetricLogger())
if err != nil { if err != nil {
t.Fatal("Test: failed to create new runner") t.Fatal("Test: failed to create new runner")
} }

View File

@@ -68,9 +68,11 @@ func main() {
if err != nil { if err != nil {
log.WithError(err).Fatal("Error on init MQ") log.WithError(err).Fatal("Error on init MQ")
} }
metricLogger := runner.NewMetricLogger()
rnr, err := runner.New(metricLogger) metricLogger := runner.NewMetricLogger()
funcLogger := runner.NewFuncLogger()
rnr, err := runner.New(funcLogger, metricLogger)
if err != nil { if err != nil {
log.WithError(err).Fatalln("Failed to create a runner") log.WithError(err).Fatalln("Failed to create a runner")
} }