mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Change basic stats to use opentracing rather than Prometheus API (#671)
* Change basic stats to use opentracing rather than Prometheus API directly * Just ran gofmt * Extract opentracing access for metrics to common/metrics.go * Replace quotes strings with constants where possible
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -174,11 +173,11 @@ func transformTimeout(e error, isRetriable bool) error {
|
||||
|
||||
// handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start)
|
||||
// cases. Only timeouts can be a simple dequeue while other cases are actual errors.
|
||||
func (a *agent) handleStatsDequeue(err error, callI Call) {
|
||||
func (a *agent) handleStatsDequeue(ctx context.Context, err error, callI Call) {
|
||||
if err == context.DeadlineExceeded {
|
||||
a.stats.Dequeue(callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.Dequeue(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
} else {
|
||||
a.stats.DequeueAndFail(callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.DequeueAndFail(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,9 +191,6 @@ func (a *agent) Submit(callI Call) error {
|
||||
default:
|
||||
}
|
||||
|
||||
// increment queued count
|
||||
a.stats.Enqueue(callI.Model().AppName, callI.Model().Path)
|
||||
|
||||
call := callI.(*call)
|
||||
ctx := call.req.Context()
|
||||
|
||||
@@ -219,9 +215,14 @@ func (a *agent) Submit(callI Call) error {
|
||||
call.req = call.req.WithContext(ctxSlotWait)
|
||||
defer cancelSlotWait()
|
||||
|
||||
// increment queued count
|
||||
// this is done after setting "fn_appname" and "fn_path"
|
||||
a.stats.Enqueue(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
|
||||
slot, err := a.getSlot(ctxSlotWait, call) // find ram available / running
|
||||
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(err, call)
|
||||
a.handleStatsDequeue(ctx, err, call)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
// TODO if the call times out & container is created, we need
|
||||
@@ -230,7 +231,7 @@ func (a *agent) Submit(callI Call) error {
|
||||
|
||||
err = call.Start(ctxSlotWait)
|
||||
if err != nil {
|
||||
a.handleStatsDequeue(err, call)
|
||||
a.handleStatsDequeue(ctx, err, call)
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
@@ -241,7 +242,7 @@ func (a *agent) Submit(callI Call) error {
|
||||
defer cancelExec()
|
||||
|
||||
// decrement queued count, increment running count
|
||||
a.stats.DequeueAndStart(callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.DequeueAndStart(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
|
||||
err = slot.exec(ctxExec, call)
|
||||
// pass this error (nil or otherwise) to end directly, to store status, etc
|
||||
@@ -249,10 +250,10 @@ func (a *agent) Submit(callI Call) error {
|
||||
|
||||
if err == nil {
|
||||
// decrement running count, increment completed count
|
||||
a.stats.Complete(callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.Complete(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
} else {
|
||||
// decrement running count, increment failed count
|
||||
a.stats.Failed(callI.Model().AppName, callI.Model().Path)
|
||||
a.stats.Failed(ctx, callI.Model().AppName, callI.Model().Path)
|
||||
}
|
||||
|
||||
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
||||
@@ -726,16 +727,19 @@ func (c *container) Timeout() time.Duration { return c.timeout }
|
||||
func (c *container) EnvVars() map[string]string { return c.env }
|
||||
func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB
|
||||
|
||||
// Log the specified stats to a tracing span.
|
||||
// Spans are not processed by the collector until the span ends, so to prevent any delay
|
||||
// in processing the stats when the function is long-lived we create a new span for every call
|
||||
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
|
||||
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_stats")
|
||||
defer span.Finish()
|
||||
|
||||
// Convert each metric value from uint64 to float64
|
||||
// and, for backward compatibility reasons, prepend each metric name with "docker_stats_fn_"
|
||||
// (if we don't care about compatibility then we can remove that)
|
||||
var metrics = make(map[string]float64)
|
||||
for key, value := range stat.Metrics {
|
||||
span.LogFields(log.Uint64("fn_"+key, value))
|
||||
metrics["docker_stats_fn_"+key] = float64(value)
|
||||
}
|
||||
|
||||
common.PublishHistograms(ctx, metrics)
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.stats != nil {
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// TODO this should expose:
|
||||
@@ -30,8 +30,9 @@ type functionStats struct {
|
||||
failed uint64
|
||||
}
|
||||
|
||||
// Stats hold the statistics for all functions combined
|
||||
// and the statistics for each individual function
|
||||
type Stats struct {
|
||||
// statistics for all functions combined
|
||||
Queue uint64
|
||||
Running uint64
|
||||
Complete uint64
|
||||
@@ -40,7 +41,7 @@ type Stats struct {
|
||||
FunctionStatsMap map[string]*FunctionStats
|
||||
}
|
||||
|
||||
// statistics for an individual function
|
||||
// FunctionStats holds the statistics for an individual function
|
||||
type FunctionStats struct {
|
||||
Queue uint64
|
||||
Running uint64
|
||||
@@ -48,52 +49,6 @@ type FunctionStats struct {
|
||||
Failed uint64
|
||||
}
|
||||
|
||||
var (
|
||||
fnCalls = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "fn_api_calls",
|
||||
Help: "Function calls by app and path",
|
||||
},
|
||||
[](string){"app", "path"},
|
||||
)
|
||||
fnQueued = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "fn_api_queued",
|
||||
Help: "Queued requests by app and path",
|
||||
},
|
||||
[](string){"app", "path"},
|
||||
)
|
||||
fnRunning = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "fn_api_running",
|
||||
Help: "Running requests by app and path",
|
||||
},
|
||||
[](string){"app", "path"},
|
||||
)
|
||||
fnCompleted = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "fn_api_completed",
|
||||
Help: "Completed requests by app and path",
|
||||
},
|
||||
[](string){"app", "path"},
|
||||
)
|
||||
fnFailed = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "fn_api_failed",
|
||||
Help: "Failed requests by path",
|
||||
},
|
||||
[](string){"app", "path"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(fnCalls)
|
||||
prometheus.MustRegister(fnQueued)
|
||||
prometheus.MustRegister(fnRunning)
|
||||
prometheus.MustRegister(fnFailed)
|
||||
prometheus.MustRegister(fnCompleted)
|
||||
}
|
||||
|
||||
func (s *stats) getStatsForFunction(path string) *functionStats {
|
||||
if s.functionStatsMap == nil {
|
||||
s.functionStatsMap = make(map[string]*functionStats)
|
||||
@@ -107,80 +62,81 @@ func (s *stats) getStatsForFunction(path string) *functionStats {
|
||||
return thisFunctionStats
|
||||
}
|
||||
|
||||
func (s *stats) Enqueue(app string, path string) {
|
||||
func (s *stats) Enqueue(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.queue++
|
||||
s.getStatsForFunction(path).queue++
|
||||
fnQueued.WithLabelValues(app, path).Inc()
|
||||
fnCalls.WithLabelValues(app, path).Inc()
|
||||
common.IncrementGauge(ctx, queuedMetricName)
|
||||
|
||||
common.IncrementCounter(ctx, callsMetricName)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Call when a function has been queued but cannot be started because of an error
|
||||
func (s *stats) Dequeue(app string, path string) {
|
||||
func (s *stats) Dequeue(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.queue--
|
||||
s.getStatsForFunction(path).queue--
|
||||
fnQueued.WithLabelValues(app, path).Dec()
|
||||
common.DecrementGauge(ctx, queuedMetricName)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *stats) DequeueAndStart(app string, path string) {
|
||||
func (s *stats) DequeueAndStart(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.queue--
|
||||
s.getStatsForFunction(path).queue--
|
||||
fnQueued.WithLabelValues(app, path).Dec()
|
||||
common.DecrementGauge(ctx, queuedMetricName)
|
||||
|
||||
s.running++
|
||||
s.getStatsForFunction(path).running++
|
||||
fnRunning.WithLabelValues(app, path).Inc()
|
||||
common.IncrementGauge(ctx, runningSuffix)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *stats) Complete(app string, path string) {
|
||||
func (s *stats) Complete(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.running--
|
||||
s.getStatsForFunction(path).running--
|
||||
fnRunning.WithLabelValues(app, path).Dec()
|
||||
common.DecrementGauge(ctx, runningSuffix)
|
||||
|
||||
s.complete++
|
||||
s.getStatsForFunction(path).complete++
|
||||
fnCompleted.WithLabelValues(app, path).Inc()
|
||||
common.IncrementCounter(ctx, completedMetricName)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *stats) Failed(app string, path string) {
|
||||
func (s *stats) Failed(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.running--
|
||||
s.getStatsForFunction(path).running--
|
||||
fnRunning.WithLabelValues(app, path).Dec()
|
||||
common.DecrementGauge(ctx, runningSuffix)
|
||||
|
||||
s.failed++
|
||||
s.getStatsForFunction(path).failed++
|
||||
fnFailed.WithLabelValues(app, path).Inc()
|
||||
common.IncrementCounter(ctx, failedMetricName)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *stats) DequeueAndFail(app string, path string) {
|
||||
func (s *stats) DequeueAndFail(ctx context.Context, app string, path string) {
|
||||
s.mu.Lock()
|
||||
|
||||
s.queue--
|
||||
s.getStatsForFunction(path).queue--
|
||||
fnQueued.WithLabelValues(app, path).Dec()
|
||||
common.DecrementGauge(ctx, queuedMetricName)
|
||||
|
||||
s.failed++
|
||||
s.getStatsForFunction(path).failed++
|
||||
fnFailed.WithLabelValues(app, path).Inc()
|
||||
common.IncrementCounter(ctx, failedMetricName)
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
@@ -200,3 +156,11 @@ func (s *stats) Stats() Stats {
|
||||
s.mu.Unlock()
|
||||
return stats
|
||||
}
|
||||
|
||||
const (
|
||||
queuedMetricName = "queued"
|
||||
callsMetricName = "calls"
|
||||
runningSuffix = "running"
|
||||
completedMetricName = "completed"
|
||||
failedMetricName = "failed"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user