From c3537399f1965bba4946758abe2cd9191eeac097 Mon Sep 17 00:00:00 2001 From: Tom Coupland Date: Wed, 12 Sep 2018 15:45:53 +0100 Subject: [PATCH] The V2 Calls api endpoints have been added beneath fns: (#1203) /fns/{fnID}/calls /fns/{fnID}/calls/{callID} The S3 implementation forces our hand as we if we want to list Calls under a Fn, we have to use the FnID as a prefix on the object names, which mean we need it to look up any Call. It also makes sense in terms of resource hierarchy. These endpoints can optionally be disabled (as other endpoints), if a service provider needs to provide this functionality via other means. The 'calls' test has been fully migrated to fn calls. This has been done to reduce the copy pasta a bit, and on balance is ok as the routes calls will be removed soon. --- api/datastore/mock.go | 1 - api/datastore/sql/migratex/migrate.go | 2 +- .../sql/migrations/18_add_fnid_calls.go | 46 +++++ api/datastore/sql/sql.go | 60 +++++- api/logs/metrics/metrics.go | 18 +- api/logs/mock.go | 58 +++++- api/logs/s3/s3.go | 167 +++++++++++++++-- api/logs/testing/test.go | 82 ++++----- api/models/call.go | 6 + api/models/logs.go | 11 +- api/server/call_get.go | 19 +- api/server/call_list.go | 28 ++- api/server/server.go | 25 ++- docs/swagger_v2.yml | 173 ++++++++++++++++++ 14 files changed, 615 insertions(+), 81 deletions(-) create mode 100644 api/datastore/sql/migrations/18_add_fnid_calls.go diff --git a/api/datastore/mock.go b/api/datastore/mock.go index c399875a7..7d720b669 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -363,7 +363,6 @@ func (m *mock) GetFns(ctx context.Context, filter *models.FnFilter) (*models.FnL (filter.Name == "" || filter.Name == f.Name) { funcs = append(funcs, f) } - } var nextCursor string diff --git a/api/datastore/sql/migratex/migrate.go b/api/datastore/sql/migratex/migrate.go index a43a16259..702b7f841 100644 --- a/api/datastore/sql/migratex/migrate.go +++ b/api/datastore/sql/migratex/migrate.go @@ -27,6 +27,7 @@ func migrateErr(version int64, up bool, err error) ErrMigration { dir := "up" if !up { dir = "down" + version++ } return ErrMigration(fmt.Sprintf("error running migration. version: %v direction: %v err: %v", version, dir, err)) } @@ -185,7 +186,6 @@ func run(ctx context.Context, tx *sqlx.Tx, m Migration, up bool) error { return fmt.Errorf("non-contiguous migration attempted down: %v != %v", m.Version(), curVersion) } - // TODO is this robust enough? we could check version := m.Version() if !up { version = m.Version() - 1 diff --git a/api/datastore/sql/migrations/18_add_fnid_calls.go b/api/datastore/sql/migrations/18_add_fnid_calls.go new file mode 100644 index 000000000..c4b444dfe --- /dev/null +++ b/api/datastore/sql/migrations/18_add_fnid_calls.go @@ -0,0 +1,46 @@ +package migrations + +import ( + "context" + + "github.com/fnproject/fn/api/datastore/sql/migratex" + "github.com/jmoiron/sqlx" +) + +func up18(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE calls ADD fn_id varchar(256);") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE calls MODIFY app_id varchar(256) NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE calls ALTER COLUMN app_id DROP NOT NULL;") + return err + } + + return err +} + +func down18(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE calls DROP COLUMN fn_id;") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE calls MODIFY app_id varchar(256) NOT NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE calls ALTER COLUMN app_id SET NOT NULL;") + return err + } + + return err +} + +func init() { + Migrations = append(Migrations, &migratex.MigFields{ + VersionFunc: vfunc(18), + UpFunc: up18, + DownFunc: down18, + }) +} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index ce69971d1..225c45bdd 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -72,7 +72,8 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( completed_at varchar(256) NOT NULL, status varchar(256) NOT NULL, id varchar(256) NOT NULL, - app_id varchar(256) NOT NULL, + app_id varchar(256), + fn_id varchar(256), path varchar(256) NOT NULL, stats text, error text, @@ -955,6 +956,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { completed_at, status, app_id, + fn_id, path, stats, error @@ -966,6 +968,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { :completed_at, :status, :app_id, + :fn_id, :path, :stats, :error @@ -975,7 +978,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { return err } -func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +func (ds *SQLStore) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector) query = ds.db.Rebind(query) row := ds.db.QueryRowxContext(ctx, query, callID, appID) @@ -991,7 +994,47 @@ func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models. return &call, nil } -func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (ds *SQLStore) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + query := fmt.Sprintf(`%s WHERE id=? AND fn_id=?`, callSelector) + query = ds.db.Rebind(query) + row := ds.db.QueryRowxContext(ctx, query, callID, fnID) + + var call models.Call + err := row.StructScan(&call) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrCallNotFound + } + return nil, err + } + return &call, nil +} + +func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + if filter.Cursor != "" { + cursor, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + filter.Cursor = string(cursor) + } + + calls, err := ds.GetCalls1(ctx, filter) + if err != nil { + return nil, err + } + + callList := &models.CallList{Items: calls} + + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + callList.NextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return callList, nil +} + +func (ds *SQLStore) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { res := []*models.Call{} query, args := buildFilterCallQuery(filter) query = fmt.Sprintf("%s %s", callSelector, query) @@ -1109,8 +1152,15 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) { if !time.Time(filter.FromTime).IsZero() { args = where(&b, args, "created_at>?", filter.FromTime.String()) } - args = where(&b, args, "app_id=?", filter.AppID) - args = where(&b, args, "path=?", filter.Path) + if filter.AppID != "" { + args = where(&b, args, "app_id=?", filter.AppID) + } + if filter.FnID != "" { + args = where(&b, args, "fn_id=?", filter.FnID) + } + if filter.Path != "" { + args = where(&b, args, "path=?", filter.Path) + } fmt.Fprintf(&b, ` ORDER BY id DESC`) // TODO assert this is indexed fmt.Fprintf(&b, ` LIMIT ?`) diff --git a/api/logs/metrics/metrics.go b/api/logs/metrics/metrics.go index fc6b96df8..08bf2dad5 100644 --- a/api/logs/metrics/metrics.go +++ b/api/logs/metrics/metrics.go @@ -22,13 +22,25 @@ func (m *metricls) InsertCall(ctx context.Context, call *models.Call) error { return m.ls.InsertCall(ctx, call) } -func (m *metricls) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { +func (m *metricls) GetCall1(ctx context.Context, appName, callID string) (*models.Call, error) { ctx, span := trace.StartSpan(ctx, "ls_get_call") defer span.End() - return m.ls.GetCall(ctx, appName, callID) + return m.ls.GetCall1(ctx, appName, callID) } -func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (m *metricls) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_call") + defer span.End() + return m.ls.GetCall(ctx, fnID, callID) +} + +func (m *metricls) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_calls") + defer span.End() + return m.ls.GetCalls1(ctx, filter) +} + +func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { ctx, span := trace.StartSpan(ctx, "ls_get_calls") defer span.End() return m.ls.GetCalls(ctx, filter) diff --git a/api/logs/mock.go b/api/logs/mock.go index 04abb55af..df0191e9e 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -3,6 +3,7 @@ package logs import ( "bytes" "context" + "encoding/base64" "io" "io/ioutil" "sort" @@ -50,7 +51,7 @@ func (m *mock) InsertCall(ctx context.Context, call *models.Call) error { return nil } -func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +func (m *mock) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { for _, t := range m.Calls { if t.ID == callID && t.AppID == appID { return t, nil @@ -60,13 +61,24 @@ func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, return nil, models.ErrCallNotFound } +func (m *mock) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + for _, t := range m.Calls { + if t.ID == callID && + t.FnID == fnID { + return t, nil + } + } + + return nil, models.ErrCallNotFound +} + type sortC []*models.Call func (s sortC) Len() int { return len(s) } func (s sortC) Less(i, j int) bool { return strings.Compare(s[i].ID, s[j].ID) < 0 } func (s sortC) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (m *mock) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) // calls are in DESC order so use sort.Reverse sort.Sort(sort.Reverse(sortC(m.Calls))) @@ -90,6 +102,48 @@ func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mode return calls, nil } +func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) + // calls are in DESC order so use sort.Reverse + sort.Sort(sort.Reverse(sortC(m.Calls))) + + var calls []*models.Call + + var cursor = "" + if filter.Cursor != "" { + s, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + cursor = string(s) + } + + for _, c := range m.Calls { + if filter.PerPage > 0 && len(calls) == filter.PerPage { + break + } + + if (cursor == "" || strings.Compare(cursor, c.ID) > 0) && + (filter.FnID == "" || c.FnID == filter.FnID) && + (time.Time(filter.FromTime).IsZero() || time.Time(filter.FromTime).Before(time.Time(c.CreatedAt))) && + (time.Time(filter.ToTime).IsZero() || time.Time(c.CreatedAt).Before(time.Time(filter.ToTime))) { + + calls = append(calls, c) + } + } + + var nextCursor string + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + nextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return &models.CallList{ + NextCursor: nextCursor, + Items: calls, + }, nil +} + func (m *mock) Close() error { return nil } diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index d8202a7cb..eec3f4695 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -195,6 +195,9 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error { } objectName := callKey(call.AppID, call.ID) + if call.FnID != "" { + objectName = callKey2(call.FnID, call.ID) + } params := &s3manager.UploadInput{ Bucket: aws.String(s.bucket), Key: aws.String(objectName), @@ -213,26 +216,28 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error { // see this entry when listing only when specifying a route path. (NOTE: this // behavior will go away if we stop listing by route -> triggers) - objectName = callMarkerKey(call.AppID, call.Path, call.ID) - params = &s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(objectName), - Body: bytes.NewReader([]byte{}), - ContentType: aws.String("text/plain"), - } + if call.FnID == "" { + objectName = callMarkerKey(call.AppID, call.Path, call.ID) + params = &s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(objectName), + Body: bytes.NewReader([]byte{}), + ContentType: aws.String("text/plain"), + } - logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker") - _, err = s.uploader.UploadWithContext(ctx, params) - if err != nil { - // XXX(reed): we could just log this? - return fmt.Errorf("failed to write marker key for log, %v", err) + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker") + _, err = s.uploader.UploadWithContext(ctx, params) + if err != nil { + // XXX(reed): we could just log this? + return fmt.Errorf("failed to write marker key for log, %v", err) + } } return nil } -// GetCall returns a call at a certain id and app name. -func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +// GetCall1 returns a call at a certain id and app name. +func (s *store) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { ctx, span := trace.StartSpan(ctx, "s3_get_call") defer span.End() @@ -242,6 +247,17 @@ func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call return s.getCallByKey(ctx, objectName) } +// GetCall returns a call at a certain id +func (s *store) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_call") + defer span.End() + + objectName := callKey2(fnID, callID) + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call") + + return s.getCallByKey(ctx, objectName) +} + func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) { // stream the logs to an in-memory buffer var target aws.WriteAtBuffer @@ -292,14 +308,23 @@ func callKeyFlipped(app, id string) string { return callKeyPrefix + app + "/" + id } +func callKey2(fnID, id string) string { + id = flipCursor(id) + return callKeyPrefix + fnID + "/" + id +} + func logKey(appID, callID string) string { return logKeyPrefix + appID + "/" + callID } -// GetCalls returns a list of calls that satisfy the given CallFilter. If no +func logKey2(callID string) string { + return logKeyPrefix + callID +} + +// GetCalls1 returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. // NOTE: this relies on call ids being lexicographically sortable and <= 16 byte -func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (s *store) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { ctx, span := trace.StartSpan(ctx, "s3_get_calls") defer span.End() @@ -420,6 +445,116 @@ func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mod return calls, nil } +// GetCalls returns a list of calls that satisfy the given CallFilter. If no +// calls exist, an empty list and a nil error are returned. +// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte +func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_calls") + defer span.End() + + if filter.FnID == "" { + return nil, errors.New("s3 store does not support listing across all fns") + } + + if filter.Cursor != "" { + cursor, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + filter.Cursor = string(cursor) + } + + // NOTE we need marker keys to support (fn is REQUIRED): + // 1) quick iteration per path + // 2) sorted by id across all path + // key: s: {fn} : {id} + // + // also s3 api returns sorted in lexicographic order, we need the reverse of this. + var key string + + // filter.Cursor is a call id, translate to our key format. if a path is + // provided, we list keys from markers instead. + if filter.Cursor != "" { + key = callKey2(filter.FnID, filter.Cursor) + } else if t := time.Time(filter.ToTime); !t.IsZero() { + // get a fake id that has the most significant bits set to the to_time (first 48 bits) + fako := id.NewWithTime(t) + //var buf [id.EncodedSize]byte + //fakoId.MarshalTextTo(buf) + //mid := string(buf[:10]) + mid := fako.String() + key = callKey(filter.FnID, mid) + } + + // prefix prevents leaving bounds of app or path marker keys + prefix := callKey2(filter.FnID, "") + + input := &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + MaxKeys: aws.Int64(int64(filter.PerPage)), + Marker: aws.String(key), + Prefix: aws.String(prefix), + } + + result, err := s.client.ListObjects(input) + if err != nil { + return nil, fmt.Errorf("failed to list logs: %v", err) + } + + calls := make([]*models.Call, 0, len(result.Contents)) + + for _, obj := range result.Contents { + if len(calls) == filter.PerPage { + break + } + + // extract the app and id from the key to lookup the object, this also + // validates we aren't reading strangely keyed objects from the bucket. + var fnID, id string + + fields := strings.Split(*obj.Key, "/") + if len(fields) != 3 { + return nil, fmt.Errorf("invalid key in calls: %v", *obj.Key) + } + fnID = fields[1] + id = fields[2] + + // the id here is already reverse encoded, keep it that way. + objectName := callKeyFlipped(fnID, id) + + // NOTE: s3 doesn't have a way to get multiple objects so just use GetCall + // TODO we should reuse the buffer to decode these + call, err := s.getCallByKey(ctx, objectName) + if err != nil { + common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"fnID": fnID, "id": id}).Error("error filling call object") + continue + } + + // ensure: from_time < created_at < to_time + fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond) + if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) { + // NOTE could break, ids and created_at aren't necessarily in perfect order + continue + } + + toTime := time.Time(filter.ToTime).Truncate(time.Millisecond) + if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) { + continue + } + + calls = append(calls, call) + } + + callList := &models.CallList{Items: calls} + + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + callList.NextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return callList, nil +} + func (s *store) Close() error { return nil } diff --git a/api/logs/testing/test.go b/api/logs/testing/test.go index 9911107bf..b04a9beaa 100644 --- a/api/logs/testing/test.go +++ b/api/logs/testing/test.go @@ -3,6 +3,7 @@ package testing import ( "bytes" "context" + "encoding/base64" "io" "strings" "testing" @@ -18,10 +19,9 @@ var testApp = &models.App{ ID: id.New().String(), } -var testRoute = &models.Route{ - Path: "/test", +var testFn = &models.Fn{ + ID: id.New().String(), Image: "fnproject/fn-test-utils", - Type: "sync", Format: "http", } @@ -32,7 +32,7 @@ func SetupTestCall(t *testing.T, ctx context.Context, ls models.LogStore) *model call.Status = "success" call.StartedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now()) - call.Path = testRoute.Path + call.FnID = testFn.ID return &call } @@ -44,7 +44,7 @@ func Test(t *testing.T, fnl models.LogStore) { // test list first, the rest are point lookup tests t.Run("calls-get", func(t *testing.T) { - filter := &models.CallFilter{AppID: call.AppID, Path: call.Path, PerPage: 100} + filter := &models.CallFilter{FnID: call.FnID, PerPage: 100} now := time.Now() call.CreatedAt = common.DateTime(now) call.ID = id.New().String() @@ -54,10 +54,10 @@ func Test(t *testing.T, fnl models.LogStore) { } calls, err := fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls2(ctx, filter): one call, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls2(ctx, filter): one call, unexpected length 1 != `%v`", len(calls.Items)) } c2 := *call @@ -80,55 +80,47 @@ func Test(t *testing.T, fnl models.LogStore) { } // test that no filter works too - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, PerPage: 100}) + calls, err = fnl.GetCalls(ctx, &models.CallFilter{FnID: testFn.ID, PerPage: 100}) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls2(ctx, filter): three calls, unexpected error `%v`", err) } - if len(calls) != 3 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 3 { + t.Fatalf("Test GetCalls2(ctx, filter): three calls, unexpected length 3 != `%v`", len(calls.Items)) } // test that pagination stuff works. id, descending filter.PerPage = 1 calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): per page 1, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c3.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c3.ID) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): per page 1, unexpected length 1 != `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c3.ID { + t.Fatalf("Test GetCalls: per page 1, call ids not in expected order: %v %v", calls.Items[0].ID, c3.ID) } filter.PerPage = 100 - filter.Cursor = calls[0].ID + filter.Cursor = base64.RawURLEncoding.EncodeToString([]byte(calls.Items[0].ID)) calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): cursor set, unexpected error `%v`", err) } - if len(calls) != 2 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c2.ID) - } else if calls[1].ID != call.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[1].ID, call.ID) + if len(calls.Items) != 2 { + t.Fatalf("Test GetCalls(ctx, filter): cursor set, unexpected length 2 != `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c2.ID { + t.Fatalf("Test GetCalls: cursor set, call ids not in expected order: %v %v", calls.Items[0].ID, c2.ID) + } else if calls.Items[1].ID != call.ID { + t.Fatalf("Test GetCalls: cursor set, call ids not in expected order: %v %v", calls.Items[1].ID, call.ID) } // test that filters actually applied - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: "wrongappname", PerPage: 100}) + calls, err = fnl.GetCalls(ctx, &models.CallFilter{FnID: "wrongfnID", PerPage: 100}) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): bad fnID, unexpected error `%v`", err) } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, Path: "wrongpath", PerPage: 100}) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 0 { + t.Fatalf("Test GetCalls(ctx, filter): bad fnID, unexpected length `%v`", len(calls.Items)) } // make sure from_time and to_time work @@ -136,16 +128,16 @@ func Test(t *testing.T, fnl models.LogStore) { PerPage: 100, FromTime: call.CreatedAt, ToTime: c3.CreatedAt, - AppID: call.AppID, + FnID: call.FnID, } calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): time filter, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call id not expected %s vs %s", calls[0].ID, c2.ID) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): time filter, unexpected length `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c2.ID { + t.Fatalf("Test GetCalls: time filter, call id not expected %s vs %s", calls.Items[0].ID, c2.ID) } }) @@ -181,7 +173,7 @@ func Test(t *testing.T, fnl models.LogStore) { call.StartedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now()) call.AppID = testApp.Name - call.Path = testRoute.Path + call.FnID = testFn.ID t.Run("call-insert", func(t *testing.T) { call.ID = id.New().String() @@ -197,7 +189,7 @@ func Test(t *testing.T, fnl models.LogStore) { if err != nil { t.Fatalf("Test GetCall: unexpected error `%v`", err) } - newCall, err := fnl.GetCall(ctx, call.AppID, call.ID) + newCall, err := fnl.GetCall(ctx, call.FnID, call.ID) if err != nil { t.Fatalf("Test GetCall: unexpected error `%v`", err) } diff --git a/api/models/call.go b/api/models/call.go index 8f62b0650..d571fc65c 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -163,8 +163,14 @@ type Call struct { type CallFilter struct { Path string // match AppID string // match + FnID string //match FromTime common.DateTime ToTime common.DateTime Cursor string PerPage int } + +type CallList struct { + NextCursor string `json:"next_cursor,omitempty"` + Items []*Call `json:"items"` +} diff --git a/api/models/logs.go b/api/models/logs.go index 4786c4d6b..35a748934 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -25,11 +25,18 @@ type LogStore interface { InsertCall(ctx context.Context, call *Call) error // GetCall returns a call at a certain id and app name. - GetCall(ctx context.Context, appId, callID string) (*Call, error) + GetCall1(ctx context.Context, appId, callID string) (*Call, error) + + // GetCall2 returns a call at a certain id + GetCall(ctx context.Context, fnID, callID string) (*Call, error) // GetCalls returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. - GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error) + GetCalls1(ctx context.Context, filter *CallFilter) ([]*Call, error) + + // GetCalls returns a list of calls that satisfy the given CallFilter. If no + // calls exist, an empty list and a nil error are returned. + GetCalls(ctx context.Context, filter *CallFilter) (*CallList, error) // Close will close any underlying connections as needed. // Close is not safe to be called from multiple threads. diff --git a/api/server/call_get.go b/api/server/call_get.go index bb6ed24ac..31f1bdfdd 100644 --- a/api/server/call_get.go +++ b/api/server/call_get.go @@ -7,13 +7,28 @@ import ( "github.com/gin-gonic/gin" ) -func (s *Server) handleCallGet(c *gin.Context) { +func (s *Server) handleCallGet1(c *gin.Context) { ctx := c.Request.Context() callID := c.Param(api.ParamCallID) appID := c.MustGet(api.AppID).(string) - callObj, err := s.logstore.GetCall(ctx, appID, callID) + callObj, err := s.logstore.GetCall1(ctx, appID, callID) + if err != nil { + handleV1ErrorResponse(c, err) + return + } + + c.JSON(http.StatusOK, callResponse{"Successfully loaded call", callObj}) +} + +func (s *Server) handleCallGet(c *gin.Context) { + ctx := c.Request.Context() + + fnID := c.Param(api.ParamFnID) + callID := c.Param(api.ParamCallID) + + callObj, err := s.logstore.GetCall(ctx, fnID, callID) if err != nil { handleV1ErrorResponse(c, err) return diff --git a/api/server/call_list.go b/api/server/call_list.go index 3063bafa0..db3956aed 100644 --- a/api/server/call_list.go +++ b/api/server/call_list.go @@ -11,7 +11,7 @@ import ( "github.com/gin-gonic/gin" ) -func (s *Server) handleCallList(c *gin.Context) { +func (s *Server) handleCallList1(c *gin.Context) { ctx := c.Request.Context() var err error @@ -26,7 +26,7 @@ func (s *Server) handleCallList(c *gin.Context) { return } - calls, err := s.logstore.GetCalls(ctx, &filter) + calls, err := s.logstore.GetCalls1(ctx, &filter) var nextCursor string if len(calls) > 0 && len(calls) == filter.PerPage { @@ -41,6 +41,30 @@ func (s *Server) handleCallList(c *gin.Context) { }) } +func (s *Server) handleCallList(c *gin.Context) { + ctx := c.Request.Context() + var err error + + fnID := c.MustGet(api.ParamFnID).(string) + // TODO api.ParamRouteName needs to be escaped probably, since it has '/' a lot + filter := models.CallFilter{FnID: fnID} + filter.Cursor, filter.PerPage = pageParams(c, false) // ids are url safe + + filter.FromTime, filter.ToTime, err = timeParams(c) + if err != nil { + handleV1ErrorResponse(c, err) + return + } + + calls, err := s.logstore.GetCalls(ctx, &filter) + + if err != nil { + handleErrorResponse(c, err) + } + + c.JSON(http.StatusOK, calls) +} + // "" gets parsed to a zero time, which is fine (ignored in query) func timeParams(c *gin.Context) (fromTime, toTime common.DateTime, err error) { fromStr := c.Query("from_time") diff --git a/api/server/server.go b/api/server/server.go index b8f2cc455..f338abfce 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -197,6 +197,7 @@ type Server struct { noHTTTPTriggerEndpoint bool noHybridAPI bool noFnInvokeEndpoint bool + noCallEndpoints bool appListeners *appListeners routeListeners *routeListeners fnListeners *fnListeners @@ -747,6 +748,14 @@ func WithoutHybridAPI() Option { } } +// WithoutCallEndpoints unconditionally disables the call resources in the api +func WithoutCallEndpoints() Option { + return func(ctx context.Context, s *Server) error { + s.noCallEndpoints = true + return nil + } +} + // WithJaeger maps EnvJaegerURL func WithJaeger(jaegerURL string) Option { return func(ctx context.Context, s *Server) error { @@ -1039,6 +1048,10 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { } } +func (s *Server) notImplementedResponse(c *gin.Context) { + c.Status(http.StatusNotImplemented) +} + func (s *Server) bindHandlers(ctx context.Context) { engine := s.Router admin := s.AdminRouter @@ -1082,9 +1095,9 @@ func (s *Server) bindHandlers(ctx context.Context) { withAppCheck.GET("/routes/:route", s.handleRouteGetAPI) withAppCheck.PATCH("/routes/*route", s.handleRoutesPatch) withAppCheck.DELETE("/routes/*route", s.handleRouteDelete) - withAppCheck.GET("/calls/:call", s.handleCallGet) + withAppCheck.GET("/calls/:call", s.handleCallGet1) withAppCheck.GET("/calls/:call/log", s.handleCallLogGet) - withAppCheck.GET("/calls", s.handleCallList) + withAppCheck.GET("/calls", s.handleCallList1) } apps.POST("/routes", s.handleRoutesPostPut) @@ -1115,6 +1128,14 @@ func (s *Server) bindHandlers(ctx context.Context) { v2.DELETE("/triggers/:triggerID", s.handleTriggerDelete) } + if !s.noCallEndpoints { + v2.GET("/fns/:fnID/calls", s.handleCallList) + v2.GET("/fns/:fnID/calls/:callID", s.handleCallGet) + } else { + v2.GET("/fns/:fnID/calls", s.notImplementedResponse) + v2.GET("/fns/:fnID/calls/:callID", s.notImplementedResponse) + } + if !s.noHybridAPI { // Hybrid API - this should only be enabled on API servers runner := cleanv2.Group("/runner") runner.PUT("/async", s.handleRunnerEnqueue) diff --git a/docs/swagger_v2.yml b/docs/swagger_v2.yml index 99a65ba56..8d4fb7db8 100644 --- a/docs/swagger_v2.yml +++ b/docs/swagger_v2.yml @@ -404,6 +404,77 @@ paths: schema: $ref: '#/definitions/Error' + /fns/{fnID}/calls: + get: + summary: Get a fns calls. + description: Get a functions calls, results returned in created_at, descending order (newest first). + tags: + - Call + parameters: + - $ref: '#/parameters/FnID' + - $ref: '#/parameters/cursor' + - $ref: '#/parameters/perPage' + - name: from_time + description: Unix timestamp in seconds, of call.created_at to begin the results at, default 0. + required: false + type: integer + in: query + - name: to_time + description: Unix timestamp in seconds, of call.created_at to end the results at, defaults to latest. + required: false + type: integer + in: query + responses: + 200: + description: "List of Calls" + schema: + $ref: '#/definitions/CallList' + 404: + description: "Calls not found" + schema: + $ref: '#/definitions/Error' + + /fns/{fnID}/calls/{callID}: + get: + summary: Get call information + description: Get call information + tags: + - Call + parameters: + - $ref: '#/parameters/FnID' + - $ref: '#/parameters/CallID' + responses: + 200: + description: Call found. + schema: + $ref: '#/definitions/Call' + 404: + description: Call not found. + schema: + $ref: '#/definitions/Error' + + # Implementing next + # /fns/{fnID}/calls/{callID}/log: + # get: + # operationId: "GetCallLogs" + # summary: "Get logs for a call." + # description: "Get logs for a call." + # tags: + # - Call + # - Log + # parameters: + # - $ref: '#/parameters/FnIDQuery' + # - $ref: '#/parameters/CallID' + # responses: + # 200: + # description: Log found. + # schema: + # $ref: '#/definitions/Log' + # 404: + # description: Log not found. + # schema: + # $ref: '#/definitions/Error' + definitions: App: type: object @@ -591,6 +662,102 @@ definitions: type: string readOnly: true + Call: + type: object + properties: + id: + type: string + description: Call ID. + readOnly: true + status: + type: string + description: Call execution status. + readOnly: true + error: + type: string + description: Call execution error, if status is 'error'. + readOnly: true + app_id: + type: string + description: App ID of fn that executed this call. + readOnly: true + fn_id: + type: string + description: Fn ID of fn that executed this call. + readOnly: true + created_at: + type: string + format: date-time + description: Time when call was submitted. Always in UTC. + readOnly: true + started_at: + type: string + format: date-time + description: Time when call started execution. Always in UTC. + readOnly: true + completed_at: + type: string + format: date-time + description: Time when call completed, whether it was successul or failed. Always in UTC. + readOnly: true + stats: + type: array + items: + $ref: '#/definitions/Stat' + description: A histogram of stats for a call, each is a snapshot of a calls state at the timestamp. + readOnly: true + + CallList: + type: object + required: + - items + properties: + next_cursor: + type: string + description: "Cursor to send with subsequent request to recieve next page, if non-empty." + readOnly: true + items: + type: array + items: + $ref: '#/definitions/Call' + + Stat: + type: object + properties: + timestamp: + type: string + format: date-time + metrics: + type: object + properties: + net_rx: + type: integer + format: int64 + net_tx: + type: integer + format: int64 + mem_limit: + type: integer + format: int64 + mem_usage: + type: integer + format: int64 + disk_read: + type: integer + format: int64 + disk_write: + type: integer + format: int64 + cpu_user: + type: integer + format: int64 + cpu_total: + type: integer + format: int64 + cpu_kernel: + type: integer + format: int64 + parameters: cursor: name: cursor @@ -623,6 +790,12 @@ parameters: description: "Opaque, unique Trigger ID." required: true type: string + CallID: + name: callID + in: path + description: "Opaque, unique Call ID." + required: true + type: string FnIDQuery: name: fn_id