diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 56b60d460..eddadbb1a 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -89,7 +89,7 @@ func TestCallConfigurationRequest(t *testing.T) { IdleTimeout: idleTimeout, Memory: memory, }, - }, nil, + }, ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) @@ -233,7 +233,7 @@ func TestCallConfigurationModel(t *testing.T) { } // FromModel doesn't need a datastore, for now... - ds := datastore.NewMockInit(nil, nil, nil) + ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() @@ -304,7 +304,7 @@ func TestAsyncCallHeaders(t *testing.T) { } // FromModel doesn't need a datastore, for now... - ds := datastore.NewMockInit(nil, nil, nil) + ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() @@ -434,7 +434,7 @@ func TestSubmitError(t *testing.T) { } // FromModel doesn't need a datastore, for now... - ds := datastore.NewMockInit(nil, nil, nil) + ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() @@ -489,7 +489,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { IdleTimeout: 10, Memory: 128, }, - }, nil, + }, ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) @@ -553,7 +553,7 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { } // FromModel doesn't need a datastore, for now... - ds := datastore.NewMockInit(nil, nil, nil) + ds := datastore.NewMockInit() a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) defer a.Close() @@ -658,7 +658,7 @@ func TestPipesAreClear(t *testing.T) { IdleTimeout: ca.IdleTimeout, Memory: ca.Memory, }, - }, nil, + }, ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) @@ -808,7 +808,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { IdleTimeout: call.IdleTimeout, Memory: call.Memory, }, - }, nil, + }, ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) diff --git a/api/agent/data_access.go b/api/agent/data_access.go index ac8e62507..d4821746d 100644 --- a/api/agent/data_access.go +++ b/api/agent/data_access.go @@ -160,7 +160,7 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde // this means that we could potentially store an error / timeout status for a // call that ran successfully [by a user's perspective] // TODO: this should be update, really - if err := da.ds.InsertCall(ctx, mCall); err != nil { + if err := da.ls.InsertCall(ctx, mCall); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") // note: Not returning err here since the job could have already finished successfully. } diff --git a/api/datastore/internal/datastoretest/test.go b/api/datastore/internal/datastoretest/test.go index c6bf172a7..831df0491 100644 --- a/api/datastore/internal/datastoretest/test.go +++ b/api/datastore/internal/datastoretest/test.go @@ -5,12 +5,10 @@ import ( "context" "log" "testing" - "time" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/gin-gonic/gin" - "github.com/go-openapi/strfmt" "github.com/sirupsen/logrus" ) @@ -37,243 +35,6 @@ func Test(t *testing.T, dsf func(t *testing.T) models.Datastore) { ctx := context.Background() - call := new(models.Call) - call.CreatedAt = strfmt.DateTime(time.Now()) - call.Status = "error" - call.Error = "ya dun goofed" - call.StartedAt = strfmt.DateTime(time.Now()) - call.CompletedAt = strfmt.DateTime(time.Now()) - call.AppID = testApp.ID - call.Path = testRoute.Path - - t.Run("call-insert", func(t *testing.T) { - ds := dsf(t) - call.ID = id.New().String() - err := ds.InsertCall(ctx, call) - if err != nil { - t.Fatalf("Test InsertCall(ctx, &call): unexpected error `%v`", err) - } - }) - - t.Run("call-atomic-update", func(t *testing.T) { - ds := dsf(t) - call.ID = id.New().String() - err := ds.InsertCall(ctx, call) - if err != nil { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - newCall := new(models.Call) - *newCall = *call - newCall.Status = "success" - newCall.Error = "" - err = ds.UpdateCall(ctx, call, newCall) - if err != nil { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - dbCall, err := ds.GetCall(ctx, call.AppID, call.ID) - if err != nil { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - if dbCall.ID != newCall.ID { - t.Fatalf("Test GetCall: id mismatch `%v` `%v`", call.ID, newCall.ID) - } - if dbCall.Status != newCall.Status { - t.Fatalf("Test GetCall: status mismatch `%v` `%v`", call.Status, newCall.Status) - } - if dbCall.Error != newCall.Error { - t.Fatalf("Test GetCall: error mismatch `%v` `%v`", call.Error, newCall.Error) - } - if time.Time(dbCall.CreatedAt).Unix() != time.Time(newCall.CreatedAt).Unix() { - t.Fatalf("Test GetCall: created_at mismatch `%v` `%v`", call.CreatedAt, newCall.CreatedAt) - } - if time.Time(dbCall.StartedAt).Unix() != time.Time(newCall.StartedAt).Unix() { - t.Fatalf("Test GetCall: started_at mismatch `%v` `%v`", call.StartedAt, newCall.StartedAt) - } - if time.Time(dbCall.CompletedAt).Unix() != time.Time(newCall.CompletedAt).Unix() { - t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) - } - if dbCall.AppID != newCall.AppID { - t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppID, newCall.AppID) - } - if dbCall.Path != newCall.Path { - t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) - } - }) - - t.Run("call-atomic-update-no-existing-call", func(t *testing.T) { - ds := dsf(t) - call.ID = id.New().String() - // Do NOT insert the call - newCall := new(models.Call) - *newCall = *call - newCall.Status = "success" - newCall.Error = "" - err := ds.UpdateCall(ctx, call, newCall) - if err != models.ErrCallNotFound { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - }) - - t.Run("call-atomic-update-unexpected-existing-call", func(t *testing.T) { - ds := dsf(t) - call.ID = id.New().String() - err := ds.InsertCall(ctx, call) - if err != nil { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - // Now change the 'from' call so it becomes different from the db - badFrom := new(models.Call) - *badFrom = *call - badFrom.Status = "running" - newCall := new(models.Call) - *newCall = *call - newCall.Status = "success" - newCall.Error = "" - err = ds.UpdateCall(ctx, badFrom, newCall) - if err != models.ErrDatastoreCannotUpdateCall { - t.Fatalf("Test UpdateCall: unexpected error `%v`", err) - } - }) - - t.Run("call-get", func(t *testing.T) { - ds := dsf(t) - call.ID = id.New().String() - err := ds.InsertCall(ctx, call) - if err != nil { - t.Fatalf("Test GetCall: unexpected error `%v`", err) - } - newCall, err := ds.GetCall(ctx, call.AppID, call.ID) - if err != nil { - t.Fatalf("Test GetCall: unexpected error `%v`", err) - } - if call.ID != newCall.ID { - t.Fatalf("Test GetCall: id mismatch `%v` `%v`", call.ID, newCall.ID) - } - if call.Status != newCall.Status { - t.Fatalf("Test GetCall: status mismatch `%v` `%v`", call.Status, newCall.Status) - } - if call.Error != newCall.Error { - t.Fatalf("Test GetCall: error mismatch `%v` `%v`", call.Error, newCall.Error) - } - if time.Time(call.CreatedAt).Unix() != time.Time(newCall.CreatedAt).Unix() { - t.Fatalf("Test GetCall: created_at mismatch `%v` `%v`", call.CreatedAt, newCall.CreatedAt) - } - if time.Time(call.StartedAt).Unix() != time.Time(newCall.StartedAt).Unix() { - t.Fatalf("Test GetCall: started_at mismatch `%v` `%v`", call.StartedAt, newCall.StartedAt) - } - if time.Time(call.CompletedAt).Unix() != time.Time(newCall.CompletedAt).Unix() { - t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) - } - if call.AppID != newCall.AppID { - t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppID, newCall.AppID) - } - if call.Path != newCall.Path { - t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) - } - }) - - t.Run("calls-get", func(t *testing.T) { - ds := dsf(t) - filter := &models.CallFilter{AppID: call.AppID, Path: call.Path, PerPage: 100} - call.ID = id.New().String() - call.CreatedAt = strfmt.DateTime(time.Now()) - err := ds.InsertCall(ctx, call) - if err != nil { - t.Fatal(err) - } - calls, err := ds.GetCalls(ctx, filter) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - c2 := *call - c3 := *call - c2.ID = id.New().String() - c2.CreatedAt = strfmt.DateTime(time.Now().Add(100 * time.Millisecond)) // add ms cuz db uses it for sort - c3.ID = id.New().String() - c3.CreatedAt = strfmt.DateTime(time.Now().Add(200 * time.Millisecond)) - - err = ds.InsertCall(ctx, &c2) - if err != nil { - t.Fatal(err) - } - err = ds.InsertCall(ctx, &c3) - if err != nil { - t.Fatal(err) - } - - // test that no filter works too - calls, err = ds.GetCalls(ctx, &models.CallFilter{PerPage: 100}) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 3 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - // test that pagination stuff works. id, descending - filter.PerPage = 1 - calls, err = ds.GetCalls(ctx, filter) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c3.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c3.ID) - } - - filter.PerPage = 100 - filter.Cursor = calls[0].ID - calls, err = ds.GetCalls(ctx, filter) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 2 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c2.ID) - } else if calls[1].ID != call.ID { - t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[1].ID, call.ID) - } - - // test that filters actually applied - calls, err = ds.GetCalls(ctx, &models.CallFilter{AppID: "wrongappname", PerPage: 100}) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - calls, err = ds.GetCalls(ctx, &models.CallFilter{Path: "wrongpath", PerPage: 100}) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 0 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } - - // make sure from_time and to_time work - filter = &models.CallFilter{ - PerPage: 100, - FromTime: call.CreatedAt, - ToTime: c3.CreatedAt, - } - calls, err = ds.GetCalls(ctx, filter) - if err != nil { - t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) - } - if len(calls) != 1 { - t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) - } else if calls[0].ID != c2.ID { - t.Fatalf("Test GetCalls: call id not expected %s vs %s", calls[0].ID, c2.ID) - } - }) - t.Run("apps", func(t *testing.T) { ds := dsf(t) // Testing insert app diff --git a/api/datastore/internal/datastoreutil/metrics.go b/api/datastore/internal/datastoreutil/metrics.go index 7a62dfcdb..9b26aade4 100644 --- a/api/datastore/internal/datastoreutil/metrics.go +++ b/api/datastore/internal/datastoreutil/metrics.go @@ -90,12 +90,6 @@ func (m *metricds) InsertCall(ctx context.Context, call *models.Call) error { return m.ds.InsertCall(ctx, call) } -func (m *metricds) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { - ctx, span := trace.StartSpan(ctx, "ds_update_call") - defer span.End() - return m.ds.UpdateCall(ctx, from, to) -} - func (m *metricds) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { ctx, span := trace.StartSpan(ctx, "ds_get_call") defer span.End() diff --git a/api/datastore/mock.go b/api/datastore/mock.go index 1699d2c8f..e2c67c2c2 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -4,7 +4,6 @@ import ( "context" "sort" "strings" - "time" "github.com/fnproject/fn/api/datastore/internal/datastoreutil" "github.com/fnproject/fn/api/logs" @@ -15,18 +14,29 @@ import ( type mock struct { Apps []*models.App Routes []*models.Route - Calls []*models.Call - data map[string][]byte models.LogStore } func NewMock() models.Datastore { - return NewMockInit(nil, nil, nil) + return NewMockInit() } -func NewMockInit(apps []*models.App, routes []*models.Route, calls []*models.Call) models.Datastore { - return datastoreutil.NewValidator(&mock{apps, routes, calls, make(map[string][]byte), logs.NewMock()}) +// args helps break tests less if we change stuff +func NewMockInit(args ...interface{}) models.Datastore { + var mocker mock + for _, a := range args { + switch x := a.(type) { + case []*models.App: + mocker.Apps = x + case []*models.Route: + mocker.Routes = x + default: + panic("not accounted for data type sent to mock init. add it") + } + } + mocker.LogStore = logs.NewMock() + return datastoreutil.NewValidator(&mocker) } func (m *mock) GetAppID(ctx context.Context, appName string) (string, error) { @@ -91,7 +101,6 @@ func (m *mock) UpdateApp(ctx context.Context, app *models.App) (*models.App, err } func (m *mock) RemoveApp(ctx context.Context, appID string) error { - m.batchDeleteCalls(ctx, appID) m.batchDeleteRoutes(ctx, appID) for i, a := range m.Apps { if a.ID == appID { @@ -174,104 +183,6 @@ func (m *mock) RemoveRoute(ctx context.Context, appID, routePath string) error { return models.ErrRoutesNotFound } -func (m *mock) Put(ctx context.Context, key, value []byte) error { - if len(value) == 0 { - delete(m.data, string(key)) - } else { - m.data[string(key)] = value - } - return nil -} - -func (m *mock) Get(ctx context.Context, key []byte) ([]byte, error) { - return m.data[string(key)], nil -} - -func (m *mock) InsertCall(ctx context.Context, call *models.Call) error { - m.Calls = append(m.Calls, call) - return nil -} - -// This equivalence only makes sense in the context of the datastore, so it's -// not in the model. -func equivalentCalls(expected *models.Call, actual *models.Call) bool { - equivalentFields := expected.ID == actual.ID && - time.Time(expected.CreatedAt).Unix() == time.Time(actual.CreatedAt).Unix() && - time.Time(expected.StartedAt).Unix() == time.Time(actual.StartedAt).Unix() && - time.Time(expected.CompletedAt).Unix() == time.Time(actual.CompletedAt).Unix() && - expected.Status == actual.Status && - expected.AppID == actual.AppID && - expected.Path == actual.Path && - expected.Error == actual.Error && - len(expected.Stats) == len(actual.Stats) - // TODO: We don't do comparisons of individual Stats. We probably should. - return equivalentFields -} - -func (m *mock) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { - for _, t := range m.Calls { - if t.ID == from.ID && t.AppID == from.AppID { - if equivalentCalls(from, t) { - *t = *to - return nil - } - return models.ErrDatastoreCannotUpdateCall - } - } - return models.ErrCallNotFound -} - -func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { - for _, t := range m.Calls { - if t.ID == callID && t.AppID == appID { - return t, nil - } - } - - return nil, models.ErrCallNotFound -} - -type sortC []*models.Call - -func (s sortC) Len() int { return len(s) } -func (s sortC) Less(i, j int) bool { return strings.Compare(s[i].ID, s[j].ID) < 0 } -func (s sortC) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { - // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) - // calls are in DESC order so use sort.Reverse - sort.Sort(sort.Reverse(sortC(m.Calls))) - - var calls []*models.Call - for _, c := range m.Calls { - if len(calls) == filter.PerPage { - break - } - - if (filter.AppID == "" || c.AppID == filter.AppID) && - (filter.Path == "" || filter.Path == c.Path) && - (time.Time(filter.FromTime).IsZero() || time.Time(filter.FromTime).Before(time.Time(c.CreatedAt))) && - (time.Time(filter.ToTime).IsZero() || time.Time(c.CreatedAt).Before(time.Time(filter.ToTime))) && - (filter.Cursor == "" || strings.Compare(filter.Cursor, c.ID) > 0) { - - calls = append(calls, c) - } - } - - return calls, nil -} - -func (m *mock) batchDeleteCalls(ctx context.Context, appID string) error { - newCalls := []*models.Call{} - for _, c := range m.Calls { - if c.AppID != appID || c.ID != appID { - newCalls = append(newCalls, c) - } - } - m.Calls = newCalls - return nil -} - func (m *mock) batchDeleteRoutes(ctx context.Context, appID string) error { newRoutes := []*models.Route{} for _, c := range m.Routes { diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 9f2118ba4..9736550f9 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -662,81 +662,6 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { return err } -// This equivalence only makes sense in the context of the datastore, so it's -// not in the model. -func equivalentCalls(expected *models.Call, actual *models.Call) bool { - equivalentFields := expected.ID == actual.ID && - time.Time(expected.CreatedAt).Unix() == time.Time(actual.CreatedAt).Unix() && - time.Time(expected.StartedAt).Unix() == time.Time(actual.StartedAt).Unix() && - time.Time(expected.CompletedAt).Unix() == time.Time(actual.CompletedAt).Unix() && - expected.Status == actual.Status && - expected.Path == actual.Path && - expected.Error == actual.Error && - len(expected.Stats) == len(actual.Stats) && - expected.AppID == actual.AppID - // TODO: We don't do comparisons of individual Stats. We probably should. - return equivalentFields -} - -func (ds *sqlStore) UpdateCall(ctx context.Context, from *models.Call, to *models.Call) error { - // Assert that from and to are supposed to be the same call - if from.ID != to.ID || from.AppID != to.AppID { - return errors.New("assertion error: 'from' and 'to' calls refer to different app/ID") - } - - // Atomic update - err := ds.Tx(func(tx *sqlx.Tx) error { - var call models.Call - query := tx.Rebind(fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector)) - row := tx.QueryRowxContext(ctx, query, from.ID, from.AppID) - - err := row.StructScan(&call) - if err == sql.ErrNoRows { - return models.ErrCallNotFound - } else if err != nil { - return err - } - - // Only do the update if the existing call is exactly what we expect. - // If something has modified it in the meantime, we must fail the - // transaction. - if !equivalentCalls(from, &call) { - return models.ErrDatastoreCannotUpdateCall - } - - query = tx.Rebind(`UPDATE calls SET - id = :id, - created_at = :created_at, - started_at = :started_at, - completed_at = :completed_at, - status = :status, - app_id = :app_id, - path = :path, - stats = :stats, - error = :error - WHERE id=:id AND app_id=:app_id;`) - - res, err := tx.NamedExecContext(ctx, query, to) - if err != nil { - return err - } - - if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - // inside of the transaction, we are querying for the row, so we know that it exists - return nil - } - - return nil - }) - - if err != nil { - return err - } - return nil -} - func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector) query = ds.db.Rebind(query) diff --git a/api/datastore/sql/sql_test.go b/api/datastore/sql/sql_test.go index 9bd262c63..5f15b3dba 100644 --- a/api/datastore/sql/sql_test.go +++ b/api/datastore/sql/sql_test.go @@ -10,6 +10,7 @@ import ( "github.com/fnproject/fn/api/datastore/internal/datastoreutil" "github.com/fnproject/fn/api/datastore/sql/migratex" "github.com/fnproject/fn/api/datastore/sql/migrations" + logstoretest "github.com/fnproject/fn/api/logs/testing" "github.com/fnproject/fn/api/models" ) @@ -56,6 +57,9 @@ func TestDatastore(t *testing.T) { } datastoretest.Test(t, f) + // also logs + logstoretest.Test(t, f(t)) + // NOTE: sqlite3 does not like ALTER TABLE DROP COLUMN so do not run // migration tests against it, only pg and mysql -- should prove UP migrations // will likely work for sqlite3, but may need separate testing by devs :( @@ -80,6 +84,9 @@ func TestDatastore(t *testing.T) { // test fresh w/o migrations datastoretest.Test(t, f) + // also test sql implements logstore + logstoretest.Test(t, f(t)) + f = func(t *testing.T) models.Datastore { t.Log("with migrations now!") ds, err := newWithMigrations(ctx, u) @@ -95,6 +102,9 @@ func TestDatastore(t *testing.T) { // test that migrations work & things work with them datastoretest.Test(t, f) + + // also test sql implements logstore + logstoretest.Test(t, f(t)) } if pg := os.Getenv("POSTGRES_URL"); pg != "" { diff --git a/api/id/id.go b/api/id/id.go index dc71ec3e6..2b190970d 100644 --- a/api/id/id.go +++ b/api/id/id.go @@ -3,6 +3,7 @@ package id import ( "errors" "net" + "strings" "sync/atomic" "time" ) @@ -45,7 +46,13 @@ func SetMachineIdHost(addr net.IP, port uint16) { // Ids are sortable within (not between, thanks to clocks) each machine, with // a modified base32 encoding exposed for convenience in API usage. func New() Id { - t := time.Now() + // NewWithTime will be inlined + return NewWithTime(time.Now()) +} + +// NewWithTime returns an id that uses the milliseconds from the given time. +// New is identical to NewWithTime(time.Now()) +func NewWithTime(t time.Time) Id { // NOTE compiler optimizes out division by constant for us ms := uint64(t.Unix())*1000 + uint64(t.Nanosecond()/int(time.Millisecond)) count := atomic.AddUint32(&counter, 1) @@ -240,3 +247,42 @@ func (id *Id) UnmarshalText(v []byte) error { return nil } + +// reverse encoding useful for sorting, descending +var rEncoding = reverseString(Encoding) + +func reverseString(input string) string { + // rsc: http://groups.google.com/group/golang-nuts/browse_thread/thread/a0fb81698275eede + + // Get Unicode code points. + n := 0 + rune := make([]rune, len(input)) + for _, r := range input { + rune[n] = r + n++ + } + rune = rune[0:n] + // Reverse + for i := 0; i < n/2; i++ { + rune[i], rune[n-1-i] = rune[n-1-i], rune[i] + } + + // Convert back to UTF-8. + return string(rune) +} + +// EncodeDescending returns a lexicographically sortable descending encoding +// of a given id, e.g. 000 -> ZZZ, which allows reversing the sort order when stored +// contiguously since ids are lexicographically sortable. The returned string will +// be of len(src), and assumes src is from the base32 crockford alphabet, otherwise +// using 0xFF. +func EncodeDescending(src string) string { + var buf [EncodedSize]byte + copy(buf[:], src) + for i, s := range buf[:len(src)] { + // XXX(reed): optimize as dec is + j := strings.Index(Encoding, string(s)) + buf[i] = rEncoding[j] + } + return string(buf[:len(src)]) +} diff --git a/api/id/id_test.go b/api/id/id_test.go index 136b7ceb9..0f2555500 100644 --- a/api/id/id_test.go +++ b/api/id/id_test.go @@ -2,7 +2,6 @@ package id import ( "encoding/binary" - "fmt" "math" "net" "testing" @@ -41,7 +40,6 @@ func TestIdRaw(t *testing.T) { ms := uint64(ts.Unix())*1000 + uint64(ts.Nanosecond()/int(time.Millisecond)) count := uint32(math.MaxUint32) id := newID(ms, machineID, count) - fmt.Println(len(id), id) var buf [8]byte copy(buf[2:], id[:6]) @@ -61,3 +59,19 @@ func TestIdRaw(t *testing.T) { t.Fatal("count mismatch", idCount, count) } } + +func TestDescending(t *testing.T) { + id := "0123WXYZ" + + flip := EncodeDescending(id) + + if len(flip) != len(id) { + t.Fatal("flipped string has different length:", len(flip), len(id)) + } + + for i := range flip { + if flip[i] != id[len(id)-1-i] { + t.Fatalf("flipped encoding not working. got: %v, want: %v", flip[i], id[len(id)-1-i]) + } + } +} diff --git a/api/logs/log_test.go b/api/logs/log_test.go deleted file mode 100644 index 81b9e08f7..000000000 --- a/api/logs/log_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package logs - -import ( - logTesting "github.com/fnproject/fn/api/logs/testing" - "testing" -) - -func TestDatastore(t *testing.T) { - ds := logTesting.SetupSQLiteDS(t) - logTesting.Test(t, ds, ds) -} diff --git a/api/logs/mock.go b/api/logs/mock.go index b88b1d1af..73557e76a 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -5,16 +5,30 @@ import ( "context" "io" "io/ioutil" + "sort" + "strings" + "time" "github.com/fnproject/fn/api/models" ) type mock struct { - Logs map[string][]byte + Logs map[string][]byte + Calls []*models.Call } -func NewMock() models.LogStore { - return &mock{make(map[string][]byte)} +func NewMock(args ...interface{}) models.LogStore { + var mocker mock + for _, a := range args { + switch x := a.(type) { + case []*models.Call: + mocker.Calls = x + default: + panic("unknown type handed to mocker. add me") + } + } + mocker.Logs = make(map[string][]byte) + return &mocker } func (m *mock) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { @@ -30,3 +44,48 @@ func (m *mock) GetLog(ctx context.Context, appID, callID string) (io.Reader, err } return bytes.NewReader(logEntry), nil } + +func (m *mock) InsertCall(ctx context.Context, call *models.Call) error { + m.Calls = append(m.Calls, call) + return nil +} + +func (m *mock) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { + for _, t := range m.Calls { + if t.ID == callID && t.AppID == appID { + return t, nil + } + } + + return nil, models.ErrCallNotFound +} + +type sortC []*models.Call + +func (s sortC) Len() int { return len(s) } +func (s sortC) Less(i, j int) bool { return strings.Compare(s[i].ID, s[j].ID) < 0 } +func (s sortC) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { + // sort them all first for cursoring (this is for testing, n is small & mock is not concurrent..) + // calls are in DESC order so use sort.Reverse + sort.Sort(sort.Reverse(sortC(m.Calls))) + + var calls []*models.Call + for _, c := range m.Calls { + if len(calls) == filter.PerPage { + break + } + + if (filter.AppID == "" || c.AppID == filter.AppID) && + (filter.Path == "" || filter.Path == c.Path) && + (time.Time(filter.FromTime).IsZero() || time.Time(filter.FromTime).Before(time.Time(c.CreatedAt))) && + (time.Time(filter.ToTime).IsZero() || time.Time(c.CreatedAt).Before(time.Time(filter.ToTime))) && + (filter.Cursor == "" || strings.Compare(filter.Cursor, c.ID) > 0) { + + calls = append(calls, c) + } + } + + return calls, nil +} diff --git a/api/logs/mock_test.go b/api/logs/mock_test.go new file mode 100644 index 000000000..705be84dc --- /dev/null +++ b/api/logs/mock_test.go @@ -0,0 +1,11 @@ +package logs + +import ( + logTesting "github.com/fnproject/fn/api/logs/testing" + "testing" +) + +func TestMock(t *testing.T) { + ls := NewMock() + logTesting.Test(t, ls) +} diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index 2bfc6076b..7f22f7a0f 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -1,15 +1,17 @@ -// package s3 implements an s3 api compatible log store +// Package s3 implements an s3 api compatible log store package s3 import ( "bytes" "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" "net/url" "strings" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -17,6 +19,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" "go.opencensus.io/stats" @@ -32,7 +36,10 @@ import ( // TODO do we need to use the v2 API? can't find BMC object store docs :/ const ( - contentType = "text/plain" + // key prefixes + callKeyPrefix = "c/" + callMarkerPrefix = "m/" + logKeyPrefix = "l/" ) type store struct { @@ -73,7 +80,8 @@ func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey stri } } -// s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true +// New returns an s3 api compatible log store. +// url format: s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true // Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters! func New(u *url.URL) (models.LogStore, error) { endpoint := u.Host @@ -118,24 +126,18 @@ func New(u *url.URL) (models.LogStore, error) { return store, nil } -func path(appName, callID string) string { - // raw url encode, b/c s3 does not like: & $ @ = : ; + , ? - appName = base64.RawURLEncoding.EncodeToString([]byte(appName)) // TODO optimize.. - return appName + "/" + callID -} - func (s *store) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { ctx, span := trace.StartSpan(ctx, "s3_insert_log") defer span.End() // wrap original reader in a decorator to keep track of read bytes without buffering cr := &countingReader{r: callLog} - objectName := path(appID, callID) + objectName := logKey(appID, callID) params := &s3manager.UploadInput{ Bucket: aws.String(s.bucket), Key: aws.String(objectName), Body: cr, - ContentType: aws.String(contentType), + ContentType: aws.String("text/plain"), } logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log") @@ -152,7 +154,7 @@ func (s *store) GetLog(ctx context.Context, appID, callID string) (io.Reader, er ctx, span := trace.StartSpan(ctx, "s3_get_log") defer span.End() - objectName := path(appID, callID) + objectName := logKey(appID, callID) logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log") // stream the logs to an in-memory buffer @@ -173,6 +175,241 @@ func (s *store) GetLog(ctx context.Context, appID, callID string) (io.Reader, er return bytes.NewReader(target.Bytes()), nil } +func (s *store) InsertCall(ctx context.Context, call *models.Call) error { + ctx, span := trace.StartSpan(ctx, "s3_insert_call") + defer span.End() + + byts, err := json.Marshal(call) + if err != nil { + return err + } + + objectName := callKey(call.AppID, call.ID) + params := &s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(objectName), + Body: bytes.NewReader(byts), + ContentType: aws.String("text/plain"), + } + + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call") + _, err = s.uploader.UploadWithContext(ctx, params) + if err != nil { + return fmt.Errorf("failed to insert call, %v", err) + } + + // at this point, they can point lookup the log and it will work. now, we can try to upload + // the marker key. if the marker key upload fails, the user will simply not + // see this entry when listing only when specifying a route path. (NOTE: this + // behavior will go away if we stop listing by route -> triggers) + + objectName = callMarkerKey(call.AppID, call.Path, call.ID) + params = &s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(objectName), + Body: bytes.NewReader([]byte{}), + ContentType: aws.String("text/plain"), + } + + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading call marker") + _, err = s.uploader.UploadWithContext(ctx, params) + if err != nil { + // XXX(reed): we could just log this? + return fmt.Errorf("failed to write marker key for log, %v", err) + } + + return nil +} + +// GetCall returns a call at a certain id and app name. +func (s *store) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_call") + defer span.End() + + objectName := callKey(appID, callID) + logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading call") + + return s.getCallByKey(ctx, objectName) +} + +func (s *store) getCallByKey(ctx context.Context, key string) (*models.Call, error) { + // stream the logs to an in-memory buffer + var target aws.WriteAtBuffer + _, err := s.downloader.DownloadWithContext(ctx, &target, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }) + if err != nil { + aerr, ok := err.(awserr.Error) + if ok && aerr.Code() == s3.ErrCodeNoSuchKey { + return nil, models.ErrCallNotFound + } + return nil, fmt.Errorf("failed to read log, %v", err) + } + + var call models.Call + err = json.Unmarshal(target.Bytes(), &call) + if err != nil { + return nil, err + } + + return &call, nil +} + +func flipCursor(oid string) string { + if oid == "" { + return "" + } + + return id.EncodeDescending(oid) +} + +func callMarkerKey(app, path, id string) string { + id = flipCursor(id) + // s3 urls use / and are url, we need to encode this since paths have / in them + // NOTE: s3 urls are max of 1024 chars. path is the only non-fixed sized object in here + // but it is fixed to 256 chars in sql (by chance, mostly). further validation may be needed if weirdness ensues. + path = base64.RawURLEncoding.EncodeToString([]byte(path)) + return callMarkerPrefix + app + "/" + path + "/" + id +} + +func callKey(app, id string) string { + id = flipCursor(id) + return callKeyFlipped(app, id) +} + +func callKeyFlipped(app, id string) string { + return callKeyPrefix + app + "/" + id +} + +func logKey(appID, callID string) string { + return logKeyPrefix + appID + "/" + callID +} + +// GetCalls returns a list of calls that satisfy the given CallFilter. If no +// calls exist, an empty list and a nil error are returned. +// NOTE: this relies on call ids being lexicographically sortable and <= 16 byte +func (s *store) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "s3_get_calls") + defer span.End() + + if filter.AppID == "" { + return nil, errors.New("s3 store does not support listing across all apps") + } + + // NOTE: + // if filter.Path != "" + // find marker from marker keys, start there, list keys, get next marker from there + // else + // use marker for keys + + // NOTE we need marker keys to support (app is REQUIRED): + // 1) quick iteration per path + // 2) sorted by id across all path + // marker key: m : {app} : {path} : {id} + // key: s: {app} : {id} + // + // also s3 api returns sorted in lexicographic order, we need the reverse of this. + + // marker is either a provided marker, or a key we create based on parameters + // that contains app_id, may be a marker key if path is provided, and may + // have a time guesstimate if to time is provided. + + var marker string + + // filter.Cursor is a call id, translate to our key format. if a path is + // provided, we list keys from markers instead. + if filter.Cursor != "" { + marker = callKey(filter.AppID, filter.Cursor) + if filter.Path != "" { + marker = callMarkerKey(filter.AppID, filter.Path, filter.Cursor) + } + } else if t := time.Time(filter.ToTime); !t.IsZero() { + // get a fake id that has the most significant bits set to the to_time (first 48 bits) + fako := id.NewWithTime(t) + //var buf [id.EncodedSize]byte + //fakoId.MarshalTextTo(buf) + //mid := string(buf[:10]) + mid := fako.String() + marker = callKey(filter.AppID, mid) + if filter.Path != "" { + marker = callMarkerKey(filter.AppID, filter.Path, mid) + } + } + + // prefix prevents leaving bounds of app or path marker keys + prefix := callKey(filter.AppID, "") + if filter.Path != "" { + prefix = callMarkerKey(filter.AppID, filter.Path, "") + } + + input := &s3.ListObjectsInput{ + Bucket: aws.String(s.bucket), + MaxKeys: aws.Int64(int64(filter.PerPage)), + Marker: aws.String(marker), + Prefix: aws.String(prefix), + } + + result, err := s.client.ListObjects(input) + if err != nil { + return nil, fmt.Errorf("failed to list logs: %v", err) + } + + calls := make([]*models.Call, 0, len(result.Contents)) + + for _, obj := range result.Contents { + if len(calls) == filter.PerPage { + break + } + + // extract the app and id from the key to lookup the object, this also + // validates we aren't reading strangely keyed objects from the bucket. + var app, id string + if filter.Path != "" { + fields := strings.Split(*obj.Key, "/") + if len(fields) != 4 { + return calls, fmt.Errorf("invalid key in call markers: %v", *obj.Key) + } + app = fields[1] + id = fields[3] + } else { + fields := strings.Split(*obj.Key, "/") + if len(fields) != 3 { + return calls, fmt.Errorf("invalid key in calls: %v", *obj.Key) + } + app = fields[1] + id = fields[2] + } + + // the id here is already reverse encoded, keep it that way. + objectName := callKeyFlipped(app, id) + + // NOTE: s3 doesn't have a way to get multiple objects so just use GetCall + // TODO we should reuse the buffer to decode these + call, err := s.getCallByKey(ctx, objectName) + if err != nil { + common.Logger(ctx).WithError(err).WithFields(logrus.Fields{"app": app, "id": id}).Error("error filling call object") + continue + } + + // ensure: from_time < created_at < to_time + fromTime := time.Time(filter.FromTime).Truncate(time.Millisecond) + if !fromTime.IsZero() && !fromTime.Before(time.Time(call.CreatedAt)) { + // NOTE could break, ids and created_at aren't necessarily in perfect order + continue + } + + toTime := time.Time(filter.ToTime).Truncate(time.Millisecond) + if !toTime.IsZero() && !time.Time(call.CreatedAt).Before(toTime) { + continue + } + + calls = append(calls, call) + } + + return calls, nil +} + var ( uploadSizeMeasure *stats.Int64Measure downloadSizeMeasure *stats.Int64Measure diff --git a/api/logs/s3/s3_test.go b/api/logs/s3/s3_test.go index 19116292a..1335af104 100644 --- a/api/logs/s3/s3_test.go +++ b/api/logs/s3/s3_test.go @@ -24,5 +24,5 @@ func TestS3(t *testing.T) { if err != nil { t.Fatalf("failed to create s3 datastore: %v", err) } - logTesting.Test(t, nil, ls) + logTesting.Test(t, ls) } diff --git a/api/logs/testing/test.go b/api/logs/testing/test.go index 0c8535a71..90d58817e 100644 --- a/api/logs/testing/test.go +++ b/api/logs/testing/test.go @@ -8,12 +8,9 @@ import ( "testing" "time" - "github.com/fnproject/fn/api/datastore/sql" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/go-openapi/strfmt" - "net/url" - "os" ) var testApp = &models.App{ @@ -28,55 +25,131 @@ var testRoute = &models.Route{ Format: "http", } -func SetupTestCall(t *testing.T, ctx context.Context, ds models.Datastore) *models.Call { +func SetupTestCall(t *testing.T, ctx context.Context, ls models.LogStore) *models.Call { testApp.SetDefaults() - _, err := ds.InsertApp(ctx, testApp) - if err != nil { - t.Log(err.Error()) - t.Fatalf("Test InsertLog(ctx, call.ID, logText): unable to insert app, err: `%v`", err) - } - testRoute.AppID = testApp.ID - _, err = ds.InsertRoute(ctx, testRoute) - if err != nil { - t.Log(err.Error()) - t.Fatalf("Test InsertLog(ctx, call.ID, logText): unable to insert app route, err: `%v`", err) - } - var call models.Call call.AppID = testApp.ID call.CreatedAt = strfmt.DateTime(time.Now()) call.Status = "success" call.StartedAt = strfmt.DateTime(time.Now()) call.CompletedAt = strfmt.DateTime(time.Now()) - call.AppID = testApp.ID call.Path = testRoute.Path return &call } const tmpLogDb = "/tmp/func_test_log.db" -func SetupSQLiteDS(t *testing.T) models.Datastore { - os.Remove(tmpLogDb) +func Test(t *testing.T, fnl models.LogStore) { ctx := context.Background() - uLog, err := url.Parse("sqlite3://" + tmpLogDb) - if err != nil { - t.Fatalf("failed to parse url: %v", err) - } + call := SetupTestCall(t, ctx, fnl) - ds, err := sql.New(ctx, uLog) - if err != nil { - t.Fatalf("failed to create sqlite3 datastore: %v", err) - } - return ds -} + // test list first, the rest are point lookup tests + t.Run("calls-get", func(t *testing.T) { + filter := &models.CallFilter{AppID: call.AppID, Path: call.Path, PerPage: 100} + now := time.Now() + call.CreatedAt = strfmt.DateTime(now) + call.ID = id.New().String() + err := fnl.InsertCall(ctx, call) + if err != nil { + t.Fatal(err) + } + calls, err := fnl.GetCalls(ctx, filter) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } -func Test(t *testing.T, ds models.Datastore, fnl models.LogStore) { - ctx := context.Background() - if ds == nil { - ds = SetupSQLiteDS(t) - } - call := SetupTestCall(t, ctx, ds) + c2 := *call + c3 := *call + now = time.Now().Add(100 * time.Millisecond) + c2.CreatedAt = strfmt.DateTime(now) // add ms cuz db uses it for sort + c2.ID = id.New().String() + + now = time.Now().Add(200 * time.Millisecond) + c3.CreatedAt = strfmt.DateTime(now) + c3.ID = id.New().String() + + err = fnl.InsertCall(ctx, &c2) + if err != nil { + t.Fatal(err) + } + err = fnl.InsertCall(ctx, &c3) + if err != nil { + t.Fatal(err) + } + + // test that no filter works too + calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, PerPage: 100}) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 3 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } + + // test that pagination stuff works. id, descending + filter.PerPage = 1 + calls, err = fnl.GetCalls(ctx, filter) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } else if calls[0].ID != c3.ID { + t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c3.ID) + } + + filter.PerPage = 100 + filter.Cursor = calls[0].ID + calls, err = fnl.GetCalls(ctx, filter) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 2 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } else if calls[0].ID != c2.ID { + t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[0].ID, c2.ID) + } else if calls[1].ID != call.ID { + t.Fatalf("Test GetCalls: call ids not in expected order: %v %v", calls[1].ID, call.ID) + } + + // test that filters actually applied + calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: "wrongappname", PerPage: 100}) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 0 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } + + calls, err = fnl.GetCalls(ctx, &models.CallFilter{AppID: call.AppID, Path: "wrongpath", PerPage: 100}) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 0 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } + + // make sure from_time and to_time work + filter = &models.CallFilter{ + PerPage: 100, + FromTime: call.CreatedAt, + ToTime: c3.CreatedAt, + AppID: call.AppID, + } + calls, err = fnl.GetCalls(ctx, filter) + if err != nil { + t.Fatalf("Test GetCalls(ctx, filter): unexpected error `%v`", err) + } + if len(calls) != 1 { + t.Fatalf("Test GetCalls(ctx, filter): unexpected length `%v`", len(calls)) + } else if calls[0].ID != c2.ID { + t.Fatalf("Test GetCalls: call id not expected %s vs %s", calls[0].ID, c2.ID) + } + }) t.Run("call-log-insert-get", func(t *testing.T) { call.ID = id.New().String() @@ -86,10 +159,7 @@ func Test(t *testing.T, ds models.Datastore, fnl models.LogStore) { if err != nil { t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during inserting log `%v`", err) } - logEntry, err := fnl.GetLog(ctx, testApp.ID, call.ID) - if err != nil { - t.Fatalf("Test InsertLog(ctx, call.ID, logText): unexpected error during log get `%v`", err) - } + logEntry, err := fnl.GetLog(ctx, call.AppID, call.ID) var b bytes.Buffer io.Copy(&b, logEntry) if !strings.Contains(b.String(), logText) { @@ -105,4 +175,57 @@ func Test(t *testing.T, ds models.Datastore, fnl models.LogStore) { t.Fatal("GetLog should return not found, but got:", err) } }) + + call = new(models.Call) + call.CreatedAt = strfmt.DateTime(time.Now()) + call.Status = "error" + call.Error = "ya dun goofed" + call.StartedAt = strfmt.DateTime(time.Now()) + call.CompletedAt = strfmt.DateTime(time.Now()) + call.AppID = testApp.Name + call.Path = testRoute.Path + + t.Run("call-insert", func(t *testing.T) { + call.ID = id.New().String() + err := fnl.InsertCall(ctx, call) + if err != nil { + t.Fatalf("Test InsertCall(ctx, &call): unexpected error `%v`", err) + } + }) + + t.Run("call-get", func(t *testing.T) { + call.ID = id.New().String() + err := fnl.InsertCall(ctx, call) + if err != nil { + t.Fatalf("Test GetCall: unexpected error `%v`", err) + } + newCall, err := fnl.GetCall(ctx, call.AppID, call.ID) + if err != nil { + t.Fatalf("Test GetCall: unexpected error `%v`", err) + } + if call.ID != newCall.ID { + t.Fatalf("Test GetCall: id mismatch `%v` `%v`", call.ID, newCall.ID) + } + if call.Status != newCall.Status { + t.Fatalf("Test GetCall: status mismatch `%v` `%v`", call.Status, newCall.Status) + } + if call.Error != newCall.Error { + t.Fatalf("Test GetCall: error mismatch `%v` `%v`", call.Error, newCall.Error) + } + if time.Time(call.CreatedAt).Unix() != time.Time(newCall.CreatedAt).Unix() { + t.Fatalf("Test GetCall: created_at mismatch `%v` `%v`", call.CreatedAt, newCall.CreatedAt) + } + if time.Time(call.StartedAt).Unix() != time.Time(newCall.StartedAt).Unix() { + t.Fatalf("Test GetCall: started_at mismatch `%v` `%v`", call.StartedAt, newCall.StartedAt) + } + if time.Time(call.CompletedAt).Unix() != time.Time(newCall.CompletedAt).Unix() { + t.Fatalf("Test GetCall: completed_at mismatch `%v` `%v`", call.CompletedAt, newCall.CompletedAt) + } + if call.AppID != newCall.AppID { + t.Fatalf("Test GetCall: app_name mismatch `%v` `%v`", call.AppID, newCall.AppID) + } + if call.Path != newCall.Path { + t.Fatalf("Test GetCall: path mismatch `%v` `%v`", call.Path, newCall.Path) + } + }) } diff --git a/api/models/datastore.go b/api/models/datastore.go index 533c6cfa8..79ea24736 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -58,22 +58,6 @@ type Datastore interface { // ErrDatastoreEmptyRoutePath when routePath is empty. Returns ErrRoutesNotFound when no route exists. RemoveRoute(ctx context.Context, appID, routePath string) error - // InsertCall inserts a call into the datastore, it will error if the call already - // exists. - InsertCall(ctx context.Context, call *Call) error - - // UpdateCall atomically updates a call into the datastore to the "to" value if it finds an existing call equivalent - // to "from", otherwise it will error. ErrCallNotFound is returned if the call was not found, and - // ErrDatastoreCannotUpdateCall is returned if a call with the right AppName/ID exists but is different from "from". - UpdateCall(ctx context.Context, from *Call, to *Call) error - - // GetCall returns a call at a certain id and app name. - GetCall(ctx context.Context, appID, callID string) (*Call, error) - - // GetCalls returns a list of calls that satisfy the given CallFilter. If no - // calls exist, an empty list and a nil error are returned. - GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error) - // Implement LogStore methods for convenience LogStore diff --git a/api/models/error.go b/api/models/error.go index 77c5411ee..0b3da6f00 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -76,10 +76,6 @@ var ( code: http.StatusBadRequest, error: errors.New("Missing call ID"), } - ErrDatastoreCannotUpdateCall = err{ - code: http.StatusConflict, - error: errors.New("Call to be updated is different from expected"), - } ErrInvalidPayload = err{ code: http.StatusBadRequest, error: errors.New("Invalid payload"), diff --git a/api/models/logs.go b/api/models/logs.go index cd7dad07b..f035d5e46 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -19,4 +19,15 @@ type LogStore interface { // * route gets nuked // * app gets nuked // * call+logs getting cleaned up periodically + + // InsertCall inserts a call into the datastore, it will error if the call already + // exists. + InsertCall(ctx context.Context, call *Call) error + + // GetCall returns a call at a certain id and app name. + GetCall(ctx context.Context, appName, callID string) (*Call, error) + + // GetCalls returns a list of calls that satisfy the given CallFilter. If no + // calls exist, an empty list and a nil error are returned. + GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error) } diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 710258a71..9a9668bdb 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -116,9 +116,7 @@ func TestAppDelete(t *testing.T) { Name: "myapp", } app.SetDefaults() - ds := datastore.NewMockInit( - []*models.App{app}, nil, nil, - ) + ds := datastore.NewMockInit([]*models.App{app}) for i, test := range []struct { ds models.Datastore logDB models.LogStore @@ -168,8 +166,6 @@ func TestAppList(t *testing.T) { {Name: "myapp2"}, {Name: "myapp3"}, }, - nil, // no routes - nil, // no calls ) fnl := logs.NewMock() srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) @@ -277,9 +273,7 @@ func TestAppUpdate(t *testing.T) { Name: "myapp", } app.SetDefaults() - ds := datastore.NewMockInit( - []*models.App{app}, nil, nil, - ) + ds := datastore.NewMockInit([]*models.App{app}) for i, test := range []struct { mock models.Datastore diff --git a/api/server/call_get.go b/api/server/call_get.go index e2b1c0bf3..5f8855577 100644 --- a/api/server/call_get.go +++ b/api/server/call_get.go @@ -13,7 +13,7 @@ func (s *Server) handleCallGet(c *gin.Context) { callID := c.Param(api.Call) appID := c.MustGet(api.AppID).(string) - callObj, err := s.datastore.GetCall(ctx, appID, callID) + callObj, err := s.logstore.GetCall(ctx, appID, callID) if err != nil { handleErrorResponse(c, err) return diff --git a/api/server/call_list.go b/api/server/call_list.go index 0b5b4e285..f43c9e726 100644 --- a/api/server/call_list.go +++ b/api/server/call_list.go @@ -26,7 +26,7 @@ func (s *Server) handleCallList(c *gin.Context) { return } - calls, err := s.datastore.GetCalls(ctx, &filter) + calls, err := s.logstore.GetCalls(ctx, &filter) var nextCursor string if len(calls) > 0 && len(calls) == filter.PerPage { diff --git a/api/server/calls_test.go b/api/server/calls_test.go index 92e50a5d4..bfd4bcd6b 100644 --- a/api/server/calls_test.go +++ b/api/server/calls_test.go @@ -18,6 +18,11 @@ import ( func TestCallGet(t *testing.T) { buf := setLogBuffer() + defer func() { + if t.Failed() { + t.Log(buf.String()) + } + }() app := &models.App{Name: "myapp"} app.SetDefaults() @@ -43,10 +48,8 @@ func TestCallGet(t *testing.T) { defer cancel() ds := datastore.NewMockInit( []*models.App{app}, - nil, - []*models.Call{call}, ) - fnl := logs.NewMock() + fnl := logs.NewMock([]*models.Call{call}) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { @@ -64,7 +67,6 @@ func TestCallGet(t *testing.T) { _, rec := routerRequest(t, srv.Router, "GET", test.path, nil) if rec.Code != test.expectedCode { - t.Log(buf.String()) t.Log(rec.Body.String()) t.Errorf("Test %d: Expected status code to be %d but was %d", i, test.expectedCode, rec.Code) @@ -74,7 +76,6 @@ func TestCallGet(t *testing.T) { resp := getErrorResponse(t, rec) if !strings.Contains(resp.Error.Message, test.expectedError.Error()) { - t.Log(buf.String()) t.Log(resp.Error.Message) t.Log(rec.Body.String()) t.Errorf("Test %d: Expected error message to have `%s`", @@ -87,6 +88,11 @@ func TestCallGet(t *testing.T) { func TestCallList(t *testing.T) { buf := setLogBuffer() + defer func() { + if t.Failed() { + t.Log(buf.String()) + } + }() app := &models.App{Name: "myapp"} app.SetDefaults() @@ -110,21 +116,19 @@ func TestCallList(t *testing.T) { } c2 := *call c3 := *call - c2.ID = id.New().String() c2.CreatedAt = strfmt.DateTime(time.Now().Add(100 * time.Second)) + c2.ID = id.New().String() c2.Path = "test2" - c3.ID = id.New().String() c3.CreatedAt = strfmt.DateTime(time.Now().Add(200 * time.Second)) + c3.ID = id.New().String() c3.Path = "/test3" rnr, cancel := testRunner(t) defer cancel() ds := datastore.NewMockInit( []*models.App{app}, - nil, - []*models.Call{call, &c2, &c3}, ) - fnl := logs.NewMock() + fnl := logs.NewMock([]*models.Call{call, &c2, &c3}) srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) // add / sub 1 second b/c unix time will lop off millis and mess up our comparisons @@ -159,7 +163,6 @@ func TestCallList(t *testing.T) { _, rec := routerRequest(t, srv.Router, "GET", test.path, nil) if rec.Code != test.expectedCode { - t.Log(buf.String()) t.Errorf("Test %d: Expected status code to be %d but was %d", i, test.expectedCode, rec.Code) } @@ -168,7 +171,6 @@ func TestCallList(t *testing.T) { resp := getErrorResponse(t, rec) if resp.Error == nil || !strings.Contains(resp.Error.Message, test.expectedError.Error()) { - t.Log(buf.String()) t.Errorf("Test %d: Expected error message to have `%s`, got: `%s`", i, test.expectedError.Error(), resp.Error) } diff --git a/api/server/middleware_test.go b/api/server/middleware_test.go index 6a6de597f..f603bba96 100644 --- a/api/server/middleware_test.go +++ b/api/server/middleware_test.go @@ -2,7 +2,6 @@ package server import ( "context" - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -79,7 +78,7 @@ func TestRootMiddleware(t *testing.T) { {Path: "/app2func", AppID: app2.ID, Image: "fnproject/hello", Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: map[string][]string{"X-Function": {"Test"}}, Config: map[string]string{"NAME": "johnny"}, }, - }, nil, + }, ) rnr, cancelrnr := testRunner(t, ds) @@ -91,7 +90,7 @@ func TestRootMiddleware(t *testing.T) { // this one will override a call to the API based on a header return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Header.Get("funcit") != "" { - fmt.Fprintf(os.Stderr, "breaker breaker!\n") + t.Log("breaker breaker!") ctx := r.Context() // TODO: this is a little dicey, should have some functions to set these in case the context keys change or something. ctx = context.WithValue(ctx, "app", "myapp2") @@ -106,7 +105,7 @@ func TestRootMiddleware(t *testing.T) { }) srv.AddRootMiddlewareFunc(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(os.Stderr, "middle log\n") + t.Log("middle log") next.ServeHTTP(w, r) }) }) @@ -132,7 +131,7 @@ func TestRootMiddleware(t *testing.T) { for k, v := range test.headers { req.Header.Add(k, v[0]) } - fmt.Println("TESTING:", req.URL.String()) + t.Log("TESTING:", req.URL.String()) _, rec := routerRequest2(t, srv.Router, req) // t.Log("REC: %+v\n", rec) diff --git a/api/server/routes_test.go b/api/server/routes_test.go index d3112d952..26c8715c9 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -100,7 +100,7 @@ func TestRouteCreate(t *testing.T) { a := &models.App{Name: "a"} a.SetDefaults() - commonDS := datastore.NewMockInit([]*models.App{a}, nil, nil) + commonDS := datastore.NewMockInit([]*models.App{a}) for i, test := range []routeTestCase{ // errors {commonDS, logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", ``, http.StatusBadRequest, models.ErrInvalidJSON}, @@ -118,7 +118,7 @@ func TestRouteCreate(t *testing.T) { AppID: a.ID, Path: "/myroute", }, - }, nil, + }, ), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/fn-test-utils", "path": "/myroute", "type": "sync" } }`, http.StatusConflict, models.ErrRoutesAlreadyExists}, // success @@ -135,7 +135,7 @@ func TestRoutePut(t *testing.T) { a := &models.App{Name: "a"} a.SetDefaults() - commonDS := datastore.NewMockInit([]*models.App{a}, nil, nil) + commonDS := datastore.NewMockInit([]*models.App{a}) for i, test := range []routeTestCase{ // errors (NOTE: this route doesn't exist yet) @@ -163,7 +163,7 @@ func TestRouteDelete(t *testing.T) { a := &models.App{Name: "a"} a.SetDefaults() routes := []*models.Route{{AppID: a.ID, Path: "/myroute"}} - commonDS := datastore.NewMockInit([]*models.App{a}, routes, nil) + commonDS := datastore.NewMockInit([]*models.App{a}, routes) for i, test := range []struct { ds models.Datastore @@ -225,7 +225,6 @@ func TestRouteList(t *testing.T) { AppID: app.ID, }, }, - nil, // no calls ) fnl := logs.NewMock() @@ -329,7 +328,7 @@ func TestRouteGet(t *testing.T) { func TestRouteUpdate(t *testing.T) { buf := setLogBuffer() - ds := datastore.NewMockInit(nil, nil, nil) + ds := datastore.NewMockInit() for i, test := range []routeTestCase{ // success diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 9ee4fa5b5..05920e88b 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -45,7 +45,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { {Type: "async", Path: "/myroute", AppID: app.ID, Image: "fnproject/hello", Config: map[string]string{"test": "true"}, Memory: 128, CPUs: 200, Timeout: 30, IdleTimeout: 30}, {Type: "async", Path: "/myerror", AppID: app.ID, Image: "fnproject/error", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 30, IdleTimeout: 30}, {Type: "async", Path: "/myroute/:param", AppID: app.ID, Image: "fnproject/hello", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 30, IdleTimeout: 30}, - }, nil, + }, ) mq := &mqs.Mock{} diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 94e587e3c..8ee500ade 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -61,7 +61,7 @@ func TestRouteRunnerGet(t *testing.T) { app := &models.App{Name: "myapp", Config: models.Config{}} app.SetDefaults() ds := datastore.NewMockInit( - []*models.App{app}, nil, nil, + []*models.App{app}, ) rnr, cancel := testRunner(t, ds) @@ -105,7 +105,7 @@ func TestRouteRunnerPost(t *testing.T) { app := &models.App{Name: "myapp", Config: models.Config{}} app.SetDefaults() ds := datastore.NewMockInit( - []*models.App{app}, nil, nil, + []*models.App{app}, ) rnr, cancel := testRunner(t, ds) @@ -177,7 +177,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { []*models.Route{ {Path: "/json", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 30, IdleTimeout: 30, Config: rCfg}, {Path: "/http", AppID: app.ID, Image: rImg, Type: "sync", Format: "http", Memory: 64, Timeout: 30, IdleTimeout: 30, Config: rCfg}, - }, nil, + }, ) rnr, cancelrnr := testRunner(t, ds) @@ -346,7 +346,7 @@ func TestRouteRunnerExecution(t *testing.T) { {Path: "/mybigoutputcold", AppID: app.ID, Image: rImg, Type: "sync", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg}, {Path: "/mybigoutputhttp", AppID: app.ID, Image: rImg, Type: "sync", Format: "http", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg}, {Path: "/mybigoutputjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg}, - }, nil, + }, ) rnr, cancelrnr := testRunner(t, ds) @@ -538,7 +538,7 @@ func TestFailedEnqueue(t *testing.T) { []*models.App{app}, []*models.Route{ {Path: "/dummy", Image: "dummy/dummy", Type: "async", Memory: 128, Timeout: 30, IdleTimeout: 30, AppID: app.ID}, - }, nil, + }, ) err := errors.New("Unable to push task to queue") mq := &errorMQ{err, http.StatusInternalServerError} @@ -591,7 +591,7 @@ func TestRouteRunnerTimeout(t *testing.T) { {Path: "/hot-json", Image: "fnproject/fn-test-utils", Type: "sync", Format: "json", Memory: 128, Timeout: 4, IdleTimeout: 30, AppID: app.ID}, {Path: "/bigmem-cold", Image: "fnproject/fn-test-utils", Type: "sync", Memory: hugeMem, Timeout: 1, IdleTimeout: 30, AppID: app.ID}, {Path: "/bigmem-hot", Image: "fnproject/fn-test-utils", Type: "sync", Format: "http", Memory: hugeMem, Timeout: 1, IdleTimeout: 30, AppID: app.ID}, - }, nil, + }, ) rnr, cancelrnr := testRunner(t, ds) @@ -661,7 +661,7 @@ func TestRouteRunnerMinimalConcurrentHotSync(t *testing.T) { []*models.App{app}, []*models.Route{ {Path: "/hot", AppID: app.ID, Image: "fnproject/fn-test-utils", Type: "sync", Format: "http", Memory: 128, Timeout: 30, IdleTimeout: 5}, - }, nil, + }, ) rnr, cancelrnr := testRunner(t, ds) diff --git a/api/server/server_test.go b/api/server/server_test.go index a1d405d4d..10bfd536f 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -266,7 +266,7 @@ func TestHybridEndpoints(t *testing.T) { []*models.Route{{ AppID: app.ID, Path: "yodawg", - }}, nil, + }}, ) logDB := logs.NewMock()