fn: adding docker events to stats (#1262)

Streaming docker events is useful as we can record/capture some
asynchronous containers events such as out-of-memory. For now,
we record these in opencensus/prometheus stats.
This commit is contained in:
Tolga Ceylan
2018-10-04 18:54:09 -07:00
committed by GitHub
parent ec2f9539f2
commit 29dcf0a791
2 changed files with 74 additions and 6 deletions

View File

@@ -53,6 +53,7 @@ func (r *runResult) Error() error { return r.err }
func (r *runResult) Status() string { return r.status }
type DockerDriver struct {
cancel func()
conf drivers.Config
docker dockerClient // retries on *docker.Client, restricts ad hoc *docker.Client usage / retries
hostname string
@@ -75,9 +76,11 @@ func NewDocker(conf drivers.Config) *DockerDriver {
logrus.WithError(err).Fatal("couldn't initialize registry")
}
ctx, cancel := context.WithCancel(context.Background())
driver := &DockerDriver{
cancel: cancel,
conf: conf,
docker: newClient(),
docker: newClient(ctx),
hostname: hostname,
auths: auths,
}
@@ -145,6 +148,9 @@ func (drv *DockerDriver) Close() error {
if drv.pool != nil {
err = drv.pool.Close()
}
if drv.cancel != nil {
drv.cancel()
}
return err
}

View File

@@ -4,6 +4,7 @@ package docker
import (
"context"
"errors"
"fmt"
"os"
"strings"
@@ -19,8 +20,9 @@ import (
)
const (
retryTimeout = 10 * time.Minute
pauseTimeout = 5 * time.Second
retryTimeout = 10 * time.Minute
pauseTimeout = 5 * time.Second
eventRetryDelay = 1 * time.Second
)
// wrap docker client calls so we can retry 500s, kind of sucks but fsouza doesn't
@@ -48,7 +50,7 @@ type dockerClient interface {
}
// TODO: switch to github.com/docker/engine-api
func newClient() dockerClient {
func newClient(ctx context.Context) dockerClient {
// TODO this was much easier, don't need special settings at the moment
// docker, err := docker.NewClient(conf.Docker)
client, err := docker.NewClientFromEnv()
@@ -60,6 +62,7 @@ func newClient() dockerClient {
logrus.WithError(err).Fatal("couldn't connect to docker daemon")
}
go listenEventLoop(ctx, client)
return &dockerWrap{client}
}
@@ -68,8 +71,10 @@ type dockerWrap struct {
}
var (
apiNameKey = common.MakeKey("api_name")
exitStatusKey = common.MakeKey("exit_status")
apiNameKey = common.MakeKey("api_name")
exitStatusKey = common.MakeKey("exit_status")
eventActionKey = common.MakeKey("event_action")
eventTypeKey = common.MakeKey("event_type")
dockerRetriesMeasure = common.MakeMeasure("docker_api_retries", "docker api retries", "")
dockerTimeoutMeasure = common.MakeMeasure("docker_api_timeout", "docker api timeouts", "")
@@ -78,8 +83,63 @@ var (
// WARNING: this metric reports total latency per *wrapper* call, which will add up multiple retry latencies per wrapper call.
dockerLatencyMeasure = common.MakeMeasure("docker_api_latency", "Docker wrapper latency", "msecs")
dockerEventsMeasure = common.MakeMeasure("docker_events", "docker events", "")
)
// listenEventLoop listens for docker events and reconnects if necessary
func listenEventLoop(ctx context.Context, client *docker.Client) {
for ctx.Err() == nil {
err := listenEvents(ctx, client)
if err != nil {
logrus.WithError(err).Error("listenEvents failed, will retry...")
// slow down reconnects. yes we will miss events during this time.
select {
case <-time.After(eventRetryDelay):
case <-ctx.Done():
return
}
}
}
}
// listenEvents registers an event listener to docker to stream docker events
// and records these in stats.
func listenEvents(ctx context.Context, client *docker.Client) error {
listener := make(chan *docker.APIEvents)
err := client.AddEventListener(listener)
if err != nil {
return err
}
defer client.RemoveEventListener(listener)
for {
select {
case ev := <-listener:
if ev == nil {
return errors.New("event listener closed")
}
ctx, err := tag.New(context.Background(),
tag.Upsert(eventActionKey, ev.Action),
tag.Upsert(eventTypeKey, ev.Type),
)
if err != nil {
logrus.WithError(err).Fatalf("cannot add event tags %v=%v %v=%v",
eventActionKey, ev.Action,
eventTypeKey, ev.Type,
)
}
stats.Record(ctx, dockerEventsMeasure.M(0))
case <-ctx.Done():
return nil
}
}
}
// Create a span/tracker with required context tags
func makeTracker(ctx context.Context, name string) (context.Context, func()) {
ctx, err := tag.New(ctx, tag.Upsert(apiNameKey, name))
@@ -127,6 +187,7 @@ func RegisterViews(tagKeys []string, latencyDist []float64) {
defaultTags := []tag.Key{apiNameKey}
exitTags := []tag.Key{apiNameKey, exitStatusKey}
eventTags := []tag.Key{eventActionKey, eventTypeKey}
// add extra tags if not already in default tags for req/resp
for _, key := range tagKeys {
@@ -144,6 +205,7 @@ func RegisterViews(tagKeys []string, latencyDist []float64) {
common.CreateViewWithTags(dockerErrorMeasure, view.Count(), defaultTags),
common.CreateViewWithTags(dockerExitMeasure, view.Count(), exitTags),
common.CreateViewWithTags(dockerLatencyMeasure, view.Distribution(latencyDist...), defaultTags),
common.CreateViewWithTags(dockerEventsMeasure, view.Count(), eventTags),
)
if err != nil {
logrus.WithError(err).Fatal("cannot register view")