diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 404f4069c..d9d067194 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -38,7 +38,7 @@ Then after every change, run make run ``` -to build and run the `functions` binary. It will start Functions using an embedded `Bolt` database running on port `8080`. +to build and run the `functions` binary. It will start Functions using an embedded `sqlite3` database running on port `8080`. ### Test diff --git a/Makefile b/Makefile index 649555797..aab978035 100644 --- a/Makefile +++ b/Makefile @@ -35,10 +35,10 @@ docker-build: docker build --build-arg HTTP_PROXY -t funcy/functions:latest . docker-run: docker-build - docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v ${CURDIR}/data:/app/data -p 8080:8080 funcy/functions + docker run --rm --privileged -it -e NO_PROXY -e HTTP_PROXY -e LOG_LEVEL=debug -e "DB_URL=sqlite3:///app/data/fn.db" -v ${CURDIR}/data:/app/data -p 8080:8080 funcy/functions -docker-test-run-with-bolt: - ./api_test.sh bolt +docker-test-run-with-sqlite3: + ./api_test.sh sqlite3 docker-test-run-with-mysql: ./api_test.sh mysql @@ -46,9 +46,6 @@ docker-test-run-with-mysql: docker-test-run-with-postgres: ./api_test.sh postgres -docker-test-run-with-redis: - ./api_test.sh redis - docker-test: docker run -ti --privileged --rm -e LOG_LEVEL=debug \ -v /var/run/docker.sock:/var/run/docker.sock \ diff --git a/api/datastore/bolt/bolt.go b/api/datastore/bolt/bolt.go deleted file mode 100644 index 1d587169b..000000000 --- a/api/datastore/bolt/bolt.go +++ /dev/null @@ -1,508 +0,0 @@ -package bolt - -import ( - "encoding/json" - "net/url" - "os" - "path/filepath" - "time" - - "context" - - "regexp" - "strings" - - "github.com/Sirupsen/logrus" - "github.com/boltdb/bolt" - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" -) - -type BoltDatastore struct { - routesBucket []byte - appsBucket []byte - logsBucket []byte - extrasBucket []byte - callsBucket []byte - db *bolt.DB - log logrus.FieldLogger -} - -func New(url *url.URL) (models.Datastore, error) { - dir := filepath.Dir(url.Path) - log := logrus.WithFields(logrus.Fields{"db": url.Scheme, "dir": dir}) - err := os.MkdirAll(dir, 0755) - if err != nil { - log.WithError(err).Errorln("Could not create data directory for db") - return nil, err - } - log.WithFields(logrus.Fields{"path": url.Path}).Debug("Creating bolt db") - db, err := bolt.Open(url.Path, 0655, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - log.WithError(err).Errorln("Error on bolt.Open") - return nil, err - } - // I don't think we need a prefix here do we? Made it blank. If we do, we should call the query param "prefix" instead of bucket. - bucketPrefix := "" - if url.Query()["bucket"] != nil { - bucketPrefix = url.Query()["bucket"][0] - } - routesBucketName := []byte(bucketPrefix + "routes") - appsBucketName := []byte(bucketPrefix + "apps") - logsBucketName := []byte(bucketPrefix + "logs") - extrasBucketName := []byte(bucketPrefix + "extras") // todo: think of a better name - callsBucketName := []byte(bucketPrefix + "calls") - err = db.Update(func(tx *bolt.Tx) error { - for _, name := range [][]byte{routesBucketName, appsBucketName, logsBucketName, - extrasBucketName, callsBucketName} { - _, err := tx.CreateBucketIfNotExists(name) - if err != nil { - log.WithError(err).WithFields(logrus.Fields{"name": name}).Error("create bucket") - return err - } - } - return nil - }) - if err != nil { - log.WithError(err).Errorln("Error creating bolt buckets") - return nil, err - } - - ds := &BoltDatastore{ - routesBucket: routesBucketName, - appsBucket: appsBucketName, - logsBucket: logsBucketName, - extrasBucket: extrasBucketName, - callsBucket: callsBucketName, - db: db, - log: log, - } - log.WithFields(logrus.Fields{"prefix": bucketPrefix, "file": url.Path}).Debug("BoltDB initialized") - - return datastoreutil.NewValidator(ds), nil -} - -func (ds *BoltDatastore) InsertTask(ctx context.Context, task *models.Task) error { - var fnCall *models.FnCall - taskID := []byte(task.ID) - - err := ds.db.Update( - func(tx *bolt.Tx) error { - bIm := tx.Bucket(ds.callsBucket) - buf, err := json.Marshal(fnCall.FromTask(task)) - if err != nil { - return err - } - err = bIm.Put(taskID, buf) - if err != nil { - return err - } - return nil - }) - return err -} - -func (ds *BoltDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { - res := models.FnCalls{} - err := ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.callsBucket) - err2 := b.ForEach(func(key, v []byte) error { - call := &models.FnCall{} - err := json.Unmarshal(v, call) - if err != nil { - return err - } - if applyCallFilter(call, filter) { - res = append(res, call) - } - return nil - }) - if err2 != nil { - logrus.WithError(err2).Errorln("Couldn't get calls!") - } - return err2 - }) - return res, err -} - -func (ds *BoltDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { - var res *models.FnCall - err := ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.callsBucket) - v := b.Get([]byte(callID)) - if v != nil { - fnCall := &models.FnCall{} - err := json.Unmarshal(v, fnCall) - if err != nil { - return nil - } - res = fnCall - } else { - return models.ErrCallNotFound - } - return nil - }) - return res, err -} - -func (ds *BoltDatastore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { - appname := []byte(app.Name) - - err := ds.db.Update(func(tx *bolt.Tx) error { - bIm := tx.Bucket(ds.appsBucket) - - v := bIm.Get(appname) - if v != nil { - return models.ErrAppsAlreadyExists - } - - buf, err := json.Marshal(app) - if err != nil { - return err - } - - err = bIm.Put(appname, buf) - if err != nil { - return err - } - bjParent := tx.Bucket(ds.routesBucket) - _, err = bjParent.CreateBucketIfNotExists(appname) - if err != nil { - return err - } - return nil - }) - - return app, err -} - -func (ds *BoltDatastore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { - var app *models.App - appname := []byte(newapp.Name) - - err := ds.db.Update(func(tx *bolt.Tx) error { - bIm := tx.Bucket(ds.appsBucket) - - v := bIm.Get(appname) - if v == nil { - return models.ErrAppsNotFound - } - - err := json.Unmarshal(v, &app) - if err != nil { - return err - } - - app.UpdateConfig(newapp.Config) - - buf, err := json.Marshal(app) - if err != nil { - return err - } - - err = bIm.Put(appname, buf) - if err != nil { - return err - } - bjParent := tx.Bucket(ds.routesBucket) - _, err = bjParent.CreateBucketIfNotExists([]byte(app.Name)) - if err != nil { - return err - } - return nil - }) - - return app, err -} - -func (ds *BoltDatastore) RemoveApp(ctx context.Context, appName string) error { - err := ds.db.Update(func(tx *bolt.Tx) error { - bIm := tx.Bucket(ds.appsBucket) - err := bIm.Delete([]byte(appName)) - if err != nil { - return err - } - bjParent := tx.Bucket(ds.routesBucket) - err = bjParent.DeleteBucket([]byte(appName)) - if err != nil { - return err - } - return nil - }) - return err -} - -func (ds *BoltDatastore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - res := []*models.App{} - err := ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.appsBucket) - err2 := b.ForEach(func(key, v []byte) error { - app := &models.App{} - err := json.Unmarshal(v, app) - if err != nil { - return err - } - if applyAppFilter(app, filter) { - res = append(res, app) - } - return nil - }) - if err2 != nil { - logrus.WithError(err2).Errorln("Couldn't get apps!") - } - return nil - }) - if err != nil { - return nil, err - } - return res, nil -} - -func (ds *BoltDatastore) GetApp(ctx context.Context, name string) (*models.App, error) { - var res *models.App - err := ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.appsBucket) - v := b.Get([]byte(name)) - if v != nil { - app := &models.App{} - err := json.Unmarshal(v, app) - if err != nil { - return err - } - res = app - } else { - return models.ErrAppsNotFound - } - return nil - }) - if err != nil { - return nil, err - } - return res, nil -} - -func (ds *BoltDatastore) getRouteBucketForApp(tx *bolt.Tx, appName string) (*bolt.Bucket, error) { - // todo: should this be reversed? Make a bucket for each app that contains sub buckets for routes, etc - bp := tx.Bucket(ds.routesBucket) - b := bp.Bucket([]byte(appName)) - if b == nil { - return nil, models.ErrAppsNotFound - } - return b, nil -} - -func (ds *BoltDatastore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { - routePath := []byte(route.Path) - - err := ds.db.Update(func(tx *bolt.Tx) error { - b, err := ds.getRouteBucketForApp(tx, route.AppName) - if err != nil { - return err - } - - v := b.Get(routePath) - if v != nil { - return models.ErrRoutesAlreadyExists - } - - buf, err := json.Marshal(route) - if err != nil { - return err - } - - err = b.Put(routePath, buf) - if err != nil { - return err - } - return nil - }) - if err != nil { - return nil, err - } - return route, nil -} - -func (ds *BoltDatastore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { - routePath := []byte(newroute.Path) - - var route *models.Route - - err := ds.db.Update(func(tx *bolt.Tx) error { - b, err := ds.getRouteBucketForApp(tx, newroute.AppName) - if err != nil { - return err - } - - v := b.Get(routePath) - if v == nil { - return models.ErrRoutesNotFound - } - - err = json.Unmarshal(v, &route) - if err != nil { - return err - } - - route.Update(newroute) - - buf, err := json.Marshal(route) - if err != nil { - return err - } - - return b.Put(routePath, buf) - }) - if err != nil { - return nil, err - } - return route, nil -} - -func (ds *BoltDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { - err := ds.db.Update(func(tx *bolt.Tx) error { - b, err := ds.getRouteBucketForApp(tx, appName) - if err != nil { - return err - } - - err = b.Delete([]byte(routePath)) - if err != nil { - return err - } - return nil - }) - if err != nil { - return err - } - return nil -} - -func (ds *BoltDatastore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - var route *models.Route - err := ds.db.View(func(tx *bolt.Tx) error { - b, err := ds.getRouteBucketForApp(tx, appName) - if err != nil { - return err - } - - v := b.Get([]byte(routePath)) - if v == nil { - return models.ErrRoutesNotFound - } - - if v != nil { - err = json.Unmarshal(v, &route) - } - return err - }) - return route, err -} - -func (ds *BoltDatastore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} - err := ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.routesBucket).Bucket([]byte(appName)) - if b == nil { - return nil - } - - i := 0 - c := b.Cursor() - - var k, v []byte - k, v = c.Last() - - // Iterate backwards, newest first - for ; k != nil; k, v = c.Prev() { - var route models.Route - err := json.Unmarshal(v, &route) - if err != nil { - return err - } - if applyRouteFilter(&route, filter) { - i++ - res = append(res, &route) - } - } - return nil - }) - if err != nil { - return nil, err - } - return res, nil -} - -func (ds *BoltDatastore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} - err := ds.db.View(func(tx *bolt.Tx) error { - i := 0 - rbucket := tx.Bucket(ds.routesBucket) - - b := rbucket.Cursor() - var k, v []byte - k, v = b.First() - - // Iterates all buckets - for ; k != nil && v == nil; k, v = b.Next() { - bucket := rbucket.Bucket(k) - r := bucket.Cursor() - var k2, v2 []byte - k2, v2 = r.Last() - // Iterate all routes - for ; k2 != nil; k2, v2 = r.Prev() { - var route models.Route - err := json.Unmarshal(v2, &route) - if err != nil { - return err - } - if applyRouteFilter(&route, filter) { - i++ - res = append(res, &route) - } - } - } - return nil - }) - if err != nil { - return nil, err - } - return res, nil -} - -func (ds *BoltDatastore) Put(ctx context.Context, key, value []byte) error { - ds.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.extrasBucket) // todo: maybe namespace by app? - err := b.Put(key, value) - return err - }) - return nil -} - -func (ds *BoltDatastore) Get(ctx context.Context, key []byte) ([]byte, error) { - var ret []byte - ds.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(ds.extrasBucket) - ret = b.Get(key) - return nil - }) - return ret, nil -} - -func applyAppFilter(app *models.App, filter *models.AppFilter) bool { - if filter != nil && filter.Name != "" { - nameLike, err := regexp.MatchString(strings.Replace(filter.Name, "%", ".*", -1), app.Name) - return err == nil && nameLike - } - - return true -} - -func applyRouteFilter(route *models.Route, filter *models.RouteFilter) bool { - return filter == nil || (filter.Path == "" || route.Path == filter.Path) && - (filter.AppName == "" || route.AppName == filter.AppName) && - (filter.Image == "" || route.Image == filter.Image) -} - -func applyCallFilter(call *models.FnCall, filter *models.CallFilter) bool { - return filter == nil || (filter.AppName == call.AppName) && (filter.Path == call.Path) -} diff --git a/api/datastore/bolt/bolt_test.go b/api/datastore/bolt/bolt_test.go deleted file mode 100644 index 68eefbb57..000000000 --- a/api/datastore/bolt/bolt_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package bolt - -import ( - "net/url" - "os" - "testing" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoretest" -) - -const tmpBolt = "/tmp/func_test_bolt.db" - -func TestDatastore(t *testing.T) { - os.Remove(tmpBolt) - u, err := url.Parse("bolt://" + tmpBolt) - if err != nil { - t.Fatalf("failed to parse url:", err) - } - ds, err := New(u) - if err != nil { - t.Fatalf("failed to create bolt datastore:", err) - } - datastoretest.Test(t, ds) -} diff --git a/api/datastore/datastore.go b/api/datastore/datastore.go index 14ba576c6..67f6a25c4 100644 --- a/api/datastore/datastore.go +++ b/api/datastore/datastore.go @@ -5,10 +5,7 @@ import ( "net/url" "github.com/Sirupsen/logrus" - "gitlab-odx.oracle.com/odx/functions/api/datastore/bolt" - "gitlab-odx.oracle.com/odx/functions/api/datastore/mysql" - "gitlab-odx.oracle.com/odx/functions/api/datastore/postgres" - "gitlab-odx.oracle.com/odx/functions/api/datastore/redis" + "gitlab-odx.oracle.com/odx/functions/api/datastore/sql" "gitlab-odx.oracle.com/odx/functions/api/models" ) @@ -19,14 +16,8 @@ func New(dbURL string) (models.Datastore, error) { } logrus.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating new datastore") switch u.Scheme { - case "bolt": - return bolt.New(u) - case "postgres": - return postgres.New(u) - case "mysql": - return mysql.New(u) - case "redis": - return redis.New(u) + case "sqlite3", "postgres", "mysql": + return sql.New(u) default: return nil, fmt.Errorf("db type not supported %v", u.Scheme) } diff --git a/api/datastore/internal/datastoretest/test.go b/api/datastore/internal/datastoretest/test.go index faf3ca372..172b1e073 100644 --- a/api/datastore/internal/datastoretest/test.go +++ b/api/datastore/internal/datastoretest/test.go @@ -468,47 +468,6 @@ func Test(t *testing.T, ds models.Datastore) { t.Fatalf("Test UpdateRoute inexistent: expected error to be `%v`, but it was `%v`", models.ErrRoutesNotFound, err) } }) - - t.Run("put-get", func(t *testing.T) { - // Testing Put/Get - err := ds.Put(ctx, nil, nil) - if err != models.ErrDatastoreEmptyKey { - t.Log(buf.String()) - t.Fatalf("Test Put(nil,nil): expected error `%v`, but it was `%v`", models.ErrDatastoreEmptyKey, err) - } - - err = ds.Put(ctx, []byte("test"), []byte("success")) - if err != nil { - t.Log(buf.String()) - t.Fatalf("Test Put: unexpected error: %v", err) - } - - val, err := ds.Get(ctx, []byte("test")) - if err != nil { - t.Log(buf.String()) - t.Fatalf("Test Put: unexpected error: %v", err) - } - if string(val) != "success" { - t.Log(buf.String()) - t.Fatalf("Test Get: expected value to be `%v`, but it was `%v`", "success", string(val)) - } - - err = ds.Put(ctx, []byte("test"), nil) - if err != nil { - t.Log(buf.String()) - t.Fatalf("Test Put: unexpected error: %v", err) - } - - val, err = ds.Get(ctx, []byte("test")) - if err != nil { - t.Log(buf.String()) - t.Fatalf("Test Put: unexpected error: %v", err) - } - if string(val) != "" { - t.Log(buf.String()) - t.Fatalf("Test Get: expected value to be `%v`, but it was `%v`", "", string(val)) - } - }) } var testApp = &models.App{ diff --git a/api/datastore/internal/datastoreutil/shared.go b/api/datastore/internal/datastoreutil/shared.go index 2fe74ab5a..0842eb934 100644 --- a/api/datastore/internal/datastoreutil/shared.go +++ b/api/datastore/internal/datastoreutil/shared.go @@ -5,12 +5,13 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/Sirupsen/logrus" "strings" "gitlab-odx.oracle.com/odx/functions/api/models" ) +// TODO scrap for sqlx + type RowScanner interface { Scan(dest ...interface{}) error } @@ -165,213 +166,4 @@ func ScanCall(scanner RowScanner, call *models.FnCall) error { return err } return nil - -} - -func SQLGetCall(db *sql.DB, callSelector, callID, whereStm string) (*models.FnCall, error) { - var call models.FnCall - row := db.QueryRow(fmt.Sprintf(whereStm, callSelector), callID) - err := ScanCall(row, &call) - if err != nil { - return nil, err - } - return &call, nil -} - -func SQLGetCalls(db *sql.DB, cSelector string, filter *models.CallFilter, whereStm, andStm string) (models.FnCalls, error) { - res := models.FnCalls{} - filterQuery, args := BuildFilterCallQuery(filter, whereStm, andStm) - rows, err := db.Query(fmt.Sprintf("%s %s", cSelector, filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var call models.FnCall - err := ScanCall(rows, &call) - if err != nil { - continue - } - res = append(res, &call) - } - if err := rows.Err(); err != nil { - return nil, err - } - return res, nil -} - -func SQLGetApp(db *sql.DB, queryStr string, queryArgs ...interface{}) (*models.App, error) { - row := db.QueryRow(queryStr, queryArgs...) - - var resName string - var config string - err := row.Scan(&resName, &config) - if err != nil { - if err == sql.ErrNoRows { - return nil, models.ErrAppsNotFound - } - return nil, err - } - - res := &models.App{ - Name: resName, - } - - if len(config) > 0 { - err := json.Unmarshal([]byte(config), &res.Config) - if err != nil { - return nil, err - } - } - - return res, nil -} - -func SQLGetApps(db *sql.DB, filter *models.AppFilter, whereStm, selectStm string) ([]*models.App, error) { - res := []*models.App{} - filterQuery, args := BuildFilterAppQuery(filter, whereStm) - rows, err := db.Query(fmt.Sprintf(selectStm, filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var app models.App - err := ScanApp(rows, &app) - - if err != nil { - if err == sql.ErrNoRows { - return res, nil - } - return res, err - } - res = append(res, &app) - } - - if err := rows.Err(); err != nil { - return res, err - } - return res, nil -} - -func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, error) { - db, err := sql.Open(dialect, dataSourceName) - if err != nil { - return nil, err - } - - err = db.Ping() - if err != nil { - return nil, err - } - - maxIdleConns := 30 // c.MaxIdleConnections - db.SetMaxIdleConns(maxIdleConns) - logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info( - fmt.Sprintf("%v datastore dialed", dialect)) - - for _, v := range tables { - _, err = db.Exec(v) - if err != nil { - return nil, err - } - } - - return db, nil -} - -func SQLGetRoutes(db *sql.DB, filter *models.RouteFilter, rSelect string, whereStm, andStm string) ([]*models.Route, error) { - res := []*models.Route{} - filterQuery, args := BuildFilterRouteQuery(filter, whereStm, andStm) - rows, err := db.Query(fmt.Sprintf("%s %s", rSelect, filterQuery), args...) - // todo: check for no rows so we don't respond with a sql 500 err - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var route models.Route - err := ScanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - return res, nil - -} - -func SQLGetRoutesByApp(db *sql.DB, appName string, filter *models.RouteFilter, rSelect, defaultFilterQuery, whereStm, andStm string) ([]*models.Route, error) { - res := []*models.Route{} - var filterQuery string - var args []interface{} - if filter == nil { - filterQuery = defaultFilterQuery - args = []interface{}{appName} - } else { - filter.AppName = appName - filterQuery, args = BuildFilterRouteQuery(filter, whereStm, andStm) - } - rows, err := db.Query(fmt.Sprintf("%s %s", rSelect, filterQuery), args...) - // todo: check for no rows so we don't respond with a sql 500 err - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var route models.Route - err := ScanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - - return res, nil -} - -func SQLGetRoute(db *sql.DB, appName, routePath, rSelectCondition, routeSelector string) (*models.Route, error) { - var route models.Route - - row := db.QueryRow(fmt.Sprintf(rSelectCondition, routeSelector), appName, routePath) - err := ScanRoute(row, &route) - - if err == sql.ErrNoRows { - return nil, models.ErrRoutesNotFound - } else if err != nil { - return nil, err - } - return &route, nil -} - -func SQLRemoveRoute(db *sql.DB, appName, routePath, deleteStm string) error { - res, err := db.Exec(deleteStm, routePath, appName) - - if err != nil { - return err - } - - n, err := res.RowsAffected() - if err != nil { - return err - } - - if n == 0 { - return models.ErrRoutesRemoving - } - - return nil - } diff --git a/api/datastore/internal/datastoreutil/validator.go b/api/datastore/internal/datastoreutil/validator.go index e37e662d8..c747316b8 100644 --- a/api/datastore/internal/datastoreutil/validator.go +++ b/api/datastore/internal/datastoreutil/validator.go @@ -6,62 +6,28 @@ import ( "gitlab-odx.oracle.com/odx/functions/api/models" ) -// Datastore is a copy of models.Datastore, with additional comments on parameter guarantees. -type Datastore interface { - // name will never be empty. - GetApp(ctx context.Context, name string) (*models.App, error) - - GetApps(ctx context.Context, appFilter *models.AppFilter) ([]*models.App, error) - - // app and app.Name will never be nil/empty. - InsertApp(ctx context.Context, app *models.App) (*models.App, error) - UpdateApp(ctx context.Context, app *models.App) (*models.App, error) - - // name will never be empty. - RemoveApp(ctx context.Context, name string) error - - // appName and routePath will never be empty. - GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) - RemoveRoute(ctx context.Context, appName, routePath string) error - - GetRoutes(ctx context.Context, filter *models.RouteFilter) (routes []*models.Route, err error) - - // appName will never be empty - GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) (routes []*models.Route, err error) - - // route will never be nil and route's AppName and Path will never be empty. - InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) - UpdateRoute(ctx context.Context, route *models.Route) (*models.Route, error) - - InsertTask(ctx context.Context, task *models.Task) error - GetTask(ctx context.Context, callID string) (*models.FnCall, error) - GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) - - // key will never be nil/empty - Put(ctx context.Context, key, val []byte) error - Get(ctx context.Context, key []byte) ([]byte, error) -} - // NewValidator returns a models.Datastore which validates certain arguments before delegating to ds. -func NewValidator(ds Datastore) models.Datastore { +func NewValidator(ds models.Datastore) models.Datastore { return &validator{ds} } type validator struct { - ds Datastore + models.Datastore } +// name will never be empty. func (v *validator) GetApp(ctx context.Context, name string) (app *models.App, err error) { if name == "" { return nil, models.ErrDatastoreEmptyAppName } - return v.ds.GetApp(ctx, name) + return v.Datastore.GetApp(ctx, name) } func (v *validator) GetApps(ctx context.Context, appFilter *models.AppFilter) ([]*models.App, error) { - return v.ds.GetApps(ctx, appFilter) + return v.Datastore.GetApps(ctx, appFilter) } +// app and app.Name will never be nil/empty. func (v *validator) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { if app == nil { return nil, models.ErrDatastoreEmptyApp @@ -70,9 +36,10 @@ func (v *validator) InsertApp(ctx context.Context, app *models.App) (*models.App return nil, models.ErrDatastoreEmptyAppName } - return v.ds.InsertApp(ctx, app) + return v.Datastore.InsertApp(ctx, app) } +// app and app.Name will never be nil/empty. func (v *validator) UpdateApp(ctx context.Context, app *models.App) (*models.App, error) { if app == nil { return nil, models.ErrDatastoreEmptyApp @@ -80,17 +47,19 @@ func (v *validator) UpdateApp(ctx context.Context, app *models.App) (*models.App if app.Name == "" { return nil, models.ErrDatastoreEmptyAppName } - return v.ds.UpdateApp(ctx, app) + return v.Datastore.UpdateApp(ctx, app) } +// name will never be empty. func (v *validator) RemoveApp(ctx context.Context, name string) error { if name == "" { return models.ErrDatastoreEmptyAppName } - return v.ds.RemoveApp(ctx, name) + return v.Datastore.RemoveApp(ctx, name) } +// appName and routePath will never be empty. func (v *validator) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { if appName == "" { return nil, models.ErrDatastoreEmptyAppName @@ -99,24 +68,26 @@ func (v *validator) GetRoute(ctx context.Context, appName, routePath string) (*m return nil, models.ErrDatastoreEmptyRoutePath } - return v.ds.GetRoute(ctx, appName, routePath) + return v.Datastore.GetRoute(ctx, appName, routePath) } func (v *validator) GetRoutes(ctx context.Context, routeFilter *models.RouteFilter) (routes []*models.Route, err error) { if routeFilter != nil && routeFilter.AppName != "" { - return v.ds.GetRoutesByApp(ctx, routeFilter.AppName, routeFilter) + return v.Datastore.GetRoutesByApp(ctx, routeFilter.AppName, routeFilter) } - return v.ds.GetRoutes(ctx, routeFilter) + return v.Datastore.GetRoutes(ctx, routeFilter) } +// appName will never be empty func (v *validator) GetRoutesByApp(ctx context.Context, appName string, routeFilter *models.RouteFilter) (routes []*models.Route, err error) { if appName == "" { return nil, models.ErrDatastoreEmptyAppName } - return v.ds.GetRoutesByApp(ctx, appName, routeFilter) + return v.Datastore.GetRoutesByApp(ctx, appName, routeFilter) } +// route will never be nil and route's AppName and Path will never be empty. func (v *validator) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { if route == nil { return nil, models.ErrDatastoreEmptyRoute @@ -128,9 +99,10 @@ func (v *validator) InsertRoute(ctx context.Context, route *models.Route) (*mode return nil, models.ErrDatastoreEmptyRoutePath } - return v.ds.InsertRoute(ctx, route) + return v.Datastore.InsertRoute(ctx, route) } +// route will never be nil and route's AppName and Path will never be empty. func (v *validator) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { if newroute == nil { return nil, models.ErrDatastoreEmptyRoute @@ -141,9 +113,10 @@ func (v *validator) UpdateRoute(ctx context.Context, newroute *models.Route) (*m if newroute.Path == "" { return nil, models.ErrDatastoreEmptyRoutePath } - return v.ds.UpdateRoute(ctx, newroute) + return v.Datastore.UpdateRoute(ctx, newroute) } +// appName and routePath will never be empty. func (v *validator) RemoveRoute(ctx context.Context, appName, routePath string) error { if appName == "" { return models.ErrDatastoreEmptyAppName @@ -152,35 +125,13 @@ func (v *validator) RemoveRoute(ctx context.Context, appName, routePath string) return models.ErrDatastoreEmptyRoutePath } - return v.ds.RemoveRoute(ctx, appName, routePath) -} - -func (v *validator) Put(ctx context.Context, key, value []byte) error { - if len(key) == 0 { - return models.ErrDatastoreEmptyKey - } - - return v.ds.Put(ctx, key, value) -} - -func (v *validator) Get(ctx context.Context, key []byte) ([]byte, error) { - if len(key) == 0 { - return nil, models.ErrDatastoreEmptyKey - } - return v.ds.Get(ctx, key) -} - -func (v *validator) InsertTask(ctx context.Context, task *models.Task) error { - return v.ds.InsertTask(ctx, task) + return v.Datastore.RemoveRoute(ctx, appName, routePath) } +// callID will never be empty. func (v *validator) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { if callID == "" { return nil, models.ErrDatastoreEmptyTaskID } - return v.ds.GetTask(ctx, callID) -} - -func (v *validator) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { - return v.ds.GetTasks(ctx, filter) + return v.Datastore.GetTask(ctx, callID) } diff --git a/api/datastore/mock.go b/api/datastore/mock.go index 6879e0dca..d3a22b2c9 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -4,6 +4,7 @@ import ( "context" "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" + "gitlab-odx.oracle.com/odx/functions/api/logs" "gitlab-odx.oracle.com/odx/functions/api/models" ) @@ -12,13 +13,15 @@ type mock struct { Routes models.Routes Calls models.FnCalls data map[string][]byte + + models.FnLog } func NewMock() models.Datastore { return NewMockInit(nil, nil, nil, nil) } -func NewMockInit(apps models.Apps, routes models.Routes, calls models.FnCalls, logs []*models.FnCallLog) models.Datastore { +func NewMockInit(apps models.Apps, routes models.Routes, calls models.FnCalls, loggos []*models.FnCallLog) models.Datastore { if apps == nil { apps = models.Apps{} } @@ -28,10 +31,10 @@ func NewMockInit(apps models.Apps, routes models.Routes, calls models.FnCalls, l if calls == nil { calls = models.FnCalls{} } - if logs == nil { - logs = []*models.FnCallLog{} + if loggos == nil { + loggos = []*models.FnCallLog{} } - return datastoreutil.NewValidator(&mock{apps, routes, calls, make(map[string][]byte)}) + return datastoreutil.NewValidator(&mock{apps, routes, calls, make(map[string][]byte), logs.NewMock()}) } func (m *mock) GetApp(ctx context.Context, appName string) (app *models.App, err error) { diff --git a/api/datastore/mysql/mysql.go b/api/datastore/mysql/mysql.go deleted file mode 100644 index 63cc22176..000000000 --- a/api/datastore/mysql/mysql.go +++ /dev/null @@ -1,453 +0,0 @@ -package mysql - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "net/url" - - "github.com/go-sql-driver/mysql" - _ "github.com/go-sql-driver/mysql" - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" -) - -const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes ( - app_name varchar(256) NOT NULL, - path varchar(256) NOT NULL, - image varchar(256) NOT NULL, - format varchar(16) NOT NULL, - memory int NOT NULL, - timeout int NOT NULL, - idle_timeout int NOT NULL, - type varchar(16) NOT NULL, - headers text NOT NULL, - config text NOT NULL, - PRIMARY KEY (app_name, path) -);` - -const appsTableCreate = `CREATE TABLE IF NOT EXISTS apps ( - name varchar(256) NOT NULL PRIMARY KEY, - config text NOT NULL -);` - -const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( - id varchar(256) NOT NULL PRIMARY KEY, - value varchar(256) NOT NULL -);` - -const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` - -const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( - created_at varchar(256) NOT NULL, - started_at varchar(256) NOT NULL, - completed_at varchar(256) NOT NULL, - status varchar(256) NOT NULL, - id varchar(256) NOT NULL, - app_name varchar(256) NOT NULL, - path varchar(256) NOT NULL, - PRIMARY KEY (id) -);` - -const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` - -/* -MySQLDatastore defines a basic MySQL Datastore struct. -*/ -type MySQLDatastore struct { - db *sql.DB -} - -/* -New creates a new MySQL Datastore. -*/ -func New(url *url.URL) (models.Datastore, error) { - tables := []string{routesTableCreate, appsTableCreate, - extrasTableCreate, callTableCreate} - dialect := "mysql" - sqlDatastore := &MySQLDatastore{} - dataSourceName := fmt.Sprintf("%s@%s%s", url.User.String(), url.Host, url.Path) - - db, err := datastoreutil.NewDatastore(dataSourceName, dialect, tables) - if err != nil { - return nil, err - } - - sqlDatastore.db = db - return datastoreutil.NewValidator(sqlDatastore), nil - -} - -/* -InsertApp inserts an app to MySQL. -*/ -func (ds *MySQLDatastore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { - var cbyte []byte - var err error - - if app.Config != nil { - cbyte, err = json.Marshal(app.Config) - if err != nil { - return nil, err - } - } - stmt, err := ds.db.Prepare("INSERT apps SET name=?,config=?") - - if err != nil { - return nil, err - } - - _, err = stmt.Exec(app.Name, string(cbyte)) - - if err != nil { - mysqlErr := err.(*mysql.MySQLError) - if mysqlErr.Number == 1062 { - return nil, models.ErrAppsAlreadyExists - } - return nil, err - } - - return app, nil -} - -/* -UpdateApp updates an existing app on MySQL. -*/ -func (ds *MySQLDatastore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { - app := &models.App{Name: newapp.Name} - err := ds.Tx(func(tx *sql.Tx) error { - row := ds.db.QueryRow(`SELECT config FROM apps WHERE name=?`, app.Name) - - var config string - if err := row.Scan(&config); err != nil { - if err == sql.ErrNoRows { - return models.ErrAppsNotFound - } - return err - } - - if config != "" { - err := json.Unmarshal([]byte(config), &app.Config) - if err != nil { - return err - } - } - - app.UpdateConfig(newapp.Config) - - cbyte, err := json.Marshal(app.Config) - if err != nil { - return err - } - - stmt, err := ds.db.Prepare(`UPDATE apps SET config=? WHERE name=?`) - - if err != nil { - return err - } - - res, err := stmt.Exec(string(cbyte), app.Name) - - if err != nil { - return err - } - - if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return models.ErrAppsNotFound - } - return nil - }) - - if err != nil { - return nil, err - } - - return app, nil -} - -/* -RemoveApp removes an existing app on MySQL. -*/ -func (ds *MySQLDatastore) RemoveApp(ctx context.Context, appName string) error { - _, err := ds.db.Exec(` - DELETE FROM apps - WHERE name = ? - `, appName) - - return err -} - -/* -GetApp retrieves an app from MySQL. -*/ -func (ds *MySQLDatastore) GetApp(ctx context.Context, name string) (*models.App, error) { - queryStr := `SELECT name, config FROM apps WHERE name=?` - queryArgs := []interface{}{name} - return datastoreutil.SQLGetApp(ds.db, queryStr, queryArgs...) -} - -/* -GetApps retrieves an array of apps according to a specific filter. -*/ -func (ds *MySQLDatastore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - whereStm := "WHERE name LIKE ?" - selectStm := "SELECT DISTINCT name, config FROM apps %s" - - return datastoreutil.SQLGetApps(ds.db, filter, whereStm, selectStm) -} - -/* -InsertRoute inserts an route to MySQL. -*/ -func (ds *MySQLDatastore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { - hbyte, err := json.Marshal(route.Headers) - if err != nil { - return nil, err - } - - cbyte, err := json.Marshal(route.Config) - if err != nil { - return nil, err - } - - err = ds.Tx(func(tx *sql.Tx) error { - r := tx.QueryRow(`SELECT 1 FROM apps WHERE name=?`, route.AppName) - if err := r.Scan(new(int)); err != nil { - if err == sql.ErrNoRows { - return models.ErrAppsNotFound - } - } - same, err := tx.Query(`SELECT 1 FROM routes WHERE app_name=? AND path=?`, - route.AppName, route.Path) - if err != nil { - return err - } - defer same.Close() - if same.Next() { - return models.ErrRoutesAlreadyExists - } - - _, err = tx.Exec(` - INSERT INTO routes ( - app_name, - path, - image, - format, - memory, - type, - timeout, - idle_timeout, - headers, - config - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, - route.AppName, - route.Path, - route.Image, - route.Format, - route.Memory, - route.Type, - route.Timeout, - route.IdleTimeout, - string(hbyte), - string(cbyte), - ) - return err - }) - - if err != nil { - return nil, err - } - return route, nil -} - -/* -UpdateRoute updates an existing route on MySQL. -*/ -func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { - var route models.Route - err := ds.Tx(func(tx *sql.Tx) error { - row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector), newroute.AppName, newroute.Path) - if err := datastoreutil.ScanRoute(row, &route); err == sql.ErrNoRows { - return models.ErrRoutesNotFound - } else if err != nil { - return err - } - - route.Update(newroute) - - hbyte, err := json.Marshal(route.Headers) - if err != nil { - return err - } - - cbyte, err := json.Marshal(route.Config) - if err != nil { - return err - } - - res, err := tx.Exec(` - UPDATE routes SET - image = ?, - format = ?, - memory = ?, - type = ?, - timeout = ?, - idle_timeout = ?, - headers = ?, - config = ? - WHERE app_name = ? AND path = ?;`, - route.Image, - route.Format, - route.Memory, - route.Type, - route.Timeout, - route.IdleTimeout, - string(hbyte), - string(cbyte), - route.AppName, - route.Path, - ) - - if err != nil { - return err - } - - if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return models.ErrRoutesNotFound - } - - return nil - }) - - if err != nil { - return nil, err - } - return &route, nil -} - -/* -RemoveRoute removes an existing route on MySQL. -*/ -func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { - deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) -} - -/* -GetRoute retrieves a route from MySQL. -*/ -func (ds *MySQLDatastore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - rSelectCondition := "%s WHERE app_name=? AND path=?" - - return datastoreutil.SQLGetRoute(ds.db, appName, routePath, rSelectCondition, routeSelector) -} - -/* -GetRoutes retrieves an array of routes according to a specific filter. -*/ -func (ds *MySQLDatastore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - whereStm := "WHERE %s ?" - andStm := " AND %s ?" - - return datastoreutil.SQLGetRoutes(ds.db, filter, routeSelector, whereStm, andStm) -} - -/* -GetRoutesByApp retrieves a route with a specific app name. -*/ -func (ds *MySQLDatastore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - whereStm := "WHERE %s ?" - andStm := " AND %s ?" - defaultFilterQuery := "WHERE app_name = ?" - - return datastoreutil.SQLGetRoutesByApp(ds.db, appName, filter, routeSelector, defaultFilterQuery, whereStm, andStm) -} - -/* -Put inserts an extra into MySQL. -*/ -func (ds *MySQLDatastore) Put(ctx context.Context, key, value []byte) error { - _, err := ds.db.Exec(` - INSERT INTO extras ( - id, - value - ) - VALUES (?, ?) - ON DUPLICATE KEY UPDATE - value = ? - `, string(key), string(value), string(value)) - - if err != nil { - return err - } - - return nil -} - -/* -Get retrieves the value of a specific extra from MySQL. -*/ -func (ds *MySQLDatastore) Get(ctx context.Context, key []byte) ([]byte, error) { - row := ds.db.QueryRow("SELECT value FROM extras WHERE id=?", key) - - var value string - err := row.Scan(&value) - if err == sql.ErrNoRows { - return nil, nil - } else if err != nil { - return nil, err - } - - return []byte(value), nil -} - -/* -Tx Begins and commits a MySQL Transaction. -*/ -func (ds *MySQLDatastore) Tx(f func(*sql.Tx) error) error { - tx, err := ds.db.Begin() - if err != nil { - return err - } - err = f(tx) - if err != nil { - tx.Rollback() - return err - } - return tx.Commit() -} - -func (ds *MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { - stmt, err := ds.db.Prepare("INSERT calls SET id=?,created_at=?,started_at=?,completed_at=?,status=?,app_name=?,path=?") - if err != nil { - return err - } - _, err = stmt.Exec(task.ID, task.CreatedAt.String(), - task.StartedAt.String(), task.CompletedAt.String(), - task.Status, task.AppName, task.Path) - - if err != nil { - return err - } - - return nil -} - -func (ds *MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { - whereStm := "%s WHERE id=?" - - return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) -} - -func (ds *MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { - whereStm := "WHERE %s ?" - andStm := " AND %s ?" - - return datastoreutil.SQLGetCalls(ds.db, callSelector, filter, whereStm, andStm) -} diff --git a/api/datastore/mysql/mysql_test.go b/api/datastore/mysql/mysql_test.go deleted file mode 100644 index 83697b2c6..000000000 --- a/api/datastore/mysql/mysql_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package mysql - -import ( - "bytes" - "database/sql" - "fmt" - "log" - "net/url" - "os" - "os/exec" - "strconv" - "testing" - "time" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoretest" -) - -const tmpMysql = "mysql://root:root@tcp(%s:%d)/funcs" - -var ( - mysqlHost = func() string { - host := os.Getenv("MYSQL_HOST") - if host == "" { - host = "127.0.0.1" - } - return host - }() - mysqlPort = func() int { - port := os.Getenv("MYSQL_PORT") - if port == "" { - port = "3307" - } - p, err := strconv.Atoi(port) - if err != nil { - panic(err) - } - return p - }() -) - -func prepareMysqlTest(logf, fatalf func(string, ...interface{})) (func(), func()) { - timeout := time.After(60 * time.Second) - wait := 2 * time.Second - var db *sql.DB - var err error - var buf bytes.Buffer - time.Sleep(time.Second * 25) - for { - db, err = sql.Open("mysql", fmt.Sprintf("root:root@tcp(%s:%v)/", - mysqlHost, mysqlPort)) - if err != nil { - fmt.Fprintln(&buf, "failed to connect to mysql:", err) - fmt.Fprintln(&buf, "retrying in:", wait) - } else { - // Ping - if _, err = db.Exec("SELECT 1"); err == nil { - break - } - fmt.Fprintln(&buf, "failed to ping database:", err) - } - select { - case <-timeout: - fmt.Println(buf.String()) - log.Fatal("timed out waiting for mysql") - case <-time.After(wait): - continue - } - } - - _, err = db.Exec("DROP DATABASE IF EXISTS funcs;") - if err != nil { - fmt.Println("failed to drop database:", err) - } - _, err = db.Exec("CREATE DATABASE funcs;") - if err != nil { - fatalf("failed to create database: %s\n", err) - } - _, err = db.Exec(`GRANT ALL PRIVILEGES ON funcs.* TO root@localhost WITH GRANT OPTION;`) - if err != nil { - fatalf("failed to grant priviledges to user 'mysql: %s\n", err) - panic(err) - } - - fmt.Println("mysql for test ready") - return func() { - db, err := sql.Open("mysql", fmt.Sprintf("root:root@tcp(%s:%d)/", - mysqlHost, mysqlPort)) - if err != nil { - fatalf("failed to connect for truncation: %s\n", err) - } - for _, table := range []string{"routes", "apps", "extras"} { - _, err = db.Exec(`TRUNCATE TABLE ` + table) - if err != nil { - fatalf("failed to truncate table %q: %s\n", table, err) - } - } - }, - func() { - tryRun(logf, "stop mysql container", exec.Command("docker", "rm", "-vf", "func-mysql-test")) - } -} - -func TestDatastore(t *testing.T) { - _, close := prepareMysqlTest(t.Logf, t.Fatalf) - defer close() - - u, err := url.Parse(fmt.Sprintf(tmpMysql, mysqlHost, mysqlPort)) - if err != nil { - t.Fatalf("failed to parse url: %s\n", err) - } - ds, err := New(u) - if err != nil { - t.Fatalf("failed to create mysql datastore: %s\n", err) - } - - datastoretest.Test(t, ds) -} - -func tryRun(logf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - logf("failed to %s: %s", desc, b.String()) - } -} - -func mustRun(fatalf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - fatalf("failed to %s: %s", desc, b.String()) - } -} diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go deleted file mode 100644 index cf2193eed..000000000 --- a/api/datastore/postgres/postgres.go +++ /dev/null @@ -1,402 +0,0 @@ -package postgres - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "net/url" - - "github.com/lib/pq" - _ "github.com/lib/pq" - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" -) - -const routesTableCreate = ` -CREATE TABLE IF NOT EXISTS routes ( - app_name character varying(256) NOT NULL, - path text NOT NULL, - image character varying(256) NOT NULL, - format character varying(16) NOT NULL, - memory integer NOT NULL, - timeout integer NOT NULL, - idle_timeout integer NOT NULL, - type character varying(16) NOT NULL, - headers text NOT NULL, - config text NOT NULL, - PRIMARY KEY (app_name, path) -);` - -const appsTableCreate = `CREATE TABLE IF NOT EXISTS apps ( - name character varying(256) NOT NULL PRIMARY KEY, - config text NOT NULL -);` - -const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( - key character varying(256) NOT NULL PRIMARY KEY, - value character varying(256) NOT NULL -);` - -const routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` - -const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls ( - created_at character varying(256) NOT NULL, - started_at character varying(256) NOT NULL, - completed_at character varying(256) NOT NULL, - status character varying(256) NOT NULL, - id character varying(256) NOT NULL, - app_name character varying(256) NOT NULL, - path character varying(256) NOT NULL, - PRIMARY KEY (id) -);` - -const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` - -type PostgresDatastore struct { - db *sql.DB -} - -func New(url *url.URL) (models.Datastore, error) { - tables := []string{routesTableCreate, appsTableCreate, extrasTableCreate, callsTableCreate} - sqlDatastore := &PostgresDatastore{} - dialect := "postgres" - - db, err := datastoreutil.NewDatastore(url.String(), dialect, tables) - if err != nil { - return nil, err - } - - sqlDatastore.db = db - return datastoreutil.NewValidator(sqlDatastore), nil -} - -func (ds *PostgresDatastore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { - var cbyte []byte - var err error - - if app.Config != nil { - cbyte, err = json.Marshal(app.Config) - if err != nil { - return nil, err - } - } - - _, err = ds.db.Exec(`INSERT INTO apps (name, config) VALUES ($1, $2);`, - app.Name, - string(cbyte), - ) - - if err != nil { - pqErr := err.(*pq.Error) - if pqErr.Code == "23505" { - return nil, models.ErrAppsAlreadyExists - } - return nil, err - } - - return app, nil -} - -func (ds *PostgresDatastore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { - app := &models.App{Name: newapp.Name} - err := ds.Tx(func(tx *sql.Tx) error { - row := ds.db.QueryRow("SELECT config FROM apps WHERE name=$1", app.Name) - - var config string - if err := row.Scan(&config); err != nil { - if err == sql.ErrNoRows { - return models.ErrAppsNotFound - } - return err - } - - if len(config) > 0 { - err := json.Unmarshal([]byte(config), &app.Config) - if err != nil { - return err - } - } - - app.UpdateConfig(newapp.Config) - - cbyte, err := json.Marshal(app.Config) - if err != nil { - return err - } - - res, err := ds.db.Exec(`UPDATE apps SET config = $2 WHERE name = $1;`, app.Name, string(cbyte)) - if err != nil { - return err - } - - if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return models.ErrAppsNotFound - } - return nil - }) - - if err != nil { - return nil, err - } - - return app, nil -} - -func (ds *PostgresDatastore) RemoveApp(ctx context.Context, appName string) error { - _, err := ds.db.Exec(`DELETE FROM apps WHERE name = $1`, appName) - - return err -} - -func (ds *PostgresDatastore) GetApp(ctx context.Context, name string) (*models.App, error) { - queryStr := "SELECT name, config FROM apps WHERE name=$1" - queryArgs := []interface{}{name} - - return datastoreutil.SQLGetApp(ds.db, queryStr, queryArgs...) -} - -func (ds *PostgresDatastore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - whereStm := "WHERE name LIKE $1" - selectStm := "SELECT DISTINCT * FROM apps %s" - - return datastoreutil.SQLGetApps(ds.db, filter, whereStm, selectStm) -} - -func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { - hbyte, err := json.Marshal(route.Headers) - if err != nil { - return nil, err - } - - cbyte, err := json.Marshal(route.Config) - if err != nil { - return nil, err - } - - err = ds.Tx(func(tx *sql.Tx) error { - r := tx.QueryRow(`SELECT 1 FROM apps WHERE name=$1`, route.AppName) - if err := r.Scan(new(int)); err != nil { - if err == sql.ErrNoRows { - return models.ErrAppsNotFound - } - return err - } - - same, err := tx.Query(`SELECT 1 FROM routes WHERE app_name=$1 AND path=$2`, - route.AppName, route.Path) - if err != nil { - return err - } - defer same.Close() - if same.Next() { - return models.ErrRoutesAlreadyExists - } - - _, err = tx.Exec(` - INSERT INTO routes ( - app_name, - path, - image, - format, - memory, - type, - timeout, - idle_timeout, - headers, - config - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`, - route.AppName, - route.Path, - route.Image, - route.Format, - route.Memory, - route.Type, - route.Timeout, - route.IdleTimeout, - string(hbyte), - string(cbyte), - ) - return err - }) - - if err != nil { - return nil, err - } - return route, nil -} - -func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { - var route models.Route - err := ds.Tx(func(tx *sql.Tx) error { - row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=$1 AND path=$2", routeSelector), newroute.AppName, newroute.Path) - if err := datastoreutil.ScanRoute(row, &route); err == sql.ErrNoRows { - return models.ErrRoutesNotFound - } else if err != nil { - return err - } - - route.Update(newroute) - - hbyte, err := json.Marshal(route.Headers) - if err != nil { - return err - } - - cbyte, err := json.Marshal(route.Config) - if err != nil { - return err - } - - res, err := tx.Exec(` - UPDATE routes SET - image = $3, - format = $4, - memory = $5, - type = $6, - timeout = $7, - idle_timeout = $8, - headers = $9, - config = $10 - WHERE app_name = $1 AND path = $2;`, - route.AppName, - route.Path, - route.Image, - route.Format, - route.Memory, - route.Type, - route.Timeout, - route.IdleTimeout, - string(hbyte), - string(cbyte), - ) - - if err != nil { - return err - } - - if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return models.ErrRoutesNotFound - } - - return nil - }) - - if err != nil { - return nil, err - } - return &route, nil -} - -func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { - deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2` - return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) -} - -func (ds *PostgresDatastore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - rSelectCondition := "%s WHERE app_name=$1 AND path=$2" - - return datastoreutil.SQLGetRoute(ds.db, appName, routePath, rSelectCondition, routeSelector) -} - -func (ds *PostgresDatastore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - whereStm := "WHERE %s $1" - andStm := " AND %s $%d" - - return datastoreutil.SQLGetRoutes(ds.db, filter, routeSelector, whereStm, andStm) -} - -func (ds *PostgresDatastore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - defaultFilterQuery := "WHERE app_name = $1" - whereStm := "WHERE %s $1" - andStm := " AND %s $%d" - - return datastoreutil.SQLGetRoutesByApp(ds.db, appName, filter, routeSelector, defaultFilterQuery, whereStm, andStm) -} - -func (ds *PostgresDatastore) Put(ctx context.Context, key, value []byte) error { - _, err := ds.db.Exec(` - INSERT INTO extras ( - key, - value - ) - VALUES ($1, $2) - ON CONFLICT (key) DO UPDATE SET - value = $2; - `, string(key), string(value)) - - if err != nil { - return err - } - - return nil -} - -func (ds *PostgresDatastore) Get(ctx context.Context, key []byte) ([]byte, error) { - row := ds.db.QueryRow("SELECT value FROM extras WHERE key=$1", key) - - var value string - err := row.Scan(&value) - if err == sql.ErrNoRows { - return nil, nil - } else if err != nil { - return nil, err - } - - return []byte(value), nil -} - -func (ds *PostgresDatastore) Tx(f func(*sql.Tx) error) error { - tx, err := ds.db.Begin() - if err != nil { - return err - } - err = f(tx) - if err != nil { - tx.Rollback() - return err - } - return tx.Commit() -} - -func (ds *PostgresDatastore) InsertTask(ctx context.Context, task *models.Task) error { - err := ds.Tx(func(tx *sql.Tx) error { - _, err := tx.Exec( - `INSERT INTO calls ( - id, - created_at, - started_at, - completed_at, - status, - app_name, - path) VALUES ($1, $2, $3, $4, $5, $6, $7);`, - task.ID, - task.CreatedAt.String(), - task.StartedAt.String(), - task.CompletedAt.String(), - task.Status, - task.AppName, - task.Path, - ) - return err - }) - return err -} - -func (ds *PostgresDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { - whereStm := "%s WHERE id=$1" - - return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) -} - -func (ds *PostgresDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { - whereStm := "WHERE %s $1" - andStm := " AND %s $2" - - return datastoreutil.SQLGetCalls(ds.db, callSelector, filter, whereStm, andStm) -} diff --git a/api/datastore/postgres/postgres_test.go b/api/datastore/postgres/postgres_test.go deleted file mode 100644 index 2d9d4e053..000000000 --- a/api/datastore/postgres/postgres_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package postgres - -import ( - "bytes" - "database/sql" - "fmt" - "log" - "net/url" - "os" - "os/exec" - "strconv" - "testing" - "time" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoretest" -) - -const tmpPostgres = "postgres://postgres@%s:%d/funcs?sslmode=disable" - -var ( - postgresHost = func() string { - host := os.Getenv("POSTGRES_HOST") - if host == "" { - host = "127.0.0.1" - } - return host - }() - postgresPort = func() int { - port := os.Getenv("POSTGRES_PORT") - if port == "" { - port = "15432" - } - p, err := strconv.Atoi(port) - if err != nil { - panic(err) - } - return p - }() -) - -func preparePostgresTest(logf, fatalf func(string, ...interface{})) (func(), func()) { - timeout := time.After(20 * time.Second) - wait := 500 * time.Millisecond - - for { - db, err := sql.Open("postgres", fmt.Sprintf("postgres://postgres@%s:%d?sslmode=disable", - postgresHost, postgresPort)) - if err != nil { - fmt.Println("failed to connect to postgres:", err) - fmt.Println("retrying in:", wait) - } else { - _, err = db.Exec(`CREATE DATABASE funcs;`) - if err != nil { - fmt.Println("failed to create database:", err) - fmt.Println("retrying in:", wait) - - } else { - _, err = db.Exec(`GRANT ALL PRIVILEGES ON DATABASE funcs TO postgres;`) - if err == nil { - break - } - fmt.Println("failed to grant privileges:", err) - fmt.Println("retrying in:", wait) - } - - } - select { - case <-timeout: - log.Fatal("timed out waiting for postgres") - case <-time.After(wait): - continue - } - } - fmt.Println("postgres for test ready") - return func() { - db, err := sql.Open("postgres", fmt.Sprintf(tmpPostgres, postgresHost, postgresPort)) - if err != nil { - fatalf("failed to connect for truncation: %s\n", err) - } - for _, table := range []string{"routes", "apps", "extras"} { - _, err = db.Exec(`TRUNCATE TABLE ` + table) - if err != nil { - fatalf("failed to truncate table %q: %s\n", table, err) - } - } - }, - func() { - tryRun(logf, "stop postgres container", exec.Command("docker", "rm", "-fv", "func-postgres-test")) - } -} - -func TestDatastore(t *testing.T) { - _, close := preparePostgresTest(t.Logf, t.Fatalf) - defer close() - - u, err := url.Parse(fmt.Sprintf(tmpPostgres, postgresHost, postgresPort)) - if err != nil { - t.Fatalf("failed to parse url: %v", err) - } - ds, err := New(u) - if err != nil { - t.Fatalf("failed to create postgres datastore: %v", err) - } - - datastoretest.Test(t, ds) -} - -func tryRun(logf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - logf("failed to %s: %s", desc, b.String()) - } -} - -func mustRun(fatalf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - fatalf("failed to %s: %s", desc, b.String()) - } -} diff --git a/api/datastore/redis/redis.go b/api/datastore/redis/redis.go deleted file mode 100644 index 558ce96f0..000000000 --- a/api/datastore/redis/redis.go +++ /dev/null @@ -1,399 +0,0 @@ -package redis - -import ( - "context" - "encoding/json" - "fmt" - "net/url" - "regexp" - "strings" - "time" - - "github.com/Sirupsen/logrus" - "github.com/garyburd/redigo/redis" - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" - "gitlab-odx.oracle.com/odx/functions/api/models" -) - -type RedisDataStore struct { - pool *redis.Pool -} - -func New(url *url.URL) (models.Datastore, error) { - pool := &redis.Pool{ - MaxIdle: 512, - // I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea. - MaxActive: 512, - Wait: true, - IdleTimeout: 300 * time.Second, - Dial: func() (redis.Conn, error) { - return redis.DialURL(url.String()) - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, - } - // Force a connection so we can fail in case of error. - conn := pool.Get() - - if err := conn.Err(); err != nil { - logrus.WithError(err).Fatal("Error connecting to redis") - } - ds := &RedisDataStore{ - pool: pool, - } - return datastoreutil.NewValidator(ds), nil -} - -func (ds *RedisDataStore) setApp(app *models.App) (*models.App, error) { - appBytes, err := json.Marshal(app) - if err != nil { - return nil, err - } - - conn := ds.pool.Get() - defer conn.Close() - if _, err := conn.Do("HSET", "apps", app.Name, appBytes); err != nil { - return nil, err - } - return app, nil -} - -func (ds *RedisDataStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HEXISTS", "apps", app.Name) - if err != nil { - return nil, err - } - if exists, err := redis.Bool(reply, err); err != nil { - return nil, err - } else if exists { - return nil, models.ErrAppsAlreadyExists - } - - return ds.setApp(app) -} - -func (ds *RedisDataStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { - app, err := ds.GetApp(ctx, newapp.Name) - if err != nil { - return nil, err - } - - app.UpdateConfig(newapp.Config) - - return ds.setApp(app) -} - -func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error { - conn := ds.pool.Get() - defer conn.Close() - if _, err := conn.Do("HDEL", "apps", appName); err != nil { - return err - } - - return nil -} - -func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, error) { - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGET", "apps", name) - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrAppsNotFound - } - - res := &models.App{} - if err := json.Unmarshal(reply.([]byte), res); err != nil { - return nil, err - } - - return res, nil -} - -func (ds *RedisDataStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - res := []*models.App{} - - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGETALL", "apps") - if err != nil { - return nil, err - } - - apps, err := redis.StringMap(reply, err) - if err != nil { - return nil, err - } - - for _, v := range apps { - var app models.App - if err := json.Unmarshal([]byte(v), &app); err != nil { - return nil, err - } - if applyAppFilter(&app, filter) { - res = append(res, &app) - } - } - return res, nil -} - -func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Route, error) { - buf, err := json.Marshal(route) - if err != nil { - return nil, err - } - - conn := ds.pool.Get() - defer conn.Close() - if _, err := conn.Do("HSET", set, route.Path, buf); err != nil { - return nil, err - } - - return route, nil -} - -func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HEXISTS", "apps", route.AppName) - if err != nil { - return nil, err - } - if exists, err := redis.Bool(reply, err); err != nil { - return nil, err - } else if !exists { - return nil, models.ErrAppsNotFound - } - - hset := fmt.Sprintf("routes:%s", route.AppName) - - reply, err = conn.Do("HEXISTS", hset, route.Path) - if err != nil { - return nil, err - } - - if exists, err := redis.Bool(reply, err); err != nil { - return nil, err - } else if exists { - return nil, models.ErrRoutesAlreadyExists - } - - return ds.setRoute(hset, route) -} - -func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { - route, err := ds.GetRoute(ctx, newroute.AppName, newroute.Path) - if err != nil { - return nil, err - } - - route.Update(newroute) - - hset := fmt.Sprintf("routes:%s", route.AppName) - - return ds.setRoute(hset, route) -} - -func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath string) error { - hset := fmt.Sprintf("routes:%s", appName) - conn := ds.pool.Get() - defer conn.Close() - if n, err := conn.Do("HDEL", hset, routePath); err != nil { - return err - } else if n == 0 { - return models.ErrRoutesRemoving - } - - return nil -} - -func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - hset := fmt.Sprintf("routes:%s", appName) - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGET", hset, routePath) - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrRoutesNotFound - } - - var route models.Route - if err := json.Unmarshal(reply.([]byte), &route); err != nil { - return nil, err - } - - return &route, nil -} - -func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} - - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HKEYS", "apps") - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrRoutesNotFound - } - paths, err := redis.Strings(reply, err) - - for _, path := range paths { - hset := fmt.Sprintf("routes:%s", path) - reply, err := conn.Do("HGETALL", hset) - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrRoutesNotFound - } - routes, err := redis.StringMap(reply, err) - - for _, v := range routes { - var route models.Route - if err := json.Unmarshal([]byte(v), &route); err != nil { - return nil, err - } - if applyRouteFilter(&route, filter) { - res = append(res, &route) - } - } - } - - return res, nil -} - -func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - if filter == nil { - filter = new(models.RouteFilter) - } - filter.AppName = appName - res := []*models.Route{} - - hset := fmt.Sprintf("routes:%s", appName) - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGETALL", hset) - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrRoutesNotFound - } - routes, err := redis.StringMap(reply, err) - - for _, v := range routes { - var route models.Route - if err := json.Unmarshal([]byte(v), &route); err != nil { - return nil, err - } - if applyRouteFilter(&route, filter) { - res = append(res, &route) - } - } - - return res, nil -} - -func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error { - conn := ds.pool.Get() - defer conn.Close() - if _, err := conn.Do("HSET", "extras", key, value); err != nil { - return err - } - - return nil -} - -func (ds *RedisDataStore) Get(ctx context.Context, key []byte) ([]byte, error) { - conn := ds.pool.Get() - defer conn.Close() - value, err := conn.Do("HGET", "extras", key) - if err != nil { - return nil, err - } - - return value.([]byte), nil -} - -func applyAppFilter(app *models.App, filter *models.AppFilter) bool { - if filter != nil && filter.Name != "" { - nameLike, err := regexp.MatchString(strings.Replace(filter.Name, "%", ".*", -1), app.Name) - return err == nil && nameLike - } - - return true -} - -func applyRouteFilter(route *models.Route, filter *models.RouteFilter) bool { - return filter == nil || (filter.Path == "" || route.Path == filter.Path) && - (filter.AppName == "" || route.AppName == filter.AppName) && - (filter.Image == "" || route.Image == filter.Image) -} - -func applyCallFilter(call *models.FnCall, filter *models.CallFilter) bool { - return filter == nil || (filter.Path == "" || call.Path == filter.Path) && - (filter.AppName == "" || call.AppName == filter.AppName) -} - -func (ds *RedisDataStore) InsertTask(ctx context.Context, task *models.Task) error { - taskBytes, err := json.Marshal(task) - if err != nil { - return err - } - - conn := ds.pool.Get() - defer conn.Close() - if _, err := conn.Do("HSET", "calls", task.ID, taskBytes); err != nil { - return err - } - return nil -} - -func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGET", "calls", callID) - if err != nil { - return nil, err - } else if reply == nil { - return nil, models.ErrCallNotFound - } - res := &models.FnCall{} - if err := json.Unmarshal(reply.([]byte), res); err != nil { - return nil, err - } - return res, nil -} - -func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { - res := models.FnCalls{} - - conn := ds.pool.Get() - defer conn.Close() - reply, err := conn.Do("HGETALL", "calls") - if err != nil { - return nil, err - } - - calls, err := redis.StringMap(reply, err) - if err != nil { - return nil, err - } - - for _, v := range calls { - var call models.FnCall - if err := json.Unmarshal([]byte(v), &call); err != nil { - return nil, err - } - if applyCallFilter(&call, filter) { - res = append(res, &call) - } - } - return res, nil - -} diff --git a/api/datastore/redis/redis_test.go b/api/datastore/redis/redis_test.go deleted file mode 100644 index 8ec62860f..000000000 --- a/api/datastore/redis/redis_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package redis - -import ( - "bytes" - "fmt" - "log" - "net/url" - "os" - "os/exec" - "strconv" - "testing" - "time" - - "github.com/garyburd/redigo/redis" - "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoretest" -) - -const tmpRedis = "redis://%s:%d/" - -var ( - redisHost = func() string { - host := os.Getenv("REDIS_HOST") - if host == "" { - host = "127.0.0.1" - } - return host - }() - redisPort = func() int { - port := os.Getenv("REDIS_PORT") - if port == "" { - port = "6301" - } - p, err := strconv.Atoi(port) - if err != nil { - panic(err) - } - return p - }() -) - -func prepareRedisTest(logf, fatalf func(string, ...interface{})) (func(), func()) { - timeout := time.After(20 * time.Second) - - for { - c, err := redis.DialURL(fmt.Sprintf(tmpRedis, redisHost, redisPort)) - if err == nil { - _, err = c.Do("PING") - c.Close() - if err == nil { - break - } - } - fmt.Println("failed to PING redis:", err) - select { - case <-timeout: - log.Fatal("timed out waiting for redis") - case <-time.After(500 * time.Millisecond): - continue - } - } - fmt.Println("redis for test ready") - return func() {}, - func() { - tryRun(logf, "stop redis container", exec.Command("docker", "rm", "-fv", "func-redis-test")) - } -} - -func TestDatastore(t *testing.T) { - _, close := prepareRedisTest(t.Logf, t.Fatalf) - defer close() - - u, err := url.Parse(fmt.Sprintf(tmpRedis, redisHost, redisPort)) - if err != nil { - t.Fatal("failed to parse url: ", err) - } - ds, err := New(u) - if err != nil { - t.Fatal("failed to create redis datastore:", err) - } - - datastoretest.Test(t, ds) -} - -func tryRun(logf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - logf("failed to %s: %s", desc, b.String()) - } -} - -func mustRun(fatalf func(string, ...interface{}), desc string, cmd *exec.Cmd) { - var b bytes.Buffer - cmd.Stderr = &b - if err := cmd.Run(); err != nil { - fatalf("failed to %s: %s", desc, b.String()) - } -} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go new file mode 100644 index 000000000..718bfbbda --- /dev/null +++ b/api/datastore/sql/sql.go @@ -0,0 +1,759 @@ +package sql + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + + "github.com/Sirupsen/logrus" + "github.com/go-sql-driver/mysql" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/lib/pq" + _ "github.com/lib/pq" + "github.com/mattn/go-sqlite3" + _ "github.com/mattn/go-sqlite3" + "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" + "gitlab-odx.oracle.com/odx/functions/api/models" +) + +// this aims to be an ANSI-SQL compliant package that uses only question +// mark syntax for var placement, leaning on sqlx to make compatible all +// queries to the actual underlying datastore. +// +// currently tested and working are postgres, mysql and sqlite3. + +var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( + app_name varchar(256) NOT NULL, + path varchar(256) NOT NULL, + image varchar(256) NOT NULL, + format varchar(16) NOT NULL, + memory int NOT NULL, + timeout int NOT NULL, + idle_timeout int NOT NULL, + type varchar(16) NOT NULL, + headers text NOT NULL, + config text NOT NULL, + PRIMARY KEY (app_name, path) +);`, + + `CREATE TABLE IF NOT EXISTS apps ( + name varchar(256) NOT NULL PRIMARY KEY, + config text NOT NULL +);`, + + `CREATE TABLE IF NOT EXISTS calls ( + created_at varchar(256) NOT NULL, + started_at varchar(256) NOT NULL, + completed_at varchar(256) NOT NULL, + status varchar(256) NOT NULL, + id varchar(256) NOT NULL, + app_name varchar(256) NOT NULL, + path varchar(256) NOT NULL, + PRIMARY KEY (id) +);`, + + `CREATE TABLE IF NOT EXISTS logs ( + id varchar(256) NOT NULL PRIMARY KEY, + log text NOT NULL +);`, +} + +const ( + routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes` + callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` +) + +type sqlStore struct { + db *sqlx.DB + + // TODO we should prepare all of the statements, rebind them + // and store them all here. +} + +// New will open the db specified by url, create any tables necessary +// and return a models.Datastore safe for concurrent usage. +func New(url *url.URL) (models.Datastore, error) { + driver := url.Scheme + + // driver must be one of these for sqlx to work, double check: + switch driver { + case "postgres", "pgx", "mysql", "sqlite3", "oci8", "ora", "goracle": + default: + return nil, errors.New("invalid db driver, refer to the code") + } + + if driver == "sqlite3" { + // make all the dirs so we can make the file.. + dir := filepath.Dir(url.Path) + err := os.MkdirAll(dir, 0755) + if err != nil { + return nil, err + } + } + + uri := url.String() + if driver != "postgres" { + // postgres seems to need this as a prefix in lib/pq, everyone else wants it stripped of scheme + uri = strings.TrimPrefix(url.String(), url.Scheme+"://") + } + + sqldb, err := sql.Open(driver, uri) + if err != nil { + logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db") + return nil, err + } + + db := sqlx.NewDb(sqldb, driver) + // force a connection and test that it worked + err = db.Ping() + if err != nil { + logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db") + return nil, err + } + + maxIdleConns := 30 // c.MaxIdleConnections + db.SetMaxIdleConns(maxIdleConns) + logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed") + + for _, v := range tables { + _, err = db.Exec(v) + if err != nil { + return nil, err + } + } + + sqlDatastore := &sqlStore{db: db} + return datastoreutil.NewValidator(sqlDatastore), nil +} + +func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { + var cbyte []byte + var err error + if app.Config != nil { + cbyte, err = json.Marshal(app.Config) + if err != nil { + return nil, err + } + } + + query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);") + _, err = ds.db.Exec(query, app.Name, string(cbyte)) + if err != nil { + switch err := err.(type) { + case *mysql.MySQLError: + if err.Number == 1062 { + return nil, models.ErrAppsAlreadyExists + } + case *pq.Error: + if err.Code == "23505" { + return nil, models.ErrAppsAlreadyExists + } + case sqlite3.Error: + if err.ExtendedCode == sqlite3.ErrConstraintUnique || err.ExtendedCode == sqlite3.ErrConstraintPrimaryKey { + return nil, models.ErrAppsAlreadyExists + } + } + return nil, err + } + + return app, nil +} + +func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { + app := &models.App{Name: newapp.Name} + err := ds.Tx(func(tx *sqlx.Tx) error { + query := tx.Rebind(`SELECT config FROM apps WHERE name=?`) + row := tx.QueryRow(query, app.Name) + + var config string + if err := row.Scan(&config); err != nil { + if err == sql.ErrNoRows { + return models.ErrAppsNotFound + } + return err + } + + if config != "" { + err := json.Unmarshal([]byte(config), &app.Config) + if err != nil { + return err + } + } + + app.UpdateConfig(newapp.Config) + + cbyte, err := json.Marshal(app.Config) + if err != nil { + return err + } + + query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`) + res, err := tx.Exec(query, string(cbyte), app.Name) + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return models.ErrAppsNotFound + } + return nil + }) + + if err != nil { + return nil, err + } + + return app, nil +} + +func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error { + query := ds.db.Rebind(`DELETE FROM apps WHERE name = ?`) + _, err := ds.db.Exec(query, appName) + return err +} + +func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) { + query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`) + row := ds.db.QueryRow(query, name) + + var resName, config string + err := row.Scan(&resName, &config) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrAppsNotFound + } + return nil, err + } + + res := &models.App{ + Name: resName, + } + + if len(config) > 0 { + err := json.Unmarshal([]byte(config), &res.Config) + if err != nil { + return nil, err + } + } + + return res, nil +} + +// GetApps retrieves an array of apps according to a specific filter. +func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { + res := []*models.App{} + query, args := buildFilterAppQuery(filter) + query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", query)) + rows, err := ds.db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var app models.App + err := scanApp(rows, &app) + + if err != nil { + if err == sql.ErrNoRows { + return res, nil + } + return res, err + } + res = append(res, &app) + } + + if err := rows.Err(); err != nil { + return res, err + } + return res, nil +} + +func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { + hbyte, err := json.Marshal(route.Headers) + if err != nil { + return nil, err + } + + cbyte, err := json.Marshal(route.Config) + if err != nil { + return nil, err + } + + err = ds.Tx(func(tx *sqlx.Tx) error { + query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`) + r := tx.QueryRow(query, route.AppName) + if err := r.Scan(new(int)); err != nil { + if err == sql.ErrNoRows { + return models.ErrAppsNotFound + } + } + query = tx.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`) + same, err := tx.Query(query, route.AppName, route.Path) + if err != nil { + return err + } + defer same.Close() + if same.Next() { + return models.ErrRoutesAlreadyExists + } + + query = tx.Rebind(`INSERT INTO routes ( + app_name, + path, + image, + format, + memory, + type, + timeout, + idle_timeout, + headers, + config + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`) + + _, err = tx.Exec(query, + route.AppName, + route.Path, + route.Image, + route.Format, + route.Memory, + route.Type, + route.Timeout, + route.IdleTimeout, + string(hbyte), + string(cbyte), + ) + + return err + }) + + return route, err +} + +func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { + var route models.Route + err := ds.Tx(func(tx *sqlx.Tx) error { + query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector)) + row := tx.QueryRow(query, newroute.AppName, newroute.Path) + if err := scanRoute(row, &route); err == sql.ErrNoRows { + return models.ErrRoutesNotFound + } else if err != nil { + return err + } + + route.Update(newroute) + + hbyte, err := json.Marshal(route.Headers) + if err != nil { + return err + } + + cbyte, err := json.Marshal(route.Config) + if err != nil { + return err + } + + query = tx.Rebind(`UPDATE routes SET + image = ?, + format = ?, + memory = ?, + type = ?, + timeout = ?, + idle_timeout = ?, + headers = ?, + config = ? + WHERE app_name=? AND path=?;`) + + res, err := tx.Exec(query, + route.Image, + route.Format, + route.Memory, + route.Type, + route.Timeout, + route.IdleTimeout, + string(hbyte), + string(cbyte), + route.AppName, + route.Path, + ) + + if err != nil { + return err + } + + if n, err := res.RowsAffected(); err != nil { + return err + } else if n == 0 { + return models.ErrRoutesNotFound + } + + return nil + }) + + if err != nil { + return nil, err + } + return &route, nil +} + +func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error { + query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`) + res, err := ds.db.Exec(query, routePath, appName) + if err != nil { + return err + } + + n, err := res.RowsAffected() + if err != nil { + return err + } + + if n == 0 { + return models.ErrRoutesRemoving + } + + return nil +} + +func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { + rSelectCondition := "%s WHERE app_name=? AND path=?" + query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector)) + row := ds.db.QueryRow(query, appName, routePath) + + var route models.Route + err := scanRoute(row, &route) + if err == sql.ErrNoRows { + return nil, models.ErrRoutesNotFound + } else if err != nil { + return nil, err + } + return &route, nil +} + +// GetRoutes retrieves an array of routes according to a specific filter. +func (ds *sqlStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { + res := []*models.Route{} + query, args := buildFilterRouteQuery(filter) + query = fmt.Sprintf("%s %s", routeSelector, query) + query = ds.db.Rebind(query) + rows, err := ds.db.Query(query, args...) + // todo: check for no rows so we don't respond with a sql 500 err + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var route models.Route + err := scanRoute(rows, &route) + if err != nil { + continue + } + res = append(res, &route) + + } + if err := rows.Err(); err != nil { + return nil, err + } + return res, nil +} + +/* +GetRoutesByApp retrieves a route with a specific app name. +*/ +func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { + res := []*models.Route{} + var filterQuery string + var args []interface{} + if filter == nil { + filterQuery = "WHERE app_name = ?" + args = []interface{}{appName} + } else { + filter.AppName = appName + filterQuery, args = buildFilterRouteQuery(filter) + } + + query := fmt.Sprintf("%s %s", routeSelector, filterQuery) + query = ds.db.Rebind(query) + rows, err := ds.db.Query(query, args...) + // todo: check for no rows so we don't respond with a sql 500 err + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var route models.Route + err := scanRoute(rows, &route) + if err != nil { + continue + } + res = append(res, &route) + + } + if err := rows.Err(); err != nil { + return nil, err + } + + return res, nil +} + +func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error { + tx, err := ds.db.Beginx() + if err != nil { + return err + } + err = f(tx) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} + +func (ds *sqlStore) InsertTask(ctx context.Context, task *models.Task) error { + query := ds.db.Rebind(`INSERT INTO calls ( + id, + created_at, + started_at, + completed_at, + status, + app_name, + path + ) + VALUES (?, ?, ?, ?, ?, ?, ?);`) + + _, err := ds.db.Exec(query, task.ID, task.CreatedAt.String(), + task.StartedAt.String(), task.CompletedAt.String(), + task.Status, task.AppName, task.Path) + if err != nil { + return err + } + + return nil +} + +func (ds *sqlStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + query := fmt.Sprintf(`%s WHERE id=?`, callSelector) + query = ds.db.Rebind(query) + row := ds.db.QueryRow(query, callID) + + var call models.FnCall + err := scanCall(row, &call) + if err != nil { + return nil, err + } + return &call, nil +} + +func (ds *sqlStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + res := models.FnCalls{} + query, args := buildFilterCallQuery(filter) + query = fmt.Sprintf("%s %s", callSelector, query) + query = ds.db.Rebind(query) + rows, err := ds.db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var call models.FnCall + err := scanCall(rows, &call) + if err != nil { + continue + } + res = append(res, &call) + } + if err := rows.Err(); err != nil { + return nil, err + } + return res, nil +} + +func (ds *sqlStore) InsertLog(ctx context.Context, callID, callLog string) error { + query := ds.db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`) + _, err := ds.db.Exec(query, callID, callLog) + return err +} + +func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.FnCallLog, error) { + query := ds.db.Rebind(`SELECT log FROM logs WHERE id=?`) + row := ds.db.QueryRow(query, callID) + + var log string + err := row.Scan(&log) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrCallLogNotFound + } + return nil, err + } + + return &models.FnCallLog{ + CallID: callID, + Log: log, + }, nil +} + +func (ds *sqlStore) DeleteLog(ctx context.Context, callID string) error { + query := ds.db.Rebind(`DELETE FROM logs WHERE id=?`) + _, err := ds.db.Exec(query, callID) + return err +} + +// TODO scrap for sqlx scanx ?? some things aren't perfect (e.g. config is a json string) +type RowScanner interface { + Scan(dest ...interface{}) error +} + +func ScanLog(scanner RowScanner, log *models.FnCallLog) error { + return scanner.Scan( + &log.CallID, + &log.Log, + ) +} + +func scanRoute(scanner RowScanner, route *models.Route) error { + var headerStr string + var configStr string + + err := scanner.Scan( + &route.AppName, + &route.Path, + &route.Image, + &route.Format, + &route.Memory, + &route.Type, + &route.Timeout, + &route.IdleTimeout, + &headerStr, + &configStr, + ) + if err != nil { + return err + } + + if len(headerStr) > 0 { + err = json.Unmarshal([]byte(headerStr), &route.Headers) + if err != nil { + return err + } + } + + if len(configStr) > 0 { + err = json.Unmarshal([]byte(configStr), &route.Config) + if err != nil { + return err + } + } + + return nil +} + +func scanApp(scanner RowScanner, app *models.App) error { + var configStr string + + err := scanner.Scan( + &app.Name, + &configStr, + ) + if err != nil { + return err + } + if len(configStr) > 0 { + err = json.Unmarshal([]byte(configStr), &app.Config) + if err != nil { + return err + } + } + + return nil +} + +func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) { + if filter == nil { + return "", nil + } + var b bytes.Buffer + var args []interface{} + + where := func(colOp, val string) { + if val != "" { + args = append(args, val) + if len(args) == 1 { + fmt.Fprintf(&b, `WHERE %s?`, colOp) + } else { + fmt.Fprintf(&b, ` AND %s?`, colOp) + } + } + } + + where("path=", filter.Path) + where("app_name=", filter.AppName) + where("image=", filter.Image) + + return b.String(), args +} + +func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}) { + if filter == nil || filter.Name == "" { + return "", nil + } + + return "WHERE name LIKE ?", []interface{}{filter.Name} +} + +func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) { + if filter == nil { + return "", nil + } + var b bytes.Buffer + var args []interface{} + + where := func(colOp, val string) { + if val != "" { + args = append(args, val) + if len(args) == 1 { + fmt.Fprintf(&b, `WHERE %s?`, colOp) + } else { + fmt.Fprintf(&b, ` AND %s?`, colOp) + } + } + } + + where("path=", filter.Path) + where("app_name=", filter.AppName) + + return b.String(), args +} + +func scanCall(scanner RowScanner, call *models.FnCall) error { + err := scanner.Scan( + &call.ID, + &call.CreatedAt, + &call.StartedAt, + &call.CompletedAt, + &call.Status, + &call.AppName, + &call.Path, + ) + + if err == sql.ErrNoRows { + return models.ErrCallNotFound + } else if err != nil { + return err + } + return nil +} diff --git a/api/logs/bolt.go b/api/logs/bolt.go deleted file mode 100644 index 46c5366b1..000000000 --- a/api/logs/bolt.go +++ /dev/null @@ -1,126 +0,0 @@ -package logs - -import ( - "encoding/json" - "net/url" - "os" - "path/filepath" - "time" - - "context" - - "github.com/Sirupsen/logrus" - "github.com/boltdb/bolt" - "gitlab-odx.oracle.com/odx/functions/api/models" -) - -type BoltLogDatastore struct { - callLogsBucket []byte - db *bolt.DB - log logrus.FieldLogger - datastore models.Datastore -} - -func NewBolt(url *url.URL) (models.FnLog, error) { - dir := filepath.Dir(url.Path) - log := logrus.WithFields(logrus.Fields{"logdb": url.Scheme, "dir": dir}) - err := os.MkdirAll(dir, 0755) - if err != nil { - log.WithError(err).Errorln("Could not create data directory for log.db") - return nil, err - } - log.WithFields(logrus.Fields{"path": url.Path}).Debug("Creating bolt log.db") - db, err := bolt.Open(url.Path, 0655, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - log.WithError(err).Errorln("Error on bolt.Open") - return nil, err - } - // I don't think we need a prefix here do we? Made it blank. If we do, we should call the query param "prefix" instead of bucket. - bucketPrefix := "" - if url.Query()["bucket"] != nil { - bucketPrefix = url.Query()["bucket"][0] - } - callLogsBucketName := []byte(bucketPrefix + "call_logs") - err = db.Update(func(tx *bolt.Tx) error { - for _, name := range [][]byte{callLogsBucketName} { - _, err := tx.CreateBucketIfNotExists(name) - if err != nil { - log.WithError(err).WithFields(logrus.Fields{"name": name}).Error("create bucket") - return err - } - } - return nil - }) - if err != nil { - log.WithError(err).Errorln("Error creating bolt buckets") - return nil, err - } - - fnl := &BoltLogDatastore{ - callLogsBucket: callLogsBucketName, - db: db, - log: log, - } - log.WithFields(logrus.Fields{"prefix": bucketPrefix, "file": url.Path}).Debug("BoltDB initialized") - - return NewValidator(fnl), nil -} - -func (fnl *BoltLogDatastore) InsertLog(ctx context.Context, callID string, callLog string) error { - log := &models.FnCallLog{ - CallID: callID, - Log: callLog, - } - id := []byte(callID) - err := fnl.db.Update( - func(tx *bolt.Tx) error { - bIm := tx.Bucket(fnl.callLogsBucket) - buf, err := json.Marshal(log) - if err != nil { - return err - } - err = bIm.Put(id, buf) - if err != nil { - return err - } - return nil - }) - - return err -} - -func (fnl *BoltLogDatastore) GetLog(ctx context.Context, callID string) (*models.FnCallLog, error) { - var res *models.FnCallLog - err := fnl.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(fnl.callLogsBucket) - v := b.Get([]byte(callID)) - if v != nil { - fnCall := &models.FnCallLog{} - err := json.Unmarshal(v, fnCall) - if err != nil { - return nil - } - res = fnCall - } else { - return models.ErrCallLogNotFound - } - return nil - }) - return res, err -} - -func (fnl *BoltLogDatastore) DeleteLog(ctx context.Context, callID string) error { - _, err := fnl.GetLog(ctx, callID) - //means object does not exist - if err != nil { - return nil - } - - id := []byte(callID) - err = fnl.db.Update(func(tx *bolt.Tx) error { - bIm := tx.Bucket(fnl.callLogsBucket) - err := bIm.Delete(id) - return err - }) - return err -} diff --git a/api/logs/bolt_test.go b/api/logs/bolt_test.go deleted file mode 100644 index 4ded5d5d3..000000000 --- a/api/logs/bolt_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package logs - -import ( - "net/url" - "os" - "testing" - - "gitlab-odx.oracle.com/odx/functions/api/datastore/bolt" - logTesting "gitlab-odx.oracle.com/odx/functions/api/logs/testing" -) - -const tmpLogDb = "/tmp/func_test_log.db" -const tmpDatastore = "/tmp/func_test_datastore.db" - -func TestDatastore(t *testing.T) { - os.Remove(tmpLogDb) - os.Remove(tmpDatastore) - uLog, err := url.Parse("bolt://" + tmpLogDb) - if err != nil { - t.Fatalf("failed to parse url: %v", err) - } - uDatastore, err := url.Parse("bolt://" + tmpDatastore) - - fnl, err := NewBolt(uLog) - if err != nil { - t.Fatalf("failed to create bolt log datastore: %v", err) - } - ds, err := bolt.New(uDatastore) - if err != nil { - t.Fatalf("failed to create bolt datastore: %v", err) - } - logTesting.Test(t, fnl, ds) -} diff --git a/api/logs/log.go b/api/logs/log.go index d9d075468..3d66b6058 100644 --- a/api/logs/log.go +++ b/api/logs/log.go @@ -2,9 +2,11 @@ package logs import ( "fmt" - "github.com/Sirupsen/logrus" - "gitlab-odx.oracle.com/odx/functions/api/models" "net/url" + + "github.com/Sirupsen/logrus" + "gitlab-odx.oracle.com/odx/functions/api/datastore/sql" + "gitlab-odx.oracle.com/odx/functions/api/models" ) func New(dbURL string) (models.FnLog, error) { @@ -12,10 +14,10 @@ func New(dbURL string) (models.FnLog, error) { if err != nil { logrus.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL") } - logrus.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating new datastore") + logrus.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store") switch u.Scheme { - case "bolt": - return NewBolt(u) + case "sqlite3", "postgres", "mysql": + return sql.New(u) default: return nil, fmt.Errorf("db type not supported %v", u.Scheme) } diff --git a/api/logs/log_test.go b/api/logs/log_test.go new file mode 100644 index 000000000..850441b5f --- /dev/null +++ b/api/logs/log_test.go @@ -0,0 +1,26 @@ +package logs + +import ( + "net/url" + "os" + "testing" + + "gitlab-odx.oracle.com/odx/functions/api/datastore/sql" + logTesting "gitlab-odx.oracle.com/odx/functions/api/logs/testing" +) + +const tmpLogDb = "/tmp/func_test_log.db" + +func TestDatastore(t *testing.T) { + os.Remove(tmpLogDb) + uLog, err := url.Parse("sqlite3://" + tmpLogDb) + if err != nil { + t.Fatalf("failed to parse url: %v", err) + } + + ds, err := sql.New(uLog) + if err != nil { + t.Fatalf("failed to create sqlite3 datastore: %v", err) + } + logTesting.Test(t, ds, ds) +} diff --git a/api/models/datastore.go b/api/models/datastore.go index 2db6d897a..53827e69f 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -6,7 +6,6 @@ import ( ) type Datastore interface { - // GetApp gets an App by name. // Returns ErrDatastoreEmptyAppName for empty appName. // Returns ErrAppsNotFound if no app is found. @@ -28,7 +27,7 @@ type Datastore interface { // RemoveApp removes the App named appName. Returns ErrDatastoreEmptyAppName if appName is empty. // Returns ErrAppsNotFound if an App is not found. - //TODO remove routes automatically? #528 + // TODO remove routes automatically? #528 RemoveApp(ctx context.Context, appName string) error // GetRoute looks up a matching Route for appName and the literal request route routePath. @@ -63,10 +62,8 @@ type Datastore interface { GetTask(ctx context.Context, callID string) (*FnCall, error) GetTasks(ctx context.Context, filter *CallFilter) (FnCalls, error) - // The following provide a generic key value store for arbitrary data, can be used by extensions to store extra data - // todo: should we namespace these by app? Then when an app is deleted, it can delete any of this extra data too. - Put(context.Context, []byte, []byte) error - Get(context.Context, []byte) ([]byte, error) + // Implement FnLog methods for convenience + FnLog } var ( diff --git a/api/models/logs.go b/api/models/logs.go index 04ebcfabc..74bb594f0 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -5,7 +5,15 @@ import ( ) type FnLog interface { + // InsertLog will insert the log at callID, overwriting if it previously + // existed. InsertLog(ctx context.Context, callID string, callLog string) error + + // GetLog will return the log at callID, an error will be returned if the log + // cannot be found. GetLog(ctx context.Context, callID string) (*FnCallLog, error) + + // DeleteLog will remove the log at callID, it will not return an error if + // the log does not exist before removal. DeleteLog(ctx context.Context, callID string) error } diff --git a/api/server/init.go b/api/server/init.go index bff20fb1b..9b745c9e4 100644 --- a/api/server/init.go +++ b/api/server/init.go @@ -23,8 +23,8 @@ func init() { viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.SetDefault(EnvLogLevel, "info") viper.SetDefault(EnvMQURL, fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) - viper.SetDefault(EnvDBURL, fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) - viper.SetDefault(EnvLOGDBURL, fmt.Sprintf("bolt://%s/data/log.db?bucket=funcs", cwd)) + viper.SetDefault(EnvDBURL, fmt.Sprintf("sqlite3://%s/data/fn.db", cwd)) + viper.SetDefault(EnvLOGDBURL, "") // default to just using DB url viper.SetDefault(EnvPort, 8080) viper.SetDefault(EnvAPIURL, fmt.Sprintf("http://127.0.0.1:%d", viper.GetInt(EnvPort))) viper.AutomaticEnv() // picks up env vars automatically diff --git a/api/server/server.go b/api/server/server.go index 1f7a9fe34..2e3f08016 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -70,9 +70,12 @@ func NewFromEnv(ctx context.Context) *Server { logrus.WithError(err).Fatal("Error initializing message queue.") } - logDB, err := logs.New(viper.GetString(EnvLOGDBURL)) - if err != nil { - logrus.WithError(err).Fatal("Error initializing logs store.") + var logDB models.FnLog = ds + if ldb := viper.GetString(EnvLOGDBURL); ldb != "" && ldb != viper.GetString(EnvDBURL) { + logDB, err = logs.New(viper.GetString(EnvLOGDBURL)) + if err != nil { + logrus.WithError(err).Fatal("Error initializing logs store.") + } } apiURL := viper.GetString(EnvAPIURL) diff --git a/api/server/server_test.go b/api/server/server_test.go index c1bf22a71..86a4884c8 100644 --- a/api/server/server_test.go +++ b/api/server/server_test.go @@ -13,15 +13,13 @@ import ( "github.com/gin-gonic/gin" "gitlab-odx.oracle.com/odx/functions/api/datastore" - "gitlab-odx.oracle.com/odx/functions/api/logs" "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/mqs" "gitlab-odx.oracle.com/odx/functions/api/runner" "gitlab-odx.oracle.com/odx/functions/api/server/internal/routecache" ) -var tmpDatastoreBolt = "/tmp/func_test_bolt_datastore.db" -var tmpLogBolt = "/tmp/func_test_bolt_log.db" +var tmpDatastoreTests = "/tmp/func_test_datastore.db" func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner) *Server { ctx := context.Background() @@ -82,28 +80,23 @@ func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) models.Error return errResp } -func prepareBolt(ctx context.Context, t *testing.T) (models.Datastore, models.FnLog, func()) { - os.Remove(tmpDatastoreBolt) - os.Remove(tmpLogBolt) - ds, err := datastore.New("bolt://" + tmpDatastoreBolt) +func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.FnLog, func()) { + os.Remove(tmpDatastoreTests) + ds, err := datastore.New("sqlite3://" + tmpDatastoreTests) if err != nil { t.Fatalf("Error when creating datastore: %s", err) } - logDB, err := logs.New("bolt://" + tmpLogBolt) - if err != nil { - t.Fatalf("Error when creating log store: %s", err) - } + logDB := ds return ds, logDB, func() { - os.Remove(tmpDatastoreBolt) - os.Remove(tmpLogBolt) + os.Remove(tmpDatastoreTests) } } func TestFullStack(t *testing.T) { ctx := context.Background() buf := setLogBuffer() - ds, logDB, closeBolt := prepareBolt(ctx, t) - defer closeBolt() + ds, logDB, close := prepareDB(ctx, t) + defer close() rnr, rnrcancel := testRunner(t) defer rnrcancel() diff --git a/api_test.sh b/api_test.sh index 202cb9563..3aaf5a280 100755 --- a/api_test.sh +++ b/api_test.sh @@ -1,9 +1,7 @@ set -ex - case "$1" in - - "bolt" ) + "sqlite3" ) docker rm -fv func-server || echo No prev func-server container docker run --name func-server --privileged -v /var/run/docker.sock:/var/run/docker.sock -d -e NO_PROXY -e HTTP_PROXY -e DOCKER_HOST=${DOCKER_HOST} -e LOG_LEVEL=debug -p 8080:8080 funcy/functions @@ -26,27 +24,13 @@ case "$1" in docker rm -fv func-postgres-test || echo No prev test db container docker rm -fv func-server || echo No prev func-server container - docker run --name func-postgres-test -e "POSTGRES_DB=funcs" -p 5432:5432 -d postgres + docker run --name func-postgres-test -e "POSTGRES_DB=funcs" -e "POSTGRES_PASSWORD=root" -p 5432:5432 -d postgres sleep 8 export POSTGRES_HOST="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' func-postgres-test)" export POSTGRES_PORT=5432 - docker run --name func-server --privileged -d -e NO_PROXY -e HTTP_PROXY -e DOCKER_HOST=${DOCKER_HOST} -e LOG_LEVEL=debug -e "DB_URL=postgres://postgres@${POSTGRES_HOST}:${POSTGRES_PORT}/funcs?sslmode=disable" -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock funcy/functions + docker run --name func-server --privileged -d -e NO_PROXY -e HTTP_PROXY -e DOCKER_HOST=${DOCKER_HOST} -e LOG_LEVEL=debug -e "DB_URL=postgres://postgres:root@${POSTGRES_HOST}:${POSTGRES_PORT}/funcs?sslmode=disable" -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock funcy/functions ;; - - "redis" ) - docker rm -fv func-redis-test|| echo No prev redis test db container - docker rm -fv func-server || echo No prev func-server container - - docker run --name func-redis-test -p 6379:6379 -d redis - sleep 8 - export REDIS_HOST="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' func-redis-test)" - export REDIS_PORT=6379 - docker run --name func-server --privileged -d -e NO_PROXY -e HTTP_PROXY -e DOCKER_HOST=${DOCKER_HOST} -e LOG_LEVEL=debug -e "DB_URL=redis://${REDIS_HOST}:${REDIS_PORT}/" -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock funcy/functions - - ;; - - esac case ${DOCKER_LOCATION:-localhost} in diff --git a/build.ps1 b/build.ps1 index 4267a1354..5fd40a214 100644 --- a/build.ps1 +++ b/build.ps1 @@ -26,7 +26,7 @@ function build () { } function run () { - docker run --rm --name functions -it -v /var/run/docker.sock:/var/run/docker.sock -e LOG_LEVEL=debug -e "DB_URL=bolt:///app/data/bolt.db" -v $PWD/data:/app/data -p 8080:8080 treeder/functions + docker run --rm --name functions -it -v /var/run/docker.sock:/var/run/docker.sock -e LOG_LEVEL=debug -e "DB_URL=sqlite3:///app/data/fn.db" -v $PWD/data:/app/data -p 8080:8080 treeder/functions } switch ($cmd) diff --git a/docs/operating/databases/README.md b/docs/operating/databases/README.md index 0b23275ee..809dd9c5c 100644 --- a/docs/operating/databases/README.md +++ b/docs/operating/databases/README.md @@ -7,14 +7,12 @@ We currently support the following databases and they are passed in via the `DB_ docker run -e "DB_URL=postgres://user:pass@localhost:6212/mydb" ... ``` -## [Bolt](https://github.com/boltdb/bolt) (default) +## sqlite3 (default) -URL: `bolt:///functions/data/functions.db` +URL: `sqlite3:///functions/data/functions.db` -Bolt is an embedded database which stores to disk. If you want to use this, be sure you don't lose the data directory by mounting -the directory on your host. eg: `docker run -v $PWD/data:/functions/data -e DB_URL=bolt:///functions/data/bolt.db ...` - -[More on BoltDB](boltdb.md) +SQLite3 is an embedded database which stores to disk. If you want to use this, be sure you don't lose the data directory by mounting +the directory on your host. eg: `docker run -v $PWD/data:/functions/data -e DB_URL=sqlite3:///functions/data/fn.db ...` ## [PostgreSQL](http://www.postgresql.org/) diff --git a/docs/operating/databases/boltdb.md b/docs/operating/databases/boltdb.md index e5a9dde21..0f447b11e 100644 --- a/docs/operating/databases/boltdb.md +++ b/docs/operating/databases/boltdb.md @@ -1,11 +1,11 @@ # Oracle Functions using BoltDB -BoltDB is the default database, you just need to run the API. +SQLite3 is the default database, you just need to run the API. ## Persistent To keep it persistent, add a volume flag to the command: ``` -docker run --rm -it --privileged -v $PWD/bolt.db:/app/bolt.db -p 8080:8080 treeder/functions +docker run --rm -it --privileged -v $PWD/fn.db:/app/fn.db -p 8080:8080 treeder/functions ``` diff --git a/docs/operating/logs/README.md b/docs/operating/logs/README.md index e2d66de5e..e6a75aaa9 100644 --- a/docs/operating/logs/README.md +++ b/docs/operating/logs/README.md @@ -1,19 +1,20 @@ - # Function logs -We currently support the following function logs stores and they are passed in via the `LOGSTORE_URL` environment variable. For example: -Maximum size of single log entry: 4Mb - +We currently support the following function logs stores and they are passed in +via the `LOGSTORE_URL` environment variable. For example: ```sh -docker run -e "LOGSTORE_URL=bolt:///functions/logs/bolt.db" ... +docker run -e "LOGSTORE_URL=sqlite3:///functions/logs/fn.db" ... ``` -## [Bolt](https://github.com/boltdb/bolt) (default) +settings `LOGSTORE_URL` to `DB_URL` will put logs in the same database as +other data, this is not recommended for production. -URL: `bolt:///functions/logs/bolt.db` +## sqlite3 (default) -Bolt is an embedded database which stores to disk. If you want to use this, be sure you don't lose the data directory by mounting -the directory on your host. eg: `docker run -v $PWD/data:/functions/data -e LOGSTORE_URL=bolt:///functions/data/bolt.db ...` +example URL: `sqlite3:///functions/logs/fn.db` -[More on BoltDB](../databases/boltdb.md) +sqlite3 is an embedded database which stores to disk. If you want to use this, be sure you don't lose the data directory by mounting +the directory on your host. eg: `docker run -v $PWD/data:/functions/data -e LOGSTORE_URL=sqlite3:///functions/data/fn.db ...` + +sqlite3 isn't recommended for production environments diff --git a/docs/operating/options.md b/docs/operating/options.md index 888653291..283a3fa91 100644 --- a/docs/operating/options.md +++ b/docs/operating/options.md @@ -11,7 +11,7 @@ docker run -e VAR_NAME=VALUE ... | Env Variables | Description | Default values | | --------------|-------------|----------------| -| DB_URL | The database URL to use in URL format. See [Databases](databases/README.md) for more information. | bolt:///app/data/bolt.db | +| DB_URL | The database URL to use in URL format. See [Databases](databases/README.md) for more information. | sqlite3:///app/data/fn.db | | MQ_URL | The message queue to use in URL format. See [Message Queues](mqs/README.md) for more information. | bolt:///app/data/worker_mq.db | | API_URL | The primary Oracle Functions API URL to that this instance will talk to. In a production environment, this would be your load balancer URL. | N/A | | PORT | Sets the port to run on | 8080 | diff --git a/fn/tests/calls_test.go b/fn/tests/calls_test.go index eb996954b..9cebfff8d 100644 --- a/fn/tests/calls_test.go +++ b/fn/tests/calls_test.go @@ -65,7 +65,7 @@ func TestCalls(t *testing.T) { t.Run("get-real-call", func(t *testing.T) { callID := CallAsync(t, u, &bytes.Buffer{}) - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) cfg := &call.GetCallsCallParams{ Call: callID, Context: s.Context, diff --git a/fn/tests/exec_test.go b/fn/tests/exec_test.go index 10a393841..2fad34e3d 100644 --- a/fn/tests/exec_test.go +++ b/fn/tests/exec_test.go @@ -204,7 +204,7 @@ func TestRouteExecutions(t *testing.T) { u.Path = path.Join(u.Path, "r", s.AppName, routePath) callID := CallAsync(t, u, &bytes.Buffer{}) - time.Sleep(5 * time.Second) + time.Sleep(7 * time.Second) cfg := &operations.GetCallsCallLogParams{ Call: callID, diff --git a/fn/tests/utils.go b/fn/tests/utils.go index d201a6375..73b107fad 100644 --- a/fn/tests/utils.go +++ b/fn/tests/utils.go @@ -52,7 +52,7 @@ func CheckAppResponseError(t *testing.T, err error) { case *apps.PostAppsDefault: msg := err.(*apps.PostAppsDefault).Payload.Error.Message - code := err.(*apps.DeleteAppsAppDefault).Code() + code := err.(*apps.PostAppsDefault).Code() t.Fatalf("Unexpected error occurred: %v. Status code: %v", msg, code) return diff --git a/glide.yaml b/glide.yaml index a5027530a..339f74c2d 100644 --- a/glide.yaml +++ b/glide.yaml @@ -69,5 +69,6 @@ import: subpackages: - bson - package: github.com/jmoiron/sqlx +- package: github.com/mattn/go-sqlite3 testImport: - package: github.com/vrischmann/envconfig diff --git a/test.sh b/test.sh index fbd49d9bf..9615e3466 100755 --- a/test.sh +++ b/test.sh @@ -6,8 +6,6 @@ docker rm -fv func-postgres-test || echo No prev test db container docker run --name func-postgres-test -p 15432:5432 -d postgres docker rm -fv func-mysql-test || echo No prev mysql test db container docker run --name func-mysql-test -p 3307:3306 -e MYSQL_DATABASE=funcs -e MYSQL_ROOT_PASSWORD=root -d mysql -docker rm -fv func-redis-test|| echo No prev redis test db container -docker run --name func-redis-test -p 6301:6379 -d redis sleep 5 case ${DOCKER_LOCATION:-localhost} in localhost) @@ -16,9 +14,6 @@ localhost) export MYSQL_HOST=localhost export MYSQL_PORT=3307 - - export REDIS_HOST=localhost - export REDIS_PORT=6301 ;; docker_ip) if [[ ! -z ${DOCKER_HOST} ]] @@ -30,9 +25,6 @@ docker_ip) export MYSQL_HOST=${DOCKER_IP:-localhost} export MYSQL_PORT=3307 - - export REDIS_HOST=${DOCKER_IP:-localhost} - export REDIS_PORT=6301 ;; container_ip) export POSTGRES_HOST="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' func-postgres-test)" @@ -40,9 +32,6 @@ container_ip) export MYSQL_HOST="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' func-mysql-test)" export MYSQL_PORT=3306 - - export REDIS_HOST="$(docker inspect -f '{{.NetworkSettings.IPAddress}}' func-redis-test)" - export REDIS_PORT=6379 ;; esac