From bb8436c3eee5015bc8914704632977dc0783d026 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 10 Sep 2018 13:35:50 -0700 Subject: [PATCH] fn: docker driver stats/metrics for prometheus (#1197) * fn: docker driver stats/metrics for prometheus --- api/agent/drivers/docker/docker.go | 22 +-- api/agent/drivers/docker/docker_client.go | 162 ++++++++++++++++------ 2 files changed, 127 insertions(+), 57 deletions(-) diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index f97dd039c..441b3d719 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -14,7 +14,6 @@ import ( "sync" "time" - "go.opencensus.io/stats" "go.opencensus.io/trace" "github.com/coreos/go-semver/semver" @@ -368,35 +367,22 @@ func dockerMsg(derr *docker.Error) string { // The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this. func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) { - attachSuccess := make(chan struct{}) mwOut, mwErr := task.Logger() - - waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{ - Success: attachSuccess, + waiter, err := drv.docker.AttachToContainer(ctx, docker.AttachToContainerOptions{ Container: container, + InputStream: task.Input(), OutputStream: mwOut, ErrorStream: mwErr, Stream: true, Stdout: true, Stderr: true, Stdin: true, - InputStream: task.Input()}) - + }) if err != nil && ctx.Err() == nil { // ignore if ctx has errored, rewrite status lay below return nil, err } - // Sync up with NB Attacher above before starting the task - if err == nil { - // WARNING: the I/O below requires docker hijack function to honor - // the contract below, specifically if an error is not returned - // from AttachToContainerNonBlocking, then max blocking time - // here should be what drv.docker dialer/client config was set to. - <-attachSuccess - attachSuccess <- struct{}{} - } - // we want to stop trying to collect stats when the container exits // collectStats will stop when stopSignal is closed or ctx is cancelled stopSignal := make(chan struct{}) @@ -588,6 +574,7 @@ func (w *waitResult) wait(ctx context.Context) (status string, err error) { // just say it was a timeout if we have [fatal] errors talking to docker, etc. // a more prevalent case is calling wait & container already finished, so again ignore err. exitCode, _ := w.drv.docker.WaitContainerWithContext(w.container, ctx) + defer RecordWaitContainerResult(ctx, exitCode) w.waiter.Close() err = w.waiter.Wait() @@ -618,7 +605,6 @@ func (w *waitResult) wait(ctx context.Context) (status string, err error) { case 0: return drivers.StatusSuccess, nil case 137: // OOM - stats.Record(ctx, dockerOOMMeasure.M(1)) common.Logger(ctx).Error("docker oom") err := errors.New("container out of memory, you may want to raise route.memory for this route (default: 128MB)") return drivers.StatusKilled, models.NewAPIError(http.StatusBadGateway, err) diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index 26391b888..ec3b5e6f3 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -4,6 +4,7 @@ package docker import ( "context" + "fmt" "os" "strings" "time" @@ -13,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "go.opencensus.io/stats" "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "go.opencensus.io/trace" ) @@ -29,7 +31,7 @@ const ( type dockerClient interface { // Each of these are github.com/fsouza/go-dockerclient methods - AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) + AttachToContainer(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) WaitContainerWithContext(id string, ctx context.Context) (int, error) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) error KillContainer(opts docker.KillContainerOptions) error @@ -66,19 +68,82 @@ type dockerWrap struct { } var ( + apiNameKey = common.MakeKey("api_name") + exitStatusKey = common.MakeKey("exit_status") + dockerRetriesMeasure = common.MakeMeasure("docker_api_retries", "docker api retries", "") dockerTimeoutMeasure = common.MakeMeasure("docker_api_timeout", "docker api timeouts", "") dockerErrorMeasure = common.MakeMeasure("docker_api_error", "docker api errors", "") - dockerOOMMeasure = common.MakeMeasure("docker_oom", "docker oom", "") + dockerExitMeasure = common.MakeMeasure("docker_exits", "docker exit counts", "") + + // WARNING: this metric reports total latency per *wrapper* call, which will add up multiple retry latencies per wrapper call. + dockerLatencyMeasure = common.MakeMeasure("docker_api_latency", "Docker wrapper latency", "msecs") ) +// Create a span/tracker with required context tags +func makeTracker(ctx context.Context, name string) (context.Context, func()) { + ctx, err := tag.New(ctx, tag.Upsert(apiNameKey, name)) + if err != nil { + logrus.WithError(err).Fatalf("cannot add tag %v=%v", apiNameKey, name) + } + + // It would have been nice to pull the latency (end-start) elapsed time + // from Spans but this is hidden from us, so we have to call time.Now() + // twice ourselves. + ctx, span := trace.StartSpan(ctx, name) + start := time.Now() + + return ctx, func() { + stats.Record(ctx, dockerLatencyMeasure.M(int64(time.Now().Sub(start)/time.Millisecond))) + span.End() + } +} + +func RecordWaitContainerResult(ctx context.Context, exitCode int) { + + // Tag the metric with error-code or context-cancel/deadline info + exitStr := fmt.Sprintf("exit_%d", exitCode) + if exitCode == 0 && ctx.Err() != nil { + switch ctx.Err() { + case context.DeadlineExceeded: + exitStr = "ctx_deadline" + case context.Canceled: + exitStr = "ctx_canceled" + } + } + + newCtx, err := tag.New(ctx, + tag.Upsert(apiNameKey, "docker_wait_container"), + tag.Upsert(exitStatusKey, exitStr), + ) + if err != nil { + logrus.WithError(err).Fatalf("cannot add tag %v=%v or tag %v=docker_wait_container", exitStatusKey, exitStr, apiNameKey) + } + stats.Record(newCtx, dockerExitMeasure.M(0)) +} + // RegisterViews creates and registers views with provided tag keys -func RegisterViews(tagKeys []string, dist []float64) { +func RegisterViews(tagKeys []string, latencyDist []float64) { + + defaultTags := []tag.Key{apiNameKey} + exitTags := []tag.Key{apiNameKey, exitStatusKey} + + // add extra tags if not already in default tags for req/resp + for _, key := range tagKeys { + if key != "api_name" { + defaultTags = append(defaultTags, common.MakeKey(key)) + } + if key != "api_name" && key != "exit_status" { + exitTags = append(exitTags, common.MakeKey(key)) + } + } + err := view.Register( - common.CreateView(dockerRetriesMeasure, view.Sum(), tagKeys), - common.CreateView(dockerTimeoutMeasure, view.Count(), tagKeys), - common.CreateView(dockerErrorMeasure, view.Count(), tagKeys), - common.CreateView(dockerOOMMeasure, view.Count(), tagKeys), + common.CreateViewWithTags(dockerRetriesMeasure, view.Sum(), defaultTags), + common.CreateViewWithTags(dockerTimeoutMeasure, view.Count(), defaultTags), + common.CreateViewWithTags(dockerErrorMeasure, view.Count(), defaultTags), + common.CreateViewWithTags(dockerExitMeasure, view.Count(), exitTags), + common.CreateViewWithTags(dockerLatencyMeasure, view.Distribution(latencyDist...), defaultTags), ) if err != nil { logrus.WithError(err).Fatal("cannot register view") @@ -95,7 +160,7 @@ func (d *dockerWrap) retry(ctx context.Context, logger logrus.FieldLogger, f fun for ; i < 10; i++ { select { case <-ctx.Done(): - stats.Record(ctx, dockerTimeoutMeasure.M(1)) + stats.Record(ctx, dockerTimeoutMeasure.M(0)) logger.WithError(ctx.Err()).Warnf("docker call timed out") return ctx.Err() default: @@ -108,7 +173,7 @@ func (d *dockerWrap) retry(ctx context.Context, logger logrus.FieldLogger, f fun continue } if err != nil { - stats.Record(ctx, dockerErrorMeasure.M(1)) + stats.Record(ctx, dockerErrorMeasure.M(0)) } return err } @@ -161,8 +226,8 @@ func filterNoSuchContainer(ctx context.Context, err error) error { } func (d *dockerWrap) LoadImages(ctx context.Context, filePath string) error { - ctx, span := trace.StartSpan(ctx, "docker_load_images") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_load_images") + defer closer() file, err := os.Open(filePath) if err != nil { @@ -187,15 +252,34 @@ func (d *dockerWrap) Info(ctx context.Context) (info *docker.DockerInfo, err err return d.docker.Info() } -func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) { - ctx, span := trace.StartSpan(ctx, "docker_attach_container") - defer span.End() - return d.docker.AttachToContainerNonBlocking(opts) +func (d *dockerWrap) AttachToContainer(ctx context.Context, opts docker.AttachToContainerOptions) (docker.CloseWaiter, error) { + ctx, closer := makeTracker(ctx, "docker_attach_container") + defer closer() + + if opts.Success != nil { + logrus.Fatal("BUG: Invalid AttachToContainerOptions, Success channel must not be set") + } + opts.Success = make(chan struct{}) + + // We use non-blocking here since we need the CloseWaiter + waiter, err := d.docker.AttachToContainerNonBlocking(opts) + + // Sync up with NB Attacher above before starting the task + if err == nil { + // WARNING: the I/O below requires docker hijack function to honor + // the contract below, specifically if an error is not returned + // from AttachToContainerNonBlocking, then max blocking time + // here should be what drv.docker dialer/client config was set to. + <-opts.Success + opts.Success <- struct{}{} + } + + return waiter, err } func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (code int, err error) { - ctx, span := trace.StartSpan(ctx, "docker_wait_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_wait_container") + defer closer() logger := common.Logger(ctx).WithField("docker_cmd", "WaitContainer") err = d.retry(ctx, logger, func() error { @@ -206,8 +290,8 @@ func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (c } func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) (err error) { - ctx, span := trace.StartSpan(ctx, "docker_start_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_start_container") + defer closer() logger := common.Logger(ctx).WithField("docker_cmd", "StartContainer") err = d.retry(ctx, logger, func() error { @@ -222,8 +306,8 @@ func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.Hos } func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *docker.Container, err error) { - ctx, span := trace.StartSpan(opts.Context, "docker_create_container") - defer span.End() + ctx, closer := makeTracker(opts.Context, "docker_create_container") + defer closer() logger := common.Logger(ctx).WithField("docker_cmd", "CreateContainer") err = d.retry(ctx, logger, func() error { @@ -234,8 +318,8 @@ func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *doc } func (d *dockerWrap) KillContainer(opts docker.KillContainerOptions) (err error) { - ctx, span := trace.StartSpan(opts.Context, "docker_kill_container") - defer span.End() + ctx, closer := makeTracker(opts.Context, "docker_kill_container") + defer closer() logger := common.Logger(ctx).WithField("docker_cmd", "KillContainer") err = d.retry(ctx, logger, func() error { @@ -246,8 +330,8 @@ func (d *dockerWrap) KillContainer(opts docker.KillContainerOptions) (err error) } func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) (err error) { - ctx, span := trace.StartSpan(opts.Context, "docker_pull_image") - defer span.End() + ctx, closer := makeTracker(opts.Context, "docker_pull_image") + defer closer() logger := common.Logger(ctx).WithField("docker_cmd", "PullImage") err = d.retry(ctx, logger, func() error { @@ -261,8 +345,8 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er // extract the span, but do not keep the context, since the enclosing context // may be timed out, and we still want to remove the container. TODO in caller? who cares? ctx := common.BackgroundContext(opts.Context) - ctx, span := trace.StartSpan(ctx, "docker_remove_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_remove_container") + defer closer() ctx, cancel := context.WithTimeout(ctx, retryTimeout) defer cancel() @@ -276,8 +360,9 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er } func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error) { - _, span := trace.StartSpan(ctx, "docker_pause_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_pause_container") + defer closer() + ctx, cancel := context.WithTimeout(ctx, pauseTimeout) defer cancel() @@ -290,8 +375,9 @@ func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error) } func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error) { - _, span := trace.StartSpan(ctx, "docker_unpause_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_unpause_container") + defer closer() + ctx, cancel := context.WithTimeout(ctx, pauseTimeout) defer cancel() @@ -304,8 +390,9 @@ func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error } func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.Image, err error) { - ctx, span := trace.StartSpan(ctx, "docker_inspect_image") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_inspect_image") + defer closer() + ctx, cancel := context.WithTimeout(ctx, retryTimeout) defer cancel() @@ -318,8 +405,9 @@ func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.I } func (d *dockerWrap) InspectContainerWithContext(container string, ctx context.Context) (c *docker.Container, err error) { - ctx, span := trace.StartSpan(ctx, "docker_inspect_container") - defer span.End() + ctx, closer := makeTracker(ctx, "docker_inspect_container") + defer closer() + ctx, cancel := context.WithTimeout(ctx, retryTimeout) defer cancel() @@ -343,7 +431,3 @@ func (d *dockerWrap) Stats(opts docker.StatsOptions) (err error) { //}) //return err } - -func MakeMeasure(name string, desc string, unit string) *stats.Int64Measure { - return stats.Int64(name, desc, unit) -}