diff --git a/api/agent/agent.go b/api/agent/agent.go index f5d787f35..3508bd31d 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -19,6 +19,7 @@ import ( "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" ) @@ -537,7 +538,7 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T } go func() { - err := a.runHot(slots, call, tok) + err := a.runHot(ctx, slots, call, tok) if err != nil { ch <- err } @@ -574,8 +575,14 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok return nil } -// TODO add ctx back but be careful to only use for logs/spans -func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error { +func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, tok Token) error { + // We must be careful to only use ctxArg for logs/spans + + // create a span from ctxArg but ignore the new Context + // instead we will create a new Context below and explicitly set its span + span, _ := opentracing.StartSpanFromContext(ctxArg, "docker_run_hot") + defer span.Finish() + if tok == nil { // TODO we should panic, probably ;) return errors.New("no token provided, not giving you a slot") @@ -596,6 +603,9 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error { ctx, shutdownContainer := context.WithCancel(context.Background()) defer shutdownContainer() // close this if our waiter returns + // add the span we created above to the new Context + ctx = opentracing.ContextWithSpan(ctx, span) + cid := id.New().String() // set up the stderr for the first one to capture any logs before the slot is @@ -709,11 +719,22 @@ func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr func (c *container) Volumes() [][2]string { return nil } func (c *container) WorkDir() string { return "" } func (c *container) Close() {} -func (c *container) WriteStat(drivers.Stat) {} func (c *container) Image() string { return c.image } func (c *container) Timeout() time.Duration { return c.timeout } func (c *container) EnvVars() map[string]string { return c.env } func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB + +// Log the specified stats to a tracing span. +// Spans are not processed by the collector until the span ends, so to prevent any delay +// in processing the stats when the function is long-lived we create a new span for every call +func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { + span, ctx := opentracing.StartSpanFromContext(ctx, "docker_stats") + defer span.Finish() + for key, value := range stat.Metrics { + span.LogFields(log.Uint64("fn_"+key, value)) + } +} + //func (c *container) DockerAuth() (docker.AuthConfiguration, error) { // Implementing the docker.AuthConfiguration interface. // TODO per call could implement this stored somewhere (vs. configured on host) diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 210cfa154..7009bc0e1 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -270,17 +270,6 @@ func dockerMsg(derr *docker.Error) string { // Run executes the docker container. If task runs, drivers.RunResult will be returned. If something fails outside the task (ie: Docker), it will return error. // The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this. func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) { - timeout := task.Timeout() - - var cancel context.CancelFunc - if timeout <= 0 { - ctx, cancel = context.WithCancel(ctx) - } else { - ctx, cancel = context.WithTimeout(ctx, timeout) - } - defer cancel() // do this so that after Run exits, collect stops - go drv.collectStats(ctx, container, task) - mwOut, mwErr := task.Logger() waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{ @@ -292,6 +281,11 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers return nil, err } + // we want to stop trying to collect stats when the container exits + // collectStats will stop when stopSignal is closed or ctx is cancelled + stopSignal := make(chan struct{}) + go drv.collectStats(ctx, stopSignal, container, task) + err = drv.startTask(ctx, container) if err != nil && ctx.Err() == nil { // if there's just a timeout making the docker calls, drv.wait below will rewrite it to timeout @@ -302,22 +296,27 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers container: container, waiter: waiter, drv: drv, + done: stopSignal, }, nil } -// implements drivers.WaitResult +// waitResult implements drivers.WaitResult type waitResult struct { container string waiter docker.CloseWaiter drv *DockerDriver + done chan struct{} } +// waitResult implements drivers.WaitResult func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) { defer func() { w.waiter.Close() - w.waiter.Wait() // make sure we gather all logs + w.waiter.Wait() // wait for Close() to finish processing, to make sure we gather all logs + close(w.done) }() + // wait until container is stopped (or ctx is cancelled if sooner) status, err := w.drv.wait(ctx, w.container) return &runResult{ status: status, @@ -325,10 +324,17 @@ func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) { }, nil } -func (drv *DockerDriver) collectStats(ctx context.Context, container string, task drivers.ContainerTask) { +// Repeatedly collect stats from the specified docker container until the stopSignal is closed or the context is cancelled +func (drv *DockerDriver) collectStats(ctx context.Context, stopSignal <-chan struct{}, container string, task drivers.ContainerTask) { + span, ctx := opentracing.StartSpanFromContext(ctx, "docker_collect_stats") + defer span.Finish() + log := common.Logger(ctx) - done := make(chan bool) - defer close(done) + + // dockerCallDone is used to cancel the call to drv.docker.Stats when this method exits + dockerCallDone := make(chan bool) + defer close(dockerCallDone) + dstats := make(chan *docker.Stats, 1) go func() { // NOTE: docker automatically streams every 1s. we can skip or avg samples if we'd like but @@ -339,7 +345,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas ID: container, Stats: dstats, Stream: true, - Done: done, // A flag that enables stopping the stats operation + Done: dockerCallDone, // A flag that enables stopping the stats operation }) if err != nil && err != io.ErrClosedPipe { @@ -347,15 +353,22 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas } }() + // collect stats until context is done (i.e. until the container is terminated) for { select { case <-ctx.Done(): return + case <-stopSignal: + return case ds, ok := <-dstats: if !ok { return } - task.WriteStat(cherryPick(ds)) + stats := cherryPick(ds) + if !stats.Timestamp.IsZero() { + task.WriteStat(ctx, stats) + } + } } } diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index d29e4fa0e..10b083db5 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -22,18 +22,18 @@ func (f *taskDockerTest) Command() string { return "" } func (f *taskDockerTest) EnvVars() map[string]string { return map[string]string{} } -func (f *taskDockerTest) Labels() map[string]string { return nil } -func (f *taskDockerTest) Id() string { return f.id } -func (f *taskDockerTest) Group() string { return "" } -func (f *taskDockerTest) Image() string { return "fnproject/hello" } -func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.Second } -func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil } -func (f *taskDockerTest) WriteStat(drivers.Stat) { /* TODO */ } -func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} } -func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 } -func (f *taskDockerTest) WorkDir() string { return "" } -func (f *taskDockerTest) Close() {} -func (f *taskDockerTest) Input() io.Reader { return f.input } +func (f *taskDockerTest) Labels() map[string]string { return nil } +func (f *taskDockerTest) Id() string { return f.id } +func (f *taskDockerTest) Group() string { return "" } +func (f *taskDockerTest) Image() string { return "fnproject/hello" } +func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.Second } +func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil } +func (f *taskDockerTest) WriteStat(context.Context, drivers.Stat) { /* TODO */ } +func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} } +func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 } +func (f *taskDockerTest) WorkDir() string { return "" } +func (f *taskDockerTest) Close() {} +func (f *taskDockerTest) Input() io.Reader { return f.input } func TestRunnerDocker(t *testing.T) { dkr := NewDocker(drivers.Config{}) diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 15a289377..a2c337b7e 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -87,7 +87,7 @@ type ContainerTask interface { Logger() (stdout, stderr io.Writer) // WriteStat writes a single Stat, implementation need not be thread safe. - WriteStat(Stat) + WriteStat(context.Context, Stat) // Volumes returns an array of 2-element tuples indicating storage volume mounts. // The first element is the path on the host, and the second element is the diff --git a/api/server/prom_zip_collector.go b/api/server/prom_zip_collector.go index 7128feb2d..17e0a78bb 100644 --- a/api/server/prom_zip_collector.go +++ b/api/server/prom_zip_collector.go @@ -4,6 +4,7 @@ import ( "github.com/openzipkin/zipkin-go-opentracing" "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" "github.com/prometheus/client_golang/prometheus" + "strconv" "strings" "time" ) @@ -32,24 +33,52 @@ func NewPrometheusCollector() (zipkintracer.Collector, error) { // PrometheusCollector implements Collector. func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { - var labelValuesToUse map[string]string + + spanName := span.GetName() // extract any label values from the span labelKeysFromSpan, labelValuesFromSpan := getLabels(span) // get the HistogramVec for this span name - histogramVec, found := pc.histogramVecMap[span.GetName()] + histogramVec, labelValuesToUse := pc.getHistogramVec( + ("fn_span_" + spanName + "_duration_seconds"), ("Span " + spanName + " duration, by span name"), labelKeysFromSpan, labelValuesFromSpan) + + // now report the span duration value + histogramVec.With(labelValuesToUse).Observe((time.Duration(span.GetDuration()) * time.Microsecond).Seconds()) + + // now extract any logged metric values from the span + for key, value := range getLoggedMetrics(span) { + + // get the HistogramVec for this metric + thisMetricHistogramVec, labelValuesToUse := pc.getHistogramVec( + ("fn_" + spanName + "_" + key), (spanName + " metric " + key), labelKeysFromSpan, labelValuesFromSpan) + + // now report the metric value + thisMetricHistogramVec.With(labelValuesToUse).Observe(float64(value)) + } + + return nil +} + +// Return (and create, if necessary) a HistogramVec for the specified Prometheus metric +func (pc PrometheusCollector) getHistogramVec( + metricName string, metricHelp string, labelKeysFromSpan []string, labelValuesFromSpan map[string]string) ( + *prometheus.HistogramVec, map[string]string) { + + var labelValuesToUse map[string]string + + histogramVec, found := pc.histogramVecMap[metricName] if !found { // create a new HistogramVec histogramVec = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: "fn_span_" + span.GetName() + "_duration_seconds", - Help: "Span " + span.GetName() + " duration, by span name", + Name: metricName, + Help: metricHelp, }, labelKeysFromSpan, ) - pc.histogramVecMap[span.GetName()] = histogramVec - pc.registeredLabelKeysMap[span.GetName()] = labelKeysFromSpan + pc.histogramVecMap[metricName] = histogramVec + pc.registeredLabelKeysMap[metricName] = labelKeysFromSpan prometheus.MustRegister(histogramVec) labelValuesToUse = labelValuesFromSpan } else { @@ -59,7 +88,7 @@ func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { // that's why we saved the original label keys in the registeredLabelKeysMap map // so we can use that to construct a map of label key/value pairs to set on the metric labelValuesToUse = make(map[string]string) - for _, thisRegisteredLabelKey := range pc.registeredLabelKeysMap[span.GetName()] { + for _, thisRegisteredLabelKey := range pc.registeredLabelKeysMap[metricName] { if value, found := labelValuesFromSpan[thisRegisteredLabelKey]; found { labelValuesToUse[thisRegisteredLabelKey] = value } else { @@ -67,11 +96,7 @@ func (pc PrometheusCollector) Collect(span *zipkincore.Span) error { } } } - - // now report the metric value - histogramVec.With(labelValuesToUse).Observe((time.Duration(span.GetDuration()) * time.Microsecond).Seconds()) - - return nil + return histogramVec, labelValuesToUse } // extract from the specified span the key/value pairs that we want to add as labels to the Prometheus metric for this span @@ -95,5 +120,27 @@ func getLabels(span *zipkincore.Span) ([]string, map[string]string) { return keys, labelMap } +// extract from the span the logged metric values, which we assume as uint64 values +func getLoggedMetrics(span *zipkincore.Span) map[string]uint64 { + + keyValueMap := make(map[string]uint64) + + // extract any annotations whose Value starts with "fn_" + annotations := span.GetAnnotations() + for _, thisAnnotation := range annotations { + if strings.HasPrefix(thisAnnotation.GetValue(), "fn_") { + keyvalue := strings.Split(thisAnnotation.GetValue(), "=") + if len(keyvalue) == 2 { + if value, err := strconv.ParseUint(keyvalue[1], 10, 64); err == nil { + key := strings.TrimSpace(keyvalue[0]) + key = key[3:] // strip off leading fn_ + keyValueMap[key] = value + } + } + } + } + return keyValueMap +} + // PrometheusCollector implements Collector. func (PrometheusCollector) Close() error { return nil } diff --git a/docs/assets/GrafanaDashboard3.png b/docs/assets/GrafanaDashboard3.png new file mode 100755 index 000000000..ec05d597e Binary files /dev/null and b/docs/assets/GrafanaDashboard3.png differ diff --git a/examples/grafana/README.md b/examples/grafana/README.md index 327d07f60..ec129d39a 100644 --- a/examples/grafana/README.md +++ b/examples/grafana/README.md @@ -109,4 +109,27 @@ A second example dashboard `fn_grafana_dashboard2.json` in this example's direct In the following screenshot, the "Choose spans to display rates" dropdown has been used to select `agent_submit` and `serve_http`, and the "Choose spans to display durations" dropdown, has been used to select `agent_cold_exec`, `agent_get_slot`, `agent_submit`, `docker_create_container`, `docker_start_container` and `docker_wait_container`. + +## Docker statistics + +During the execution of the docker container, a selected number of statistics from docker are available as Prometheus metrics. The available metrics are listed in the following table: + +| Prometheus metric name | +| ------------- | +| `fn_docker_stats_cpu_kernel` | +| `fn_docker_stats_cpu_kernel` | +| `fn_docker_stats_cpu_user` | +| `fn_docker_stats_disk_read` | +| `fn_docker_stats_disk_write` | +| `fn_docker_stats_mem_limit` | +| `fn_docker_stats_mem_usage` | +| `fn_docker_stats_net_rx` | +| `fn_docker_stats_net_tx` | + + Note that if the container runs for a very short length of time there may be insufficient time to obtain statistics before the container terminates. + +An example dashboard `fn_grafana_dashboard3.json` in this example's directory displays the available docker statistics. Use the dropdown lists at the top of the dashboard to choose which metrics to examine. + + + diff --git a/examples/grafana/fn_grafana_dashboard3.json b/examples/grafana/fn_grafana_dashboard3.json new file mode 100644 index 000000000..d306348d5 --- /dev/null +++ b/examples/grafana/fn_grafana_dashboard3.json @@ -0,0 +1,204 @@ +{ + "__inputs": [ + { + "name": "DS_PROMDS", + "label": "PromDS", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "4.6.1" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph", + "version": "" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "hideControls": false, + "id": null, + "links": [], + "refresh": "30s", + "rows": [ + { + "collapse": false, + "height": 250, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMDS}", + "fill": 1, + "id": 46, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "minSpan": 3, + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "repeat": "docker_metric_name", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(fn_docker_stats_[[docker_metric_name]]_sum[1m]) / rate(fn_docker_stats_[[docker_metric_name]]_count[1m])\n\n\n\n", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{fn_path}} ({{fn_appname}})", + "refId": "A", + "step": 1 + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "[[docker_metric_name]] (1min rolling avg)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": "", + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + } + ], + "repeat": null, + "repeatIteration": null, + "repeatRowId": null, + "showTitle": false, + "title": "Docker stats", + "titleSize": "h6" + } + ], + "schemaVersion": 14, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMDS}", + "hide": 0, + "includeAll": false, + "label": "Choose docker metrics to display", + "multi": true, + "name": "docker_metric_name", + "options": [], + "query": "metrics(fn_.*)", + "refresh": 2, + "regex": "fn_docker_stats_(.*)_count", + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + } + ] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "timezone": "", + "title": "Fn docker stats", + "version": 2 +} \ No newline at end of file