mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
mask models.Call blank fields in api, sqlx
sqlx has nice facilities for using structs to do queries and using their fields, so decided to move us all over to this. now when you take a look at models.Call it's really obvious what's in db and what's not. added omitempty to some json fields that were bleeding through api too. deletes a lot of code in the sql package for scanning and made some queries use struct based sqlx methods now which seem easier to read than what we previously had. moves all json stuff into sql.Valuer and sql.Scanner methods in models/config.go, these are the only 2 types that ever need this. sadly, sqlx would have done this marshaling for us, but to keep compat, I added json. we can do some migrations later maybe for a more efficient encoding, but did not want to fuss with it today. it seems like we should probably aim to keep models.Call as small as possible in the db as there will be a lot of them. interestingly, most functions platforms I looked at do not seem to expose this kind of information that I could find. so, i think only having timestamps, status, id, app, path and maybe docker stats is really all that should be in here (agree w/ Denys on 284 as these and logs will end up taking up most db space in prod. notably, payload, headers, and env vars could be extremely large and in the general case they are always a copy of the routes (this breaks apart when routes are updated, which would be useful considering we don't have versioning -- versioning may be cheaper). removed unused field in apps too this is lined up behind #349 so that I could use the tests... closes #345 closes #142 closes #284
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -75,9 +74,6 @@ const (
|
||||
|
||||
type sqlStore struct {
|
||||
db *sqlx.DB
|
||||
|
||||
// TODO we should prepare all of the statements, rebind them
|
||||
// and store them all here.
|
||||
}
|
||||
|
||||
// New will open the db specified by url, create any tables necessary
|
||||
@@ -140,17 +136,8 @@ func New(url *url.URL) (models.Datastore, error) {
|
||||
}
|
||||
|
||||
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
||||
var cbyte []byte
|
||||
var err error
|
||||
if app.Config != nil {
|
||||
cbyte, err = json.Marshal(app.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);")
|
||||
_, err = ds.db.ExecContext(ctx, query, app.Name, string(cbyte))
|
||||
query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (:name, :config);")
|
||||
_, err := ds.db.NamedExecContext(ctx, query, app)
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *mysql.MySQLError:
|
||||
@@ -176,32 +163,19 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
|
||||
app := &models.App{Name: newapp.Name}
|
||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||
query := tx.Rebind(`SELECT config FROM apps WHERE name=?`)
|
||||
row := tx.QueryRowContext(ctx, query, app.Name)
|
||||
row := tx.QueryRowxContext(ctx, query, app.Name)
|
||||
|
||||
var config string
|
||||
if err := row.Scan(&config); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return models.ErrAppsNotFound
|
||||
}
|
||||
err := row.StructScan(app)
|
||||
if err == sql.ErrNoRows {
|
||||
return models.ErrAppsNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config != "" {
|
||||
err := json.Unmarshal([]byte(config), &app.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
app.UpdateConfig(newapp.Config)
|
||||
|
||||
cbyte, err := json.Marshal(app.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`)
|
||||
res, err := tx.ExecContext(ctx, query, string(cbyte), app.Name)
|
||||
query = tx.Rebind(`UPDATE apps SET config=:config WHERE name=:name`)
|
||||
res, err := tx.NamedExecContext(ctx, query, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -254,29 +228,16 @@ func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error {
|
||||
|
||||
func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) {
|
||||
query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`)
|
||||
row := ds.db.QueryRowContext(ctx, query, name)
|
||||
row := ds.db.QueryRowxContext(ctx, query, name)
|
||||
|
||||
var resName, config string
|
||||
err := row.Scan(&resName, &config)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, models.ErrAppsNotFound
|
||||
}
|
||||
var res models.App
|
||||
err := row.StructScan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, models.ErrAppsNotFound
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := &models.App{
|
||||
Name: resName,
|
||||
}
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal([]byte(config), &res.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// GetApps retrieves an array of apps according to a specific filter.
|
||||
@@ -284,7 +245,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
|
||||
res := []*models.App{}
|
||||
query, args := buildFilterAppQuery(filter)
|
||||
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", query))
|
||||
rows, err := ds.db.QueryContext(ctx, query, args...)
|
||||
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -292,8 +253,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
|
||||
|
||||
for rows.Next() {
|
||||
var app models.App
|
||||
err := scanApp(rows, &app)
|
||||
|
||||
err := rows.StructScan(&app)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return res, nil
|
||||
@@ -310,17 +270,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
|
||||
}
|
||||
|
||||
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
||||
hbyte, err := json.Marshal(route.Headers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cbyte, err := json.Marshal(route.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = ds.Tx(func(tx *sqlx.Tx) error {
|
||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||
query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`)
|
||||
r := tx.QueryRowContext(ctx, query, route.AppName)
|
||||
if err := r.Scan(new(int)); err != nil {
|
||||
@@ -350,20 +300,20 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
|
||||
headers,
|
||||
config
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`)
|
||||
VALUES (
|
||||
:app_name,
|
||||
:path,
|
||||
:image,
|
||||
:format,
|
||||
:memory,
|
||||
:type,
|
||||
:timeout,
|
||||
:idle_timeout,
|
||||
:headers,
|
||||
:config
|
||||
);`)
|
||||
|
||||
_, err = tx.ExecContext(ctx, query,
|
||||
route.AppName,
|
||||
route.Path,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
route.IdleTimeout,
|
||||
string(hbyte),
|
||||
string(cbyte),
|
||||
)
|
||||
_, err = tx.NamedExecContext(ctx, query, route)
|
||||
|
||||
return err
|
||||
})
|
||||
@@ -375,8 +325,10 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
|
||||
var route models.Route
|
||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||
query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector))
|
||||
row := tx.QueryRowContext(ctx, query, newroute.AppName, newroute.Path)
|
||||
if err := scanRoute(row, &route); err == sql.ErrNoRows {
|
||||
row := tx.QueryRowxContext(ctx, query, newroute.AppName, newroute.Path)
|
||||
|
||||
err := row.StructScan(&route)
|
||||
if err == sql.ErrNoRows {
|
||||
return models.ErrRoutesNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
@@ -384,40 +336,18 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
|
||||
|
||||
route.Update(newroute)
|
||||
|
||||
hbyte, err := json.Marshal(route.Headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cbyte, err := json.Marshal(route.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query = tx.Rebind(`UPDATE routes SET
|
||||
image = ?,
|
||||
format = ?,
|
||||
memory = ?,
|
||||
type = ?,
|
||||
timeout = ?,
|
||||
idle_timeout = ?,
|
||||
headers = ?,
|
||||
config = ?
|
||||
WHERE app_name=? AND path=?;`)
|
||||
|
||||
res, err := tx.ExecContext(ctx, query,
|
||||
route.Image,
|
||||
route.Format,
|
||||
route.Memory,
|
||||
route.Type,
|
||||
route.Timeout,
|
||||
route.IdleTimeout,
|
||||
string(hbyte),
|
||||
string(cbyte),
|
||||
route.AppName,
|
||||
route.Path,
|
||||
)
|
||||
image = :image,
|
||||
format = :format,
|
||||
memory = :memory,
|
||||
type = :type,
|
||||
timeout = :timeout,
|
||||
idle_timeout = :idle_timeout,
|
||||
headers = :headers,
|
||||
config = :config
|
||||
WHERE app_name=:app_name AND path=:path;`)
|
||||
|
||||
res, err := tx.NamedExecContext(ctx, query, &route)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -460,10 +390,10 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string)
|
||||
func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
|
||||
rSelectCondition := "%s WHERE app_name=? AND path=?"
|
||||
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
|
||||
row := ds.db.QueryRowContext(ctx, query, appName, routePath)
|
||||
row := ds.db.QueryRowxContext(ctx, query, appName, routePath)
|
||||
|
||||
var route models.Route
|
||||
err := scanRoute(row, &route)
|
||||
err := row.StructScan(&route)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, models.ErrRoutesNotFound
|
||||
} else if err != nil {
|
||||
@@ -484,7 +414,7 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *
|
||||
|
||||
query := fmt.Sprintf("%s %s", routeSelector, filterQuery)
|
||||
query = ds.db.Rebind(query)
|
||||
rows, err := ds.db.QueryContext(ctx, query, args...)
|
||||
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return res, nil // no error for empty list
|
||||
@@ -495,12 +425,11 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *
|
||||
|
||||
for rows.Next() {
|
||||
var route models.Route
|
||||
err := scanRoute(rows, &route)
|
||||
err := rows.StructScan(&route)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
res = append(res, &route)
|
||||
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
@@ -534,28 +463,27 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
|
||||
app_name,
|
||||
path
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?);`)
|
||||
VALUES (
|
||||
:id,
|
||||
:created_at,
|
||||
:started_at,
|
||||
:completed_at,
|
||||
:status,
|
||||
:app_name,
|
||||
:path
|
||||
);`)
|
||||
|
||||
_, err := ds.db.ExecContext(ctx, query, call.ID, call.CreatedAt.String(),
|
||||
call.StartedAt.String(), call.CompletedAt.String(),
|
||||
call.Status, call.AppName, call.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
_, err := ds.db.NamedExecContext(ctx, query, call)
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO calls are not fully qualified in this backend currently. need to discuss,
|
||||
// if we store the whole thing then it adds a lot of disk space and then we can
|
||||
// make async only queue hints instead of entire calls (mq a lot smaller space wise). pick.
|
||||
func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
|
||||
query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)
|
||||
query = ds.db.Rebind(query)
|
||||
row := ds.db.QueryRowContext(ctx, query, callID, appName)
|
||||
row := ds.db.QueryRowxContext(ctx, query, callID, appName)
|
||||
|
||||
var call models.Call
|
||||
err := scanCall(row, &call)
|
||||
err := row.StructScan(&call)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -567,7 +495,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
|
||||
query, args := buildFilterCallQuery(filter)
|
||||
query = fmt.Sprintf("%s %s", callSelector, query)
|
||||
query = ds.db.Rebind(query)
|
||||
rows, err := ds.db.QueryContext(ctx, query, args...)
|
||||
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -575,7 +503,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
|
||||
|
||||
for rows.Next() {
|
||||
var call models.Call
|
||||
err := scanCall(rows, &call)
|
||||
err := rows.StructScan(&call)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -631,75 +559,6 @@ func (ds *sqlStore) DeleteLog(ctx context.Context, appName, callID string) error
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO scrap for sqlx scanx ?? some things aren't perfect (e.g. config is a json string)
|
||||
type RowScanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
func ScanLog(scanner RowScanner, log *models.CallLog) error {
|
||||
return scanner.Scan(
|
||||
&log.CallID,
|
||||
&log.Log,
|
||||
)
|
||||
}
|
||||
|
||||
func scanRoute(scanner RowScanner, route *models.Route) error {
|
||||
var headerStr string
|
||||
var configStr string
|
||||
|
||||
err := scanner.Scan(
|
||||
&route.AppName,
|
||||
&route.Path,
|
||||
&route.Image,
|
||||
&route.Format,
|
||||
&route.Memory,
|
||||
&route.Type,
|
||||
&route.Timeout,
|
||||
&route.IdleTimeout,
|
||||
&headerStr,
|
||||
&configStr,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(headerStr) > 0 {
|
||||
err = json.Unmarshal([]byte(headerStr), &route.Headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(configStr) > 0 {
|
||||
err = json.Unmarshal([]byte(configStr), &route.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanApp(scanner RowScanner, app *models.App) error {
|
||||
var configStr string
|
||||
|
||||
err := scanner.Scan(
|
||||
&app.Name,
|
||||
&configStr,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(configStr) > 0 {
|
||||
err = json.Unmarshal([]byte(configStr), &app.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
|
||||
if filter == nil {
|
||||
return "", nil
|
||||
@@ -794,25 +653,6 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
|
||||
return b.String(), args
|
||||
}
|
||||
|
||||
func scanCall(scanner RowScanner, call *models.Call) error {
|
||||
err := scanner.Scan(
|
||||
&call.ID,
|
||||
&call.CreatedAt,
|
||||
&call.StartedAt,
|
||||
&call.CompletedAt,
|
||||
&call.Status,
|
||||
&call.AppName,
|
||||
&call.Path,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return models.ErrCallNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDatabase returns the underlying sqlx database implementation
|
||||
func (ds *sqlStore) GetDatabase() *sqlx.DB {
|
||||
return ds.db
|
||||
|
||||
Reference in New Issue
Block a user