From c9198b852577a3bca25fa0953e2ebe4dbec9f87e Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Mon, 27 Nov 2017 08:52:53 -0600 Subject: [PATCH] add per call stats field as histogram (#528) * add per call stats field as histogram this will add a histogram of up to 240 data points of call data, produced every second, stored at the end of a call invocation in the db. the same metrics are also still shipped to prometheus (prometheus has the not-potentially-reduced version). for the API reference, see the updates to the swagger spec, this is just added onto the get call endpoint. this does not add any extra db calls and the field for stats in call is a json blob, which is easily modified to add / omit future fields. this is just tacked on to the call we're making to InsertCall, and expect this to add very little overhead; we are bounding the set to be relatively small, planning to clean out the db of calls periodically, functions will generally be short, and the same code used at a previous firm did not cause a notable db size increase with production workload that is worse, wrt histogram size (I checked). the code changes are really small aside from changing to strfmt.DateTime, adding a migration and implementing sql.Valuer; needed to slightly modify the swap function so that we can safely read `call.Stats` field to upload at end. with the full histogram in hand, we can compute max/min/average/median/growth rate/bernoulli distributions/whatever very easily in a UI or tooling. in particular, this data is easily chartable [for a UI], which is beneficial. * adds swagger spec of api update to calls endpoint * adds migration for call.stats field * adds call.stats field to sql queries * change swapping of hot logger to exec, so we know that call.Stats is no longer being modified after `exec` [in call.End] * throws out docker stats between function invocations in hot functions (no call to store them on, we could change this later for debug; they're in prom) * tested in tests and API closes #19 * add format of ints to swag --- api/agent/agent.go | 35 +++++++--- api/agent/call.go | 4 ++ api/agent/drivers/docker/docker.go | 6 +- api/agent/drivers/driver.go | 64 ++++++++++++++++--- api/agent/drivers/driver_test.go | 15 +++-- .../sql/migrations/2_add_call_stats.down.sql | 1 + .../sql/migrations/2_add_call_stats.up.sql | 1 + api/datastore/sql/migrations/index.go | 2 +- api/datastore/sql/migrations/migrations.go | 50 ++++++++++++++- api/datastore/sql/sql.go | 9 ++- api/models/call.go | 4 ++ docs/swagger.yml | 43 +++++++++++++ 12 files changed, 202 insertions(+), 32 deletions(-) create mode 100644 api/datastore/sql/migrations/2_add_call_stats.down.sql create mode 100644 api/datastore/sql/migrations/2_add_call_stats.up.sql diff --git a/api/agent/agent.go b/api/agent/agent.go index 04699baff..4e22395a0 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -428,8 +428,9 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { // link the container id and id in the logs [for us!] common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") - // swap in the new stderr logger - s.container.swap(call.stderr) + // swap in the new stderr logger & stat accumulator + oldStderr := s.container.swap(call.stderr, &call.Stats) + defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races errApp := make(chan error, 1) go func() { @@ -442,8 +443,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { select { case err := <-s.errC: // error from container return err - case err := <-errApp: - // would be great to be able to decipher what error is returning from here so we can show better messages + case err := <-errApp: // from dispatch return err case <-ctx.Done(): // call timeout return ctx.Err() @@ -488,6 +488,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok stdin: call.req.Body, stdout: call.w, stderr: call.stderr, + stats: &call.Stats, } // pull & create container before we return a slot, so as to be friendly @@ -605,7 +606,6 @@ func (a *agent) runHot(ctxArg context.Context, slots chan<- slot, call *call, to // wait for this call to finish // NOTE do NOT select with shutdown / other channels. slot handles this. <-done - container.swap(stderr) // log between tasks } }() @@ -634,14 +634,25 @@ type container struct { stdin io.Reader stdout io.Writer stderr io.Writer + + // lock protects the swap and any fields that need to be swapped + sync.Mutex + stats *drivers.Stats } -func (c *container) swap(stderr io.Writer) { +func (c *container) swap(stderr io.Writer, cs *drivers.Stats) (old io.Writer) { + c.Lock() + defer c.Unlock() + // TODO meh, maybe shouldn't bury this + old = c.stderr gw, ok := c.stderr.(*ghostWriter) if ok { - gw.swap(stderr) + old = gw.swap(stderr) } + + c.stats = cs + return old } func (c *container) Id() string { return c.id } @@ -665,6 +676,12 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { for key, value := range stat.Metrics { span.LogFields(log.Uint64("fn_"+key, value)) } + + c.Lock() + defer c.Unlock() + if c.stats != nil { + *(c.stats) = append(*(c.stats), stat) + } } //func (c *container) DockerAuth() (docker.AuthConfiguration, error) { @@ -679,10 +696,12 @@ type ghostWriter struct { inner io.Writer } -func (g *ghostWriter) swap(w io.Writer) { +func (g *ghostWriter) swap(w io.Writer) (old io.Writer) { g.Lock() + old = g.inner g.inner = w g.Unlock() + return old } func (g *ghostWriter) Write(b []byte) (int, error) { diff --git a/api/agent/call.go b/api/agent/call.go index a08ed6cf4..34474bb08 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" @@ -347,6 +348,9 @@ func (c *call) End(ctx context.Context, errIn error, t callTrigger) error { // XXX (reed): delete MQ message, eventually } + // ensure stats histogram is reasonably bounded + c.Call.Stats = drivers.Decimate(240, c.Call.Stats) + // this means that we could potentially store an error / timeout status for a // call that ran successfully [by a user's perspective] // TODO: this should be update, really diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 0c3a00f84..e660ffe49 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -16,6 +16,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/fsouza/go-dockerclient" + "github.com/go-openapi/strfmt" "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -366,10 +367,9 @@ func (drv *DockerDriver) collectStats(ctx context.Context, stopSignal <-chan str return } stats := cherryPick(ds) - if !stats.Timestamp.IsZero() { + if !time.Time(stats.Timestamp).IsZero() { task.WriteStat(ctx, stats) } - } } } @@ -405,7 +405,7 @@ func cherryPick(ds *docker.Stats) drivers.Stat { } return drivers.Stat{ - Timestamp: ds.Read, + Timestamp: strfmt.DateTime(ds.Read), Metrics: map[string]uint64{ // source: https://godoc.org/github.com/fsouza/go-dockerclient#Stats // ex (for future expansion): {"read":"2016-08-03T18:08:05Z","pids_stats":{},"network":{},"networks":{"eth0":{"rx_bytes":508,"tx_packets":6,"rx_packets":6,"tx_bytes":508}},"memory_stats":{"stats":{"cache":16384,"pgpgout":281,"rss":8826880,"pgpgin":2440,"total_rss":8826880,"hierarchical_memory_limit":536870912,"total_pgfault":3809,"active_anon":8843264,"total_active_anon":8843264,"total_pgpgout":281,"total_cache":16384,"pgfault":3809,"total_pgpgin":2440},"max_usage":8953856,"usage":8953856,"limit":536870912},"blkio_stats":{"io_service_bytes_recursive":[{"major":202,"op":"Read"},{"major":202,"op":"Write"},{"major":202,"op":"Sync"},{"major":202,"op":"Async"},{"major":202,"op":"Total"}],"io_serviced_recursive":[{"major":202,"op":"Read"},{"major":202,"op":"Write"},{"major":202,"op":"Sync"},{"major":202,"op":"Async"},{"major":202,"op":"Total"}]},"cpu_stats":{"cpu_usage":{"percpu_usage":[47641874],"usage_in_usermode":30000000,"total_usage":47641874},"system_cpu_usage":8880800500000000,"throttling_data":{}},"precpu_stats":{"cpu_usage":{"percpu_usage":[44946186],"usage_in_usermode":30000000,"total_usage":44946186},"system_cpu_usage":8880799510000000,"throttling_data":{}}} diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index a2c337b7e..f711eaa23 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -3,10 +3,16 @@ package drivers import ( + "bytes" "context" + "database/sql/driver" + "encoding/json" + "fmt" "io" "strings" "time" + + "github.com/go-openapi/strfmt" ) // A DriverCookie identifies a unique request to run a task. @@ -109,8 +115,50 @@ type ContainerTask interface { // Stat is a bucket of stats from a driver at a point in time for a certain task. type Stat struct { - Timestamp time.Time - Metrics map[string]uint64 + Timestamp strfmt.DateTime `json:"timestamp"` + Metrics map[string]uint64 `json:"metrics"` +} + +// Stats is a list of Stat, notably implements sql.Valuer +type Stats []Stat + +// implements sql.Valuer, returning a string +func (s Stats) Value() (driver.Value, error) { + if len(s) < 1 { + return driver.Value(string("")), nil + } + var b bytes.Buffer + err := json.NewEncoder(&b).Encode(s) + // return a string type + return driver.Value(b.String()), err +} + +// implements sql.Scanner +func (s *Stats) Scan(value interface{}) error { + if value == nil { + *s = nil + return nil + } + bv, err := driver.String.ConvertValue(value) + if err == nil { + var b []byte + switch x := bv.(type) { + case []byte: + b = x + case string: + b = []byte(x) + } + + if len(b) > 0 { + return json.Unmarshal(b, s) + } + + *s = nil + return nil + } + + // otherwise, return an error + return fmt.Errorf("stats invalid db format: %T %T value, err: %v", value, bv, err) } // TODO: ensure some type is applied to these statuses. @@ -149,15 +197,15 @@ func average(samples []Stat) (Stat, bool) { s := Stat{ Metrics: samples[0].Metrics, // Recycle Metrics map from first sample } - t := samples[0].Timestamp.UnixNano() / int64(l) + t := time.Time(samples[0].Timestamp).UnixNano() / int64(l) for _, sample := range samples[1:] { - t += sample.Timestamp.UnixNano() / int64(l) + t += time.Time(sample.Timestamp).UnixNano() / int64(l) for k, v := range sample.Metrics { s.Metrics[k] += v } } - s.Timestamp = time.Unix(0, t) + s.Timestamp = strfmt.DateTime(time.Unix(0, t)) for k, v := range s.Metrics { s.Metrics[k] = v / uint64(l) } @@ -183,8 +231,8 @@ func Decimate(maxSamples int, stats []Stat) []Stat { return nil } - start := stats[0].Timestamp - window := stats[len(stats)-1].Timestamp.Sub(start) / time.Duration(maxSamples) + start := time.Time(stats[0].Timestamp) + window := time.Time(stats[len(stats)-1].Timestamp).Sub(start) / time.Duration(maxSamples) nextEntry, current := 0, start // nextEntry is the index tracking next Stats record location for x := 0; x < len(stats); { @@ -192,7 +240,7 @@ func Decimate(maxSamples int, stats []Stat) []Stat { var samples []Stat for offset := 0; x+offset < len(stats); offset++ { // Iterate through samples until out of window - if !isLastEntry && stats[x+offset].Timestamp.After(current.Add(window)) { + if !isLastEntry && time.Time(stats[x+offset].Timestamp).After(current.Add(window)) { break } samples = stats[x : x+offset+1] diff --git a/api/agent/drivers/driver_test.go b/api/agent/drivers/driver_test.go index f309f8536..f068ae224 100644 --- a/api/agent/drivers/driver_test.go +++ b/api/agent/drivers/driver_test.go @@ -3,6 +3,8 @@ package drivers import ( "testing" "time" + + "github.com/go-openapi/strfmt" ) func TestAverage(t *testing.T) { @@ -10,7 +12,7 @@ func TestAverage(t *testing.T) { stats := make([]Stat, 10) for i := 0; i < len(stats); i++ { stats[i] = Stat{ - Timestamp: start.Add(time.Duration(i) * time.Minute), + Timestamp: strfmt.DateTime(start.Add(time.Duration(i) * time.Minute)), Metrics: map[string]uint64{"x": uint64(i)}, } } @@ -26,7 +28,7 @@ func TestAverage(t *testing.T) { } expectedT := time.Unix(1470873870, 0) - if res.Timestamp != expectedT { + if time.Time(res.Timestamp) != expectedT { t.Error("Actual average didn't match expected", "actual", res.Timestamp, "expected", expectedT) } } @@ -36,10 +38,9 @@ func TestDecimate(t *testing.T) { stats := make([]Stat, 480) for i := range stats { stats[i] = Stat{ - Timestamp: start.Add(time.Duration(i) * time.Second), + Timestamp: strfmt.DateTime(start.Add(time.Duration(i) * time.Second)), Metrics: map[string]uint64{"x": uint64(i)}, } - // t.Log(stats[i]) } stats = Decimate(240, stats) @@ -54,7 +55,7 @@ func TestDecimate(t *testing.T) { stats = make([]Stat, 700) for i := range stats { stats[i] = Stat{ - Timestamp: start.Add(time.Duration(i) * time.Second), + Timestamp: strfmt.DateTime(start.Add(time.Duration(i) * time.Second)), Metrics: map[string]uint64{"x": uint64(i)}, } } @@ -66,7 +67,7 @@ func TestDecimate(t *testing.T) { stats = make([]Stat, 300) for i := range stats { stats[i] = Stat{ - Timestamp: start.Add(time.Duration(i) * time.Second), + Timestamp: strfmt.DateTime(start.Add(time.Duration(i) * time.Second)), Metrics: map[string]uint64{"x": uint64(i)}, } } @@ -82,7 +83,7 @@ func TestDecimate(t *testing.T) { start = start.Add(20 * time.Minute) } stats[i] = Stat{ - Timestamp: start.Add(time.Duration(i) * time.Second), + Timestamp: strfmt.DateTime(start.Add(time.Duration(i) * time.Second)), Metrics: map[string]uint64{"x": uint64(i)}, } } diff --git a/api/datastore/sql/migrations/2_add_call_stats.down.sql b/api/datastore/sql/migrations/2_add_call_stats.down.sql new file mode 100644 index 000000000..04d25653a --- /dev/null +++ b/api/datastore/sql/migrations/2_add_call_stats.down.sql @@ -0,0 +1 @@ +ALTER TABLE calls DROP COLUMN stats; diff --git a/api/datastore/sql/migrations/2_add_call_stats.up.sql b/api/datastore/sql/migrations/2_add_call_stats.up.sql new file mode 100644 index 000000000..a1aa83e9f --- /dev/null +++ b/api/datastore/sql/migrations/2_add_call_stats.up.sql @@ -0,0 +1 @@ +ALTER TABLE calls ADD stats text; diff --git a/api/datastore/sql/migrations/index.go b/api/datastore/sql/migrations/index.go index 40c7150d1..1b186aea7 100644 --- a/api/datastore/sql/migrations/index.go +++ b/api/datastore/sql/migrations/index.go @@ -1,6 +1,6 @@ package migrations -//go:generate go-bindata -ignore migrations.go -ignore index.go -o migrations.go -pkg migrations . +//go:generate go-bindata -ignore README.md -ignore migrations.go -ignore index.go -o migrations.go -pkg migrations . // migrations are generated from this cwd with go generate. // install https://github.com/jteeuwen/go-bindata for go generate diff --git a/api/datastore/sql/migrations/migrations.go b/api/datastore/sql/migrations/migrations.go index fca5eeec3..98503061f 100644 --- a/api/datastore/sql/migrations/migrations.go +++ b/api/datastore/sql/migrations/migrations.go @@ -2,6 +2,8 @@ // sources: // 1_add_route_created_at.down.sql // 1_add_route_created_at.up.sql +// 2_add_call_stats.down.sql +// 2_add_call_stats.up.sql // DO NOT EDIT! package migrations @@ -84,7 +86,7 @@ func _1_add_route_created_atDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_add_route_created_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1508386173, 0)} + info := bindataFileInfo{name: "1_add_route_created_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1510786558, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -104,7 +106,47 @@ func _1_add_route_created_atUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_add_route_created_at.up.sql", size: 40, mode: os.FileMode(420), modTime: time.Unix(1508360377, 0)} + info := bindataFileInfo{name: "1_add_route_created_at.up.sql", size: 40, mode: os.FileMode(420), modTime: time.Unix(1510786558, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +var __2_add_call_statsDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x48\x4e\xcc\xc9\x29\x56\x70\x09\xf2\x0f\x50\x70\xf6\xf7\x09\xf5\xf5\x53\x28\x2e\x49\x2c\x29\xb6\xe6\x02\x04\x00\x00\xff\xff\xd3\x09\xeb\x22\x25\x00\x00\x00") + +func _2_add_call_statsDownSqlBytes() ([]byte, error) { + return bindataRead( + __2_add_call_statsDownSql, + "2_add_call_stats.down.sql", + ) +} + +func _2_add_call_statsDownSql() (*asset, error) { + bytes, err := _2_add_call_statsDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_add_call_stats.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1511225799, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +var __2_add_call_statsUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x48\x4e\xcc\xc9\x29\x56\x70\x74\x71\x51\x28\x2e\x49\x2c\x29\x56\x28\x49\xad\x28\xb1\xe6\x02\x04\x00\x00\xff\xff\x29\xde\x11\xe8\x22\x00\x00\x00") + +func _2_add_call_statsUpSqlBytes() ([]byte, error) { + return bindataRead( + __2_add_call_statsUpSql, + "2_add_call_stats.up.sql", + ) +} + +func _2_add_call_statsUpSql() (*asset, error) { + bytes, err := _2_add_call_statsUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_add_call_stats.up.sql", size: 34, mode: os.FileMode(420), modTime: time.Unix(1511225651, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -163,6 +205,8 @@ func AssetNames() []string { var _bindata = map[string]func() (*asset, error){ "1_add_route_created_at.down.sql": _1_add_route_created_atDownSql, "1_add_route_created_at.up.sql": _1_add_route_created_atUpSql, + "2_add_call_stats.down.sql": _2_add_call_statsDownSql, + "2_add_call_stats.up.sql": _2_add_call_statsUpSql, } // AssetDir returns the file names below a certain @@ -208,6 +252,8 @@ type bintree struct { var _bintree = &bintree{nil, map[string]*bintree{ "1_add_route_created_at.down.sql": &bintree{_1_add_route_created_atDownSql, map[string]*bintree{}}, "1_add_route_created_at.up.sql": &bintree{_1_add_route_created_atUpSql, map[string]*bintree{}}, + "2_add_call_stats.down.sql": &bintree{_2_add_call_statsDownSql, map[string]*bintree{}}, + "2_add_call_stats.up.sql": &bintree{_2_add_call_statsUpSql, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 177848571..8a5b7ba65 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -65,6 +65,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( id varchar(256) NOT NULL, app_name varchar(256) NOT NULL, path varchar(256) NOT NULL, + stats text, PRIMARY KEY (id) );`, @@ -77,7 +78,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( const ( routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config, created_at FROM routes` - callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` + callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path, stats FROM calls` ) type sqlStore struct { @@ -585,7 +586,8 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { completed_at, status, app_name, - path + path, + stats ) VALUES ( :id, @@ -594,7 +596,8 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { :completed_at, :status, :app_name, - :path + :path, + :stats );`) _, err := ds.db.NamedExecContext(ctx, query, call) diff --git a/api/models/call.go b/api/models/call.go index 9aa078781..c5de8a0dd 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -1,6 +1,7 @@ package models import ( + "github.com/fnproject/fn/api/agent/drivers" "github.com/go-openapi/strfmt" ) @@ -129,6 +130,9 @@ type Call struct { // Time when call started execution. Always in UTC. StartedAt strfmt.DateTime `json:"started_at,omitempty" db:"started_at"` + + // Stats is a list of metrics from this call's execution, possibly empty. + Stats drivers.Stats `json:"stats,omitempty" db:"stats"` } type CallFilter struct { diff --git a/docs/swagger.yml b/docs/swagger.yml index f47acc180..3b4167840 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -669,6 +669,49 @@ definitions: format: date-time description: Time when call completed, whether it was successul or failed. Always in UTC. readOnly: true + stats: + type: array + items: + $ref: '#/definitions/Stat' + description: A histogram of stats for a call, each is a snapshot of a calls state at the timestamp. + readOnly: true + + Stat: + type: object + properties: + timestamp: + type: string + format: date-time + metrics: + type: object + properties: + net_rx: + type: integer + format: int64 + net_tx: + type: integer + format: int64 + mem_limit: + type: integer + format: int64 + mem_usage: + type: integer + format: int64 + disk_read: + type: integer + format: int64 + disk_write: + type: integer + format: int64 + cpu_user: + type: integer + format: int64 + cpu_total: + type: integer + format: int64 + cpu_kernel: + type: integer + format: int64 ErrorBody: type: object