diff --git a/api/datastore/mock.go b/api/datastore/mock.go index c399875a7..7d720b669 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -363,7 +363,6 @@ func (m *mock) GetFns(ctx context.Context, filter *models.FnFilter) (*models.FnL (filter.Name == "" || filter.Name == f.Name) { funcs = append(funcs, f) } - } var nextCursor string diff --git a/api/datastore/sql/migratex/migrate.go b/api/datastore/sql/migratex/migrate.go index a43a16259..702b7f841 100644 --- a/api/datastore/sql/migratex/migrate.go +++ b/api/datastore/sql/migratex/migrate.go @@ -27,6 +27,7 @@ func migrateErr(version int64, up bool, err error) ErrMigration { dir := "up" if !up { dir = "down" + version++ } return ErrMigration(fmt.Sprintf("error running migration. version: %v direction: %v err: %v", version, dir, err)) } @@ -185,7 +186,6 @@ func run(ctx context.Context, tx *sqlx.Tx, m Migration, up bool) error { return fmt.Errorf("non-contiguous migration attempted down: %v != %v", m.Version(), curVersion) } - // TODO is this robust enough? we could check version := m.Version() if !up { version = m.Version() - 1 diff --git a/api/datastore/sql/migrations/18_add_fnid_calls.go b/api/datastore/sql/migrations/18_add_fnid_calls.go new file mode 100644 index 000000000..c4b444dfe --- /dev/null +++ b/api/datastore/sql/migrations/18_add_fnid_calls.go @@ -0,0 +1,46 @@ +package migrations + +import ( + "context" + + "github.com/fnproject/fn/api/datastore/sql/migratex" + "github.com/jmoiron/sqlx" +) + +func up18(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE calls ADD fn_id varchar(256);") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE calls MODIFY app_id varchar(256) NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE calls ALTER COLUMN app_id DROP NOT NULL;") + return err + } + + return err +} + +func down18(ctx context.Context, tx *sqlx.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE calls DROP COLUMN fn_id;") + + switch tx.DriverName() { + case "mysql": + _, err := tx.ExecContext(ctx, "ALTER TABLE calls MODIFY app_id varchar(256) NOT NULL;") + return err + case "postgres", "pgx": + _, err = tx.ExecContext(ctx, "ALTER TABLE calls ALTER COLUMN app_id SET NOT NULL;") + return err + } + + return err +} + +func init() { + Migrations = append(Migrations, &migratex.MigFields{ + VersionFunc: vfunc(18), + UpFunc: up18, + DownFunc: down18, + }) +} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index ce69971d1..225c45bdd 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -72,7 +72,8 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( completed_at varchar(256) NOT NULL, status varchar(256) NOT NULL, id varchar(256) NOT NULL, - app_id varchar(256) NOT NULL, + app_id varchar(256), + fn_id varchar(256), path varchar(256) NOT NULL, stats text, error text, @@ -955,6 +956,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { completed_at, status, app_id, + fn_id, path, stats, error @@ -966,6 +968,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { :completed_at, :status, :app_id, + :fn_id, :path, :stats, :error @@ -975,7 +978,7 @@ func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { return err } -func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +func (ds *SQLStore) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector) query = ds.db.Rebind(query) row := ds.db.QueryRowxContext(ctx, query, callID, appID) @@ -991,7 +994,47 @@ func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models. return &call, nil } -func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (ds *SQLStore) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + query := fmt.Sprintf(`%s WHERE id=? AND fn_id=?`, callSelector) + query = ds.db.Rebind(query) + row := ds.db.QueryRowxContext(ctx, query, callID, fnID) + + var call models.Call + err := row.StructScan(&call) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrCallNotFound + } + return nil, err + } + return &call, nil +} + +func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + if filter.Cursor != "" { + cursor, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + filter.Cursor = string(cursor) + } + + calls, err := ds.GetCalls1(ctx, filter) + if err != nil { + return nil, err + } + + callList := &models.CallList{Items: calls} + + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + callList.NextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return callList, nil +} + +func (ds *SQLStore) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { res := []*models.Call{} query, args := buildFilterCallQuery(filter) query = fmt.Sprintf("%s %s", callSelector, query) @@ -1109,8 +1152,15 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) { if !time.Time(filter.FromTime).IsZero() { args = where(&b, args, "created_at>?", filter.FromTime.String()) } - args = where(&b, args, "app_id=?", filter.AppID) - args = where(&b, args, "path=?", filter.Path) + if filter.AppID != "" { + args = where(&b, args, "app_id=?", filter.AppID) + } + if filter.FnID != "" { + args = where(&b, args, "fn_id=?", filter.FnID) + } + if filter.Path != "" { + args = where(&b, args, "path=?", filter.Path) + } fmt.Fprintf(&b, ` ORDER BY id DESC`) // TODO assert this is indexed fmt.Fprintf(&b, ` LIMIT ?`) diff --git a/api/logs/metrics/metrics.go b/api/logs/metrics/metrics.go index fc6b96df8..08bf2dad5 100644 --- a/api/logs/metrics/metrics.go +++ b/api/logs/metrics/metrics.go @@ -22,13 +22,25 @@ func (m *metricls) InsertCall(ctx context.Context, call *models.Call) error { return m.ls.InsertCall(ctx, call) } -func (m *metricls) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { +func (m *metricls) GetCall1(ctx context.Context, appName, callID string) (*models.Call, error) { ctx, span := trace.StartSpan(ctx, "ls_get_call") defer span.End() - return m.ls.GetCall(ctx, appName, callID) + return m.ls.GetCall1(ctx, appName, callID) } -func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (m *metricls) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_call") + defer span.End() + return m.ls.GetCall(ctx, fnID, callID) +} + +func (m *metricls) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_calls") + defer span.End() + return m.ls.GetCalls1(ctx, filter) +} + +func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { ctx, span := trace.StartSpan(ctx, "ls_get_calls") defer span.End() return m.ls.GetCalls(ctx, filter) diff --git a/api/logs/mock.go b/api/logs/mock.go index 04abb55af..df0191e9e 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -3,6 +3,7 @@ package logs import ( "bytes" "context" + "encoding/base64" "io" "io/ioutil" "sort" @@ -50,7 +51,7 @@ func (m *mock) InsertCall(ctx context.Context, call *models.Call) error { return nil } -func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +func (m *mock) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { for _, t := range m.Calls { if t.ID == callID && t.AppID == appID { return t, nil @@ -60,13 +61,24 @@ func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, return nil, models.ErrCallNotFound } +func (m *mock) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + for _, t := range m.Calls { + if t.ID == callID && + t.FnID == fnID { + return t, nil + } + } + + return nil, models.ErrCallNotFound +} + type sortC []*models.Call func (s sortC) Len() int { return len(s) } func (s sortC) Less(i, j int) bool { return strings.Compare(s[i].ID, s[j].ID) < 0 } func (s sortC) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (m *mock) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) // calls are in DESC order so use sort.Reverse sort.Sort(sort.Reverse(sortC(m.Calls))) @@ -90,6 +102,48 @@ func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mode return calls, nil } +func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) + // calls are in DESC order so use sort.Reverse + sort.Sort(sort.Reverse(sortC(m.Calls))) + + var calls []*models.Call + + var cursor = "" + if filter.Cursor != "" { + s, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + cursor = string(s) + } + + for _, c := range m.Calls { + if filter.PerPage > 0 && len(calls) == filter.PerPage { + break + } + + if (cursor == "" || strings.Compare(cursor, c.ID) > 0) && + (filter.FnID == "" || c.FnID == filter.FnID) && + (time.Time(filter.FromTime).IsZero() || time.Time(filter.FromTime).Before(time.Time(c.CreatedAt))) && + (time.Time(filter.ToTime).IsZero() || time.Time(c.CreatedAt).Before(time.Time(filter.ToTime))) { + + calls = append(calls, c) + } + } + + var nextCursor string + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + nextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return &models.CallList{ + NextCursor: nextCursor, + Items: calls, + }, nil +} + func (m *mock) Close() error { return nil } diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index d8202a7cb..eec3f4695 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -195,6 +195,9 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error { } objectName := callKey(call.AppID, call.ID) + if call.FnID != "" { + objectName = callKey2(call.FnID, call.ID) + } params := &s3manager.UploadInput{ Bucket: aws.String(s.bucket), Key: aws.String(objectName), @@ -213,26 +216,28 @@ func (s *store) InsertCall(ctx context.Context, call *models.Call) error { // see this entry when listing only when specifying a route path. (NOTE: this // behavior will go away if we stop listing by route -> triggers) - objectName = callMarkerKey(call.AppID, call.Path, call.ID) - params = &s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(objectName), - Body: bytes.NewReader([]byte{}), - ContentType: aws.String("text/plain"), - } + if call.FnID == "" { + objectName = callMarkerKey(call.AppID, call.Path, call.ID) + params = &s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(objectName), + Body: bytes.NewReader([]byte{}), + ContentType: aws.String("text/plain"), + } - logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker") - _, err = s.uploader.UploadWithContext(ctx, params) - if err != nil { - // XXX(reed): we could just log this? - return fmt.Errorf("failed to write marker key for log, %v", err) + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker") + _, err = s.uploader.UploadWithContext(ctx, params) + if err != nil { + // XXX(reed): we could just log this? + return fmt.Errorf("failed to write marker key for log, %v", err) + } } return nil } -// GetCall returns a call at a certain id and app name. -func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +// GetCall1 returns a call at a certain id and app name. +func (s *store) GetCall1(ctx context.Context, appID, callID string) (*models.Call, error) { ctx, span := trace.StartSpan(ctx, "s3_get_call") defer span.End() @@ -242,6 +247,17 @@ func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call return s.getCallByKey(ctx, objectName) } +// GetCall returns a call at a certain id +func (s *store) GetCall(ctx context.Context, fnID, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_call") + defer span.End() + + objectName := callKey2(fnID, callID) + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call") + + return s.getCallByKey(ctx, objectName) +} + func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) { // stream the logs to an in-memory buffer var target aws.WriteAtBuffer @@ -292,14 +308,23 @@ func callKeyFlipped(app, id string) string { return callKeyPrefix + app + "/" + id } +func callKey2(fnID, id string) string { + id = flipCursor(id) + return callKeyPrefix + fnID + "/" + id +} + func logKey(appID, callID string) string { return logKeyPrefix + appID + "/" + callID } -// GetCalls returns a list of calls that satisfy the given CallFilter. If no +func logKey2(callID string) string { + return logKeyPrefix + callID +} + +// GetCalls1 returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. // NOTE: this relies on call ids being lexicographically sortable and <= 16 byte -func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (s *store) GetCalls1(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { ctx, span := trace.StartSpan(ctx, "s3_get_calls") defer span.End() @@ -420,6 +445,116 @@ func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mod return calls, nil } +// GetCalls returns a list of calls that satisfy the given CallFilter. If no +// calls exist, an empty list and a nil error are returned. +// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte +func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) (*models.CallList, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_calls") + defer span.End() + + if filter.FnID == "" { + return nil, errors.New("s3 store does not support listing across all fns") + } + + if filter.Cursor != "" { + cursor, err := base64.RawURLEncoding.DecodeString(filter.Cursor) + if err != nil { + return nil, err + } + filter.Cursor = string(cursor) + } + + // NOTE we need marker keys to support (fn is REQUIRED): + // 1) quick iteration per path + // 2) sorted by id across all path + // key: s: {fn} : {id} + // + // also s3 api returns sorted in lexicographic order, we need the reverse of this. + var key string + + // filter.Cursor is a call id, translate to our key format. if a path is + // provided, we list keys from markers instead. + if filter.Cursor != "" { + key = callKey2(filter.FnID, filter.Cursor) + } else if t := time.Time(filter.ToTime); !t.IsZero() { + // get a fake id that has the most significant bits set to the to_time (first 48 bits) + fako := id.NewWithTime(t) + //var buf [id.EncodedSize]byte + //fakoId.MarshalTextTo(buf) + //mid := string(buf[:10]) + mid := fako.String() + key = callKey(filter.FnID, mid) + } + + // prefix prevents leaving bounds of app or path marker keys + prefix := callKey2(filter.FnID, "") + + input := &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + MaxKeys: aws.Int64(int64(filter.PerPage)), + Marker: aws.String(key), + Prefix: aws.String(prefix), + } + + result, err := s.client.ListObjects(input) + if err != nil { + return nil, fmt.Errorf("failed to list logs: %v", err) + } + + calls := make([]*models.Call, 0, len(result.Contents)) + + for _, obj := range result.Contents { + if len(calls) == filter.PerPage { + break + } + + // extract the app and id from the key to lookup the object, this also + // validates we aren't reading strangely keyed objects from the bucket. + var fnID, id string + + fields := strings.Split(*obj.Key, "/") + if len(fields) != 3 { + return nil, fmt.Errorf("invalid key in calls: %v", *obj.Key) + } + fnID = fields[1] + id = fields[2] + + // the id here is already reverse encoded, keep it that way. + objectName := callKeyFlipped(fnID, id) + + // NOTE: s3 doesn't have a way to get multiple objects so just use GetCall + // TODO we should reuse the buffer to decode these + call, err := s.getCallByKey(ctx, objectName) + if err != nil { + common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"fnID": fnID, "id": id}).Error("error filling call object") + continue + } + + // ensure: from_time < created_at < to_time + fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond) + if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) { + // NOTE could break, ids and created_at aren't necessarily in perfect order + continue + } + + toTime := time.Time(filter.ToTime).Truncate(time.Millisecond) + if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) { + continue + } + + calls = append(calls, call) + } + + callList := &models.CallList{Items: calls} + + if len(calls) > 0 && len(calls) == filter.PerPage { + last := []byte(calls[len(calls)-1].ID) + callList.NextCursor = base64.RawURLEncoding.EncodeToString(last) + } + + return callList, nil +} + func (s *store) Close() error { return nil } diff --git a/api/logs/testing/test.go b/api/logs/testing/test.go index 9911107bf..b04a9beaa 100644 --- a/api/logs/testing/test.go +++ b/api/logs/testing/test.go @@ -3,6 +3,7 @@ package testing import ( "bytes" "context" + "encoding/base64" "io" "strings" "testing" @@ -18,10 +19,9 @@ var testApp = &models.App{ ID: id.New().String(), } -var testRoute = &models.Route{ - Path: "/test", +var testFn = &models.Fn{ + ID: id.New().String(), Image: "fnproject/fn-test-utils", - Type: "sync", Format: "http", } @@ -32,7 +32,7 @@ func SetupTestCall(t *testing.T, ctx context.Context, ls models.LogStore) *model call.Status = "success" call.StartedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now()) - call.Path = testRoute.Path + call.FnID = testFn.ID return &call } @@ -44,7 +44,7 @@ func Test(t *testing.T, fnl models.LogStore) { // test list first, the rest are point lookup tests t.Run("calls-get", func(t *testing.T) { - filter := &models.CallFilter{AppID: call.AppID, Path: call.Path, PerPage: 100} + filter := &models.CallFilter{FnID: call.FnID, PerPage: 100} now := time.Now() call.CreatedAt = common.DateTime(now) call.ID = id.New().String() @@ -54,10 +54,10 @@ func Test(t *testing.T, fnl models.LogStore) { } calls, err := fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls2(ctx, filter): one call, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls2(ctx, filter): one call, unexpected length 1 != `%v`", len(calls.Items)) } c2 := *call @@ -80,55 +80,47 @@ func Test(t *testing.T, fnl models.LogStore) { } // test that no filter works too - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, PerPage: 100}) + calls, err = fnl.GetCalls(ctx, &models.CallFilter{FnID: testFn.ID, PerPage: 100}) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls2(ctx, filter): three calls, unexpected error `%v`", err) } - if len(calls) != 3 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 3 { + t.Fatalf("Test GetCalls2(ctx, filter): three calls, unexpected length 3 != `%v`", len(calls.Items)) } // test that pagination stuff works. id, descending filter.PerPage = 1 calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): per page 1, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c3.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c3.ID) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): per page 1, unexpected length 1 != `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c3.ID { + t.Fatalf("Test GetCalls: per page 1, call ids not in expected order: %v %v", calls.Items[0].ID, c3.ID) } filter.PerPage = 100 - filter.Cursor = calls[0].ID + filter.Cursor = base64.RawURLEncoding.EncodeToString([]byte(calls.Items[0].ID)) calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): cursor set, unexpected error `%v`", err) } - if len(calls) != 2 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c2.ID) - } else if calls[1].ID != call.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[1].ID, call.ID) + if len(calls.Items) != 2 { + t.Fatalf("Test GetCalls(ctx, filter): cursor set, unexpected length 2 != `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c2.ID { + t.Fatalf("Test GetCalls: cursor set, call ids not in expected order: %v %v", calls.Items[0].ID, c2.ID) + } else if calls.Items[1].ID != call.ID { + t.Fatalf("Test GetCalls: cursor set, call ids not in expected order: %v %v", calls.Items[1].ID, call.ID) } // test that filters actually applied - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: "wrongappname", PerPage: 100}) + calls, err = fnl.GetCalls(ctx, &models.CallFilter{FnID: "wrongfnID", PerPage: 100}) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): bad fnID, unexpected error `%v`", err) } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, Path: "wrongpath", PerPage: 100}) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + if len(calls.Items) != 0 { + t.Fatalf("Test GetCalls(ctx, filter): bad fnID, unexpected length `%v`", len(calls.Items)) } // make sure from_time and to_time work @@ -136,16 +128,16 @@ func Test(t *testing.T, fnl models.LogStore) { PerPage: 100, FromTime: call.CreatedAt, ToTime: c3.CreatedAt, - AppID: call.AppID, + FnID: call.FnID, } calls, err = fnl.GetCalls(ctx, filter) if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + t.Fatalf("Test GetCalls(ctx, filter): time filter, unexpected error `%v`", err) } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call id not expected %s vs %s", calls[0].ID, c2.ID) + if len(calls.Items) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): time filter, unexpected length `%v`", len(calls.Items)) + } else if calls.Items[0].ID != c2.ID { + t.Fatalf("Test GetCalls: time filter, call id not expected %s vs %s", calls.Items[0].ID, c2.ID) } }) @@ -181,7 +173,7 @@ func Test(t *testing.T, fnl models.LogStore) { call.StartedAt = common.DateTime(time.Now()) call.CompletedAt = common.DateTime(time.Now()) call.AppID = testApp.Name - call.Path = testRoute.Path + call.FnID = testFn.ID t.Run("call-insert", func(t *testing.T) { call.ID = id.New().String() @@ -197,7 +189,7 @@ func Test(t *testing.T, fnl models.LogStore) { if err != nil { t.Fatalf("Test GetCall: unexpected error `%v`", err) } - newCall, err := fnl.GetCall(ctx, call.AppID, call.ID) + newCall, err := fnl.GetCall(ctx, call.FnID, call.ID) if err != nil { t.Fatalf("Test GetCall: unexpected error `%v`", err) } diff --git a/api/models/call.go b/api/models/call.go index 8f62b0650..d571fc65c 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -163,8 +163,14 @@ type Call struct { type CallFilter struct { Path string // match AppID string // match + FnID string //match FromTime common.DateTime ToTime common.DateTime Cursor string PerPage int } + +type CallList struct { + NextCursor string `json:"next_cursor,omitempty"` + Items []*Call `json:"items"` +} diff --git a/api/models/logs.go b/api/models/logs.go index 4786c4d6b..35a748934 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -25,11 +25,18 @@ type LogStore interface { InsertCall(ctx context.Context, call *Call) error // GetCall returns a call at a certain id and app name. - GetCall(ctx context.Context, appId, callID string) (*Call, error) + GetCall1(ctx context.Context, appId, callID string) (*Call, error) + + // GetCall2 returns a call at a certain id + GetCall(ctx context.Context, fnID, callID string) (*Call, error) // GetCalls returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. - GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error) + GetCalls1(ctx context.Context, filter *CallFilter) ([]*Call, error) + + // GetCalls returns a list of calls that satisfy the given CallFilter. If no + // calls exist, an empty list and a nil error are returned. + GetCalls(ctx context.Context, filter *CallFilter) (*CallList, error) // Close will close any underlying connections as needed. // Close is not safe to be called from multiple threads. diff --git a/api/server/call_get.go b/api/server/call_get.go index bb6ed24ac..31f1bdfdd 100644 --- a/api/server/call_get.go +++ b/api/server/call_get.go @@ -7,13 +7,28 @@ import ( "github.com/gin-gonic/gin" ) -func (s *Server) handleCallGet(c *gin.Context) { +func (s *Server) handleCallGet1(c *gin.Context) { ctx := c.Request.Context() callID := c.Param(api.ParamCallID) appID := c.MustGet(api.AppID).(string) - callObj, err := s.logstore.GetCall(ctx, appID, callID) + callObj, err := s.logstore.GetCall1(ctx, appID, callID) + if err != nil { + handleV1ErrorResponse(c, err) + return + } + + c.JSON(http.StatusOK, callResponse{"Successfully loaded call", callObj}) +} + +func (s *Server) handleCallGet(c *gin.Context) { + ctx := c.Request.Context() + + fnID := c.Param(api.ParamFnID) + callID := c.Param(api.ParamCallID) + + callObj, err := s.logstore.GetCall(ctx, fnID, callID) if err != nil { handleV1ErrorResponse(c, err) return diff --git a/api/server/call_list.go b/api/server/call_list.go index 3063bafa0..db3956aed 100644 --- a/api/server/call_list.go +++ b/api/server/call_list.go @@ -11,7 +11,7 @@ import ( "github.com/gin-gonic/gin" ) -func (s *Server) handleCallList(c *gin.Context) { +func (s *Server) handleCallList1(c *gin.Context) { ctx := c.Request.Context() var err error @@ -26,7 +26,7 @@ func (s *Server) handleCallList(c *gin.Context) { return } - calls, err := s.logstore.GetCalls(ctx, &filter) + calls, err := s.logstore.GetCalls1(ctx, &filter) var nextCursor string if len(calls) > 0 && len(calls) == filter.PerPage { @@ -41,6 +41,30 @@ func (s *Server) handleCallList(c *gin.Context) { }) } +func (s *Server) handleCallList(c *gin.Context) { + ctx := c.Request.Context() + var err error + + fnID := c.MustGet(api.ParamFnID).(string) + // TODO api.ParamRouteName needs to be escaped probably, since it has '/' a lot + filter := models.CallFilter{FnID: fnID} + filter.Cursor, filter.PerPage = pageParams(c, false) // ids are url safe + + filter.FromTime, filter.ToTime, err = timeParams(c) + if err != nil { + handleV1ErrorResponse(c, err) + return + } + + calls, err := s.logstore.GetCalls(ctx, &filter) + + if err != nil { + handleErrorResponse(c, err) + } + + c.JSON(http.StatusOK, calls) +} + // "" gets parsed to a zero time, which is fine (ignored in query) func timeParams(c *gin.Context) (fromTime, toTime common.DateTime, err error) { fromStr := c.Query("from_time") diff --git a/api/server/server.go b/api/server/server.go index b8f2cc455..f338abfce 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -197,6 +197,7 @@ type Server struct { noHTTTPTriggerEndpoint bool noHybridAPI bool noFnInvokeEndpoint bool + noCallEndpoints bool appListeners *appListeners routeListeners *routeListeners fnListeners *fnListeners @@ -747,6 +748,14 @@ func WithoutHybridAPI() Option { } } +// WithoutCallEndpoints unconditionally disables the call resources in the api +func WithoutCallEndpoints() Option { + return func(ctx context.Context, s *Server) error { + s.noCallEndpoints = true + return nil + } +} + // WithJaeger maps EnvJaegerURL func WithJaeger(jaegerURL string) Option { return func(ctx context.Context, s *Server) error { @@ -1039,6 +1048,10 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { } } +func (s *Server) notImplementedResponse(c *gin.Context) { + c.Status(http.StatusNotImplemented) +} + func (s *Server) bindHandlers(ctx context.Context) { engine := s.Router admin := s.AdminRouter @@ -1082,9 +1095,9 @@ func (s *Server) bindHandlers(ctx context.Context) { withAppCheck.GET("/routes/:route", s.handleRouteGetAPI) withAppCheck.PATCH("/routes/*route", s.handleRoutesPatch) withAppCheck.DELETE("/routes/*route", s.handleRouteDelete) - withAppCheck.GET("/calls/:call", s.handleCallGet) + withAppCheck.GET("/calls/:call", s.handleCallGet1) withAppCheck.GET("/calls/:call/log", s.handleCallLogGet) - withAppCheck.GET("/calls", s.handleCallList) + withAppCheck.GET("/calls", s.handleCallList1) } apps.POST("/routes", s.handleRoutesPostPut) @@ -1115,6 +1128,14 @@ func (s *Server) bindHandlers(ctx context.Context) { v2.DELETE("/triggers/:triggerID", s.handleTriggerDelete) } + if !s.noCallEndpoints { + v2.GET("/fns/:fnID/calls", s.handleCallList) + v2.GET("/fns/:fnID/calls/:callID", s.handleCallGet) + } else { + v2.GET("/fns/:fnID/calls", s.notImplementedResponse) + v2.GET("/fns/:fnID/calls/:callID", s.notImplementedResponse) + } + if !s.noHybridAPI { // Hybrid API - this should only be enabled on API servers runner := cleanv2.Group("/runner") runner.PUT("/async", s.handleRunnerEnqueue) diff --git a/docs/swagger_v2.yml b/docs/swagger_v2.yml index 99a65ba56..8d4fb7db8 100644 --- a/docs/swagger_v2.yml +++ b/docs/swagger_v2.yml @@ -404,6 +404,77 @@ paths: schema: $ref: '#/definitions/Error' + /fns/{fnID}/calls: + get: + summary: Get a fns calls. + description: Get a functions calls, results returned in created_at, descending order (newest first). + tags: + - Call + parameters: + - $ref: '#/parameters/FnID' + - $ref: '#/parameters/cursor' + - $ref: '#/parameters/perPage' + - name: from_time + description: Unix timestamp in seconds, of call.created_at to begin the results at, default 0. + required: false + type: integer + in: query + - name: to_time + description: Unix timestamp in seconds, of call.created_at to end the results at, defaults to latest. + required: false + type: integer + in: query + responses: + 200: + description: "List of Calls" + schema: + $ref: '#/definitions/CallList' + 404: + description: "Calls not found" + schema: + $ref: '#/definitions/Error' + + /fns/{fnID}/calls/{callID}: + get: + summary: Get call information + description: Get call information + tags: + - Call + parameters: + - $ref: '#/parameters/FnID' + - $ref: '#/parameters/CallID' + responses: + 200: + description: Call found. + schema: + $ref: '#/definitions/Call' + 404: + description: Call not found. + schema: + $ref: '#/definitions/Error' + + # Implementing next + # /fns/{fnID}/calls/{callID}/log: + # get: + # operationId: "GetCallLogs" + # summary: "Get logs for a call." + # description: "Get logs for a call." + # tags: + # - Call + # - Log + # parameters: + # - $ref: '#/parameters/FnIDQuery' + # - $ref: '#/parameters/CallID' + # responses: + # 200: + # description: Log found. + # schema: + # $ref: '#/definitions/Log' + # 404: + # description: Log not found. + # schema: + # $ref: '#/definitions/Error' + definitions: App: type: object @@ -591,6 +662,102 @@ definitions: type: string readOnly: true + Call: + type: object + properties: + id: + type: string + description: Call ID. + readOnly: true + status: + type: string + description: Call execution status. + readOnly: true + error: + type: string + description: Call execution error, if status is 'error'. + readOnly: true + app_id: + type: string + description: App ID of fn that executed this call. + readOnly: true + fn_id: + type: string + description: Fn ID of fn that executed this call. + readOnly: true + created_at: + type: string + format: date-time + description: Time when call was submitted. Always in UTC. + readOnly: true + started_at: + type: string + format: date-time + description: Time when call started execution. Always in UTC. + readOnly: true + completed_at: + type: string + format: date-time + description: Time when call completed, whether it was successul or failed. Always in UTC. + readOnly: true + stats: + type: array + items: + $ref: '#/definitions/Stat' + description: A histogram of stats for a call, each is a snapshot of a calls state at the timestamp. + readOnly: true + + CallList: + type: object + required: + - items + properties: + next_cursor: + type: string + description: "Cursor to send with subsequent request to recieve next page, if non-empty." + readOnly: true + items: + type: array + items: + $ref: '#/definitions/Call' + + Stat: + type: object + properties: + timestamp: + type: string + format: date-time + metrics: + type: object + properties: + net_rx: + type: integer + format: int64 + net_tx: + type: integer + format: int64 + mem_limit: + type: integer + format: int64 + mem_usage: + type: integer + format: int64 + disk_read: + type: integer + format: int64 + disk_write: + type: integer + format: int64 + cpu_user: + type: integer + format: int64 + cpu_total: + type: integer + format: int64 + cpu_kernel: + type: integer + format: int64 + parameters: cursor: name: cursor @@ -623,6 +790,12 @@ parameters: description: "Opaque, unique Trigger ID." required: true type: string + CallID: + name: callID + in: path + description: "Opaque, unique Call ID." + required: true + type: string FnIDQuery: name: fn_id