From bd5150f1ac550843f5409e1d9305b2d581baac7f Mon Sep 17 00:00:00 2001 From: Peter Jausovec Date: Tue, 12 Jun 2018 09:24:21 -0700 Subject: [PATCH] Extract register view functionality (#1056) * WIP * Create separate Register*Views functions that are called from main. --- api/agent/agent.go | 19 +-- api/agent/drivers/docker/docker_client.go | 74 ++++++---- api/agent/state_trackers.go | 28 ---- api/agent/stats.go | 166 ++++++++++++++++------ api/logs/s3/s3.go | 86 +++++------ cmd/fnserver/main.go | 19 +++ 6 files changed, 229 insertions(+), 163 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 6b9a0957e..685721257 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -20,7 +20,6 @@ import ( "github.com/go-openapi/strfmt" "github.com/sirupsen/logrus" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opencensus.io/trace" ) @@ -1112,7 +1111,7 @@ func (c *container) TmpFsSize() uint64 { return c.tmpFsSize } // WriteStat publishes each metric in the specified Stats structure as a histogram metric func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { for key, value := range stat.Metrics { - if m, ok := measures[key]; ok { + if m, ok := dockerMeasures[key]; ok { stats.Record(ctx, m.M(int64(value))) } } @@ -1124,22 +1123,6 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { c.swapMu.Unlock() } -var measures map[string]*stats.Int64Measure - -func init() { - // TODO this is nasty figure out how to use opencensus to not have to declare these - keys := []string{"net_rx", "net_tx", "mem_limit", "mem_usage", "disk_read", "disk_write", "cpu_user", "cpu_total", "cpu_kernel"} - - measures = make(map[string]*stats.Int64Measure) - for _, key := range keys { - units := "bytes" - if strings.Contains(key, "cpu") { - units = "cpu" - } - measures[key] = makeMeasure("docker_stats_"+key, "docker container stats for "+key, units, view.Distribution()) - } -} - //func (c *container) DockerAuth() (docker.AuthConfiguration, error) { // Implementing the docker.AuthConfiguration interface. // TODO per call could implement this stored somewhere (vs. configured on host) diff --git a/api/agent/drivers/docker/docker_client.go b/api/agent/drivers/docker/docker_client.go index 85b6b5a23..e889554be 100644 --- a/api/agent/drivers/docker/docker_client.go +++ b/api/agent/drivers/docker/docker_client.go @@ -10,12 +10,13 @@ import ( "strings" "time" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "github.com/fnproject/fn/api/common" "github.com/fsouza/go-dockerclient" "github.com/sirupsen/logrus" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opencensus.io/trace" ) @@ -102,10 +103,10 @@ type dockerWrap struct { } func init() { - dockerRetriesMeasure = makeMeasure("docker_api_retries", "docker api retries", "", view.Sum()) - dockerTimeoutMeasure = makeMeasure("docker_api_timeout", "docker api timeouts", "", view.Count()) - dockerErrorMeasure = makeMeasure("docker_api_error", "docker api errors", "", view.Count()) - dockerOOMMeasure = makeMeasure("docker_oom", "docker oom", "", view.Count()) + dockerRetriesMeasure = makeMeasure("docker_api_retries", "docker api retries", "") + dockerTimeoutMeasure = makeMeasure("docker_api_timeout", "docker api timeouts", "") + dockerErrorMeasure = makeMeasure("docker_api_error", "docker api errors", "") + dockerOOMMeasure = makeMeasure("docker_oom", "docker oom", "") } var ( @@ -116,6 +117,41 @@ var ( dockerOOMMeasure *stats.Int64Measure ) +// RegisterViews creates and registers views with provided tag keys +func RegisterViews(tagKeys []string) { + err := view.Register( + createView(dockerRetriesMeasure, view.Sum(), tagKeys), + createView(dockerTimeoutMeasure, view.Count(), tagKeys), + createView(dockerErrorMeasure, view.Count(), tagKeys), + createView(dockerOOMMeasure, view.Count(), tagKeys), + ) + if err != nil { + logrus.WithError(err).Fatal("cannot register view") + } +} + +func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View { + return &view.View{ + Name: measure.Name(), + Description: measure.Description(), + Measure: measure, + TagKeys: makeKeys(tagKeys), + Aggregation: agg, + } +} + +func makeKeys(names []string) []tag.Key { + tagKeys := make([]tag.Key, len(names)) + for i, name := range names { + key, err := tag.NewKey(name) + if err != nil { + logrus.Fatal(err) + } + tagKeys[i] = key + } + return tagKeys +} + func (d *dockerWrap) retry(ctx context.Context, logger logrus.FieldLogger, f func() error) error { var i int var err error @@ -357,28 +393,6 @@ func (d *dockerWrap) Stats(opts docker.StatsOptions) (err error) { //return err } -func makeMeasure(name string, desc string, unit string, agg *view.Aggregation) *stats.Int64Measure { - appKey, err := tag.NewKey("fn_appname") - if err != nil { - logrus.Fatal(err) - } - pathKey, err := tag.NewKey("fn_path") - if err != nil { - logrus.Fatal(err) - } - - measure := stats.Int64(name, desc, unit) - err = view.Register( - &view.View{ - Name: name, - Description: desc, - TagKeys: []tag.Key{appKey, pathKey}, - Measure: measure, - Aggregation: agg, - }, - ) - if err != nil { - logrus.WithError(err).Fatal("cannot create view") - } - return measure +func makeMeasure(name string, desc string, unit string) *stats.Int64Measure { + return stats.Int64(name, desc, unit) } diff --git a/api/agent/state_trackers.go b/api/agent/state_trackers.go index e153050a7..da28fd5aa 100644 --- a/api/agent/state_trackers.go +++ b/api/agent/state_trackers.go @@ -6,7 +6,6 @@ import ( "time" "go.opencensus.io/stats" - "go.opencensus.io/stats/view" ) type RequestStateType int @@ -152,30 +151,3 @@ func (c *containerState) UpdateState(ctx context.Context, newState ContainerStat stats.Record(ctx, containerGaugeMeasures[newState].M(1)) } } - -var ( - containerGaugeMeasures []*stats.Int64Measure - containerTimeMeasures []*stats.Int64Measure -) - -func init() { - // TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated - // or we have to white list them in the view for them to show up? test... - - containerGaugeMeasures = make([]*stats.Int64Measure, len(containerGaugeKeys)) - for i, key := range containerGaugeKeys { - if key == "" { // leave nil intentionally, let it panic - continue - } - containerGaugeMeasures[i] = makeMeasure(key, "containers in state "+key, "", view.Count()) - } - - containerTimeMeasures = make([]*stats.Int64Measure, len(containerTimeKeys)) - - for i, key := range containerTimeKeys { - if key == "" { - continue - } - containerTimeMeasures[i] = makeMeasure(key, "time spent in container state "+key, "ms", view.Distribution()) - } -} diff --git a/api/agent/stats.go b/api/agent/stats.go index 6c2cfc6bd..a4006fd1f 100644 --- a/api/agent/stats.go +++ b/api/agent/stats.go @@ -2,6 +2,7 @@ package agent import ( "context" + "strings" "github.com/sirupsen/logrus" "go.opencensus.io/stats" @@ -68,49 +69,134 @@ const ( ) var ( - queuedMeasure *stats.Int64Measure - callsMeasure *stats.Int64Measure // TODO this is a dupe of sum {complete,failed} ? - runningMeasure *stats.Int64Measure - completedMeasure *stats.Int64Measure - failedMeasure *stats.Int64Measure - timedoutMeasure *stats.Int64Measure - errorsMeasure *stats.Int64Measure - serverBusyMeasure *stats.Int64Measure + queuedMeasure = makeMeasure(queuedMetricName, "calls currently queued against agent", "") + // TODO this is a dupe of sum {complete,failed} ? + callsMeasure = makeMeasure(callsMetricName, "calls created in agent", "") + runningMeasure = makeMeasure(runningMetricName, "calls currently running in agent", "") + completedMeasure = makeMeasure(completedMetricName, "calls completed in agent", "") + failedMeasure = makeMeasure(failedMetricName, "calls failed in agent", "") + timedoutMeasure = makeMeasure(timedoutMetricName, "calls timed out in agent", "") + errorsMeasure = makeMeasure(errorsMetricName, "calls errored in agent", "") + serverBusyMeasure = makeMeasure(serverBusyMetricName, "calls where server was too busy in agent", "") + dockerMeasures = initDockerMeasures() + containerGaugeMeasures = initContainerGaugeMeasures() + containerTimeMeasures = initContainerTimeMeasures() ) -func init() { - queuedMeasure = makeMeasure(queuedMetricName, "calls currently queued against agent", "", view.Sum()) - callsMeasure = makeMeasure(callsMetricName, "calls created in agent", "", view.Sum()) - runningMeasure = makeMeasure(runningMetricName, "calls currently running in agent", "", view.Sum()) - completedMeasure = makeMeasure(completedMetricName, "calls completed in agent", "", view.Sum()) - failedMeasure = makeMeasure(failedMetricName, "calls failed in agent", "", view.Sum()) - timedoutMeasure = makeMeasure(timedoutMetricName, "calls timed out in agent", "", view.Sum()) - errorsMeasure = makeMeasure(errorsMetricName, "calls errored in agent", "", view.Sum()) - serverBusyMeasure = makeMeasure(serverBusyMetricName, "calls where server was too busy in agent", "", view.Sum()) -} - -func makeMeasure(name string, desc string, unit string, agg *view.Aggregation) *stats.Int64Measure { - appKey, err := tag.NewKey("fn_appname") - if err != nil { - logrus.Fatal(err) - } - pathKey, err := tag.NewKey("fn_path") - if err != nil { - logrus.Fatal(err) - } - - measure := stats.Int64(name, desc, unit) - err = view.Register( - &view.View{ - Name: name, - Description: desc, - TagKeys: []tag.Key{appKey, pathKey}, - Measure: measure, - Aggregation: agg, - }, +// RegisterAgentViews creates and registers all agent views +func RegisterAgentViews(tagKeys []string) { + err := view.Register( + createView(queuedMeasure, view.Sum(), tagKeys), + createView(callsMeasure, view.Sum(), tagKeys), + createView(runningMeasure, view.Sum(), tagKeys), + createView(completedMeasure, view.Sum(), tagKeys), + createView(failedMeasure, view.Sum(), tagKeys), + createView(timedoutMeasure, view.Sum(), tagKeys), + createView(errorsMeasure, view.Sum(), tagKeys), + createView(serverBusyMeasure, view.Sum(), tagKeys), ) if err != nil { - logrus.WithError(err).Fatal("cannot create view") + logrus.WithError(err).Fatal("cannot register view") } - return measure +} + +// RegisterDockerViews creates a and registers Docker views with provided tag keys +func RegisterDockerViews(tagKeys []string) { + for _, m := range dockerMeasures { + v := createView(m, view.Distribution(), tagKeys) + if err := view.Register(v); err != nil { + logrus.WithError(err).Fatal("cannot register view") + } + } +} + +// RegisterContainerViews creates and register containers views with provided tag keys +func RegisterContainerViews(tagKeys []string) { + // Create views for container measures + for i, key := range containerGaugeKeys { + if key == "" { + continue + } + v := createView(containerGaugeMeasures[i], view.Count(), tagKeys) + if err := view.Register(v); err != nil { + logrus.WithError(err).Fatal("cannot register view") + } + } + + for i, key := range containerTimeKeys { + if key == "" { + continue + } + v := createView(containerTimeMeasures[i], view.Distribution(), tagKeys) + if err := view.Register(v); err != nil { + logrus.WithError(err).Fatal("cannot register view") + } + } +} + +// initDockerMeasures initializes Docker related measures +func initDockerMeasures() map[string]*stats.Int64Measure { + // TODO this is nasty figure out how to use opencensus to not have to declare these + keys := []string{"net_rx", "net_tx", "mem_limit", "mem_usage", "disk_read", "disk_write", "cpu_user", "cpu_total", "cpu_kernel"} + measures := make(map[string]*stats.Int64Measure, len(keys)) + for _, key := range keys { + units := "bytes" + if strings.Contains(key, "cpu") { + units = "cpu" + } + measures[key] = makeMeasure("docker_stats_"+key, "docker container stats for "+key, units) + } + return measures +} + +func initContainerGaugeMeasures() []*stats.Int64Measure { + gaugeMeasures := make([]*stats.Int64Measure, len(containerGaugeKeys)) + for i, key := range containerGaugeKeys { + if key == "" { // leave nil intentionally, let it panic + continue + } + gaugeMeasures[i] = makeMeasure(key, "containers in state "+key, "") + } + return gaugeMeasures +} + +func initContainerTimeMeasures() []*stats.Int64Measure { + // TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated + // or we have to white list them in the view for them to show up? test... + + timeMeasures := make([]*stats.Int64Measure, len(containerTimeKeys)) + for i, key := range containerTimeKeys { + if key == "" { + continue + } + timeMeasures[i] = makeMeasure(key, "time spent in container state "+key, "ms") + } + + return timeMeasures +} + +func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View { + return &view.View{ + Name: measure.Name(), + Description: measure.Description(), + Measure: measure, + TagKeys: makeKeys(tagKeys), + Aggregation: agg, + } +} + +func makeMeasure(name string, desc string, unit string) *stats.Int64Measure { + return stats.Int64(name, desc, unit) +} + +func makeKeys(names []string) []tag.Key { + tagKeys := make([]tag.Key, len(names)) + for i, name := range names { + key, err := tag.NewKey(name) + if err != nil { + logrus.Fatal(err) + } + tagKeys[i] = key + } + return tagKeys } diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index fe39f8f11..961f33a3d 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -421,59 +421,51 @@ func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mod return calls, nil } -var ( - uploadSizeMeasure *stats.Int64Measure - downloadSizeMeasure *stats.Int64Measure +func (s *store) Close() error { + return nil +} + +const ( + uploadSizeMetricName = "s3_log_upload_size" + downloadSizeMetricName = "s3_log_download_size" ) -func init() { - // TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated - // or we have to white list them in the view for them to show up? test... - var err error - appKey, err := tag.NewKey("fn_appname") - if err != nil { - logrus.Fatal(err) - } - pathKey, err := tag.NewKey("fn_path") - if err != nil { - logrus.Fatal(err) - } +var ( + uploadSizeMeasure = stats.Int64(uploadSizeMetricName, "uploaded log size", "byte") + downloadSizeMeasure = stats.Int64(downloadSizeMetricName, "downloaded log size", "byte") +) - { - uploadSizeMeasure = stats.Int64("s3_log_upload_size", "uploaded log size", "byte") - err = view.Register( - &view.View{ - Name: "s3_log_upload_size", - Description: "uploaded log size", - TagKeys: []tag.Key{appKey, pathKey}, - Measure: uploadSizeMeasure, - Aggregation: view.Distribution(), - }, - ) - if err != nil { - logrus.WithError(err).Fatal("cannot create view") - } - } - - { - downloadSizeMeasure = stats.Int64("s3_log_download_size", "downloaded log size", "byte") - err = view.Register( - &view.View{ - Name: "s3_log_download_size", - Description: "downloaded log size", - TagKeys: []tag.Key{appKey, pathKey}, - Measure: uploadSizeMeasure, - Aggregation: view.Distribution(), - }, - ) - if err != nil { - logrus.WithError(err).Fatal("cannot create view") - } +// RegisterViews registers views for s3 measures +func RegisterViews(tagKeys []string) { + err := view.Register( + createView(uploadSizeMeasure, view.Distribution(), tagKeys), + createView(downloadSizeMeasure, view.Distribution(), tagKeys), + ) + if err != nil { + logrus.WithError(err).Fatal("cannot create view") } } -func (s *store) Close() error { - return nil +func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View { + return &view.View{ + Name: measure.Name(), + Description: measure.Description(), + Measure: measure, + TagKeys: makeKeys(tagKeys), + Aggregation: agg, + } +} + +func makeKeys(names []string) []tag.Key { + tagKeys := make([]tag.Key, len(names)) + for i, name := range names { + key, err := tag.NewKey(name) + if err != nil { + logrus.Fatal(err) + } + tagKeys[i] = key + } + return tagKeys } func init() { diff --git a/cmd/fnserver/main.go b/cmd/fnserver/main.go index 46a3ba467..1dbe8a9eb 100644 --- a/cmd/fnserver/main.go +++ b/cmd/fnserver/main.go @@ -3,6 +3,9 @@ package main import ( "context" + "github.com/fnproject/fn/api/agent" + "github.com/fnproject/fn/api/agent/drivers/docker" + "github.com/fnproject/fn/api/logs/s3" "github.com/fnproject/fn/api/server" // EXTENSIONS: Add extension imports here or use `fn build-server`. Learn more: https://github.com/fnproject/fn/blob/master/docs/operating/extending.md @@ -12,5 +15,21 @@ import ( func main() { ctx := context.Background() funcServer := server.NewFromEnv(ctx) + + registerViews() funcServer.Start(ctx) } + +func registerViews() { + // Register views in agent package + keys := []string{"fn_appname", "fn_path"} + agent.RegisterAgentViews(keys) + agent.RegisterDockerViews(keys) + agent.RegisterContainerViews(keys) + + // Register docker client views + docker.RegisterViews(keys) + + // Register s3 log views + s3.RegisterViews(keys) +}