diff --git a/api/datastore/sql/migratex/migrate.go b/api/datastore/sql/migratex/migrate.go index 659ccdc55..97095c443 100644 --- a/api/datastore/sql/migratex/migrate.go +++ b/api/datastore/sql/migratex/migrate.go @@ -12,6 +12,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/lib/pq" + "github.com/sirupsen/logrus" ) var ( @@ -76,25 +77,19 @@ func (m MigFields) Version() int64 { return m.Versi // TODO instance must have `multiStatements` set to true ? -func Up(ctx context.Context, db *sqlx.DB, migs []Migration) error { - return migrate(ctx, db, migs, true) +func Up(ctx context.Context, tx *sqlx.Tx, migs []Migration) error { + return migrate(ctx, tx, migs, true) } -func Down(ctx context.Context, db *sqlx.DB, migs []Migration) error { - return migrate(ctx, db, migs, false) +func Down(ctx context.Context, tx *sqlx.Tx, migs []Migration) error { + return migrate(ctx, tx, migs, false) } -func migrate(ctx context.Context, db *sqlx.DB, migs []Migration, up bool) error { - var curVersion int64 // could be NilVersion, is ok - err := tx(ctx, db, func(tx *sqlx.Tx) error { - var dirty bool - var err error - curVersion, dirty, err = Version(ctx, tx) - if dirty { - return dirtyErr(curVersion) - } - return err - }) +func migrate(ctx context.Context, tx *sqlx.Tx, migs []Migration, up bool) error { + curVersion, dirty, err := Version(ctx, tx) + if dirty { + return dirtyErr(curVersion) + } if err != nil { return err } @@ -119,7 +114,7 @@ func migrate(ctx context.Context, db *sqlx.DB, migs []Migration, up bool) error // XXX(reed): we could more gracefully handle concurrent databases trying to // run migrations here by handling error and feeding back the version. // get something working mode for now... - err := run(ctx, db, m, up) + err := run(ctx, tx, m, up) if err != nil { return err } @@ -129,19 +124,6 @@ func migrate(ctx context.Context, db *sqlx.DB, migs []Migration, up bool) error return nil } -func tx(ctx context.Context, db *sqlx.DB, f func(*sqlx.Tx) error) error { - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - err = f(tx) - if err != nil { - tx.Rollback() - return err - } - return tx.Commit() -} - func withLock(ctx context.Context, tx *sqlx.Tx, f func(*sqlx.Tx) error) error { err := lock(ctx, tx) if err != nil { @@ -188,45 +170,43 @@ func (m MultiError) Error() string { return strings.Join(strs, "\n") } -func run(ctx context.Context, db *sqlx.DB, m Migration, up bool) error { - return tx(ctx, db, func(tx *sqlx.Tx) error { - return withLock(ctx, tx, func(tx *sqlx.Tx) error { - // within the transaction, we need to check the version and ensure this - // migration has not already been applied. - curVersion, dirty, err := Version(ctx, tx) - if dirty { - return dirtyErr(curVersion) - } +func run(ctx context.Context, tx *sqlx.Tx, m Migration, up bool) error { + return withLock(ctx, tx, func(tx *sqlx.Tx) error { + // within the transaction, we need to check the version and ensure this + // migration has not already been applied. + curVersion, dirty, err := Version(ctx, tx) + if dirty { + return dirtyErr(curVersion) + } - // enforce monotonicity - if up && curVersion != NilVersion && m.Version() != curVersion+1 { - return fmt.Errorf("non-contiguous migration attempted up: %v != %v", m.Version(), curVersion+1) - } else if !up && m.Version() != curVersion { // down is always unraveling - return fmt.Errorf("non-contiguous migration attempted down: %v != %v", m.Version(), curVersion) - } + // enforce monotonicity + if up && curVersion != NilVersion && m.Version() != curVersion+1 { + return fmt.Errorf("non-contiguous migration attempted up: %v != %v", m.Version(), curVersion+1) + } else if !up && m.Version() != curVersion { // down is always unraveling + return fmt.Errorf("non-contiguous migration attempted down: %v != %v", m.Version(), curVersion) + } - // TODO is this robust enough? we could check - version := m.Version() - if !up { - version = m.Version() - 1 - } + // TODO is this robust enough? we could check + version := m.Version() + if !up { + version = m.Version() - 1 + } - // TODO we don't need the dirty bit anymore since we're using transactions? - err = SetVersion(ctx, tx, version, true) + // TODO we don't need the dirty bit anymore since we're using transactions? + err = SetVersion(ctx, tx, version, true) - if up { - err = m.Up(ctx, tx) - } else { - err = m.Down(ctx, tx) - } + if up { + err = m.Up(ctx, tx) + } else { + err = m.Down(ctx, tx) + } - if err != nil { - return migrateErr(version, up, err) - } + if err != nil { + return migrateErr(version, up, err) + } - err = SetVersion(ctx, tx, version, false) - return err - }) + err = SetVersion(ctx, tx, version, false) + return err }) } @@ -297,7 +277,8 @@ func unlock(ctx context.Context, tx *sqlx.Tx) error { func SetVersion(ctx context.Context, tx *sqlx.Tx, version int64, dirty bool) error { err := ensureVersionTable(ctx, tx) if err != nil { - return nil + logrus.WithError(err).Error("error ensuring version table") + return err } // TODO need to handle down migration better @@ -305,12 +286,14 @@ func SetVersion(ctx context.Context, tx *sqlx.Tx, version int64, dirty bool) err // this just nukes the whole table which is kinda lame. query := tx.Rebind("DELETE FROM " + MigrationsTable) if _, err := tx.Exec(query); err != nil { + logrus.WithError(err).Error("error deleting version table") return err } if version >= 0 { query = tx.Rebind(`INSERT INTO ` + MigrationsTable + ` (version, dirty) VALUES (?, ?)`) if _, err := tx.ExecContext(ctx, query, version, dirty); err != nil { + logrus.WithError(err).Error("error updating version table") return err } } diff --git a/api/datastore/sql/migratex/migrate_test.go b/api/datastore/sql/migratex/migrate_test.go index 39312ca07..33b9052fb 100644 --- a/api/datastore/sql/migratex/migrate_test.go +++ b/api/datastore/sql/migratex/migrate_test.go @@ -38,7 +38,14 @@ func TestMigrateUp(t *testing.T) { ctx := context.Background() - err = tx(ctx, db, func(tx *sqlx.Tx) error { + do := func() error { + tx, err := db.Beginx() + if err != nil { + return err + } + + defer tx.Commit() + version, dirty, err := Version(ctx, tx) if version != NilVersion || err != nil || dirty { return fmt.Errorf("version err: %v %v", err, dirty) @@ -48,7 +55,7 @@ func TestMigrateUp(t *testing.T) { return errors.New("found existing version in db, nuke it") } - err = Up(ctx, db, []Migration{x}) + err = Up(ctx, tx, []Migration{x}) if err != nil { return err } @@ -61,7 +68,15 @@ func TestMigrateUp(t *testing.T) { if version != x.Version() { return errors.New("version did not update, migration should have ran.") } + return nil + } + err = do() + if err != nil { + t.Fatalf("couldn't run migrations: %v", err) + } + + do = func() error { // make sure the table is there. // TODO find a db agnostic way of doing this. // query := db.Rebind(`SELECT foo FROM sqlite_master WHERE type = 'table'`) @@ -76,9 +91,10 @@ func TestMigrateUp(t *testing.T) { return fmt.Errorf("migration version worked but migration didn't work: %v", result) } return nil - }) + } + err = do() if err != nil { - t.Fatalf("bad things happened: %v", err) + t.Fatalf("migration check failed: %v", err) } } diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 8fb16b38a..82eff90e6 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -151,23 +151,43 @@ func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) { db.SetMaxIdleConns(maxIdleConns) log.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed") - sdb := &sqlStore{db: db} - - err = sdb.runMigrations(ctx, checkExistence(db), migrations.Migrations) - if err != nil { - log.WithError(err).Error("error running migrations") - return nil, err - } - - switch driver { + switch driver { // NOTE: fixes weird sqlite3 behavior case "sqlite3": db.SetMaxOpenConns(1) } - for _, v := range tables { - _, err = db.ExecContext(ctx, v) + + sdb := &sqlStore{db: db} + + // NOTE: runMigrations happens before we create all the tables, so that it + // can detect whether the db did not exist and insert the latest version of + // the migrations BEFORE the tables are created (it uses table info to + // determine that). + // + // we either create all the tables with the latest version of the schema, + // insert the latest version to the migration table and bail without running + // any migrations. + // OR + // run all migrations necessary to get up to the latest, inserting that version, + // [and the tables exist so CREATE IF NOT EXIST guards us when we run the create queries]. + err = sdb.Tx(func(tx *sqlx.Tx) error { + err = sdb.runMigrations(ctx, tx, migrations.Migrations) if err != nil { - return nil, err + log.WithError(err).Error("error running migrations") + return err } + + for _, v := range tables { + _, err = tx.ExecContext(ctx, v) + if err != nil { + log.WithError(err).Error("error creating tables") + return err + } + } + return nil + }) + + if err != nil { + return nil, err } return sdb, nil @@ -208,35 +228,47 @@ func pingWithRetry(ctx context.Context, db *sqlx.DB) (err error) { // about the existence of the schema migration version (since migrations were // added to existing dbs, we need to know whether the db exists without migrations // or if it's brand new). -func checkExistence(db *sqlx.DB) bool { - query := db.Rebind(`SELECT name FROM apps LIMIT 1`) - row := db.QueryRow(query) +func checkExistence(tx *sqlx.Tx) (bool, error) { + query := tx.Rebind(`SELECT count(*) + FROM information_schema.TABLES + WHERE TABLE_NAME = 'apps' +`) - var dummy string - err := row.Scan(&dummy) - if err != nil && err != sql.ErrNoRows { - // TODO we should probably ensure this is a certain 'no such table' error - // and if it's not that or err no rows, we should probably block start up. - // if we return false here spuriously, then migrations could be skipped, - // which would be bad. - return false + if tx.DriverName() == "sqlite3" { + // sqlite3 is special, of course + query = tx.Rebind(`SELECT count(*) + FROM sqlite_master + WHERE name = 'apps' + `) } - return true + + row := tx.QueryRow(query) + + var count int + err := row.Scan(&count) + if err != nil { + return false, err + } + + exists := count > 0 + return exists, nil } // check if the db already existed, if the db is brand new then we can skip // over all the migrations BUT we must be sure to set the right migration // number so that only current migrations are skipped, not any future ones. -func (ds *sqlStore) runMigrations(ctx context.Context, dbExists bool, migrations []migratex.Migration) error { +func (ds *sqlStore) runMigrations(ctx context.Context, tx *sqlx.Tx, migrations []migratex.Migration) error { + dbExists, err := checkExistence(tx) + if err != nil { + return err + } if !dbExists { // set to highest and bail - return ds.Tx(func(tx *sqlx.Tx) error { - return migratex.SetVersion(ctx, tx, latestVersion(migrations), false) - }) + return migratex.SetVersion(ctx, tx, latestVersion(migrations), false) } // run any migrations needed to get to latest, if any - return migratex.Up(ctx, ds.db, migrations) + return migratex.Up(ctx, tx, migrations) } // latest version will find the latest version from a list of migration diff --git a/api/datastore/sql/sql_test.go b/api/datastore/sql/sql_test.go index 5f15b3dba..d78356d5c 100644 --- a/api/datastore/sql/sql_test.go +++ b/api/datastore/sql/sql_test.go @@ -12,6 +12,7 @@ import ( "github.com/fnproject/fn/api/datastore/sql/migrations" logstoretest "github.com/fnproject/fn/api/logs/testing" "github.com/fnproject/fn/api/models" + "github.com/jmoiron/sqlx" ) // since New with fresh dbs skips all migrations: @@ -25,7 +26,9 @@ func newWithMigrations(ctx context.Context, url *url.URL) (*sqlStore, error) { return nil, err } - err = migratex.Down(ctx, ds.db, migrations.Migrations) + err = ds.Tx(func(tx *sqlx.Tx) error { + return migratex.Down(ctx, tx, migrations.Migrations) + }) if err != nil { return nil, err }