diff --git a/.gitignore b/.gitignore index a8bce7ef9..e146a75ec 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ private.sh *.pem data/ .vscode/ +fn/func.yaml diff --git a/README.md b/README.md index 532075b73..ad903e4cc 100644 --- a/README.md +++ b/README.md @@ -158,4 +158,3 @@ For more information, see: [https://github.com/treeder/functions-ui](https://git ### Check out the [Tutorial Series](examples/tutorial/). It will demonstrate some of Oracle Functions capabilities through a series of exmaples. We'll try to show examples in most major languages. This is a great place to start! - diff --git a/api/const.go b/api/const.go index 3fdaec09c..31cfa0542 100644 --- a/api/const.go +++ b/api/const.go @@ -4,7 +4,7 @@ package api const ( AppName string = "app_name" Path string = "path" - + Call string = "call" // Short forms for API URLs CApp string = "app" CRoute string = "route" diff --git a/api/datastore/bolt/bolt.go b/api/datastore/bolt/bolt.go index 30c22d93e..c194cc042 100644 --- a/api/datastore/bolt/bolt.go +++ b/api/datastore/bolt/bolt.go @@ -23,6 +23,7 @@ type BoltDatastore struct { appsBucket []byte logsBucket []byte extrasBucket []byte + callsBucket []byte db *bolt.DB log logrus.FieldLogger } @@ -50,8 +51,9 @@ func New(url *url.URL) (models.Datastore, error) { appsBucketName := []byte(bucketPrefix + "apps") logsBucketName := []byte(bucketPrefix + "logs") extrasBucketName := []byte(bucketPrefix + "extras") // todo: think of a better name + callsBucketName := []byte(bucketPrefix + "calls") err = db.Update(func(tx *bolt.Tx) error { - for _, name := range [][]byte{routesBucketName, appsBucketName, logsBucketName, extrasBucketName} { + for _, name := range [][]byte{routesBucketName, appsBucketName, logsBucketName, extrasBucketName, callsBucketName} { _, err := tx.CreateBucketIfNotExists(name) if err != nil { log.WithError(err).WithFields(logrus.Fields{"name": name}).Error("create bucket") @@ -70,6 +72,7 @@ func New(url *url.URL) (models.Datastore, error) { appsBucket: appsBucketName, logsBucket: logsBucketName, extrasBucket: extrasBucketName, + callsBucket: callsBucketName, db: db, log: log, } @@ -78,6 +81,72 @@ func New(url *url.URL) (models.Datastore, error) { return datastoreutil.NewValidator(ds), nil } + +func (ds *BoltDatastore) InsertTask(ctx context.Context, task *models.Task) error { + var fnCall *models.FnCall + taskID := []byte(task.ID) + + err := ds.db.Update( + func(tx *bolt.Tx) error { + bIm := tx.Bucket(ds.callsBucket) + buf, err := json.Marshal(fnCall.FromTask(task)) + if err != nil { + return err + } + err = bIm.Put(taskID, buf) + if err != nil { + return err + } + return nil + }) + return err +} + +func (ds *BoltDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + res := models.FnCalls{} + err := ds.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(ds.callsBucket) + err2 := b.ForEach(func(key, v []byte) error { + call := &models.FnCall{} + err := json.Unmarshal(v, call) + if err != nil { + return err + } + if applyCallFilter(call, filter) { + res = append(res, call) + } + return nil + }) + if err2 != nil { + logrus.WithError(err2).Errorln("Couldn't get calls!") + } + return err2 + }) + return res, err +} + + +func (ds *BoltDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + var res *models.FnCall + err := ds.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(ds.callsBucket) + v := b.Get([]byte(callID)) + if v != nil { + fnCall := &models.FnCall{} + err := json.Unmarshal(v, fnCall) + if err != nil { + return nil + } + res = fnCall + } else { + return models.ErrCallNotFound + } + return nil + }) + return res, err +} + + func (ds *BoltDatastore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { appname := []byte(app.Name) @@ -99,7 +168,7 @@ func (ds *BoltDatastore) InsertApp(ctx context.Context, app *models.App) (*model return err } bjParent := tx.Bucket(ds.routesBucket) - _, err = bjParent.CreateBucketIfNotExists([]byte(app.Name)) + _, err = bjParent.CreateBucketIfNotExists(appname) if err != nil { return err } @@ -435,3 +504,7 @@ func applyRouteFilter(route *models.Route, filter *models.RouteFilter) bool { (filter.AppName == "" || route.AppName == filter.AppName) && (filter.Image == "" || route.Image == filter.Image) } + +func applyCallFilter(call *models.FnCall, filter *models.CallFilter) bool { + return filter == nil || (filter.AppName == call.AppName) && (filter.Path == call.Path) +} diff --git a/api/datastore/internal/datastoretest/test.go b/api/datastore/internal/datastoretest/test.go index 3a72b6c63..097742cc1 100644 --- a/api/datastore/internal/datastoretest/test.go +++ b/api/datastore/internal/datastoretest/test.go @@ -12,9 +12,12 @@ import ( "net/url" "os" "reflect" + "time" "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" + "github.com/go-openapi/strfmt" + "github.com/satori/go.uuid" ) func setLogBuffer() *bytes.Buffer { @@ -41,6 +44,50 @@ func Test(t *testing.T, ds models.Datastore) { ctx := context.Background() + task := &models.Task{} + task.CreatedAt = strfmt.DateTime(time.Now()) + task.Status = "success" + task.StartedAt = strfmt.DateTime(time.Now()) + task.CompletedAt = strfmt.DateTime(time.Now()) + task.AppName = testApp.Name + task.Path = testRoute.Path + + t.Run("call-insert", func(t *testing.T) { + task.ID = uuid.NewV4().String() + err := ds.InsertTask(ctx, task) + if err != nil { + t.Log(buf.String()) + t.Fatalf("Test InsertTask(ctx, &task): unexpected error `%v`", err) + } + }) + + t.Run("call-get", func(t *testing.T) { + task.ID = uuid.NewV4().String() + ds.InsertTask(ctx, task) + newTask, err := ds.GetTask(ctx, task.ID) + if err != nil { + t.Fatalf("Test GetTask(ctx, task.ID): unexpected error `%v`", err) + } + if task.ID != newTask.ID { + t.Log(buf.String()) + t.Fatalf("Test GetTask(ctx, task.ID): unexpected error `%v`", err) + } + }) + + t.Run("calls-get", func(t *testing.T) { + filter := &models.CallFilter{AppName: task.AppName, Path:task.Path} + task.ID = uuid.NewV4().String() + ds.InsertTask(ctx, task) + calls, err := ds.GetTasks(ctx, filter) + if err != nil { + t.Fatalf("Test GetTasks(ctx, filter): unexpected error `%v`", err) + } + if len(calls) == 0 { + t.Log(buf.String()) + t.Fatalf("Test GetTasks(ctx, filter): unexpected error `%v`", err) + } + }) + t.Run("apps", func(t *testing.T) { // Testing insert app _, err := ds.InsertApp(ctx, nil) diff --git a/api/datastore/internal/datastoreutil/shared.go b/api/datastore/internal/datastoreutil/shared.go new file mode 100644 index 000000000..f1c57747a --- /dev/null +++ b/api/datastore/internal/datastoreutil/shared.go @@ -0,0 +1,375 @@ +package datastoreutil + +import ( + "bytes" + "database/sql" + "encoding/json" + "github.com/Sirupsen/logrus" + "fmt" + "strings" + + "gitlab-odx.oracle.com/odx/functions/api/models" +) + +type RowScanner interface { + Scan(dest ...interface{}) error +} + + +func ScanRoute(scanner RowScanner, route *models.Route) error { + var headerStr string + var configStr string + + err := scanner.Scan( + &route.AppName, + &route.Path, + &route.Image, + &route.Format, + &route.MaxConcurrency, + &route.Memory, + &route.Type, + &route.Timeout, + &route.IdleTimeout, + &headerStr, + &configStr, + ) + if err != nil { + return err + } + + if len(headerStr) > 0 { + err = json.Unmarshal([]byte(headerStr), &route.Headers) + if err != nil { + return err + } + } + + if len(configStr) > 0 { + err = json.Unmarshal([]byte(configStr), &route.Config) + if err != nil { + return err + } + } + + return nil +} + +func ScanApp(scanner RowScanner, app *models.App) error { + var configStr string + + err := scanner.Scan( + &app.Name, + &configStr, + ) + if err != nil { + return err + } + if len(configStr) > 0 { + err = json.Unmarshal([]byte(configStr), &app.Config) + if err != nil { + return err + } + } + + return nil + +} + +func BuildFilterRouteQuery(filter *models.RouteFilter, whereStm, andStm string) (string, []interface{}) { + if filter == nil { + return "", nil + } + var b bytes.Buffer + var args []interface{} + + where := func(colOp, val string) { + if val != "" { + args = append(args, val) + if len(args) == 1 { + fmt.Fprintf(&b, whereStm, colOp) + } else { + //TODO: maybe better way to detect/driver SQL dialect-specific things + if strings.Contains(whereStm, "$") { + // PgSQL specific + fmt.Fprintf(&b, andStm, colOp, len(args)) + } else { + // MySQL specific + fmt.Fprintf(&b, andStm, colOp) + } + } + } + } + + where("path =", filter.Path) + where("app_name =", filter.AppName) + where("image =", filter.Image) + + return b.String(), args +} + + +func BuildFilterAppQuery(filter *models.AppFilter, whereStm string) (string, []interface{}) { + if filter == nil { + return "", nil + } + + if filter.Name != "" { + return whereStm, []interface{}{filter.Name} + } + + return "", nil +} + + +func BuildFilterCallQuery(filter *models.CallFilter, whereStm, andStm string) (string, []interface{}) { + if filter == nil { + return "", nil + } + var b bytes.Buffer + var args []interface{} + + where := func(colOp, val string) { + if val != "" { + args = append(args, val) + if len(args) == 1 { + fmt.Fprintf(&b, whereStm, colOp) + } else { + fmt.Fprintf(&b, andStm, colOp) + } + } + } + + where("path =", filter.Path) + where("app_name =", filter.AppName) + + return b.String(), args +} + +func ScanCall(scanner RowScanner, call *models.FnCall) error { + err := scanner.Scan( + &call.ID, + &call.CreatedAt, + &call.StartedAt, + &call.CompletedAt, + &call.Status, + &call.AppName, + &call.Path, + ) + + if err == sql.ErrNoRows { + return models.ErrCallNotFound + } else if err != nil { + return err + } + return nil + +} + +func SQLGetCall(db *sql.DB, callSelector, callID, whereStm string) (*models.FnCall, error) { + var call models.FnCall + row := db.QueryRow(fmt.Sprintf(whereStm, callSelector), callID) + err := ScanCall(row, &call) + if err != nil { + return nil, err + } + return &call, nil +} + +func SQLGetCalls(db *sql.DB, cSelector string, filter *models.CallFilter, whereStm, andStm string) (models.FnCalls, error) { + res := models.FnCalls{} + filterQuery, args := BuildFilterCallQuery(filter, whereStm, andStm) + rows, err := db.Query(fmt.Sprintf("%s %s", cSelector, filterQuery), args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var call models.FnCall + err := ScanCall(rows, &call) + if err != nil { + continue + } + res = append(res, &call) + } + if err := rows.Err(); err != nil { + return nil, err + } + return res, nil +} + +func SQLGetApp(db *sql.DB, queryStr string, queryArgs ...interface{}) (*models.App, error) { + row := db.QueryRow(queryStr, queryArgs...) + + var resName string + var config string + err := row.Scan(&resName, &config) + if err != nil { + if err == sql.ErrNoRows { + return nil, models.ErrAppsNotFound + } + return nil, err + } + + res := &models.App{ + Name: resName, + } + + if len(config) > 0 { + err := json.Unmarshal([]byte(config), &res.Config) + if err != nil { + return nil, err + } + } + + return res, nil +} + +func SQLGetApps(db *sql.DB, filter *models.AppFilter, whereStm, selectStm string) ([]*models.App, error) { + res := []*models.App{} + filterQuery, args := BuildFilterAppQuery(filter, whereStm) + rows, err := db.Query(fmt.Sprintf(selectStm, filterQuery), args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var app models.App + err := ScanApp(rows, &app) + + if err != nil { + if err == sql.ErrNoRows { + return res, nil + } + return res, err + } + res = append(res, &app) + } + + if err := rows.Err(); err != nil { + return res, err + } + return res, nil +} + +func NewDatastore(dataSourceName, dialect string, tables []string) (*sql.DB, error) { + db, err := sql.Open(dialect, dataSourceName) + if err != nil { + return nil, err + } + + err = db.Ping() + if err != nil { + return nil, err + } + + maxIdleConns := 30 // c.MaxIdleConnections + db.SetMaxIdleConns(maxIdleConns) + logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info( + fmt.Sprintf("%v datastore dialed", dialect)) + + + for _, v := range tables { + _, err = db.Exec(v) + if err != nil { + return nil, err + } + } + + return db, nil +} + +func SQLGetRoutes(db *sql.DB, filter *models.RouteFilter, rSelect string, whereStm, andStm string) ([]*models.Route, error) { + res := []*models.Route{} + filterQuery, args := BuildFilterRouteQuery(filter, whereStm, andStm) + rows, err := db.Query(fmt.Sprintf("%s %s", rSelect, filterQuery), args...) + // todo: check for no rows so we don't respond with a sql 500 err + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var route models.Route + err := ScanRoute(rows, &route) + if err != nil { + continue + } + res = append(res, &route) + + } + if err := rows.Err(); err != nil { + return nil, err + } + return res, nil + +} + +func SQLGetRoutesByApp(db *sql.DB, appName string, filter *models.RouteFilter, rSelect, defaultFilterQuery, whereStm, andStm string) ([]*models.Route, error) { + res := []*models.Route{} + var filterQuery string + var args []interface{} + if filter == nil { + filterQuery = defaultFilterQuery + args = []interface{}{appName} + } else { + filter.AppName = appName + filterQuery, args = BuildFilterRouteQuery(filter, whereStm, andStm) + } + rows, err := db.Query(fmt.Sprintf("%s %s", rSelect, filterQuery), args...) + // todo: check for no rows so we don't respond with a sql 500 err + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var route models.Route + err := ScanRoute(rows, &route) + if err != nil { + continue + } + res = append(res, &route) + + } + if err := rows.Err(); err != nil { + return nil, err + } + + return res, nil +} + +func SQLGetRoute(db *sql.DB, appName, routePath, rSelectCondition, routeSelector string) (*models.Route, error) { + var route models.Route + + row := db.QueryRow(fmt.Sprintf(rSelectCondition, routeSelector), appName, routePath) + err := ScanRoute(row, &route) + + if err == sql.ErrNoRows { + return nil, models.ErrRoutesNotFound + } else if err != nil { + return nil, err + } + return &route, nil +} + +func SQLRemoveRoute(db *sql.DB, appName, routePath, deleteStm string) error { + res, err := db.Exec(deleteStm, routePath, appName) + + if err != nil { + return err + } + + n, err := res.RowsAffected() + if err != nil { + return err + } + + if n == 0 { + return models.ErrRoutesRemoving + } + + return nil + +} \ No newline at end of file diff --git a/api/datastore/internal/datastoreutil/validator.go b/api/datastore/internal/datastoreutil/validator.go index 83b0757e3..e37e662d8 100644 --- a/api/datastore/internal/datastoreutil/validator.go +++ b/api/datastore/internal/datastoreutil/validator.go @@ -33,6 +33,10 @@ type Datastore interface { InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) UpdateRoute(ctx context.Context, route *models.Route) (*models.Route, error) + InsertTask(ctx context.Context, task *models.Task) error + GetTask(ctx context.Context, callID string) (*models.FnCall, error) + GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) + // key will never be nil/empty Put(ctx context.Context, key, val []byte) error Get(ctx context.Context, key []byte) ([]byte, error) @@ -164,4 +168,19 @@ func (v *validator) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, models.ErrDatastoreEmptyKey } return v.ds.Get(ctx, key) -} \ No newline at end of file +} + +func (v *validator) InsertTask(ctx context.Context, task *models.Task) error { + return v.ds.InsertTask(ctx, task) +} + +func (v *validator) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + if callID == "" { + return nil, models.ErrDatastoreEmptyTaskID + } + return v.ds.GetTask(ctx, callID) +} + +func (v *validator) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + return v.ds.GetTasks(ctx, filter) +} diff --git a/api/datastore/mock.go b/api/datastore/mock.go index a5a7a8185..63e4daf7c 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -8,23 +8,27 @@ import ( ) type mock struct { - Apps []*models.App - Routes []*models.Route + Apps models.Apps + Routes models.Routes + Calls models.FnCalls data map[string][]byte } func NewMock() models.Datastore { - return NewMockInit(nil, nil) + return NewMockInit(nil, nil, nil) } -func NewMockInit(apps []*models.App, routes []*models.Route) models.Datastore { +func NewMockInit(apps models.Apps, routes models.Routes, calls models.FnCalls) models.Datastore { if apps == nil { - apps = []*models.App{} + apps = models.Apps{} } if routes == nil { - routes = []*models.Route{} + routes = models.Routes{} } - return datastoreutil.NewValidator(&mock{apps, routes, make(map[string][]byte)}) + if calls == nil { + calls = models.FnCalls{} + } + return datastoreutil.NewValidator(&mock{apps, routes, calls, make(map[string][]byte)}) } func (m *mock) GetApp(ctx context.Context, appName string) (app *models.App, err error) { @@ -137,3 +141,23 @@ func (m *mock) Put(ctx context.Context, key, value []byte) error { func (m *mock) Get(ctx context.Context, key []byte) ([]byte, error) { return m.data[string(key)], nil } + +func (m *mock) InsertTask(ctx context.Context, task *models.Task) error { + var call *models.FnCall + m.Calls = append(m.Calls, call.FromTask(task)) + return nil +} + +func (m *mock) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + for _, t := range m.Calls { + if t.ID == callID { + return t, nil + } + } + + return nil, models.ErrCallNotFound +} + +func (m *mock) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + return m.Calls, nil +} diff --git a/api/datastore/mysql/mysql.go b/api/datastore/mysql/mysql.go index 85c11a70d..e6f50e565 100644 --- a/api/datastore/mysql/mysql.go +++ b/api/datastore/mysql/mysql.go @@ -1,14 +1,12 @@ package mysql import ( - "bytes" "context" "database/sql" "encoding/json" "fmt" "net/url" - "github.com/Sirupsen/logrus" "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql" "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" @@ -42,13 +40,19 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` -type rowScanner interface { - Scan(dest ...interface{}) error -} +const callTableCreate = `CREATE TABLE IF NOT EXISTS calls ( + created_at varchar(256) NOT NULL, + started_at varchar(256) NOT NULL, + completed_at varchar(256) NOT NULL, + status varchar(256) NOT NULL, + id varchar(256) NOT NULL, + app_name varchar(256) NOT NULL, + path varchar(256) NOT NULL, + PRIMARY KEY (id) +);` + +const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` -type rowQuerier interface { - QueryRow(query string, args ...interface{}) *sql.Row -} /* MySQLDatastore defines a basic MySQL Datastore struct. @@ -61,33 +65,19 @@ type MySQLDatastore struct { New creates a new MySQL Datastore. */ func New(url *url.URL) (models.Datastore, error) { - u := fmt.Sprintf("%s@%s%s", url.User.String(), url.Host, url.Path) - db, err := sql.Open("mysql", u) + tables := []string{routesTableCreate, appsTableCreate, extrasTableCreate, callTableCreate} + dialect := "mysql" + sqlDatastore := &MySQLDatastore{} + dataSourceName := fmt.Sprintf("%s@%s%s", url.User.String(), url.Host, url.Path) + + db, err := datastoreutil.NewDatastore(dataSourceName, dialect, tables) if err != nil { return nil, err } - err = db.Ping() - if err != nil { - return nil, err - } + sqlDatastore.db = db + return datastoreutil.NewValidator(sqlDatastore), nil - maxIdleConns := 30 - db.SetMaxIdleConns(maxIdleConns) - logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info("MySQL dialed") - - pg := &MySQLDatastore{ - db: db, - } - - for _, v := range []string{routesTableCreate, appsTableCreate, extrasTableCreate} { - _, err = db.Exec(v) - if err != nil { - return nil, err - } - } - - return datastoreutil.NewValidator(pg), nil } /* @@ -188,78 +178,26 @@ func (ds *MySQLDatastore) RemoveApp(ctx context.Context, appName string) error { WHERE name = ? `, appName) - if err != nil { - return err - } - - return nil + return err } /* GetApp retrieves an app from MySQL. */ func (ds *MySQLDatastore) GetApp(ctx context.Context, name string) (*models.App, error) { - row := ds.db.QueryRow(`SELECT name, config FROM apps WHERE name=?`, name) - - var resName string - var config string - err := row.Scan(&resName, &config) - - res := &models.App{ - Name: resName, - } - - json.Unmarshal([]byte(config), &res.Config) - - if err != nil { - if err == sql.ErrNoRows { - return nil, models.ErrAppsNotFound - } - return nil, err - } - - return res, nil -} - -func scanApp(scanner rowScanner, app *models.App) error { - var configStr string - - err := scanner.Scan( - &app.Name, - &configStr, - ) - - json.Unmarshal([]byte(configStr), &app.Config) - - return err + queryStr := `SELECT name, config FROM apps WHERE name=?` + queryArgs := []interface{}{name} + return datastoreutil.SQLGetApp(ds.db, queryStr, queryArgs...) } /* GetApps retrieves an array of apps according to a specific filter. */ func (ds *MySQLDatastore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - res := []*models.App{} - filterQuery, args := buildFilterAppQuery(filter) - rows, err := ds.db.Query(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() + whereStm := "WHERE name LIKE ?" + selectStm := "SELECT DISTINCT name, config FROM apps %s" - for rows.Next() { - var app models.App - err := scanApp(rows, &app) - - if err != nil { - break - } - res = append(res, &app) - } - - if err := rows.Err(); err != nil { - return res, err - } - return res, nil + return datastoreutil.SQLGetApps(ds.db, filter, whereStm, selectStm) } /* @@ -336,7 +274,7 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout var route models.Route err := ds.Tx(func(tx *sql.Tx) error { row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector), newroute.AppName, newroute.Path) - if err := scanRoute(row, &route); err == sql.ErrNoRows { + if err := datastoreutil.ScanRoute(row, &route); err == sql.ErrNoRows { return models.ErrRoutesNotFound } else if err != nil { return err @@ -402,174 +340,39 @@ func (ds *MySQLDatastore) UpdateRoute(ctx context.Context, newroute *models.Rout RemoveRoute removes an existing route on MySQL. */ func (ds *MySQLDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { - res, err := ds.db.Exec(` - DELETE FROM routes - WHERE path = ? AND app_name = ? - `, routePath, appName) + deleteStm := `DELETE FROM routes WHERE path = ? AND app_name = ?` - if err != nil { - return err - } - - n, err := res.RowsAffected() - if err != nil { - return err - } - - if n == 0 { - return models.ErrRoutesRemoving - } - - return nil -} - -func scanRoute(scanner rowScanner, route *models.Route) error { - var headerStr string - var configStr string - - err := scanner.Scan( - &route.AppName, - &route.Path, - &route.Image, - &route.Format, - &route.MaxConcurrency, - &route.Memory, - &route.Type, - &route.Timeout, - &route.IdleTimeout, - &headerStr, - &configStr, - ) - if err != nil { - return err - } - - if headerStr == "" { - return models.ErrRoutesNotFound - } - - if err := json.Unmarshal([]byte(headerStr), &route.Headers); err != nil { - return err - } - return json.Unmarshal([]byte(configStr), &route.Config) + return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } /* GetRoute retrieves a route from MySQL. */ func (ds *MySQLDatastore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - var route models.Route + rSelectCondition := "%s WHERE app_name=? AND path=?" - row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector), appName, routePath) - err := scanRoute(row, &route) - - if err == sql.ErrNoRows { - return nil, models.ErrRoutesNotFound - } else if err != nil { - return nil, err - } - return &route, nil + return datastoreutil.SQLGetRoute(ds.db, appName, routePath, rSelectCondition, routeSelector) } /* GetRoutes retrieves an array of routes according to a specific filter. */ func (ds *MySQLDatastore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} - filterQuery, args := buildFilterRouteQuery(filter) - rows, err := ds.db.Query(fmt.Sprintf("%s %s", routeSelector, filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() + whereStm := "WHERE %s ?" + andStm := " AND %s ?" - for rows.Next() { - var route models.Route - err := scanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - return res, nil + return datastoreutil.SQLGetRoutes(ds.db, filter, routeSelector, whereStm, andStm) } /* GetRoutesByApp retrieves a route with a specific app name. */ func (ds *MySQLDatastore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} + whereStm := "WHERE %s ?" + andStm := " AND %s ?" + defaultFilterQuery := "WHERE app_name = ?" - var filterQuery string - var args []interface{} - if filter == nil { - filterQuery = "WHERE app_name = ?" - args = []interface{}{appName} - } else { - filter.AppName = appName - filterQuery, args = buildFilterRouteQuery(filter) - } - rows, err := ds.db.Query(fmt.Sprintf("%s %s", routeSelector, filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var route models.Route - err := scanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - - return res, nil -} - -func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}) { - if filter == nil { - return "", nil - } - - if filter.Name != "" { - return "WHERE name LIKE ?", []interface{}{filter.Name} - } - - return "", nil -} - -func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) { - if filter == nil { - return "", nil - } - var b bytes.Buffer - var args []interface{} - - where := func(colOp, val string) { - if val != "" { - args = append(args, val) - if len(args) == 1 { - fmt.Fprintf(&b, "WHERE %s ?", colOp) - } else { - fmt.Fprintf(&b, " AND %s ?", colOp) - } - } - } - - where("path =", filter.Path) - where("app_name =", filter.AppName) - where("image =", filter.Image) - - return b.String(), args + return datastoreutil.SQLGetRoutesByApp(ds.db, appName, filter, routeSelector, defaultFilterQuery, whereStm, andStm) } /* @@ -625,3 +428,32 @@ func (ds *MySQLDatastore) Tx(f func(*sql.Tx) error) error { } return tx.Commit() } + +func (ds * MySQLDatastore) InsertTask(ctx context.Context, task *models.Task) error { + stmt, err := ds.db.Prepare("INSERT calls SET id=?,created_at=?,started_at=?,completed_at=?,status=?,app_name=?,path=?") + if err != nil { + return err + } + _, err = stmt.Exec(task.ID, task.CreatedAt.String(), + task.StartedAt.String(), task.CompletedAt.String(), + task.Status, task.AppName, task.Path) + + if err != nil { + return err + } + + return nil +} + +func (ds * MySQLDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + whereStm := "%s WHERE id=?" + + return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) +} + +func (ds * MySQLDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + whereStm := "WHERE %s ?" + andStm := " AND %s ?" + + return datastoreutil.SQLGetCalls(ds.db, callSelector, filter, whereStm, andStm) +} diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go index 25d210b1c..f598ea4d7 100644 --- a/api/datastore/postgres/postgres.go +++ b/api/datastore/postgres/postgres.go @@ -8,8 +8,6 @@ import ( "context" - "bytes" - "github.com/Sirupsen/logrus" "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" "gitlab-odx.oracle.com/odx/functions/api/models" "github.com/lib/pq" @@ -44,41 +42,35 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( const routeSelector = `SELECT app_name, path, image, format, maxc, memory, type, timeout, idle_timeout, headers, config FROM routes` -type rowScanner interface { - Scan(dest ...interface{}) error -} +const callsTableCreate = `CREATE TABLE IF NOT EXISTS calls ( + created_at character varying(256) NOT NULL, + started_at character varying(256) NOT NULL, + completed_at character varying(256) NOT NULL, + status character varying(256) NOT NULL, + id character varying(256) NOT NULL, + app_name character varying(256) NOT NULL, + path character varying(256) NOT NULL, + PRIMARY KEY (id) +);` + +const callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls` type PostgresDatastore struct { db *sql.DB } func New(url *url.URL) (models.Datastore, error) { - db, err := sql.Open("postgres", url.String()) + tables := []string{routesTableCreate, appsTableCreate, extrasTableCreate, callsTableCreate} + sqlDatastore := &PostgresDatastore{} + dialect := "postgres" + + db, err := datastoreutil.NewDatastore(url.String(), dialect, tables) if err != nil { return nil, err } - err = db.Ping() - if err != nil { - return nil, err - } - - maxIdleConns := 30 // c.MaxIdleConnections - db.SetMaxIdleConns(maxIdleConns) - logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info("Postgres dialed") - - pg := &PostgresDatastore{ - db: db, - } - - for _, v := range []string{routesTableCreate, appsTableCreate, extrasTableCreate} { - _, err = db.Exec(v) - if err != nil { - return nil, err - } - } - - return datastoreutil.NewValidator(pg), nil + sqlDatastore.db = db + return datastoreutil.NewValidator(sqlDatastore), nil } func (ds *PostgresDatastore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { @@ -156,93 +148,23 @@ func (ds *PostgresDatastore) UpdateApp(ctx context.Context, newapp *models.App) } func (ds *PostgresDatastore) RemoveApp(ctx context.Context, appName string) error { - _, err := ds.db.Exec(` - DELETE FROM apps - WHERE name = $1 - `, appName) + _, err := ds.db.Exec(`DELETE FROM apps WHERE name = $1`, appName) - if err != nil { - return err - } - - return nil + return err } func (ds *PostgresDatastore) GetApp(ctx context.Context, name string) (*models.App, error) { - row := ds.db.QueryRow("SELECT name, config FROM apps WHERE name=$1", name) + queryStr := "SELECT name, config FROM apps WHERE name=$1" + queryArgs := []interface{}{name} - var resName string - var config string - err := row.Scan(&resName, &config) - if err != nil { - if err == sql.ErrNoRows { - return nil, models.ErrAppsNotFound - } - return nil, err - } - - res := &models.App{ - Name: resName, - } - - if len(config) > 0 { - err := json.Unmarshal([]byte(config), &res.Config) - if err != nil { - return nil, err - } - } - - return res, nil -} - -func scanApp(scanner rowScanner, app *models.App) error { - var configStr string - - err := scanner.Scan( - &app.Name, - &configStr, - ) - if err != nil { - return err - } - - if len(configStr) > 0 { - err = json.Unmarshal([]byte(configStr), &app.Config) - if err != nil { - return err - } - } - - return nil + return datastoreutil.SQLGetApp(ds.db, queryStr, queryArgs...) } func (ds *PostgresDatastore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { - res := []*models.App{} + whereStm := "WHERE name LIKE $1" + selectStm := "SELECT DISTINCT * FROM apps %s" - filterQuery, args := buildFilterAppQuery(filter) - rows, err := ds.db.Query(fmt.Sprintf("SELECT DISTINCT * FROM apps %s", filterQuery), args...) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var app models.App - err := scanApp(rows, &app) - - if err != nil { - if err == sql.ErrNoRows { - return res, nil - } - return res, err - } - res = append(res, &app) - } - - if err := rows.Err(); err != nil { - return res, err - } - return res, nil + return datastoreutil.SQLGetApps(ds.db, filter, whereStm, selectStm) } func (ds *PostgresDatastore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { @@ -315,7 +237,7 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R var route models.Route err := ds.Tx(func(tx *sql.Tx) error { row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=$1 AND path=$2", routeSelector), newroute.AppName, newroute.Path) - if err := scanRoute(row, &route); err == sql.ErrNoRows { + if err := datastoreutil.ScanRoute(row, &route); err == sql.ErrNoRows { return models.ErrRoutesNotFound } else if err != nil { return err @@ -378,173 +300,30 @@ func (ds *PostgresDatastore) UpdateRoute(ctx context.Context, newroute *models.R } func (ds *PostgresDatastore) RemoveRoute(ctx context.Context, appName, routePath string) error { - res, err := ds.db.Exec(` - DELETE FROM routes - WHERE path = $1 AND app_name = $2 - `, routePath, appName) + deleteStm := `DELETE FROM routes WHERE path = $1 AND app_name = $2` - if err != nil { - return err - } - - n, err := res.RowsAffected() - if err != nil { - return err - } - - if n == 0 { - return models.ErrRoutesRemoving - } - - return nil -} - -func scanRoute(scanner rowScanner, route *models.Route) error { - var headerStr string - var configStr string - - err := scanner.Scan( - &route.AppName, - &route.Path, - &route.Image, - &route.Format, - &route.MaxConcurrency, - &route.Memory, - &route.Type, - &route.Timeout, - &route.IdleTimeout, - &headerStr, - &configStr, - ) - if err != nil { - return err - } - - if len(headerStr) > 0 { - err = json.Unmarshal([]byte(headerStr), &route.Headers) - if err != nil { - return err - } - } - - if len(configStr) > 0 { - err = json.Unmarshal([]byte(configStr), &route.Config) - if err != nil { - return err - } - } - - return nil + return datastoreutil.SQLRemoveRoute(ds.db, appName, routePath, deleteStm) } func (ds *PostgresDatastore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - var route models.Route + rSelectCondition := "%s WHERE app_name=$1 AND path=$2" - row := ds.db.QueryRow(fmt.Sprintf("%s WHERE app_name=$1 AND path=$2", routeSelector), appName, routePath) - err := scanRoute(row, &route) - - if err == sql.ErrNoRows { - return nil, models.ErrRoutesNotFound - } else if err != nil { - return nil, err - } - return &route, nil + return datastoreutil.SQLGetRoute(ds.db, appName, routePath, rSelectCondition, routeSelector) } func (ds *PostgresDatastore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} - filterQuery, args := buildFilterRouteQuery(filter) - rows, err := ds.db.Query(fmt.Sprintf("%s %s", routeSelector, filterQuery), args...) - // todo: check for no rows so we don't respond with a sql 500 err - if err != nil { - return nil, err - } - defer rows.Close() + whereStm := "WHERE %s $1" + andStm := " AND %s $%d" - for rows.Next() { - var route models.Route - err := scanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - return res, nil + return datastoreutil.SQLGetRoutes(ds.db, filter, routeSelector, whereStm, andStm) } func (ds *PostgresDatastore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) { - res := []*models.Route{} + defaultFilterQuery := "WHERE app_name = $1" + whereStm := "WHERE %s $1" + andStm := " AND %s $%d" - var filterQuery string - var args []interface{} - if filter == nil { - filterQuery = "WHERE app_name = $1" - args = []interface{}{appName} - } else { - filter.AppName = appName - filterQuery, args = buildFilterRouteQuery(filter) - } - rows, err := ds.db.Query(fmt.Sprintf("%s %s", routeSelector, filterQuery), args...) - // todo: check for no rows so we don't respond with a sql 500 err - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var route models.Route - err := scanRoute(rows, &route) - if err != nil { - continue - } - res = append(res, &route) - - } - if err := rows.Err(); err != nil { - return nil, err - } - - return res, nil -} -func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}) { - if filter == nil { - return "", nil - } - - if filter.Name != "" { - return "WHERE name LIKE $1", []interface{}{filter.Name} - } - - return "", nil -} - -func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) { - if filter == nil { - return "", nil - } - var b bytes.Buffer - var args []interface{} - - where := func(colOp, val string) { - if val != "" { - args = append(args, val) - if len(args) == 1 { - fmt.Fprintf(&b, "WHERE %s $1", colOp) - } else { - fmt.Fprintf(&b, " AND %s $%d", colOp, len(args)) - } - } - } - - where("path =", filter.Path) - where("app_name =", filter.AppName) - where("image =", filter.Image) - - return b.String(), args + return datastoreutil.SQLGetRoutesByApp(ds.db, appName, filter, routeSelector, defaultFilterQuery, whereStm, andStm) } func (ds *PostgresDatastore) Put(ctx context.Context, key, value []byte) error { @@ -591,3 +370,40 @@ func (ds *PostgresDatastore) Tx(f func(*sql.Tx) error) error { } return tx.Commit() } + +func (ds *PostgresDatastore) InsertTask(ctx context.Context, task *models.Task) error { + err := ds.Tx(func(tx *sql.Tx) error { + _, err := tx.Exec( + `INSERT INTO calls ( + id, + created_at, + started_at, + completed_at, + status, + app_name, + path) VALUES ($1, $2, $3, $4, $5, $6, $7);`, + task.ID, + task.CreatedAt.String(), + task.StartedAt.String(), + task.CompletedAt.String(), + task.Status, + task.AppName, + task.Path, + ) + return err + }) + return err +} + +func (ds *PostgresDatastore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + whereStm := "%s WHERE id=$1" + + return datastoreutil.SQLGetCall(ds.db, callSelector, callID, whereStm) +} + +func (ds *PostgresDatastore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + whereStm := "WHERE %s $1" + andStm := " AND %s $2" + + return datastoreutil.SQLGetCalls(ds.db, callSelector, filter, whereStm, andStm) +} diff --git a/api/datastore/redis/redis.go b/api/datastore/redis/redis.go index ff9f216d4..5e7c67ed5 100644 --- a/api/datastore/redis/redis.go +++ b/api/datastore/redis/redis.go @@ -310,3 +310,65 @@ func applyRouteFilter(route *models.Route, filter *models.RouteFilter) bool { (filter.AppName == "" || route.AppName == filter.AppName) && (filter.Image == "" || route.Image == filter.Image) } + +func applyCallFilter(call *models.FnCall, filter *models.CallFilter) bool { + return filter == nil || (filter.Path == "" || call.Path == filter.Path) && + (filter.AppName == "" || call.AppName == filter.AppName) +} + +func (ds *RedisDataStore) InsertTask(ctx context.Context, task *models.Task) error { + _, err := ds.conn.Do("HEXISTS", "calls", task.ID) + if err != nil { + return err + } + + taskBytes, err := json.Marshal(task) + if err != nil { + return err + } + + if _, err := ds.conn.Do("HSET", "calls", task.ID, taskBytes); err != nil { + return err + } + return nil +} + +func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { + reply, err := ds.conn.Do("HGET", "calls", callID) + if err != nil { + return nil, err + } else if reply == nil { + return nil, models.ErrCallNotFound + } + res := &models.FnCall{} + if err := json.Unmarshal(reply.([]byte), res); err != nil { + return nil, err + } + return res, nil +} + +func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { + res := models.FnCalls{} + + reply, err := ds.conn.Do("HGETALL", "calls") + if err != nil { + return nil, err + } + + calls, err := redis.StringMap(reply, err) + if err != nil { + return nil, err + } + + for _, v := range calls { + var call models.FnCall + if err := json.Unmarshal([]byte(v), &call); err != nil { + return nil, err + } + if applyCallFilter(&call, filter) { + res = append(res, &call) + } + } + return res, nil + +} diff --git a/api/models/app.go b/api/models/app.go index 4bb8e4e90..7c4de7d58 100644 --- a/api/models/app.go +++ b/api/models/app.go @@ -6,20 +6,24 @@ import ( ) type Apps []*App +type Tasks []*Task +type FnCalls []*FnCall var ( - ErrAppsAlreadyExists = errors.New("App already exists") - ErrAppsCreate = errors.New("Could not create app") - ErrAppsGet = errors.New("Could not get app from datastore") - ErrAppsList = errors.New("Could not list apps from datastore") - ErrAppsMissingNew = errors.New("Missing new application") - ErrAppsNameImmutable = errors.New("Could not update app - name is immutable") - ErrAppsNotFound = errors.New("App not found") - ErrAppsNothingToUpdate = errors.New("Nothing to update") - ErrAppsRemoving = errors.New("Could not remove app from datastore") - ErrAppsUpdate = errors.New("Could not update app") - ErrDeleteAppsWithRoutes = errors.New("Cannot remove apps with routes") - ErrUsableImage = errors.New("Image not found") + ErrAppsAlreadyExists = errors.New("App already exists") + ErrAppsCreate = errors.New("Could not create app") + ErrAppsGet = errors.New("Could not get app from datastore") + ErrAppsList = errors.New("Could not list apps from datastore") + ErrAppsMissingNew = errors.New("Missing new application") + ErrAppsNameImmutable = errors.New("Could not update app - name is immutable") + ErrAppsNotFound = errors.New("App not found") + ErrAppsNothingToUpdate = errors.New("Nothing to update") + ErrAppsRemoving = errors.New("Could not remove app from datastore") + ErrAppsUpdate = errors.New("Could not update app") + ErrDeleteAppsWithRoutes = errors.New("Cannot remove apps with routes") + ErrUsableImage = errors.New("Image not found") + ErrCallNotFound = errors.New("Call not found") + ErrTaskInvalidAppAndRoute = errors.New("Unable to get call for given app and route") ) type App struct { diff --git a/api/models/datastore.go b/api/models/datastore.go index ef28b1a31..2db6d897a 100644 --- a/api/models/datastore.go +++ b/api/models/datastore.go @@ -58,6 +58,11 @@ type Datastore interface { // ErrDatastoreEmptyRoutePath when routePath is empty. Returns ErrRoutesNotFound when no route exists. RemoveRoute(ctx context.Context, appName, routePath string) error + // InsertTask inserts a task + InsertTask(ctx context.Context, task *Task) error + GetTask(ctx context.Context, callID string) (*FnCall, error) + GetTasks(ctx context.Context, filter *CallFilter) (FnCalls, error) + // The following provide a generic key value store for arbitrary data, can be used by extensions to store extra data // todo: should we namespace these by app? Then when an app is deleted, it can delete any of this extra data too. Put(context.Context, []byte, []byte) error @@ -70,4 +75,5 @@ var ( ErrDatastoreEmptyApp = errors.New("Missing app") ErrDatastoreEmptyRoute = errors.New("Missing route") ErrDatastoreEmptyKey = errors.New("Missing key") + ErrDatastoreEmptyTaskID = errors.New("Missing task ID") ) diff --git a/api/models/task.go b/api/models/task.go index 2117147db..164557b44 100644 --- a/api/models/task.go +++ b/api/models/task.go @@ -28,6 +28,30 @@ const ( FormatHTTP = "http" ) +type FnCall struct { + IDStatus + CompletedAt strfmt.DateTime `json:"completed_at,omitempty"` + CreatedAt strfmt.DateTime `json:"created_at,omitempty"` + StartedAt strfmt.DateTime `json:"started_at,omitempty"` + AppName string `json:"app_name,omitempty"` + Path string `json:"path"` + +} + +func (fnCall *FnCall) FromTask(task *Task) *FnCall { + return &FnCall{ + CreatedAt:task.CreatedAt, + StartedAt:task.StartedAt, + CompletedAt:task.CompletedAt, + AppName:task.AppName, + Path:task.Path, + IDStatus: IDStatus{ + ID:task.ID, + Status:task.Status, + }, + } +} + /*Task task swagger:model Task @@ -151,3 +175,8 @@ func (m *Task) validateReason(formats strfmt.Registry) error { return nil } + +type CallFilter struct { + Path string + AppName string +} diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index a8f0e08cb..80aeb2cb5 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -88,14 +88,14 @@ func deleteTask(url string, task *models.Task) error { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner) { +func RunAsyncRunner(ctx context.Context, tasksrv string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { u := tasksrvURL(tasksrv) - startAsyncRunners(ctx, u, tasks, rnr) + startAsyncRunners(ctx, u, tasks, rnr, ds) <-ctx.Done() } -func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner) { +func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, rnr *Runner, ds models.Datastore) { var wg sync.WaitGroup ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"runner": "async"}) for { @@ -129,23 +129,24 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request, log.Debug("Running task:", task.ID) wg.Add(1) + go func() { defer wg.Done() // Process Task - if _, err := RunTask(tasks, ctx, getCfg(task)); err != nil { + _, err := RunTrackedTask(task, tasks, ctx, getCfg(task), ds) + if err != nil { log.WithError(err).Error("Cannot run task") } + log.Debug("Processed task") }() - log.Debug("Processed task") - // Delete task from queue if err := deleteTask(url, task); err != nil { log.WithError(err).Error("Cannot delete task") continue } - log.Info("Task complete") + log.Info("Task complete") } } } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 0df988090..5d305acf4 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -18,6 +18,8 @@ import ( "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/mqs" "gitlab-odx.oracle.com/odx/functions/api/runner/task" + "gitlab-odx.oracle.com/odx/functions/api/datastore" + "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" ) func setLogBuffer() *bytes.Buffer { @@ -198,6 +200,15 @@ func testRunner(t *testing.T) (*Runner, context.CancelFunc) { return r, cancel } +type RunResult struct { + drivers.RunResult +} + +func (r RunResult) Status() string { + return "success" +} + + func TestAsyncRunnersGracefulShutdown(t *testing.T) { buf := setLogBuffer() mockTask := getMockTask() @@ -211,16 +222,15 @@ func TestAsyncRunnersGracefulShutdown(t *testing.T) { go func() { for t := range tasks { t.Response <- task.Response{ - Result: nil, + Result: RunResult{}, Err: nil, } } }() - rnr, cancel := testRunner(t) defer cancel() - startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr) + startAsyncRunners(ctx, ts.URL+"/tasks", tasks, rnr, datastore.NewMock()) if err := ctx.Err(); err != context.DeadlineExceeded { t.Log(buf.String()) diff --git a/api/runner/drivers/docker/docker.go b/api/runner/drivers/docker/docker.go index 47dc540f9..8dd3569e7 100644 --- a/api/runner/drivers/docker/docker.go +++ b/api/runner/drivers/docker/docker.go @@ -216,7 +216,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests // task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe cmd = strings.Fields(task.Command()) - logrus.WithFields(logrus.Fields{"task_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") + logrus.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") } envvars := make([]string, 0, len(task.EnvVars())) @@ -251,11 +251,11 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask container.Config.Volumes[containerDir] = struct{}{} mapn := fmt.Sprintf("%s:%s", hostDir, containerDir) container.HostConfig.Binds = append(container.HostConfig.Binds, mapn) - logrus.WithFields(logrus.Fields{"volumes": mapn, "task_id": task.Id()}).Debug("setting volumes") + logrus.WithFields(logrus.Fields{"volumes": mapn, "call_id": task.Id()}).Debug("setting volumes") } if wd := task.WorkDir(); wd != "" { - logrus.WithFields(logrus.Fields{"wd": wd, "task_id": task.Id()}).Debug("setting work dir") + logrus.WithFields(logrus.Fields{"wd": wd, "call_id": task.Id()}).Debug("setting work dir") container.Config.WorkingDir = wd } @@ -270,7 +270,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask if err != nil { // since we retry under the hood, if the container gets created and retry fails, we can just ignore error if err != docker.ErrContainerAlreadyExists { - logrus.WithFields(logrus.Fields{"task_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory, + logrus.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory, "cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name, "image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": containerName, }).WithError(err).Error("Could not create container") @@ -473,7 +473,7 @@ func (drv *DockerDriver) collectStats(ctx context.Context, container string, tas }) if err != nil && err != io.ErrClosedPipe { - logrus.WithError(err).WithFields(logrus.Fields{"container": container, "task_id": task.Id()}).Error("error streaming docker stats for task") + logrus.WithError(err).WithFields(logrus.Fields{"container": container, "call_id": task.Id()}).Error("error streaming docker stats for task") } }() diff --git a/api/runner/worker.go b/api/runner/worker.go index f264d3c5b..c8ace58e0 100644 --- a/api/runner/worker.go +++ b/api/runner/worker.go @@ -10,9 +10,11 @@ import ( "github.com/Sirupsen/logrus" uuid "github.com/satori/go.uuid" + "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/runner/drivers" "gitlab-odx.oracle.com/odx/functions/api/runner/protocol" "gitlab-odx.oracle.com/odx/functions/api/runner/task" + "github.com/go-openapi/strfmt" ) // hot functions - theory of operation @@ -62,6 +64,25 @@ import ( // Terminate // (internal clock) + +// RunTrackedTask is just a wrapper for shared logic for async/sync runners +func RunTrackedTask(newTask *models.Task, tasks chan task.Request, ctx context.Context, cfg *task.Config, ds models.Datastore) (drivers.RunResult, error) { + startedAt := strfmt.DateTime(time.Now()) + newTask.StartedAt = startedAt + + result, err := RunTask(tasks, ctx, cfg) + + completedAt := strfmt.DateTime(time.Now()) + status := result.Status() + newTask.CompletedAt = completedAt + newTask.Status = status + + err = ds.InsertTask(ctx, newTask) + + return result, err +} + + // RunTask helps sending a task.Request into the common concurrency stream. // Refer to StartWorkers() to understand what this is about. func RunTask(tasks chan task.Request, ctx context.Context, cfg *task.Config) (drivers.RunResult, error) { diff --git a/api/server/apps_test.go b/api/server/apps_test.go index ee0c70e58..7eb50fc63 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -99,7 +99,7 @@ func TestAppDelete(t *testing.T) { {datastore.NewMockInit( []*models.App{{ Name: "myapp", - }},nil, + }},nil, nil, ), "/v1/apps/myapp", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) @@ -219,14 +219,14 @@ func TestAppUpdate(t *testing.T) { {datastore.NewMockInit( []*models.App{{ Name: "myapp", - }}, nil, + }}, nil, nil, ), "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil}, // Addresses #380 {datastore.NewMockInit( []*models.App{{ Name: "myapp", - }}, nil, + }}, nil,nil, ), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusBadRequest, nil}, } { rnr, cancel := testRunner(t) diff --git a/api/server/call_get.go b/api/server/call_get.go new file mode 100644 index 000000000..35d3549f2 --- /dev/null +++ b/api/server/call_get.go @@ -0,0 +1,22 @@ +package server + +import ( + "context" + "net/http" + + "github.com/gin-gonic/gin" + "gitlab-odx.oracle.com/odx/functions/api" +) + +func (s *Server) handleCallGet(c *gin.Context) { + ctx := c.MustGet("ctx").(context.Context) + + callID := c.Param(api.Call) + callObj, err := s.Datastore.GetTask(ctx, callID) + if err != nil { + handleErrorResponse(c, err) + return + } + + c.JSON(http.StatusOK, fnCallResponse{"Successfully loaded call", callObj}) +} diff --git a/api/server/call_list.go b/api/server/call_list.go new file mode 100644 index 000000000..30f3023a0 --- /dev/null +++ b/api/server/call_list.go @@ -0,0 +1,35 @@ +package server + +import ( + "context" + "net/http" + + "github.com/gin-gonic/gin" + "gitlab-odx.oracle.com/odx/functions/api" + "gitlab-odx.oracle.com/odx/functions/api/models" +) + +func (s *Server) handleCallList(c *gin.Context) { + ctx := c.MustGet("ctx").(context.Context) + + appName, ok := c.MustGet(api.AppName).(string) + if ok && appName == "" { + c.JSON(http.StatusBadRequest, models.ErrRoutesValidationMissingAppName) + return + } + appRoute, ok := c.MustGet(api.Path).(string) + if ok && appRoute == "" { + c.JSON(http.StatusBadRequest, models.ErrRoutesValidationMissingPath) + return + } + + filter := models.CallFilter{AppName:appName, Path:appRoute} + + calls, err := s.Datastore.GetTasks(ctx, &filter) + if err != nil { + handleErrorResponse(c, err) + return + } + + c.JSON(http.StatusOK, fnCallsResponse{"Successfully listed calls", calls}) +} diff --git a/api/server/error_response.go b/api/server/error_response.go index 4fd4fbc5e..21b38d018 100644 --- a/api/server/error_response.go +++ b/api/server/error_response.go @@ -20,6 +20,7 @@ var errStatusCode = map[error]int{ models.ErrAppsAlreadyExists: http.StatusConflict, models.ErrRoutesNotFound: http.StatusNotFound, models.ErrRoutesAlreadyExists: http.StatusConflict, + models.ErrCallNotFound: http.StatusNotFound, } func handleErrorResponse(c *gin.Context, err error) { diff --git a/api/server/routes_test.go b/api/server/routes_test.go index caa4d4e6e..7ebf1fefa 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -80,7 +80,7 @@ func TestRouteDelete(t *testing.T) { {datastore.NewMockInit(nil, []*models.Route{ {Path: "/myroute", AppName: "a"}, - }, + }, nil, ), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil}, } { rnr, cancel := testRunner(t) @@ -206,7 +206,7 @@ func TestRouteUpdate(t *testing.T) { AppName: "a", Path: "/myroute/do", }, - }, + }, nil, ), "/v1/apps/a/routes/myroute/do", `{ "route": { "image": "funcy/hello" } }`, http.StatusOK, nil}, // Addresses #381 @@ -216,7 +216,7 @@ func TestRouteUpdate(t *testing.T) { AppName: "a", Path: "/myroute/do", }, - }, + }, nil, ), "/v1/apps/a/routes/myroute/do", `{ "route": { "path": "/otherpath" } }`, http.StatusBadRequest, nil}, } { rnr, cancel := testRunner(t) diff --git a/api/server/runner.go b/api/server/runner.go index f9a0e6f75..fbfe0f321 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -14,6 +14,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" + "github.com/go-openapi/strfmt" uuid "github.com/satori/go.uuid" "gitlab-odx.oracle.com/odx/functions/api" "gitlab-odx.oracle.com/odx/functions/api/models" @@ -151,7 +152,7 @@ func (s *Server) loadroutes(ctx context.Context, filter models.RouteFilter) ([]* } // TODO: Should remove *gin.Context from these functions, should use only context.Context -func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) { +func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue, ) (ok bool) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": found.Path, "image": found.Image}) params, match := matchRoute(found.Path, route) @@ -215,6 +216,15 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun } s.Runner.Enqueue() + createdAt := strfmt.DateTime(time.Now()) + newTask := &models.Task{} + newTask.Image = &cfg.Image + newTask.ID = cfg.ID + newTask.CreatedAt = createdAt + newTask.Path = found.Path + newTask.EnvVars = cfg.Env + newTask.AppName = cfg.AppName + switch found.Type { case "async": // Read payload @@ -224,24 +234,18 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun c.JSON(http.StatusBadRequest, simpleError(models.ErrInvalidPayload)) return true } - // Create Task priority := int32(0) - task := &models.Task{} - task.Image = &cfg.Image - task.ID = cfg.ID - task.Path = found.Path - task.AppName = cfg.AppName - task.Priority = &priority - task.EnvVars = cfg.Env - task.Payload = string(pl) + newTask.Priority = &priority + newTask.Payload = string(pl) // Push to queue - enqueue(c, s.MQ, task) + enqueue(c, s.MQ, newTask) log.Info("Added new task to queue") - c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) + c.JSON(http.StatusAccepted, map[string]string{"call_id": newTask.ID}) default: - result, err := runner.RunTask(s.tasks, ctx, cfg) + result, err := runner.RunTrackedTask(newTask, s.tasks, ctx, cfg, s.Datastore) + if err != nil { c.JSON(http.StatusInternalServerError, runnerResponse{ RequestID: cfg.ID, @@ -256,6 +260,10 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun c.Header(k, v[0]) } + // this will help users to track sync execution in a manner of async + // FN_CALL_ID is an equivalent of call_id + c.Header("FN_CALL_ID", newTask.ID) + switch result.Status() { case "success": c.Data(http.StatusOK, "", stdout.Bytes()) diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index 72b0e7954..297e81821 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -48,7 +48,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { {Type: "async", Path: "/myroute", AppName: "myapp", Image: "funcy/hello", Config: map[string]string{"test": "true"}}, {Type: "async", Path: "/myerror", AppName: "myapp", Image: "funcy/error", Config: map[string]string{"test": "true"}}, {Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "funcy/hello", Config: map[string]string{"test": "true"}}, - }, + }, nil, ) mq := &mqs.Mock{} diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 6043af0c6..37f4409a1 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -33,7 +33,7 @@ func TestRouteRunnerGet(t *testing.T) { srv := testServer(datastore.NewMockInit( []*models.App{ {Name: "myapp", Config: models.Config{}}, - }, nil, + }, nil, nil, ), &mqs.Mock{}, rnr, tasks) for i, test := range []struct { @@ -76,7 +76,7 @@ func TestRouteRunnerPost(t *testing.T) { srv := testServer(datastore.NewMockInit( []*models.App{ {Name: "myapp", Config: models.Config{}}, - }, nil, + }, nil, nil, ), &mqs.Mock{}, rnr, tasks) for i, test := range []struct { @@ -130,7 +130,7 @@ func TestRouteRunnerExecution(t *testing.T) { []*models.Route{ {Path: "/myroute", AppName: "myapp", Image: "funcy/hello", Headers: map[string][]string{"X-Function": {"Test"}}}, {Path: "/myerror", AppName: "myapp", Image: "funcy/error", Headers: map[string][]string{"X-Function": {"Test"}}}, - }, + }, nil, ), &mqs.Mock{}, rnr, tasks) for i, test := range []struct { @@ -187,7 +187,7 @@ func TestRouteRunnerTimeout(t *testing.T) { }, []*models.Route{ {Path: "/sleeper", AppName: "myapp", Image: "funcy/sleeper", Timeout: 1}, - }, + }, nil, ), &mqs.Mock{}, rnr, tasks) for i, test := range []struct { diff --git a/api/server/server.go b/api/server/server.go index 00380de16..bf1de836e 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -245,7 +245,7 @@ func (s *Server) startGears(ctx context.Context) { }) svr.AddFunc(func(ctx context.Context) { - runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner) + runner.RunAsyncRunner(ctx, s.apiURL, s.tasks, s.Runner, s.Datastore) }) svr.AddFunc(func(ctx context.Context) { @@ -274,6 +274,8 @@ func (s *Server) bindHandlers(ctx context.Context) { v1.GET("/routes", s.handleRouteList) + v1.GET("/calls/:call", s.handleCallGet) + apps := v1.Group("/apps/:app") { apps.GET("/routes", s.handleRouteList) @@ -281,6 +283,7 @@ func (s *Server) bindHandlers(ctx context.Context) { apps.GET("/routes/*route", s.handleRouteGet) apps.PATCH("/routes/*route", s.handleRouteUpdate) apps.DELETE("/routes/*route", s.handleRouteDelete) + apps.GET("/calls/*route", s.handleCallList) } } @@ -316,3 +319,13 @@ type tasksResponse struct { Message string `json:"message"` Task models.Task `json:"tasksResponse"` } + +type fnCallResponse struct { + Message string `json:"message"` + Call *models.FnCall `json:"call"` +} + +type fnCallsResponse struct { + Message string `json:"message"` + Calls models.FnCalls `json:"calls"` +} diff --git a/docs/definitions.md b/docs/definitions.md index e2be0bb5b..997ad20eb 100644 --- a/docs/definitions.md +++ b/docs/definitions.md @@ -86,7 +86,7 @@ curl -H "Content-Type: application/json" -X POST -d '{ "config": { "key": "value", "key2": "value2", - "keyN": "valueN", + "keyN": "valueN" }, "headers": { "content-type": [ @@ -148,6 +148,98 @@ To define the function execution as `hot function` you set it as one of the foll This properties are only used if the function is in `hot function` mode -#### max_concurrency (string) -This property defines the maximum amount of concurrent hot functions instances the function should have (per Oracle Functions node). \ No newline at end of file +## Calls and their statuses + +### Sync/Async Call statuses + +With each function call, no matter would that be sync or async server makes a record of this it. +While execution async function server returns `call_id`: + +```json + { + "call_id": "f5621e8b-725a-4ba9-8323-b8cdc02ce37e" + } +``` +that can be used to track call status using following command: + +```sh + + curl -v -X GET ${API_URL}/v1/calls/f5621e8b-725a-4ba9-8323-b8cdc02ce37 + +``` + +```json +{ + "message": "Successfully loaded call", + "call": { + "id": "f5621e8b-725a-4ba9-8323-b8cdc02ce37e", + "status": "success", + "completed_at": "2017-06-02T15:31:30.887+03:00", + "created_at": "2017-06-02T15:31:30.597+03:00", + "started_at": "2017-06-02T15:31:30.597+03:00", + "app_name": "newapp", + "path": "/envsync" + } +} + +``` + +Server response contains timestamps(created, started, completed) and execution status for this call. + +For sync call `call_id` can be retrieved from HTTP headers: +```sh +curl -v localhost:8080/r/newapp/envsync +* Trying ::1... +* TCP_NODELAY set +* Connected to localhost (::1) port 8080 (#0) +> GET /r/newapp/envsync HTTP/1.1 +> Host: localhost:8080 +> User-Agent: curl/7.51.0 +> Accept: */* +> +< HTTP/1.1 200 OK +< Fn_call_id: f5621e8b-725a-4ba9-8323-b8cdc02ce37e +< Date: Fri, 02 Jun 2017 12:31:30 GMT +< Content-Length: 489 +< Content-Type: text/plain; charset=utf-8 +< +... +``` +Corresponding HTTP header is `Fn_call_id`. + +### Per-route calls + +In order get list of per-route calls please use following command: + +```sh +curl -X GET ${API_URL}/v1/app/{app}/calls/{route} + +``` +Server will replay with following JSON response: + +```json +{ + "message": "Successfully listed calls", + "calls": [ + { + "id": "80b12325-4c0c-5fc1-b7d3-dccf234b48fc", + "status": "success", + "completed_at": "2017-06-02T15:31:22.976+03:00", + "created_at": "2017-06-02T15:31:22.691+03:00", + "started_at": "2017-06-02T15:31:22.691+03:00", + "app_name": "newapp", + "path": "/envsync" + }, + { + "id": "beec888b-3868-59e3-878d-281f6b6f0cbc", + "status": "success", + "completed_at": "2017-06-02T15:31:30.887+03:00", + "created_at": "2017-06-02T15:31:30.597+03:00", + "started_at": "2017-06-02T15:31:30.597+03:00", + "app_name": "newapp", + "path": "/envsync" + } + ] +} +``` diff --git a/docs/swagger.yml b/docs/swagger.yml index 59d953eba..f61c629ce 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -6,7 +6,7 @@ swagger: '2.0' info: title: Oracle Functions description: The open source serverless platform. - version: "0.1.29" + version: "0.1.30" # the domain of the service host: "127.0.0.1:8080" # array of all schemes that your API supports @@ -317,6 +317,55 @@ paths: schema: $ref: '#/definitions/Error' + /calls/{call}: + get: + summary: Get call information + description: Get call information + tags: + - Call + parameters: + - name: call + description: Call ID. + required: true + type: string + in: path + responses: + 200: + description: Call found + schema: + $ref: '#/definitions/CallWrapper' + 404: + description: Call not found. + schema: + $ref: '#/definitions/Error' + + /apps/{app}/calls/{route}: + get: + summary: Get route-bound calls. + description: Get route-bound calls. + tags: + - Call + parameters: + - name: app + description: App name. + required: true + type: string + in: path + - name: route + description: App route. + required: true + type: string + in: path + responses: + 200: + description: Calls found + schema: + $ref: '#/definitions/CallsWrapper' + 404: + description: Calls not found. + schema: + $ref: '#/definitions/Error' + /tasks: get: summary: Get next task. @@ -451,6 +500,68 @@ definitions: error: $ref: '#/definitions/ErrorBody' + CallsWrapper: + type: object + required: + - app + - route + properties: + app: + type: string + description: "Name of this app." + readOnly: true + route: + type: string + description: "Name of this app." + readOnly: true + + CallWrapper: + type: object + required: + - call + properties: + call: + type: string + description: "Call ID." + readOnly: true + + + Call: + allOf: + - type: object + properties: + id: + type: string + description: Call UUID ID. + readOnly: true + status: + type: string + description: Call execution status. + readOnly: true + app_name: + type: string + description: App name that is assigned to a route that is being executed. + readOnly: true + path: + type: string + description: App route that is being executed. + readOnly: true + created_at: + type: string + format: date-time + description: Time when call was submitted. Always in UTC. + readOnly: true + started_at: + type: string + format: date-time + description: Time when call started execution. Always in UTC. + readOnly: true + completed_at: + type: string + format: date-time + description: Time when call completed, whether it was successul or failed. Always in UTC. + readOnly: true + Task: allOf: - $ref: "#/definitions/NewTask"