Docker stats to Prometheus (#486)

* Docker stats to Prometheus

* Fix compilation error in docker_test

* Refactor docker driver Run function to wait for  the container to have stopped before stopping the colleciton of statistics

* Fix go fmt errors

* Updates to sending docker stats to Prometheus

* remove new test TestWritResultImpl because we changes to support multiple waiters have been removed

* Update docker.Run to use channels not contextrs to shut down stats collector
This commit is contained in:
Nigel Deakin
2017-11-16 19:02:33 +00:00
committed by Reed Allman
parent 83145db6ba
commit 910612d0b1
8 changed files with 355 additions and 47 deletions

View File

@@ -19,6 +19,7 @@ import (
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/models"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
@@ -537,7 +538,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T
}
go func() {
err := a.runHot(slots, call, tok)
err := a.runHot(ctx, slots, call, tok)
if err != nil {
ch <- err
}
@@ -574,8 +575,14 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
return nil
}
// TODO add ctx back but be careful to only use for logs/spans
func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error {
func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error {
// We must be careful to only use ctxArg for logs/spans
// create a span from ctxArg but ignore the new Context
// instead we will create a new Context below and explicitly set its span
span, _ := opentracing.StartSpanFromContext(ctxArg, "docker_run_hot")
defer span.Finish()
if tok == nil {
// TODO we should panic, probably ;)
return errors.New("no token provided, not giving you a slot")
@@ -596,6 +603,9 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error {
ctx, shutdownContainer := context.WithCancel(context.Background())
defer shutdownContainer() // close this if our waiter returns
// add the span we created above to the new Context
ctx = opentracing.ContextWithSpan(ctx, span)
cid := id.New().String()
// set up the stderr for the first one to capture any logs before the slot is
@@ -709,11 +719,22 @@ func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr
func (c *container) Volumes() [][2]string { return nil }
func (c *container) WorkDir() string { return "" }
func (c *container) Close() {}
func (c *container) WriteStat(drivers.Stat) {}
func (c *container) Image() string { return c.image }
func (c *container) Timeout() time.Duration { return c.timeout }
func (c *container) EnvVars() map[string]string { return c.env }
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
// Log the specified stats to a tracing span.
// Spans are not processed by the collector until the span ends, so to prevent any delay
// in processing the stats when the function is long-lived we create a new span for every call
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_stats")
defer span.Finish()
for key, value := range stat.Metrics {
span.LogFields(log.Uint64("fn_"+key, value))
}
}
//func (c *container) DockerAuth() (docker.AuthConfiguration, error) {
// Implementing the docker.AuthConfiguration interface.
// TODO per call could implement this stored somewhere (vs. configured on host)

View File

@@ -270,17 +270,6 @@ func dockerMsg(derr *docker.Error) string {
// Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error.
// The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this.
func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) {
timeout := task.Timeout()
var cancel context.CancelFunc
if timeout <= 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
defer cancel() // do this so that after Run exits, collect stops
go drv.collectStats(ctx, container, task)
mwOut, mwErr := task.Logger()
waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{
@@ -292,6 +281,11 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers
return nil, err
}
// we want to stop trying to collect stats when the container exits
// collectStats will stop when stopSignal is closed or ctx is cancelled
stopSignal := make(chan struct{})
go drv.collectStats(ctx, stopSignal, container, task)
err = drv.startTask(ctx, container)
if err != nil && ctx.Err() == nil {
// if there's just a timeout making the docker calls, drv.wait below will rewrite it to timeout
@@ -302,22 +296,27 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers
container: container,
waiter: waiter,
drv: drv,
done: stopSignal,
}, nil
}
// implements drivers.WaitResult
// waitResult implements drivers.WaitResult
type waitResult struct {
container string
waiter docker.CloseWaiter
drv *DockerDriver
done chan struct{}
}
// waitResult implements drivers.WaitResult
func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) {
defer func() {
w.waiter.Close()
w.waiter.Wait() // make sure we gather all logs
w.waiter.Wait() // wait for Close() to finish processing, to make sure we gather all logs
close(w.done)
}()
// wait until container is stopped (or ctx is cancelled if sooner)
status, err := w.drv.wait(ctx, w.container)
return &runResult{
status: status,
@@ -325,10 +324,17 @@ func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) {
}, nil
}
func (drv *DockerDriver) collectStats(ctx context.Context, container string, task drivers.ContainerTask) {
// Repeatedly collect stats from the specified docker container until the stopSignal is closed or the context is cancelled
func (drv *DockerDriver) collectStats(ctx context.Context, stopSignal <-chan struct{}, container string, task drivers.ContainerTask) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_collect_stats")
defer span.Finish()
log := common.Logger(ctx)
done := make(chan bool)
defer close(done)
// dockerCallDone is used to cancel the call to drv.docker.Stats when this method exits
dockerCallDone := make(chan bool)
defer close(dockerCallDone)
dstats := make(chan *docker.Stats, 1)
go func() {
// NOTE: docker automatically streams every 1s. we can skip or avg samples if we'd like but
@@ -339,7 +345,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas
ID: container,
Stats: dstats,
Stream: true,
Done: done, // A flag that enables stopping the stats operation
Done: dockerCallDone, // A flag that enables stopping the stats operation
})
if err != nil && err != io.ErrClosedPipe {
@@ -347,15 +353,22 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas
}
}()
// collect stats until context is done (i.e. until the container is terminated)
for {
select {
case <-ctx.Done():
return
case <-stopSignal:
return
case ds, ok := <-dstats:
if !ok {
return
}
task.WriteStat(cherryPick(ds))
stats := cherryPick(ds)
if !stats.Timestamp.IsZero() {
task.WriteStat(ctx, stats)
}
}
}
}

View File

@@ -22,18 +22,18 @@ func (f *taskDockerTest) Command() string { return "" }
func (f *taskDockerTest) EnvVars() map[string]string {
return map[string]string{}
}
func (f *taskDockerTest) Labels() map[string]string { return nil }
func (f *taskDockerTest) Id() string { return f.id }
func (f *taskDockerTest) Group() string { return "" }
func (f *taskDockerTest) Image() string { return "fnproject/hello" }
func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.Second }
func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil }
func (f *taskDockerTest) WriteStat(drivers.Stat) { /* TODO */ }
func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} }
func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 }
func (f *taskDockerTest) WorkDir() string { return "" }
func (f *taskDockerTest) Close() {}
func (f *taskDockerTest) Input() io.Reader { return f.input }
func (f *taskDockerTest) Labels() map[string]string { return nil }
func (f *taskDockerTest) Id() string { return f.id }
func (f *taskDockerTest) Group() string { return "" }
func (f *taskDockerTest) Image() string { return "fnproject/hello" }
func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.Second }
func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil }
func (f *taskDockerTest) WriteStat(context.Context, drivers.Stat) { /* TODO */ }
func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} }
func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 }
func (f *taskDockerTest) WorkDir() string { return "" }
func (f *taskDockerTest) Close() {}
func (f *taskDockerTest) Input() io.Reader { return f.input }
func TestRunnerDocker(t *testing.T) {
dkr := NewDocker(drivers.Config{})

View File

@@ -87,7 +87,7 @@ type ContainerTask interface {
Logger() (stdout, stderr io.Writer)
// WriteStat writes a single Stat, implementation need not be thread safe.
WriteStat(Stat)
WriteStat(context.Context, Stat)
// Volumes returns an array of 2-element tuples indicating storage volume mounts.
// The first element is the path on the host, and the second element is the