Add support for Function and Trigger domain objects (#1060)

Vast commit, includes:

 * Introduces the Trigger domain entity.
 * Introduces the Fns domain entity.
 * V2 of the API for interacting with the new entities in swaggerv2.yml
 * Adds v2 end points for Apps to support PUT updates.
 * Rewrites the datastore level tests into a new pattern.
 * V2 routes use entity ID over name as the path parameter.
This commit is contained in:
Tom Coupland
2018-06-25 15:37:06 +01:00
committed by GitHub
parent a5abecaafb
commit 3ebff051a4
76 changed files with 5820 additions and 892 deletions

View File

@@ -20,6 +20,7 @@ import (
"github.com/fnproject/fn/api/datastore"
"github.com/fnproject/fn/api/datastore/sql/dbhelper"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/logs"
"github.com/sirupsen/logrus"
)
@@ -77,11 +78,40 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
PRIMARY KEY (id)
);`,
`CREATE TABLE IF NOT EXISTS triggers (
id varchar(256) NOT NULL PRIMARY KEY,
name varchar(256) NOT NULL,
app_id varchar(256) NOT NULL,
fn_id varchar(256) NOT NULL,
created_at varchar(256) NOT NULL,
updated_at varchar(256) NOT NULL,
type varchar(256) NOT NULL,
source varchar(256) NOT NULL,
annotations text NOT NULL,
CONSTRAINT name_app_id_fn_id_unique UNIQUE (app_id, fn_id,name)
);`,
`CREATE TABLE IF NOT EXISTS logs (
id varchar(256) NOT NULL PRIMARY KEY,
app_id varchar(256) NOT NULL,
log text NOT NULL
);`,
`CREATE TABLE IF NOT EXISTS fns (
id varchar(256) NOT NULL PRIMARY KEY,
name varchar(256) NOT NULL,
app_id varchar(256) NOT NULL,
image varchar(256) NOT NULL,
format varchar(16) NOT NULL,
memory int NOT NULL,
timeout int NOT NULL,
idle_timeout int NOT NULL,
config text NOT NULL,
annotations text NOT NULL,
created_at varchar(256) NOT NULL,
updated_at varchar(256) NOT NULL,
CONSTRAINT name_app_id_unique UNIQUE (app_id, name)
);`,
}
const (
@@ -90,6 +120,12 @@ const (
appIDSelector = `SELECT id, name, config, annotations, syslog_url, created_at, updated_at FROM apps WHERE id=?`
ensureAppSelector = `SELECT id FROM apps WHERE name=?`
fnSelector = `SELECT id,name,app_id,image,format,memory,timeout,idle_timeout,config,annotations,created_at,updated_at FROM fns`
fnIDSelector = fnSelector + ` WHERE id=?`
triggerSelector = `SELECT id,name,app_id,fn_id,type,source,annotations,created_at,updated_at FROM triggers`
triggerIDSelector = triggerSelector + ` WHERE id=?`
EnvDBPingMaxRetries = "FN_DS_DB_PING_MAX_RETRIES"
)
@@ -301,6 +337,18 @@ func (ds *SQLStore) clear() error {
return err
}
query = tx.Rebind(`DELETE FROM triggers`)
_, err = tx.Exec(query)
if err != nil {
return err
}
query = tx.Rebind(`DELETE FROM fns`)
_, err = tx.Exec(query)
if err != nil {
return err
}
query = tx.Rebind(`DELETE FROM logs`)
_, err = tx.Exec(query)
return err
@@ -323,7 +371,17 @@ func (ds *SQLStore) GetAppID(ctx context.Context, appName string) (string, error
return app.ID, nil
}
func (ds *SQLStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
func (ds *SQLStore) InsertApp(ctx context.Context, newApp *models.App) (*models.App, error) {
app := newApp.Clone()
app.CreatedAt = common.DateTime(time.Now())
app.UpdatedAt = app.CreatedAt
app.ID = id.New().String()
if app.Config == nil {
// keeps the json from being nil
app.Config = map[string]string{}
}
query := ds.db.Rebind(`INSERT INTO apps (
id,
name,
@@ -355,6 +413,7 @@ func (ds *SQLStore) InsertApp(ctx context.Context, app *models.App) (*models.App
func (ds *SQLStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
var app models.App
err := ds.Tx(func(tx *sqlx.Tx) error {
// NOTE: must query whole object since we're returning app, Update logic
// must only modify modifiable fields (as seen here). need to fix brittle..
@@ -370,6 +429,9 @@ func (ds *SQLStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
return err
}
if newapp.Name != "" && app.Name != newapp.Name {
return models.ErrAppsNameImmutable
}
app.Update(newapp)
err = app.Validate()
if err != nil {
@@ -416,6 +478,8 @@ func (ds *SQLStore) RemoveApp(ctx context.Context, appID string) error {
`DELETE FROM logs WHERE app_id=?`,
`DELETE FROM calls WHERE app_id=?`,
`DELETE FROM routes WHERE app_id=?`,
`DELETE FROM fns WHERE app_id=?`,
`DELETE FROM triggers WHERE app_id=?`,
}
for _, stmt := range deletes {
_, err := tx.ExecContext(ctx, tx.Rebind(stmt), appID)
@@ -445,15 +509,17 @@ func (ds *SQLStore) GetAppByID(ctx context.Context, appID string) (*models.App,
// GetApps retrieves an array of apps according to a specific filter.
func (ds *SQLStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
res := []*models.App{}
var res []*models.App
if filter.NameIn != nil && len(filter.NameIn) == 0 { // this basically makes sure it doesn't return ALL apps
return res, nil
}
query, args, err := buildFilterAppQuery(filter)
if err != nil {
return nil, err
}
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config, annotations, syslog_url, created_at, updated_at FROM apps %s", query))
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT id, name, config, annotations, syslog_url, created_at, updated_at FROM apps %s", query))
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
@@ -478,7 +544,11 @@ func (ds *SQLStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
return res, nil
}
func (ds *SQLStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
func (ds *SQLStore) InsertRoute(ctx context.Context, newRoute *models.Route) (*models.Route, error) {
route := newRoute.Clone()
route.CreatedAt = common.DateTime(time.Now())
route.UpdatedAt = route.CreatedAt
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
r := tx.QueryRowContext(ctx, query, route.AppID)
@@ -658,6 +728,7 @@ func (ds *SQLStore) GetRoutesByApp(ctx context.Context, appID string, filter *mo
}
res = append(res, &route)
}
if err := rows.Err(); err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
@@ -667,6 +738,190 @@ func (ds *SQLStore) GetRoutesByApp(ctx context.Context, appID string, filter *mo
return res, nil
}
func (ds *SQLStore) InsertFn(ctx context.Context, newFn *models.Fn) (*models.Fn, error) {
fn := newFn.Clone()
fn.ID = id.New().String()
fn.CreatedAt = common.DateTime(time.Now())
fn.UpdatedAt = fn.CreatedAt
err := newFn.Validate()
if err != nil {
return nil, err
}
err = ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
r := tx.QueryRowContext(ctx, query, fn.AppID)
if err := r.Scan(new(int)); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
}
query = tx.Rebind(`INSERT INTO fns (
id,
name,
app_id,
image,
format,
memory,
timeout,
idle_timeout,
config,
annotations,
created_at,
updated_at
)
VALUES (
:id,
:name,
:app_id,
:image,
:format,
:memory,
:timeout,
:idle_timeout,
:config,
:annotations,
:created_at,
:updated_at
);`)
_, err = tx.NamedExecContext(ctx, query, fn)
return err
})
if err != nil {
if ds.helper.IsDuplicateKeyError(err) {
return nil, models.ErrFnsExists
}
return nil, err
}
return fn, nil
}
func (ds *SQLStore) UpdateFn(ctx context.Context, fn *models.Fn) (*models.Fn, error) {
err := ds.Tx(func(tx *sqlx.Tx) error {
var dst models.Fn
query := tx.Rebind(fnIDSelector)
row := tx.QueryRowxContext(ctx, query, fn.ID)
err := row.StructScan(&dst)
if err == sql.ErrNoRows {
return models.ErrFnsNotFound
} else if err != nil {
return err
}
dst.Update(fn)
err = dst.Validate()
if err != nil {
return err
}
fn = &dst // set for query & to return
query = tx.Rebind(`UPDATE fns SET
name = :name,
image = :image,
format = :format,
memory = :memory,
timeout = :timeout,
idle_timeout = :idle_timeout,
config = :config,
annotations = :annotations,
updated_at = :updated_at
WHERE id=:id;`)
_, err = tx.NamedExecContext(ctx, query, fn)
return err
})
if err != nil {
return nil, err
}
return fn, nil
}
func (ds *SQLStore) GetFns(ctx context.Context, filter *models.FnFilter) ([]*models.Fn, error) {
var res []*models.Fn // for json empty list
if filter == nil {
filter = new(models.FnFilter)
}
filterQuery, args := buildFilterFnQuery(filter)
query := fmt.Sprintf("%s %s", fnSelector, filterQuery)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
return nil, err
}
defer rows.Close()
for rows.Next() {
var fn models.Fn
err := rows.StructScan(&fn)
if err != nil {
continue
}
res = append(res, &fn)
}
if err := rows.Err(); err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
}
return res, nil
}
func (ds *SQLStore) GetFnByID(ctx context.Context, fnID string) (*models.Fn, error) {
query := ds.db.Rebind(fmt.Sprintf("%s WHERE id=?", fnSelector))
row := ds.db.QueryRowxContext(ctx, query, fnID)
var fn models.Fn
err := row.StructScan(&fn)
if err == sql.ErrNoRows {
return nil, models.ErrFnsNotFound
} else if err != nil {
return nil, err
}
return &fn, nil
}
func (ds *SQLStore) RemoveFn(ctx context.Context, fnID string) error {
return ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(fmt.Sprintf("%s WHERE id=?", fnSelector))
row := tx.QueryRowxContext(ctx, query, fnID)
var fn models.Fn
err := row.StructScan(&fn)
if err == sql.ErrNoRows {
return models.ErrFnsNotFound
}
query = tx.Rebind(`DELETE FROM triggers WHERE fn_id=?`)
_, err = tx.ExecContext(ctx, query, fnID)
if err != nil {
return err
}
query = tx.Rebind(`DELETE FROM fns WHERE id=?`)
_, err = tx.ExecContext(ctx, query, fnID)
return err
})
}
func (ds *SQLStore) Tx(f func(*sqlx.Tx) error) error {
tx, err := ds.db.Beginx()
if err != nil {
@@ -791,20 +1046,9 @@ func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s`, colOp)
}
}
}
where("app_id=? ", filter.AppID)
where("image=?", filter.Image)
where("path>?", filter.Cursor)
args = where(&b, args, "app_id=? ", filter.AppID)
args = where(&b, args, "image=?", filter.Image)
args = where(&b, args, "path>?", filter.Cursor)
// where("path LIKE ?%", filter.PathPrefix) TODO needs escaping
fmt.Fprintf(&b, ` ORDER BY path ASC`) // TODO assert this is indexed
@@ -822,32 +1066,9 @@ func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}, error
var b bytes.Buffer
// todo: this same thing is in several places in here, DRY it up across this file
where := func(colOp, val interface{}) {
if val == nil {
return
}
switch v := val.(type) {
case string:
if v == "" {
return
}
case []string:
if len(v) == 0 {
return
}
}
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s`, colOp)
}
}
// where("name LIKE ?%", filter.Name) // TODO needs escaping?
where("name>?", filter.Cursor)
where("name IN (?)", filter.NameIn)
args = where(&b, args, "name>?", filter.Cursor)
args = where(&b, args, "name IN (?)", filter.NameIn)
fmt.Fprintf(&b, ` ORDER BY name ASC`) // TODO assert this is indexed
fmt.Fprintf(&b, ` LIMIT ?`)
@@ -865,26 +1086,15 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s?`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s?`, colOp)
}
}
}
where("id<", filter.Cursor)
args = where(&b, args, "id<?", filter.Cursor)
if !time.Time(filter.ToTime).IsZero() {
where("created_at<", filter.ToTime.String())
args = where(&b, args, "created_at<?", filter.ToTime.String())
}
if !time.Time(filter.FromTime).IsZero() {
where("created_at>", filter.FromTime.String())
args = where(&b, args, "created_at>?", filter.FromTime.String())
}
where("app_id=", filter.AppID)
where("path=", filter.Path)
args = where(&b, args, "app_id=?", filter.AppID)
args = where(&b, args, "path=?", filter.Path)
fmt.Fprintf(&b, ` ORDER BY id DESC`) // TODO assert this is indexed
fmt.Fprintf(&b, ` LIMIT ?`)
@@ -893,9 +1103,276 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
return b.String(), args
}
// GetDatabase returns the underlying sqlx database implementation
func (ds *SQLStore) GetDatabase() *sqlx.DB {
return ds.db
func buildFilterFnQuery(filter *models.FnFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
// where(fmt.Sprintf("image LIKE '%s%%'"), filter.Image) // TODO needs escaping, prob we want prefix query to ignore tags
args = where(&b, args, "app_id=? ", filter.AppID)
args = where(&b, args, "name>?", filter.Cursor)
fmt.Fprintf(&b, ` ORDER BY name ASC`)
if filter.PerPage > 0 {
fmt.Fprintf(&b, ` LIMIT ?`)
args = append(args, filter.PerPage)
}
return b.String(), args
}
func where(b *bytes.Buffer, args []interface{}, colOp string, val interface{}) []interface{} {
if val == nil {
return args
}
switch v := val.(type) {
case string:
if v == "" {
return args
}
case []string:
if len(v) == 0 {
return args
}
}
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(b, `WHERE %s`, colOp)
} else {
fmt.Fprintf(b, ` AND %s`, colOp)
}
return args
}
func (ds *SQLStore) InsertTrigger(ctx context.Context, newTrigger *models.Trigger) (*models.Trigger, error) {
trigger := newTrigger.Clone()
trigger.CreatedAt = common.DateTime(time.Now())
trigger.UpdatedAt = trigger.CreatedAt
trigger.ID = id.New().String()
err := trigger.Validate()
if err != nil {
return nil, err
}
err = ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
r := tx.QueryRowContext(ctx, query, trigger.AppID)
if err := r.Scan(new(int)); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
}
query = tx.Rebind(`SELECT app_id FROM fns WHERE id=?`)
r = tx.QueryRowContext(ctx, query, trigger.FnID)
var app_id string
if err := r.Scan(&app_id); err != nil {
if err == sql.ErrNoRows {
return models.ErrFnsNotFound
}
}
if app_id != trigger.AppID {
return models.ErrTriggerFnIDNotSameApp
}
query = tx.Rebind(`INSERT INTO triggers (
id,
name,
app_id,
fn_id,
created_at,
updated_at,
type,
source,
annotations
)
VALUES (
:id,
:name,
:app_id,
:fn_id,
:created_at,
:updated_at,
:type,
:source,
:annotations
);`)
_, err = tx.NamedExecContext(ctx, query, trigger)
return err
})
if err != nil {
if ds.helper.IsDuplicateKeyError(err) {
return nil, models.ErrTriggerExists
}
return nil, err
}
return trigger, err
}
func (ds *SQLStore) UpdateTrigger(ctx context.Context, trigger *models.Trigger) (*models.Trigger, error) {
err := ds.Tx(func(tx *sqlx.Tx) error {
var dst models.Trigger
query := tx.Rebind(triggerIDSelector)
row := tx.QueryRowxContext(ctx, query, trigger.ID)
err := row.StructScan(&dst)
if err != nil && err != sql.ErrNoRows {
return err
} else if err == sql.ErrNoRows {
return models.ErrTriggerNotFound
}
dst.Update(trigger)
err = dst.Validate()
if err != nil {
return err
}
trigger = &dst // set for query & to return
query = tx.Rebind(`UPDATE triggers SET
name = :name,
fn_id = :fn_id,
updated_at = :updated_at,
source = :source,
annotations = :annotations
WHERE id = :id;`)
_, err = tx.NamedExecContext(ctx, query, trigger)
return err
})
if err != nil {
return nil, err
}
return trigger, nil
}
func (ds *SQLStore) GetTrigger(ctx context.Context, appId, fnId, triggerName string) (*models.Trigger, error) {
var trigger models.Trigger
query := ds.db.Rebind(fmt.Sprintf("%s WHERE name=? AND app_id=? AND fn_id=?", fnSelector))
row := ds.db.QueryRowxContext(ctx, query, triggerName, appId, fnId)
err := row.StructScan(&trigger)
if err == sql.ErrNoRows {
return nil, models.ErrTriggerNotFound
}
if err != nil {
return nil, err
}
return &trigger, nil
}
func (ds *SQLStore) RemoveTrigger(ctx context.Context, triggerId string) error {
query := ds.db.Rebind(`DELETE FROM triggers WHERE id = ?;`)
res, err := ds.db.ExecContext(ctx, query, triggerId)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return models.ErrTriggerNotFound
}
return nil
}
func (ds *SQLStore) GetTriggerByID(ctx context.Context, triggerID string) (*models.Trigger, error) {
var trigger models.Trigger
query := ds.db.Rebind(triggerIDSelector)
row := ds.db.QueryRowxContext(ctx, query, triggerID)
err := row.StructScan(&trigger)
if err == sql.ErrNoRows {
return nil, models.ErrTriggerNotFound
}
if err != nil {
return nil, err
}
return &trigger, nil
}
func buildFilterTriggerQuery(filter *models.TriggerFilter) (string, []interface{}) {
var b bytes.Buffer
var args []interface{}
fmt.Fprintf(&b, `app_id = ?`)
args = append(args, filter.AppID)
if filter.FnID != "" {
fmt.Fprintf(&b, ` AND fn_id = ?`)
args = append(args, filter.FnID)
}
if filter.Name != "" {
fmt.Fprintf(&b, ` AND name = ?`)
args = append(args, filter.Name)
}
if filter.Cursor != "" {
fmt.Fprintf(&b, ` AND id > ?`)
args = append(args, filter.Cursor)
}
fmt.Fprintf(&b, ` ORDER BY name ASC`)
if filter.PerPage != 0 {
fmt.Fprintf(&b, ` LIMIT ?`)
args = append(args, filter.PerPage)
}
return b.String(), args
}
func (ds *SQLStore) GetTriggers(ctx context.Context, filter *models.TriggerFilter) ([]*models.Trigger, error) {
var res []*models.Trigger // for json empty list
if filter == nil {
filter = new(models.TriggerFilter)
}
filterQuery, args := buildFilterTriggerQuery(filter)
logrus.Error(filterQuery, args)
query := fmt.Sprintf("%s WHERE %s", triggerSelector, filterQuery)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
return nil, err
}
defer rows.Close()
for rows.Next() {
var trigger models.Trigger
err := rows.StructScan(&trigger)
if err != nil {
continue
}
res = append(res, &trigger)
}
if err := rows.Err(); err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
}
return res, nil
}
// Close closes the database, releasing any open resources.