diff --git a/.circleci/config.yml b/.circleci/config.yml index 921b9eea9..90d92371b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -49,7 +49,7 @@ jobs: # Rebuild fnserver if necessary - run: | if [[ -n "$FN_NEEDED" ]]; then - make full-test -j $(nproc) + make test -j $(nproc) fi - deploy: diff --git a/Makefile b/Makefile index 08930f346..10f1855c8 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ test-extensions: test-basic test-basic: checkfmt pull-images fn-test-utils ./test.sh -test: checkfmt pull-images test-basic test-middleware test-extensions +test: checkfmt pull-images test-basic test-middleware test-extensions test-api test-system test-api: test-basic ./api_test.sh sqlite3 4 @@ -55,8 +55,6 @@ test-system: test-basic ./system_test.sh mysql 4 0 ./system_test.sh postgres 4 0 -full-test: test test-api test-system - img-busybox: docker pull busybox img-hello: diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 6d58018a8..96df8572d 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -19,6 +19,7 @@ import ( "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/id" + "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/mqs" "github.com/sirupsen/logrus" @@ -99,8 +100,9 @@ func TestCallConfigurationRequest(t *testing.T) { }, }, ) + ls := logs.NewMock() - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) w := httptest.NewRecorder() @@ -242,8 +244,9 @@ func TestCallConfigurationModel(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit() + ls := logs.NewMock() - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) callI, err := a.GetCall(FromModel(cm)) @@ -313,8 +316,9 @@ func TestAsyncCallHeaders(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit() + ls := logs.NewMock() - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) callI, err := a.GetCall(FromModel(cm)) @@ -439,8 +443,9 @@ func TestReqTooLarge(t *testing.T) { } cfg.MaxRequestSize = 5 + ls := logs.NewMock() - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg)) defer checkClose(t, a) _, err = a.GetCall(FromModel(cm)) @@ -492,8 +497,9 @@ func TestSubmitError(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit() + ls := logs.NewMock() - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) var wg sync.WaitGroup @@ -560,7 +566,8 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { }, ) - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) bodOne := `{"echoContent":"yodawg"}` @@ -623,7 +630,8 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { // FromModel doesn't need a datastore, for now... ds := datastore.NewMockInit() - a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) + ls := logs.NewMock() + a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ls, new(mqs.Mock)))) defer checkClose(t, a) _, err := a.GetCall(FromModel(call)) @@ -659,7 +667,8 @@ func TestTmpFsRW(t *testing.T) { }, ) - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) // Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB @@ -763,7 +772,8 @@ func TestTmpFsSize(t *testing.T) { cfg.MaxTmpFsInodes = 1024 - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg)) defer checkClose(t, a) // Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB @@ -932,7 +942,8 @@ func TestPipesAreClear(t *testing.T) { }, ) - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) // test read this body after 5s (after call times out) and make sure we don't get yodawg @@ -1082,7 +1093,8 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { }, ) - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock))) defer checkClose(t, a) bodOne := `{"echoContent":"yodawg"}` @@ -1188,7 +1200,8 @@ func TestNBIOResourceTracker(t *testing.T) { cfg.MaxTotalMemory = 280 * 1024 * 1024 cfg.HotPoll = 20 * time.Millisecond - a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) + ls := logs.NewMock() + a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg)) defer checkClose(t, a) reqCount := 20 @@ -1248,8 +1261,9 @@ type closingDataAccess struct { func newClosingDataAccess(closeReturn error) *closingDataAccess { ds := datastore.NewMockInit() + ls := logs.NewMock() return &closingDataAccess{ - DataAccess: NewDirectDataAccess(ds, ds, new(mqs.Mock)), + DataAccess: NewDirectDataAccess(ds, ls, new(mqs.Mock)), closed: make(chan struct{}), closeReturn: closeReturn, } diff --git a/api/agent/data_access.go b/api/agent/data_access.go index 7cf15eccf..ab35215fa 100644 --- a/api/agent/data_access.go +++ b/api/agent/data_access.go @@ -192,7 +192,7 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde // and Datastore are different, it will call Close on the Logstore as well. func (da *directDataAccess) Close() error { err := da.ds.Close() - if da.ds != da.ls { + if ls, ok := da.ds.(models.LogStore); ok && ls != da.ls { if daErr := da.ls.Close(); daErr != nil { err = daErr } diff --git a/api/datastore/datastore.go b/api/datastore/datastore.go index b50148ed2..895014d53 100644 --- a/api/datastore/datastore.go +++ b/api/datastore/datastore.go @@ -13,12 +13,7 @@ import ( ) func New(ctx context.Context, dbURL string) (models.Datastore, error) { - ds, err := newds(ctx, dbURL) // teehee - if err != nil { - return nil, err - } - - return Wrap(ds), nil + return newds(ctx, dbURL) // teehee } func Wrap(ds models.Datastore) models.Datastore { diff --git a/api/datastore/internal/datastoreutil/metrics.go b/api/datastore/internal/datastoreutil/metrics.go index da168dd27..ab57226bd 100644 --- a/api/datastore/internal/datastoreutil/metrics.go +++ b/api/datastore/internal/datastoreutil/metrics.go @@ -2,7 +2,6 @@ package datastoreutil import ( "context" - "io" "go.opencensus.io/trace" @@ -84,36 +83,6 @@ func (m *metricds) RemoveRoute(ctx context.Context, appID string, routePath stri return m.ds.RemoveRoute(ctx, appID, routePath) } -func (m *metricds) InsertCall(ctx context.Context, call *models.Call) error { - ctx, span := trace.StartSpan(ctx, "ds_insert_call") - defer span.End() - return m.ds.InsertCall(ctx, call) -} - -func (m *metricds) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { - ctx, span := trace.StartSpan(ctx, "ds_get_call") - defer span.End() - return m.ds.GetCall(ctx, appName, callID) -} - -func (m *metricds) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { - ctx, span := trace.StartSpan(ctx, "ds_get_calls") - defer span.End() - return m.ds.GetCalls(ctx, filter) -} - -func (m *metricds) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { - ctx, span := trace.StartSpan(ctx, "ds_insert_log") - defer span.End() - return m.ds.InsertLog(ctx, appName, callID, callLog) -} - -func (m *metricds) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) { - ctx, span := trace.StartSpan(ctx, "ds_get_log") - defer span.End() - return m.ds.GetLog(ctx, appName, callID) -} - // instant & no context ;) func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() } diff --git a/api/datastore/internal/datastoreutil/validator.go b/api/datastore/internal/datastoreutil/validator.go index 4648dce8a..090eefefa 100644 --- a/api/datastore/internal/datastoreutil/validator.go +++ b/api/datastore/internal/datastoreutil/validator.go @@ -132,14 +132,6 @@ func (v *validator) RemoveRoute(ctx context.Context, appID string, routePath str return v.Datastore.RemoveRoute(ctx, appID, routePath) } -// callID will never be empty. -func (v *validator) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { - if callID == "" { - return nil, models.ErrDatastoreEmptyCallID - } - return v.Datastore.GetCall(ctx, appName, callID) -} - // GetDatabase returns the underlying sqlx database implementation func (v *validator) GetDatabase() *sqlx.DB { return v.Datastore.GetDatabase() diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index f80a9540e..90a275aa3 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -97,18 +97,23 @@ const ( EnvDBPingMaxRetries = "FN_DS_DB_PING_MAX_RETRIES" ) -type sqlStore struct { +var ( // compiler will yell nice things about our upbringing as a child + _ models.Datastore = new(SQLStore) + _ models.LogStore = new(SQLStore) +) + +type SQLStore struct { db *sqlx.DB } // New will open the db specified by url, create any tables necessary // and return a models.Datastore safe for concurrent usage. -func New(ctx context.Context, url *url.URL) (models.Datastore, error) { +func New(ctx context.Context, url *url.URL) (*SQLStore, error) { return newDS(ctx, url) } // for test methods, return concrete type, but don't expose -func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) { +func newDS(ctx context.Context, url *url.URL) (*SQLStore, error) { driver := url.Scheme log := common.Logger(ctx) @@ -158,7 +163,7 @@ func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) { db.SetMaxOpenConns(1) } - sdb := &sqlStore{db: db} + sdb := &SQLStore{db: db} // NOTE: runMigrations happens before we create all the tables, so that it // can detect whether the db did not exist and insert the latest version of @@ -259,7 +264,7 @@ func checkExistence(tx *sqlx.Tx) (bool, error) { // check if the db already existed, if the db is brand new then we can skip // over all the migrations BUT we must be sure to set the right migration // number so that only current migrations are skipped, not any future ones. -func (ds *sqlStore) runMigrations(ctx context.Context, tx *sqlx.Tx, migrations []migratex.Migration) error { +func (ds *SQLStore) runMigrations(ctx context.Context, tx *sqlx.Tx, migrations []migratex.Migration) error { dbExists, err := checkExistence(tx) if err != nil { return err @@ -287,7 +292,7 @@ func latestVersion(migs []migratex.Migration) int64 { } // clear is for tests only, be careful, it deletes all records. -func (ds *sqlStore) clear() error { +func (ds *SQLStore) clear() error { return ds.Tx(func(tx *sqlx.Tx) error { query := tx.Rebind(`DELETE FROM routes`) _, err := tx.Exec(query) @@ -313,7 +318,7 @@ func (ds *sqlStore) clear() error { }) } -func (ds *sqlStore) GetAppID(ctx context.Context, appName string) (string, error) { +func (ds *SQLStore) GetAppID(ctx context.Context, appName string) (string, error) { var app models.App query := ds.db.Rebind(ensureAppSelector) row := ds.db.QueryRowxContext(ctx, query, appName) @@ -329,7 +334,7 @@ func (ds *sqlStore) GetAppID(ctx context.Context, appName string) (string, error return app.ID, nil } -func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { +func (ds *SQLStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { query := ds.db.Rebind(`INSERT INTO apps ( id, name, @@ -370,7 +375,7 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App return app, nil } -func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { +func (ds *SQLStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { var app models.App err := ds.Tx(func(tx *sqlx.Tx) error { // NOTE: must query whole object since we're returning app, Update logic @@ -415,7 +420,7 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models. return &app, nil } -func (ds *sqlStore) RemoveApp(ctx context.Context, appID string) error { +func (ds *SQLStore) RemoveApp(ctx context.Context, appID string) error { return ds.Tx(func(tx *sqlx.Tx) error { res, err := tx.ExecContext(ctx, tx.Rebind(`DELETE FROM apps WHERE id=?`), appID) if err != nil { @@ -445,7 +450,7 @@ func (ds *sqlStore) RemoveApp(ctx context.Context, appID string) error { }) } -func (ds *sqlStore) GetAppByID(ctx context.Context, appID string) (*models.App, error) { +func (ds *SQLStore) GetAppByID(ctx context.Context, appID string) (*models.App, error) { var app models.App query := ds.db.Rebind(appIDSelector) row := ds.db.QueryRowxContext(ctx, query, appID) @@ -461,7 +466,7 @@ func (ds *sqlStore) GetAppByID(ctx context.Context, appID string) (*models.App, } // GetApps retrieves an array of apps according to a specific filter. -func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { +func (ds *SQLStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { res := []*models.App{} if filter.NameIn != nil && len(filter.NameIn) == 0 { // this basically makes sure it doesn't return ALL apps return res, nil @@ -495,7 +500,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m return res, nil } -func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { +func (ds *SQLStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { err := ds.Tx(func(tx *sqlx.Tx) error { query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`) r := tx.QueryRowContext(ctx, query, route.AppID) @@ -557,7 +562,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode return route, err } -func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { +func (ds *SQLStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { var route models.Route err := ds.Tx(func(tx *sqlx.Tx) error { query := tx.Rebind(fmt.Sprintf("%s WHERE app_id=? AND path=?", routeSelector)) @@ -612,7 +617,7 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m return &route, nil } -func (ds *sqlStore) RemoveRoute(ctx context.Context, appID string, routePath string) error { +func (ds *SQLStore) RemoveRoute(ctx context.Context, appID string, routePath string) error { query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_id = ?`) res, err := ds.db.ExecContext(ctx, query, routePath, appID) if err != nil { @@ -631,7 +636,7 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appID string, routePath str return nil } -func (ds *sqlStore) GetRoute(ctx context.Context, appID, routePath string) (*models.Route, error) { +func (ds *SQLStore) GetRoute(ctx context.Context, appID, routePath string) (*models.Route, error) { rSelectCondition := "%s WHERE app_id=? AND path=?" query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector)) row := ds.db.QueryRowxContext(ctx, query, appID, routePath) @@ -647,7 +652,7 @@ func (ds *sqlStore) GetRoute(ctx context.Context, appID, routePath string) (*mod } // GetRoutesByApp retrieves a route with a specific app name. -func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appID string, filter *models.RouteFilter) ([]*models.Route, error) { +func (ds *SQLStore) GetRoutesByApp(ctx context.Context, appID string, filter *models.RouteFilter) ([]*models.Route, error) { res := []*models.Route{} if filter == nil { filter = new(models.RouteFilter) @@ -684,7 +689,7 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appID string, filter *mo return res, nil } -func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error { +func (ds *SQLStore) Tx(f func(*sqlx.Tx) error) error { tx, err := ds.db.Beginx() if err != nil { return err @@ -697,7 +702,7 @@ func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error { return tx.Commit() } -func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { +func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error { query := ds.db.Rebind(`INSERT INTO calls ( id, created_at, @@ -725,7 +730,7 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { return err } -func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { +func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector) query = ds.db.Rebind(query) row := ds.db.QueryRowxContext(ctx, query, callID, appID) @@ -741,7 +746,7 @@ func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models. return &call, nil } -func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { +func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { res := []*models.Call{} query, args := buildFilterCallQuery(filter) query = fmt.Sprintf("%s %s", callSelector, query) @@ -766,7 +771,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([] return res, nil } -func (ds *sqlStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error { +func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error { // coerce this into a string for sql var log string if stringer, ok := logR.(fmt.Stringer); ok { @@ -785,7 +790,7 @@ func (ds *sqlStore) InsertLog(ctx context.Context, appID, callID string, logR io return err } -func (ds *sqlStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { +func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`) row := ds.db.QueryRowContext(ctx, query, callID, appID) @@ -911,11 +916,11 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) { } // GetDatabase returns the underlying sqlx database implementation -func (ds *sqlStore) GetDatabase() *sqlx.DB { +func (ds *SQLStore) GetDatabase() *sqlx.DB { return ds.db } // Close closes the database, releasing any open resources. -func (ds *sqlStore) Close() error { +func (ds *SQLStore) Close() error { return ds.db.Close() } diff --git a/api/datastore/sql/sql_test.go b/api/datastore/sql/sql_test.go index ed6e147da..84d29e2ea 100644 --- a/api/datastore/sql/sql_test.go +++ b/api/datastore/sql/sql_test.go @@ -20,7 +20,7 @@ import ( // * run all down migrations // * run all up migrations // [ then run tests against that db ] -func newWithMigrations(ctx context.Context, url *url.URL) (*sqlStore, error) { +func newWithMigrations(ctx context.Context, url *url.URL) (*SQLStore, error) { ds, err := newDS(ctx, url) if err != nil { return nil, err @@ -49,16 +49,20 @@ func TestDatastore(t *testing.T) { if err != nil { t.Fatal(err) } - f := func(t *testing.T) models.Datastore { + f := func(t *testing.T) *SQLStore { os.RemoveAll("sqlite_test_dir") ds, err := newDS(ctx, u) if err != nil { t.Fatal(err) } // we don't want to test the validator, really + return ds + } + f2 := func(t *testing.T) models.Datastore { + ds := f(t) return datastoreutil.NewValidator(ds) } - datastoretest.Test(t, f) + datastoretest.Test(t, f2) // also logs logstoretest.Test(t, f(t)) @@ -72,7 +76,7 @@ func TestDatastore(t *testing.T) { // will down migrate all migrations, up migrate, and run tests again. both := func(u *url.URL) { - f := func(t *testing.T) models.Datastore { + f := func(t *testing.T) *SQLStore { ds, err := newDS(ctx, u) if err != nil { t.Fatal(err) @@ -81,16 +85,20 @@ func TestDatastore(t *testing.T) { if err != nil { t.Fatal(err) } + return ds + } + f2 := func(t *testing.T) models.Datastore { + ds := f(t) return datastoreutil.NewValidator(ds) } // test fresh w/o migrations - datastoretest.Test(t, f) + datastoretest.Test(t, f2) // also test sql implements logstore logstoretest.Test(t, f(t)) - f = func(t *testing.T) models.Datastore { + f = func(t *testing.T) *SQLStore { t.Log("with migrations now!") ds, err := newWithMigrations(ctx, u) if err != nil { @@ -100,11 +108,15 @@ func TestDatastore(t *testing.T) { if err != nil { t.Fatal(err) } + return ds + } + f2 = func(t *testing.T) models.Datastore { + ds := f(t) return datastoreutil.NewValidator(ds) } // test that migrations work & things work with them - datastoretest.Test(t, f) + datastoretest.Test(t, f2) // also test sql implements logstore logstoretest.Test(t, f(t)) diff --git a/api/logs/log.go b/api/logs/log.go index 293f556bf..3ded597c6 100644 --- a/api/logs/log.go +++ b/api/logs/log.go @@ -8,7 +8,9 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore/sql" + "github.com/fnproject/fn/api/logs/metrics" "github.com/fnproject/fn/api/logs/s3" + "github.com/fnproject/fn/api/logs/validator" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" ) @@ -20,12 +22,19 @@ func New(ctx context.Context, dbURL string) (models.LogStore, error) { log.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") } log.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store") + var ls models.LogStore switch u.Scheme { case "sqlite3", "postgres", "mysql": - return sql.New(ctx, u) + ls, err = sql.New(ctx, u) case "s3": - return s3.New(u) + ls, err = s3.New(u) default: - return nil, fmt.Errorf("db type not supported %v", u.Scheme) + err = fmt.Errorf("db type not supported %v", u.Scheme) } + + return ls, err +} + +func Wrap(ls models.LogStore) models.LogStore { + return validator.NewValidator(metrics.NewLogstore(ls)) } diff --git a/api/logs/metrics/metrics.go b/api/logs/metrics/metrics.go new file mode 100644 index 000000000..fc6b96df8 --- /dev/null +++ b/api/logs/metrics/metrics.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "context" + "io" + + "github.com/fnproject/fn/api/models" + "go.opencensus.io/trace" +) + +func NewLogstore(ls models.LogStore) models.LogStore { + return &metricls{ls} +} + +type metricls struct { + ls models.LogStore +} + +func (m *metricls) InsertCall(ctx context.Context, call *models.Call) error { + ctx, span := trace.StartSpan(ctx, "ls_insert_call") + defer span.End() + return m.ls.InsertCall(ctx, call) +} + +func (m *metricls) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_call") + defer span.End() + return m.ls.GetCall(ctx, appName, callID) +} + +func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_calls") + defer span.End() + return m.ls.GetCalls(ctx, filter) +} + +func (m *metricls) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error { + ctx, span := trace.StartSpan(ctx, "ls_insert_log") + defer span.End() + return m.ls.InsertLog(ctx, appName, callID, callLog) +} + +func (m *metricls) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) { + ctx, span := trace.StartSpan(ctx, "ls_get_log") + defer span.End() + return m.ls.GetLog(ctx, appName, callID) +} + +func (m *metricls) Close() error { + return m.ls.Close() +} diff --git a/api/logs/validator/validator.go b/api/logs/validator/validator.go new file mode 100644 index 000000000..523f7f52d --- /dev/null +++ b/api/logs/validator/validator.go @@ -0,0 +1,60 @@ +package validator + +import ( + "context" + "io" + + "github.com/fnproject/fn/api/models" +) + +func NewValidator(ls models.LogStore) models.LogStore { + return &validator{ls} +} + +type validator struct { + models.LogStore +} + +// callID or appID will never be empty. +func (v *validator) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error { + if callID == "" { + return models.ErrDatastoreEmptyCallID + } + if appID == "" { + return models.ErrDatastoreEmptyAppID + } + return v.LogStore.InsertLog(ctx, appID, callID, callLog) +} + +// callID or appID will never be empty. +func (v *validator) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) { + if callID == "" { + return nil, models.ErrDatastoreEmptyCallID + } + if appID == "" { + return nil, models.ErrDatastoreEmptyAppID + } + return v.LogStore.GetLog(ctx, appID, callID) +} + +// callID or appID will never be empty. +func (v *validator) InsertCall(ctx context.Context, call *models.Call) error { + if call.ID == "" { + return models.ErrDatastoreEmptyCallID + } + if call.AppID == "" { + return models.ErrDatastoreEmptyAppID + } + return v.LogStore.InsertCall(ctx, call) +} + +// callID or appID will never be empty. +func (v *validator) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) { + if callID == "" { + return nil, models.ErrDatastoreEmptyCallID + } + if appID == "" { + return nil, models.ErrDatastoreEmptyAppID + } + return v.LogStore.GetCall(ctx, appID, callID) +} diff --git a/api/models/datastore.go b/api/models/datastore.go index 79ea24736..3a77bde86 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -2,6 +2,7 @@ package models import ( "context" + "io" "github.com/jmoiron/sqlx" ) @@ -58,9 +59,9 @@ type Datastore interface { // ErrDatastoreEmptyRoutePath when routePath is empty. Returns ErrRoutesNotFound when no route exists. RemoveRoute(ctx context.Context, appID, routePath string) error - // Implement LogStore methods for convenience - LogStore - // GetDatabase returns the underlying sqlx database implementation GetDatabase() *sqlx.DB + + // implements io.Closer to shutdown + io.Closer } diff --git a/api/server/hybrid.go b/api/server/hybrid.go index e25465d0b..e8bbabee4 100644 --- a/api/server/hybrid.go +++ b/api/server/hybrid.go @@ -165,7 +165,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { // TODO this needs UpdateCall functionality to work for async and should only work if: // running->error|timeout|success // TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted* - if err := s.datastore.InsertCall(ctx, &call); err != nil { + if err := s.logstore.InsertCall(ctx, &call); err != nil { common.Logger(ctx).WithError(err).Error("error inserting call into datastore") // note: Not returning err here since the job could have already finished successfully. } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 8aab40afa..f2feea9f2 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -42,6 +42,7 @@ func envTweaker(name, value string) func() { func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelFunc) { ds := datastore.NewMock() + ls := logs.NewMock() var mq models.MessageQueue = &mqs.Mock{} for _, a := range args { switch arg := a.(type) { @@ -49,9 +50,11 @@ func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelF ds = arg case models.MessageQueue: mq = arg + case models.LogStore: + ls = arg } } - r := agent.New(agent.NewDirectDataAccess(ds, ds, mq)) + r := agent.New(agent.NewDirectDataAccess(ds, ls, mq)) return r, func() { r.Close() } } @@ -169,11 +172,12 @@ func TestRouteRunnerExecEmptyBody(t *testing.T) { {Path: "/hotjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg}, }, ) + ls := logs.NewMock() - rnr, cancelrnr := testRunner(t, ds) + rnr, cancelrnr := testRunner(t, ds, ls) defer cancelrnr() - srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull) + srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull) expHeaders := map[string][]string{"X-Function": {"Test"}} emptyBody := `{"echoContent": "_TRX_ID_", "isDebug": true, "isEmptyBody": true}` @@ -258,11 +262,12 @@ func TestRouteRunnerExecution(t *testing.T) { {Path: "/mybigoutputjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg}, }, ) + ls := logs.NewMock() - rnr, cancelrnr := testRunner(t, ds) + rnr, cancelrnr := testRunner(t, ds, ls) defer cancelrnr() - srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull) + srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull) expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}} expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}} @@ -373,7 +378,7 @@ func TestRouteRunnerExecution(t *testing.T) { for i, test := range testCases { if test.expectedLogsSubStr != nil { - if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) { + if !checkLogs(t, i, ls, callIds[i], test.expectedLogsSubStr) { isFailure = true } } @@ -403,7 +408,7 @@ func getDockerId(respBytes []byte) (string, error) { return id, nil } -func checkLogs(t *testing.T, tnum int, ds models.Datastore, callID string, expected []string) bool { +func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool { logReader, err := ds.GetLog(context.Background(), "myapp", callID) if err != nil { @@ -460,7 +465,7 @@ func TestFailedEnqueue(t *testing.T) { err := errors.New("Unable to push task to queue") mq := &errorMQ{err, http.StatusInternalServerError} fnl := logs.NewMock() - rnr, cancelrnr := testRunner(t, ds, mq) + rnr, cancelrnr := testRunner(t, ds, mq, fnl) defer cancelrnr() srv := testServer(ds, mq, fnl, rnr, ServerTypeFull) @@ -511,10 +516,10 @@ func TestRouteRunnerTimeout(t *testing.T) { }, ) - rnr, cancelrnr := testRunner(t, ds) + fnl := logs.NewMock() + rnr, cancelrnr := testRunner(t, ds, fnl) defer cancelrnr() - fnl := logs.NewMock() srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { @@ -581,10 +586,10 @@ func TestRouteRunnerMinimalConcurrentHotSync(t *testing.T) { }, ) - rnr, cancelrnr := testRunner(t, ds) + fnl := logs.NewMock() + rnr, cancelrnr := testRunner(t, ds, fnl) defer cancelrnr() - fnl := logs.NewMock() srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) for i, test := range []struct { diff --git a/api/server/server.go b/api/server/server.go index 1d194e059..4ec983dfa 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -173,8 +173,10 @@ func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server { if nodeType != ServerTypeAPI { opts = append(opts, WithAgentFromEnv()) } else { + // NOTE: ensures logstore is set or there will be troubles opts = append(opts, WithLogstoreFromDatastore()) } + return New(ctx, opts...) } @@ -330,7 +332,7 @@ func WithNodeCertAuthority(ca string) ServerOption { func WithDatastore(ds models.Datastore) ServerOption { return func(ctx context.Context, s *Server) error { - s.datastore = datastore.Wrap(ds) + s.datastore = ds return nil } } @@ -370,7 +372,11 @@ func WithLogstoreFromDatastore() ServerOption { return errors.New("Need a datastore in order to use it as a logstore") } if s.logstore == nil { - s.logstore = s.datastore + if ls, ok := s.datastore.(models.LogStore); ok { + s.logstore = ls + } else { + return errors.New("datastore must implement logstore interface") + } } return nil } @@ -380,9 +386,12 @@ func WithLogstoreFromDatastore() ServerOption { func WithFullAgent() ServerOption { return func(ctx context.Context, s *Server) error { s.nodeType = ServerTypeFull + + // ensure logstore is set (TODO compat only?) if s.logstore == nil { - s.logstore = s.datastore + WithLogstoreFromDatastore()(ctx, s) } + if s.datastore == nil || s.logstore == nil || s.mq == nil { return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") } @@ -464,14 +473,7 @@ func WithAgentFromEnv() ServerOption { return errors.New("LBAgent creation failed") } default: - s.nodeType = ServerTypeFull - if s.logstore == nil { // TODO seems weird? - s.logstore = s.datastore - } - if s.datastore == nil || s.logstore == nil || s.mq == nil { - return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") - } - s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq))) + WithFullAgent()(ctx, s) } return nil } @@ -543,7 +545,9 @@ func New(ctx context.Context, opts ...ServerOption) *Server { s.appListeners = new(appListeners) s.routeListeners = new(routeListeners) + s.datastore = datastore.Wrap(s.datastore) s.datastore = fnext.NewDatastore(s.datastore, s.appListeners, s.routeListeners) + s.logstore = logs.Wrap(s.logstore) return s } diff --git a/api/server/server_test.go b/api/server/server_test.go index 10bfd536f..4afc62330 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "os" "strconv" "strings" @@ -14,6 +15,7 @@ import ( "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/datastore" + "github.com/fnproject/fn/api/datastore/sql" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -94,11 +96,17 @@ func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) *models.Erro func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.LogStore, func()) { os.Remove(tmpDatastoreTests) - ds, err := datastore.New(ctx, "sqlite3://"+tmpDatastoreTests) + uri, err := url.Parse("sqlite3://" + tmpDatastoreTests) + if err != nil { + t.Fatal(err) + } + ss, err := sql.New(ctx, uri) if err != nil { t.Fatalf("Error when creating datastore: %s", err) } - logDB := ds + logDB := logs.Wrap(ss) + ds := datastore.Wrap(ss) + return ds, logDB, func() { os.Remove(tmpDatastoreTests) }