mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'fix-logs2' into 'master'
Fix logs2 - continues from !72 See merge request !73
This commit is contained in:
@@ -1,18 +1,18 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"context"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/common"
|
||||
)
|
||||
|
||||
type FuncLogger interface {
|
||||
Writer(ctx context.Context, appName, path, image, reqID string) io.Writer
|
||||
Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser
|
||||
}
|
||||
|
||||
// FuncLogger reads STDERR output from a container and outputs it in a parsed structured log format, see: https://github.com/treeder/functions/issues/76
|
||||
@@ -24,30 +24,81 @@ func NewFuncLogger(logDB models.FnLog) FuncLogger {
|
||||
return &DefaultFuncLogger{logDB}
|
||||
}
|
||||
|
||||
func (l *DefaultFuncLogger) persistLog(ctx context.Context, log logrus.FieldLogger, reqID, logText string) {
|
||||
err := l.logDB.InsertLog(ctx, reqID, logText)
|
||||
if err != nil {
|
||||
log.WithError(err).Println(fmt.Sprintf(
|
||||
"Unable to persist log for call %v. Error: %v", reqID, err))
|
||||
type writer struct {
|
||||
bytes.Buffer
|
||||
|
||||
stderr bytes.Buffer // for logging to stderr
|
||||
db models.FnLog
|
||||
ctx context.Context
|
||||
reqID string
|
||||
appName string
|
||||
image string
|
||||
path string
|
||||
}
|
||||
|
||||
func (w *writer) Close() error {
|
||||
w.flush()
|
||||
return w.db.InsertLog(context.TODO(), w.reqID, w.String())
|
||||
}
|
||||
|
||||
func (w *writer) Write(b []byte) (int, error) {
|
||||
n, err := w.Buffer.Write(b)
|
||||
|
||||
// temp or should move to another FuncLogger implementation
|
||||
w.writeStdErr(b)
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *writer) writeStdErr(b []byte) {
|
||||
// for now, also write to stderr so we can debug quick ;)
|
||||
// TODO this should be a separate FuncLogger but time is running short !
|
||||
endLine := bytes.IndexByte(b, '\n')
|
||||
if endLine < 0 {
|
||||
w.stderr.Write(b)
|
||||
return
|
||||
}
|
||||
// we have a new line, so:
|
||||
w.stderr.Write(b[0:endLine])
|
||||
w.flush()
|
||||
w.writeStdErr(b[endLine+1:])
|
||||
|
||||
}
|
||||
|
||||
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.Writer {
|
||||
r, w := io.Pipe()
|
||||
|
||||
go func(reader io.Reader) {
|
||||
log := common.Logger(ctx)
|
||||
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName,
|
||||
"path": path, "image": image, "call_id": reqID})
|
||||
|
||||
var res string
|
||||
errMsg := "-------Unable to get full log, it's too big-------"
|
||||
fmt.Fscanf(reader, "%v", &res)
|
||||
if len(res) >= bufio.MaxScanTokenSize {
|
||||
res = res[0:bufio.MaxScanTokenSize - len(errMsg)] + errMsg
|
||||
}
|
||||
|
||||
l.persistLog(ctx, log, reqID, res)
|
||||
}(r)
|
||||
return w
|
||||
func (w *writer) flush() {
|
||||
log := common.Logger(w.ctx)
|
||||
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": w.appName, "path": w.path, "image": w.image, "call_id": w.reqID})
|
||||
log.Println(w.stderr.String())
|
||||
w.stderr.Reset()
|
||||
}
|
||||
|
||||
// overrides Write, keeps Close
|
||||
type limitWriter struct {
|
||||
n, max int
|
||||
io.WriteCloser
|
||||
}
|
||||
|
||||
func newLimitWriter(max int, w io.WriteCloser) io.WriteCloser {
|
||||
return &limitWriter{max: max, WriteCloser: w}
|
||||
}
|
||||
|
||||
func (l *limitWriter) Write(b []byte) (int, error) {
|
||||
if l.n > l.max {
|
||||
return 0, errors.New("max log size exceeded, truncating log")
|
||||
}
|
||||
n, err := l.WriteCloser.Write(b)
|
||||
l.n += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, reqID string) io.WriteCloser {
|
||||
const MB = 1 * 1024 * 1024
|
||||
return newLimitWriter(MB, &writer{
|
||||
db: l.logDB,
|
||||
ctx: ctx,
|
||||
appName: appName,
|
||||
path: path,
|
||||
image: image,
|
||||
reqID: reqID,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult,
|
||||
}
|
||||
|
||||
cfg.Stderr = r.flog.Writer(ctx, cfg.AppName, cfg.Path, cfg.Image, cfg.ID)
|
||||
defer cfg.Stderr.Close() // TODO we should prob log this err but hey
|
||||
if cfg.Stdout == nil {
|
||||
cfg.Stdout = cfg.Stderr
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitlab-odx.oracle.com/odx/functions/api/id"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/datastore"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/runner/task"
|
||||
@@ -21,7 +22,8 @@ func TestRunnerHello(t *testing.T) {
|
||||
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
runner, err := New(ctx, NewFuncLogger(fnl), NewMetricLogger(), ds)
|
||||
fLogger := NewFuncLogger(fnl)
|
||||
runner, err := New(ctx, fLogger, NewMetricLogger(), ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
@@ -33,19 +35,21 @@ func TestRunnerHello(t *testing.T) {
|
||||
expectedStatus string
|
||||
expectedOut string
|
||||
expectedErr string
|
||||
taskID string
|
||||
}{
|
||||
{&models.Route{Image: "funcy/hello"}, ``, "success", "Hello World!", ""},
|
||||
{&models.Route{Image: "funcy/hello"}, `{"name": "test"}`, "success", "Hello test!", ""},
|
||||
{&models.Route{Image: "funcy/hello"}, ``, "success", "Hello World!", "", id.New().String()},
|
||||
{&models.Route{Image: "funcy/hello"}, `{"name": "test"}`, "success", "Hello test!", "", id.New().String()},
|
||||
} {
|
||||
var stdout, stderr bytes.Buffer
|
||||
cfg := &task.Config{
|
||||
ID: fmt.Sprintf("hello-%d-%d", i, time.Now().Unix()),
|
||||
ID: test.taskID,
|
||||
Image: test.route.Image,
|
||||
Timeout: 10 * time.Second,
|
||||
Ready: make(chan struct{}),
|
||||
Stdin: strings.NewReader(test.payload),
|
||||
AppName: test.route.AppName,
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID),
|
||||
}
|
||||
|
||||
result, err := runner.run(ctx, cfg)
|
||||
@@ -78,7 +82,8 @@ func TestRunnerError(t *testing.T) {
|
||||
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
runner, err := New(ctx, NewFuncLogger(fnl), NewMetricLogger(), ds)
|
||||
fLogger := NewFuncLogger(fnl)
|
||||
runner, err := New(ctx, fLogger, NewMetricLogger(), ds)
|
||||
if err != nil {
|
||||
t.Fatalf("Test error during New() - %s", err)
|
||||
}
|
||||
@@ -89,9 +94,10 @@ func TestRunnerError(t *testing.T) {
|
||||
expectedStatus string
|
||||
expectedOut string
|
||||
expectedErr string
|
||||
taskID string
|
||||
}{
|
||||
{&models.Route{Image: "funcy/error"}, ``, "error", "", ""},
|
||||
{&models.Route{Image: "funcy/error"}, `{"name": "test"}`, "error", "", ""},
|
||||
{&models.Route{Image: "funcy/error"}, ``, "error", "", "", id.New().String()},
|
||||
{&models.Route{Image: "funcy/error"}, `{"name": "test"}`, "error", "", "", id.New().String()},
|
||||
} {
|
||||
var stdout, stderr bytes.Buffer
|
||||
cfg := &task.Config{
|
||||
@@ -101,7 +107,7 @@ func TestRunnerError(t *testing.T) {
|
||||
Ready: make(chan struct{}),
|
||||
Stdin: strings.NewReader(test.payload),
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
Stderr: fLogger.Writer(ctx, test.route.AppName, test.route.AppName, test.route.Image, test.taskID),
|
||||
}
|
||||
|
||||
result, err := runner.run(ctx, cfg)
|
||||
|
||||
@@ -24,7 +24,7 @@ type Config struct {
|
||||
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
Stderr io.WriteCloser // closer for flushy poo
|
||||
}
|
||||
|
||||
// Request stores the task to be executed, It holds in itself the channel to
|
||||
|
||||
1
examples/error/.gitignore
vendored
1
examples/error/.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
bundle/
|
||||
.bundle/
|
||||
func.yaml
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
FROM funcy/ruby:dev
|
||||
|
||||
WORKDIR /worker
|
||||
ADD Gemfile* /worker/
|
||||
RUN bundle install
|
||||
|
||||
ADD . /worker/
|
||||
|
||||
ENTRYPOINT ["ruby", "error.rb"]
|
||||
@@ -1,77 +1,3 @@
|
||||
# Error Function Image
|
||||
|
||||
This images compares the payload info with the header.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Oracle Functions API
|
||||
|
||||
## Development
|
||||
|
||||
### Building image locally
|
||||
|
||||
```
|
||||
# SET BELOW TO YOUR DOCKER HUB USERNAME
|
||||
USERNAME=YOUR_DOCKER_HUB_USERNAME
|
||||
|
||||
# build it
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Publishing to DockerHub
|
||||
|
||||
```
|
||||
# tagging
|
||||
docker run --rm -v "$PWD":/app treeder/bump patch
|
||||
docker tag $USERNAME/func-error:latest $USERNAME/func-error:`cat VERSION`
|
||||
|
||||
# pushing to docker hub
|
||||
docker push $USERNAME/func-error
|
||||
```
|
||||
|
||||
### Testing image
|
||||
|
||||
```
|
||||
./test.sh
|
||||
```
|
||||
|
||||
## Running it on Oracle Functions
|
||||
|
||||
### Let's define some environment variables
|
||||
|
||||
```
|
||||
# Set your Function server address
|
||||
# Eg. 127.0.0.1:8080
|
||||
FUNCAPI=YOUR_FUNCTIONS_ADDRESS
|
||||
```
|
||||
|
||||
### Running with Oracle Functions
|
||||
|
||||
With this command we are going to create an application with name `error`.
|
||||
|
||||
```
|
||||
curl -X POST --data '{
|
||||
"app": {
|
||||
"name": "error",
|
||||
}
|
||||
}' http://$FUNCAPI/v1/apps
|
||||
```
|
||||
|
||||
Now, we can create our route
|
||||
|
||||
```
|
||||
curl -X POST --data '{
|
||||
"route": {
|
||||
"image": "'$USERNAME'/func-error",
|
||||
"path": "/error",
|
||||
}
|
||||
}' http://$FUNCAPI/v1/apps/error/routes
|
||||
```
|
||||
|
||||
#### Testing function
|
||||
|
||||
Now that we created our Oracle Functions route, let's test our new route
|
||||
|
||||
```
|
||||
curl -X POST --data '{"input": "yoooo"}' http://$FUNCAPI/r/error/error
|
||||
```
|
||||
Raises an error.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
set -ex
|
||||
|
||||
docker build -t username/func-error .
|
||||
docker build -t funcy/error .
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
name: username/func-error
|
||||
build:
|
||||
- ./build.sh
|
||||
name: funcy/error
|
||||
version: 0.0.2
|
||||
runtime: ruby
|
||||
entrypoint: ruby func.rb
|
||||
path: /error
|
||||
|
||||
@@ -17,4 +17,8 @@ func main() {
|
||||
fmt.Printf("Hello %v!\n", p.Name)
|
||||
|
||||
log.Println("---> stderr goes to the server logs.")
|
||||
log.Println("---> LINE 2")
|
||||
log.Println("---> LINE 3 with a break right here\nand LINE 4")
|
||||
log.Println("---> LINE 5 with a double line break\n")
|
||||
log.Println("---> LINE 6")
|
||||
}
|
||||
|
||||
@@ -146,7 +146,6 @@ func (p *deploycmd) deploy(c *cli.Context, funcFilePath string) error {
|
||||
|
||||
func (p *deploycmd) route(c *cli.Context, ff *funcfile) error {
|
||||
fmt.Printf("Updating route %s using image %s...\n", ff.Path, ff.FullName())
|
||||
fmt.Printf("%+v\ntype: %v\n", ff, *ff.Type)
|
||||
if err := resetBasePath(p.Configuration); err != nil {
|
||||
return fmt.Errorf("error setting endpoint: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user