From 867eb4b176d74a00328064ce9e25c83f7bc1bee6 Mon Sep 17 00:00:00 2001 From: Pedro Nasser Date: Sun, 27 Nov 2016 16:36:40 -0200 Subject: [PATCH] Changes on function/metric loggers (#343) * initial fix logger * dix DefaultFuncLogger * fix runner and tests * reverting: sending async task stdout to func logger --- api/runner/async_runner.go | 3 -- api/runner/async_runner_test.go | 2 +- api/runner/{logger.go => func_logger.go} | 32 ++++++++++--------- api/runner/{metrics.go => metric_logger.go} | 16 +++++----- api/runner/runner.go | 34 +++++++++++++-------- api/runner/runner_test.go | 4 +-- api/server/runner.go | 13 ++++---- api/server/runner_test.go | 2 +- main.go | 6 ++-- 9 files changed, 61 insertions(+), 51 deletions(-) rename api/runner/{logger.go => func_logger.go} (50%) rename api/runner/{metrics.go => metric_logger.go} (57%) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 6aa02d576..1e3257f9d 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -42,7 +42,6 @@ func getTask(ctx context.Context, url string) (*models.Task, error) { func getCfg(task *models.Task) *Config { // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader - stderr := NewFuncLogger(task.AppName, task.Path, *task.Image, task.ID) // TODO: missing path here, how do i get that? if task.Timeout == nil { timeout := int32(30) task.Timeout = &timeout @@ -52,8 +51,6 @@ func getCfg(task *models.Task) *Config { Timeout: time.Duration(*task.Timeout) * time.Second, ID: task.ID, AppName: task.AppName, - Stdout: stderr, - Stderr: stderr, Env: task.EnvVars, } return cfg diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 9b7584553..3ea936db9 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -189,7 +189,7 @@ func TestTasksrvURL(t *testing.T) { } func testRunner(t *testing.T) *Runner { - r, err := New(NewMetricLogger()) + r, err := New(NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatal("Test: failed to create new runner") } diff --git a/api/runner/logger.go b/api/runner/func_logger.go similarity index 50% rename from api/runner/logger.go rename to api/runner/func_logger.go index 52a39deee..88602b877 100644 --- a/api/runner/logger.go +++ b/api/runner/func_logger.go @@ -4,22 +4,29 @@ import ( "bufio" "io" + "context" "github.com/Sirupsen/logrus" + "github.com/iron-io/runner/common" ) -// FuncLogger reads STDERR output from a container and outputs it in a parseable structured log format, see: https://github.com/iron-io/functions/issues/76 -type FuncLogger struct { - r io.Reader - w io.Writer +type FuncLogger interface { + Writer(context.Context, string, string, string, string) io.Writer } -func NewFuncLogger(appName, path, function, requestID string) io.Writer { +// FuncLogger reads STDERR output from a container and outputs it in a parseable structured log format, see: https://github.com/iron-io/functions/issues/76 +type DefaultFuncLogger struct { +} + +func NewFuncLogger() FuncLogger { + return &DefaultFuncLogger{} +} + +func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.Writer { r, w := io.Pipe() - funcLogger := &FuncLogger{ - r: r, - w: w, - } - log := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "function": function, "call_id": requestID}) + + log := common.Logger(ctx) + log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "image": image, "call_id": reqID}) + go func(reader io.Reader) { scanner := bufio.NewScanner(reader) for scanner.Scan() { @@ -29,9 +36,6 @@ func NewFuncLogger(appName, path, function, requestID string) io.Writer { log.WithError(err).Println("There was an error with the scanner in attached container") } }(r) - return funcLogger -} -func (l *FuncLogger) Write(p []byte) (n int, err error) { - return l.w.Write(p) + return w } diff --git a/api/runner/metrics.go b/api/runner/metric_logger.go similarity index 57% rename from api/runner/metrics.go rename to api/runner/metric_logger.go index dd9b7949f..75d7cb08a 100644 --- a/api/runner/metrics.go +++ b/api/runner/metric_logger.go @@ -8,7 +8,7 @@ import ( "github.com/iron-io/runner/common" ) -type Logger interface { +type MetricLogger interface { Log(context.Context, map[string]interface{}) LogCount(context.Context, string, int) LogGauge(context.Context, string, int) @@ -17,18 +17,18 @@ type Logger interface { type Metric map[string]interface{} -func NewMetricLogger() *MetricLogger { - return &MetricLogger{} +func NewMetricLogger() MetricLogger { + return &DefaultMetricLogger{} } -type MetricLogger struct{} +type DefaultMetricLogger struct{} -func (l *MetricLogger) Log(ctx context.Context, metric map[string]interface{}) { +func (l *DefaultMetricLogger) Log(ctx context.Context, metric map[string]interface{}) { log := common.Logger(ctx) log.WithFields(logrus.Fields(metric)).Info() } -func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) { +func (l *DefaultMetricLogger) LogCount(ctx context.Context, name string, value int) { l.Log(ctx, Metric{ "name": name, "value": value, @@ -36,7 +36,7 @@ func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) { }) } -func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Duration) { +func (l *DefaultMetricLogger) LogTime(ctx context.Context, name string, value time.Duration) { l.Log(ctx, Metric{ "name": name, "value": value, @@ -44,7 +44,7 @@ func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Dura }) } -func (l *MetricLogger) LogGauge(ctx context.Context, name string, value int) { +func (l *DefaultMetricLogger) LogGauge(ctx context.Context, name string, value int) { l.Log(ctx, Metric{ "name": name, "value": value, diff --git a/api/runner/runner.go b/api/runner/runner.go index aa88f04ad..48571e618 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -27,6 +27,7 @@ type Config struct { Image string Timeout time.Duration AppName string + Path string Memory uint64 Env map[string]string Stdin io.Reader @@ -37,7 +38,8 @@ type Config struct { type Runner struct { driver drivers.Driver taskQueue chan *containerTask - ml Logger + mlog MetricLogger + flog FuncLogger availableMem int64 usedMem int64 usedMemMutex sync.RWMutex @@ -50,7 +52,7 @@ var ( WaitMemoryTimeout = 10 * time.Second ) -func New(metricLogger Logger) (*Runner, error) { +func New(flog FuncLogger, mlog MetricLogger) (*Runner, error) { // TODO: Is this really required for the container drivers? Can we remove it? env := common.NewEnvironment(func(e *common.Environment) {}) @@ -63,7 +65,8 @@ func New(metricLogger Logger) (*Runner, error) { r := &Runner{ driver: driver, taskQueue: make(chan *containerTask, 100), - ml: metricLogger, + flog: flog, + mlog: mlog, availableMem: getAvailableMemory(), usedMem: 0, } @@ -100,12 +103,12 @@ func (r *Runner) queueHandler() { } metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) - r.ml.LogTime(task.ctx, metricBaseName+"wait_time", waitTime) - r.ml.LogTime(task.ctx, "run.wait_time", waitTime) + r.mlog.LogTime(task.ctx, metricBaseName+"wait_time", waitTime) + r.mlog.LogTime(task.ctx, "run.wait_time", waitTime) if timedOut { // Send to a signal to this task saying it cannot run - r.ml.LogCount(task.ctx, metricBaseName+"timeout", 1) + r.mlog.LogCount(task.ctx, metricBaseName+"timeout", 1) task.canRun <- false continue } @@ -159,6 +162,11 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error cfg.Memory = 128 } + cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID) + if cfg.Stdout == nil { + cfg.Stdout = cfg.Stderr + } + ctask := &containerTask{ ctx: ctx, cfg: cfg, @@ -166,7 +174,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error } metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName) - r.ml.LogCount(ctx, metricBaseName+"requests", 1) + r.mlog.LogCount(ctx, metricBaseName+"requests", 1) // Check if has enough available memory // If available, use it @@ -176,7 +184,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error case r.taskQueue <- ctask: default: // If queue is full, return error - r.ml.LogCount(ctx, "queue.full", 1) + r.mlog.LogCount(ctx, "queue.full", 1) return nil, ErrFullQueue } @@ -186,7 +194,7 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error return nil, ErrTimeOutNoMemory } } else { - r.ml.LogTime(ctx, metricBaseName+"waittime", 0) + r.mlog.LogTime(ctx, metricBaseName+"waittime", 0) } defer r.addUsedMem(-1 * int64(cfg.Memory)) @@ -204,14 +212,14 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error } if result.Status() == "success" { - r.ml.LogCount(ctx, metricBaseName+"succeeded", 1) + r.mlog.LogCount(ctx, metricBaseName+"succeeded", 1) } else { - r.ml.LogCount(ctx, metricBaseName+"error", 1) + r.mlog.LogCount(ctx, metricBaseName+"error", 1) } metricElapsed := time.Since(metricStart) - r.ml.LogTime(ctx, metricBaseName+"time", metricElapsed) - r.ml.LogTime(ctx, "run.exec_time", metricElapsed) + r.mlog.LogTime(ctx, metricBaseName+"time", metricElapsed) + r.mlog.LogTime(ctx, "run.exec_time", metricElapsed) return result, nil } diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index dedc05bcb..2483e874c 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -13,7 +13,7 @@ import ( func TestRunnerHello(t *testing.T) { buf := setLogBuffer() - runner, err := New(NewMetricLogger()) + runner, err := New(NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } @@ -66,7 +66,7 @@ func TestRunnerHello(t *testing.T) { func TestRunnerError(t *testing.T) { t.Skip() buf := setLogBuffer() - runner, err := New(NewMetricLogger()) + runner, err := New(NewFuncLogger(), NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } diff --git a/api/server/runner.go b/api/server/runner.go index 0707c68b4..1583a1229 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -90,7 +90,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) { log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on LRU cache") route, ok := s.cacheget(appName, path) - if ok && s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) { + if ok && s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) { s.refreshcache(appName, route) return } @@ -113,7 +113,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) { route = routes[0] log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image}) - if s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) { + if s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) { s.refreshcache(appName, route) return } @@ -132,8 +132,9 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]* return resp.([]*models.Route), err } -func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { - log = log.WithFields(logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) +// TODO: Should remove *gin.Context from these functions, should use only context.Context +func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) params, match := matchRoute(found.Path, route) if !match { @@ -141,7 +142,6 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f } var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader - stderr := runner.NewFuncLogger(appName, route, found.Image, reqID) envVars := map[string]string{ "METHOD": c.Request.Method, @@ -173,7 +173,6 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f ID: reqID, AppName: appName, Stdout: &stdout, - Stderr: stderr, Env: envVars, Memory: found.Memory, Stdin: payload, @@ -205,7 +204,7 @@ func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, f c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) default: - result, err := runner.RunTask(s.tasks, c, cfg) + result, err := runner.RunTask(s.tasks, ctx, cfg) if err != nil { break } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 1878716aa..db262f605 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -14,7 +14,7 @@ import ( ) func testRunner(t *testing.T) *runner.Runner { - r, err := runner.New(runner.NewMetricLogger()) + r, err := runner.New(runner.NewFuncLogger(), runner.NewMetricLogger()) if err != nil { t.Fatal("Test: failed to create new runner") } diff --git a/main.go b/main.go index 25cb9fd09..bc1694e56 100644 --- a/main.go +++ b/main.go @@ -68,9 +68,11 @@ func main() { if err != nil { log.WithError(err).Fatal("Error on init MQ") } - metricLogger := runner.NewMetricLogger() - rnr, err := runner.New(metricLogger) + metricLogger := runner.NewMetricLogger() + funcLogger := runner.NewFuncLogger() + + rnr, err := runner.New(funcLogger, metricLogger) if err != nil { log.WithError(err).Fatalln("Failed to create a runner") }