From faaf5846cec0f246cea82b233d0dbc80ef425b56 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Tue, 2 Jan 2018 20:32:10 +0200 Subject: [PATCH] Use retry func while trying to ping SQL datastore (#630) * Use retry func while trying to ping SQL datastore - implements retry func specifically for SQL datastore ping - fmt fixes - using sqlx.Db.PingContext instead of sqlx.Db.Ping - propogate context to SQL datastore * Rely on context from ServerOpt * Consolidate log instances * Cleanup * Fix server usage in API tests --- api/datastore/datastore.go | 15 +++--- api/datastore/sql/sql.go | 31 +++++++---- api/datastore/sql/sql_test.go | 14 ++--- api/logs/log.go | 11 ++-- api/logs/log_test.go | 4 +- api/server/server.go | 96 ++++++++++++++++++++++------------- api/server/server_options.go | 10 ++-- api/server/server_test.go | 2 +- 8 files changed, 117 insertions(+), 66 deletions(-) diff --git a/api/datastore/datastore.go b/api/datastore/datastore.go index 3d118d2c9..e93236ca4 100644 --- a/api/datastore/datastore.go +++ b/api/datastore/datastore.go @@ -1,17 +1,19 @@ package datastore import ( + "context" "fmt" "net/url" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore/internal/datastoreutil" "github.com/fnproject/fn/api/datastore/sql" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" ) -func New(dbURL string) (models.Datastore, error) { - ds, err := newds(dbURL) // teehee +func New(ctx context.Context, dbURL string) (models.Datastore, error) { + ds, err := newds(ctx, dbURL) // teehee if err != nil { return nil, err } @@ -19,15 +21,16 @@ func New(dbURL string) (models.Datastore, error) { return datastoreutil.MetricDS(datastoreutil.NewValidator(ds)), nil } -func newds(dbURL string) (models.Datastore, error) { +func newds(ctx context.Context, dbURL string) (models.Datastore, error) { + log := common.Logger(ctx) u, err := url.Parse(dbURL) if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") + log.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") } - logrus.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating new datastore") + log.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating new datastore") switch u.Scheme { case "sqlite3", "postgres", "mysql": - return sql.New(u) + return sql.New(ctx, u) default: return nil, fmt.Errorf("db type not supported %v", u.Scheme) } diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index d1d55325e..474dc96be 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore/sql/migrations" "github.com/fnproject/fn/api/models" "github.com/go-sql-driver/mysql" @@ -96,14 +97,15 @@ type sqlStore struct { // New will open the db specified by url, create any tables necessary // and return a models.Datastore safe for concurrent usage. -func New(url *url.URL) (models.Datastore, error) { - return newDS(url) +func New(ctx context.Context, url *url.URL) (models.Datastore, error) { + return newDS(ctx, url) } // for test methods, return concrete type, but don't expose -func newDS(url *url.URL) (*sqlStore, error) { +func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) { driver := url.Scheme + log := common.Logger(ctx) // driver must be one of these for sqlx to work, double check: switch driver { case "postgres", "pgx", "mysql", "sqlite3": @@ -128,25 +130,25 @@ func newDS(url *url.URL) (*sqlStore, error) { sqldb, err := sql.Open(driver, uri) if err != nil { - logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db") + log.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db") return nil, err } db := sqlx.NewDb(sqldb, driver) // force a connection and test that it worked - err = db.Ping() + err = pingWithRetry(ctx, 10, time.Second*1, db) if err != nil { - logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db") + log.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db") return nil, err } maxIdleConns := 256 // TODO we need to strip this out of the URL probably db.SetMaxIdleConns(maxIdleConns) - logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed") + log.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed") err = runMigrations(url.String(), checkExistence(db)) // original url string if err != nil { - logrus.WithError(err).Error("error running migrations") + log.WithError(err).Error("error running migrations") return nil, err } @@ -155,7 +157,7 @@ func newDS(url *url.URL) (*sqlStore, error) { db.SetMaxOpenConns(1) } for _, v := range tables { - _, err = db.Exec(v) + _, err = db.ExecContext(ctx, v) if err != nil { return nil, err } @@ -164,6 +166,17 @@ func newDS(url *url.URL) (*sqlStore, error) { return &sqlStore{db: db}, nil } +func pingWithRetry(ctx context.Context, attempts int, sleep time.Duration, db *sqlx.DB) (err error) { + for i := 0; i < attempts; i++ { + err = db.PingContext(ctx) + if err == nil { + return nil + } + time.Sleep(sleep) + } + return err +} + // checkExistence checks if tables have been created yet, it is not concerned // about the existence of the schema migration version (since migrations were // added to existing dbs, we need to know whether the db exists without migrations diff --git a/api/datastore/sql/sql_test.go b/api/datastore/sql/sql_test.go index a66d594b4..eef879ef8 100644 --- a/api/datastore/sql/sql_test.go +++ b/api/datastore/sql/sql_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "context" "github.com/fnproject/fn/api/datastore/internal/datastoretest" "github.com/fnproject/fn/api/datastore/internal/datastoreutil" "github.com/fnproject/fn/api/models" @@ -15,8 +16,8 @@ import ( // * run all down migrations // * run all up migrations // [ then run tests against that db ] -func newWithMigrations(url *url.URL) (*sqlStore, error) { - ds, err := newDS(url) +func newWithMigrations(ctx context.Context, url *url.URL) (*sqlStore, error) { + ds, err := newDS(ctx, url) if err != nil { return nil, err } @@ -32,7 +33,7 @@ func newWithMigrations(url *url.URL) (*sqlStore, error) { } // go through New, to ensure our Up logic works in there... - ds, err = newDS(url) + ds, err = newDS(ctx, url) if err != nil { return nil, err } @@ -41,6 +42,7 @@ func newWithMigrations(url *url.URL) (*sqlStore, error) { } func TestDatastore(t *testing.T) { + ctx := context.Background() defer os.RemoveAll("sqlite_test_dir") u, err := url.Parse("sqlite3://sqlite_test_dir") if err != nil { @@ -48,7 +50,7 @@ func TestDatastore(t *testing.T) { } f := func(t *testing.T) models.Datastore { os.RemoveAll("sqlite_test_dir") - ds, err := newDS(u) + ds, err := newDS(ctx, u) if err != nil { t.Fatal(err) } @@ -67,7 +69,7 @@ func TestDatastore(t *testing.T) { both := func(u *url.URL) { f := func(t *testing.T) models.Datastore { - ds, err := newDS(u) + ds, err := newDS(ctx, u) if err != nil { t.Fatal(err) } @@ -83,7 +85,7 @@ func TestDatastore(t *testing.T) { f = func(t *testing.T) models.Datastore { t.Log("with migrations now!") - ds, err := newWithMigrations(u) + ds, err := newWithMigrations(ctx, u) if err != nil { t.Fatal(err) } diff --git a/api/logs/log.go b/api/logs/log.go index a6fdf882b..4c9747aa6 100644 --- a/api/logs/log.go +++ b/api/logs/log.go @@ -4,21 +4,24 @@ import ( "fmt" "net/url" + "context" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore/sql" "github.com/fnproject/fn/api/logs/s3" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" ) -func New(dbURL string) (models.LogStore, error) { +func New(ctx context.Context, dbURL string) (models.LogStore, error) { + log := common.Logger(ctx) u, err := url.Parse(dbURL) if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") + log.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") } - logrus.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store") + log.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store") switch u.Scheme { case "sqlite3", "postgres", "mysql": - return sql.New(u) + return sql.New(ctx, u) case "s3": return s3.New(u) default: diff --git a/api/logs/log_test.go b/api/logs/log_test.go index eccaba466..5e7f715b2 100644 --- a/api/logs/log_test.go +++ b/api/logs/log_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "context" "github.com/fnproject/fn/api/datastore/sql" logTesting "github.com/fnproject/fn/api/logs/testing" ) @@ -13,12 +14,13 @@ const tmpLogDb = "/tmp/func_test_log.db" func TestDatastore(t *testing.T) { os.Remove(tmpLogDb) + ctx := context.Background() uLog, err := url.Parse("sqlite3://" + tmpLogDb) if err != nil { t.Fatalf("failed to parse url: %v", err) } - ds, err := sql.New(uLog) + ds, err := sql.New(ctx, uLog) if err != nil { t.Fatalf("failed to create sqlite3 datastore: %v", err) } diff --git a/api/server/server.go b/api/server/server.go index c638219d0..5ab5c3563 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -15,6 +15,7 @@ import ( "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/agent/hybrid" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" @@ -116,74 +117,96 @@ func pwd() string { } func WithDBURL(dbURL string) ServerOption { - if dbURL != "" { - ds, err := datastore.New(dbURL) - if err != nil { - logrus.WithError(err).Fatalln("Error initializing datastore.") + return func(ctx context.Context, s *Server) error { + if dbURL != "" { + ds, err := datastore.New(ctx, dbURL) + if err != nil { + return err + } + s.datastore = ds } - return WithDatastore(ds) + return nil } - return noop } func WithMQURL(mqURL string) ServerOption { - if mqURL != "" { - mq, err := mqs.New(mqURL) - if err != nil { - logrus.WithError(err).Fatal("Error initializing message queue.") + return func(ctx context.Context, s *Server) error { + if mqURL != "" { + mq, err := mqs.New(mqURL) + if err != nil { + return err + } + s.mq = mq } - return WithMQ(mq) + return nil } - return noop } func WithLogURL(logstoreURL string) ServerOption { - if ldb := logstoreURL; ldb != "" { - logDB, err := logs.New(logstoreURL) - if err != nil { - logrus.WithError(err).Fatal("Error initializing logs store.") + return func(ctx context.Context, s *Server) error { + if ldb := logstoreURL; ldb != "" { + logDB, err := logs.New(ctx, logstoreURL) + if err != nil { + return err + } + s.logstore = logDB } - return WithLogstore(logDB) + return nil } - return noop } func WithRunnerURL(runnerURL string) ServerOption { - if runnerURL != "" { - cl, err := hybrid.NewClient(runnerURL) - if err != nil { - logrus.WithError(err).Fatal("Error initializing runner API client.") + return func(ctx context.Context, s *Server) error { + if runnerURL != "" { + cl, err := hybrid.NewClient(runnerURL) + if err != nil { + return err + } + s.agent = agent.New(agent.NewCachedDataAccess(cl)) } - return WithAgent(agent.New(agent.NewCachedDataAccess(cl))) + return nil } - return noop } -func noop(s *Server) {} - func WithType(t ServerNodeType) ServerOption { - return func(s *Server) { s.nodeType = t } + return func(ctx context.Context, s *Server) error { + s.nodeType = t + return nil + } } func WithDatastore(ds models.Datastore) ServerOption { - return func(s *Server) { s.datastore = ds } + return func(ctx context.Context, s *Server) error { + s.datastore = ds + return nil + } } func WithMQ(mq models.MessageQueue) ServerOption { - return func(s *Server) { s.mq = mq } + return func(ctx context.Context, s *Server) error { + s.mq = mq + return nil + } } func WithLogstore(ls models.LogStore) ServerOption { - return func(s *Server) { s.logstore = ls } + return func(ctx context.Context, s *Server) error { + s.logstore = ls + return nil + } } func WithAgent(agent agent.Agent) ServerOption { - return func(s *Server) { s.agent = agent } + return func(ctx context.Context, s *Server) error { + s.agent = agent + return nil + } } // New creates a new Functions server with the opts given. For convenience, users may // prefer to use NewFromEnv but New is more flexible if needed. func New(ctx context.Context, opts ...ServerOption) *Server { + log := common.Logger(ctx) s := &Server{ Router: gin.New(), // Almost everything else is configured through opts (see NewFromEnv for ex.) or below @@ -193,7 +216,10 @@ func New(ctx context.Context, opts ...ServerOption) *Server { if opt == nil { continue } - opt(s) + err := opt(ctx, s) + if err != nil { + log.WithError(err).Fatal("Error during server opt initialization.") + } } if s.logstore == nil { // TODO seems weird? @@ -205,18 +231,18 @@ func New(ctx context.Context, opts ...ServerOption) *Server { switch s.nodeType { case ServerTypeAPI: if s.agent != nil { - logrus.Info("shutting down agent configured for api node") + log.Info("shutting down agent configured for api node") s.agent.Close() } s.agent = nil case ServerTypeRunner: if s.agent == nil { - logrus.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.") + log.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.") } default: s.nodeType = ServerTypeFull if s.datastore == nil || s.logstore == nil || s.mq == nil { - logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") + log.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.") } // TODO force caller to use WithAgent option? at this point we need to use the ds/ls/mq configured after all opts are run. diff --git a/api/server/server_options.go b/api/server/server_options.go index 159636a4b..784a36203 100644 --- a/api/server/server_options.go +++ b/api/server/server_options.go @@ -8,17 +8,19 @@ import ( "github.com/gin-gonic/gin" ) -type ServerOption func(*Server) +type ServerOption func(context.Context, *Server) error -func EnableShutdownEndpoint(halt context.CancelFunc) ServerOption { - return func(s *Server) { +func EnableShutdownEndpoint(ctx context.Context, halt context.CancelFunc) ServerOption { + return func(ctx context.Context, s *Server) error { s.Router.GET("/shutdown", s.handleShutdown(halt)) + return nil } } func LimitRequestBody(max int64) ServerOption { - return func(s *Server) { + return func(ctx context.Context, s *Server) error { s.Router.Use(limitRequestBody(max)) + return nil } } diff --git a/api/server/server_test.go b/api/server/server_test.go index 69ba6ee6c..95b590fcc 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -100,7 +100,7 @@ func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) models.Error func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.LogStore, func()) { os.Remove(tmpDatastoreTests) - ds, err := datastore.New("sqlite3://" + tmpDatastoreTests) + ds, err := datastore.New(ctx, "sqlite3://"+tmpDatastoreTests) if err != nil { t.Fatalf("Error when creating datastore: %s", err) }