mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'func_logs2' into 'master'
Func logs feature See merge request !66
This commit is contained in:
@@ -135,7 +135,7 @@ func startAsyncRunners(ctx context.Context, url string, rnr *Runner, ds models.D
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// Process Task
|
||||
_, err := rnr.RunTrackedTask(task, ctx, getCfg(task), ds)
|
||||
_, err := rnr.RunTrackedTask(task, ctx, getCfg(task))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Cannot run task")
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"gitlab-odx.oracle.com/odx/functions/api/mqs"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/logs"
|
||||
)
|
||||
|
||||
func setLogBuffer() *bytes.Buffer {
|
||||
@@ -193,7 +194,9 @@ func TestTasksrvURL(t *testing.T) {
|
||||
|
||||
func testRunner(t *testing.T) (*Runner, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r, err := New(ctx, NewFuncLogger(), NewMetricLogger())
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
r, err := New(ctx, NewFuncLogger(fnl), NewMetricLogger(), ds)
|
||||
if err != nil {
|
||||
t.Fatal("Test: failed to create new runner")
|
||||
}
|
||||
|
||||
@@ -2,40 +2,52 @@ package runner
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"context"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
|
||||
)
|
||||
|
||||
type FuncLogger interface {
|
||||
Writer(context.Context, string, string, string, string) io.Writer
|
||||
Writer(ctx context.Context, appName, path, image, reqID string) io.Writer
|
||||
}
|
||||
|
||||
// FuncLogger reads STDERR output from a container and outputs it in a parseable structured log format, see: https://github.com/treeder/functions/issues/76
|
||||
// FuncLogger reads STDERR output from a container and outputs it in a parsed structured log format, see: https://github.com/treeder/functions/issues/76
|
||||
type DefaultFuncLogger struct {
|
||||
logDB models.FnLog
|
||||
}
|
||||
|
||||
func NewFuncLogger() FuncLogger {
|
||||
return &DefaultFuncLogger{}
|
||||
func NewFuncLogger(logDB models.FnLog) FuncLogger {
|
||||
return &DefaultFuncLogger{logDB}
|
||||
}
|
||||
|
||||
func (l *DefaultFuncLogger) persistLog(ctx context.Context, log logrus.FieldLogger, reqID, logText string) {
|
||||
err := l.logDB.InsertLog(ctx, reqID, logText)
|
||||
if err != nil {
|
||||
log.WithError(err).Println(fmt.Sprintf(
|
||||
"Unable to persist log for call %v. Error: %v", reqID, err))
|
||||
}
|
||||
}
|
||||
|
||||
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.Writer {
|
||||
r, w := io.Pipe()
|
||||
|
||||
log := common.Logger(ctx)
|
||||
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "image": image, "call_id": reqID})
|
||||
|
||||
go func(reader io.Reader) {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
for scanner.Scan() {
|
||||
log.Println(scanner.Text())
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
log.WithError(err).Println("There was an error with the scanner in attached container")
|
||||
}
|
||||
}(r)
|
||||
log := common.Logger(ctx)
|
||||
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName,
|
||||
"path": path, "image": image, "call_id": reqID})
|
||||
|
||||
var res string
|
||||
errMsg := "-------Unable to get full log, it's too big-------"
|
||||
fmt.Fscanf(reader, "%v", &res)
|
||||
if len(res) >= bufio.MaxScanTokenSize {
|
||||
res = res[0:bufio.MaxScanTokenSize - len(errMsg)] + errMsg
|
||||
}
|
||||
|
||||
l.persistLog(ctx, log, reqID, res)
|
||||
}(r)
|
||||
return w
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/drivers/docker"
|
||||
@@ -33,6 +34,7 @@ type Runner struct {
|
||||
usedMem int64
|
||||
usedMemMutex sync.RWMutex
|
||||
hcmgr htfnmgr
|
||||
datastore models.Datastore
|
||||
|
||||
stats
|
||||
}
|
||||
@@ -48,7 +50,7 @@ const (
|
||||
DefaultIdleTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, error) {
|
||||
func New(ctx context.Context, flog FuncLogger, mlog MetricLogger, ds models.Datastore) (*Runner, error) {
|
||||
// TODO: Is this really required for the container drivers? Can we remove it?
|
||||
env := common.NewEnvironment(func(e *common.Environment) {})
|
||||
|
||||
@@ -65,6 +67,7 @@ func New(ctx context.Context, flog FuncLogger, mlog MetricLogger) (*Runner, erro
|
||||
mlog: mlog,
|
||||
availableMem: getAvailableMemory(),
|
||||
usedMem: 0,
|
||||
datastore: ds,
|
||||
}
|
||||
|
||||
go r.queueHandler(ctx)
|
||||
|
||||
@@ -8,8 +8,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitlab-odx.oracle.com/odx/functions/api/datastore"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/logs"
|
||||
)
|
||||
|
||||
func TestRunnerHello(t *testing.T) {
|
||||
@@ -17,11 +19,14 @@ func TestRunnerHello(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
runner, err := New(ctx, NewFuncLogger(), NewMetricLogger())
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
runner, err := New(ctx, NewFuncLogger(fnl), NewMetricLogger(), ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
|
||||
|
||||
for i, test := range []struct {
|
||||
route *models.Route
|
||||
payload string
|
||||
@@ -71,7 +76,9 @@ func TestRunnerError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
runner, err := New(ctx, NewFuncLogger(), NewMetricLogger())
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
runner, err := New(ctx, NewFuncLogger(fnl), NewMetricLogger(), ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ import (
|
||||
// (internal clock)
|
||||
|
||||
// RunTrackedTask is just a wrapper for shared logic for async/sync runners
|
||||
func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) {
|
||||
func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) {
|
||||
startedAt := strfmt.DateTime(time.Now())
|
||||
newTask.StartedAt = startedAt
|
||||
|
||||
@@ -78,7 +78,7 @@ func (rnr *Runner) RunTrackedTask(newTask *models.Task, ctx context.Context, cfg
|
||||
newTask.CompletedAt = completedAt
|
||||
newTask.Status = status
|
||||
|
||||
if err := ds.InsertTask(ctx, newTask); err != nil {
|
||||
if err := rnr.datastore.InsertTask(ctx, newTask); err != nil {
|
||||
// TODO we should just log this error not return it to user? just issue storing task status but task is run
|
||||
logrus.WithError(err).Error("error inserting task into datastore")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user