Files
fn-serverless/api/datastore/sql/sql.go
Reed Allman 337e962416 add pagination to all list endpoints
calls, apps, and routes listing were previously returning the entire data set,
which just won't scale. this adds pagination with cursoring forward to each of
these endpoints (see the [docs](docs/definitions.md)).

the patch is really mostly tests, shouldn't be that bad to pick through.

some blarble about implementation is in order:

calls are sorted by ids but allow searching within certain `created_at` ranges
(finally). this is because sorting by `created_at` isn't feasible when
combined with paging, as `created_at` is not guaranteed to be unique -- id's
are (eliding theoreticals). i.e. on a page boundary, if there are 200 calls
with the same `created_at`, providing a `cursor` of that `created_at` will
skip over the remaining N calls with that `created_at`.  also using id will be
better on the index anyway (well, less of them). yay having sortable ids! I
can't discern any issues doing this, as even if 200 calls have the same
created_at, they will have different ids, and the sort should allow paginating
them just fine. ids are also url safe, so the id works as the cursor value
just fine.

apps and routes are sorted by alphabetical order. as they aren't guaranteed to
be url safe, we are base64'ing them in the front end to a url safe format and
then returning them, and then base64 decoding them when we get them. this does
mean that they can be relatively large if the path/app is long, but if we
don't want to add ids then they were going to be pretty big anyway. a bonus
that this kind of obscures them. if somebody has better idea on formatting, by
all means.

notably, we are not using the sql paging facilities, and we are baking our own
based on cursors, which ends up being much more efficient for querying longer
lists of resources. this also should be easy to implement in other non-sql dbs
and the cursoring formats we can change on the fly since we are just exposing
them as opaque strings. the front end deals with the base64 / formatting, etc
and the back end is taking raw values (strfmt.DateTime or the id for calls).
the cursor that is being passed to/by the user is simply the last resource on the
previous page, so in theory we don't even need to return it, but it does make
it a little easier to use, also, cursor being blank on the last page depends
on page full-ness, so sometimes users will get a cursor when there are no
results on next page (1/N chance, and it's not really end of world -- actually
searching for the next thing would make things more complex). there are ample
tests for this behavior.

I've turned off all query parameters allowing `LIKE` queries on certain listing
endpoints, as we should not expose sql behavior through our API in the event
that we end up not using a sql db down the road. I think we should only allow
prefix matching, which sql can support as well as other types of databases
relatively cheaply, but this is not hooked up here as it didn't 'just work'
when I was fiddling with it (can add later, they're unnecessary and weren't
wired in before in front end).

* remove route listing across apps (unused)
* fix panic when doing `/app//`. this is prob possible for other types of
endpoints, out of scope here. added a guard in front of all endpoints for this
* adds `from_time` and `to_time` query parameters to calls, so you can e.g.
list the last hour of tasks. these are not required and default to
oldest/newest.
* hooked back up the datastore tests to the sql db, only run with sqlite atm,
but these are useful, added a lot to them too.
* added a bunch of tests to the front end, so pretty sure this all works now.
* added to swagger, we'll need to re-gen. also wrote some words about
pagination workings, I'm not sure how best to link to these, feedback welcome.
* not sure how we want to manage indexes, but we may need to add some (looking
at created_at, mostly)
* `?route` changed to `?path` in routes listing, to keep consistency with
everything else
* don't 404 when searching for calls where the route doesn't exist, just
return an empty list (it's a query param ffs)

closes #141
2017-09-20 06:50:49 -07:00

820 lines
18 KiB
Go

package sql
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/fnproject/fn/api/models"
"github.com/go-sql-driver/mysql"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
_ "github.com/lib/pq"
"github.com/mattn/go-sqlite3"
_ "github.com/mattn/go-sqlite3"
"github.com/sirupsen/logrus"
)
// this aims to be an ANSI-SQL compliant package that uses only question
// mark syntax for var placement, leaning on sqlx to make compatible all
// queries to the actual underlying datastore.
//
// currently tested and working are postgres, mysql and sqlite3.
var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
app_name varchar(256) NOT NULL,
path varchar(256) NOT NULL,
image varchar(256) NOT NULL,
format varchar(16) NOT NULL,
memory int NOT NULL,
timeout int NOT NULL,
idle_timeout int NOT NULL,
type varchar(16) NOT NULL,
headers text NOT NULL,
config text NOT NULL,
PRIMARY KEY (app_name, path)
);`,
`CREATE TABLE IF NOT EXISTS apps (
name varchar(256) NOT NULL PRIMARY KEY,
config text NOT NULL
);`,
`CREATE TABLE IF NOT EXISTS calls (
created_at varchar(256) NOT NULL,
started_at varchar(256) NOT NULL,
completed_at varchar(256) NOT NULL,
status varchar(256) NOT NULL,
id varchar(256) NOT NULL,
app_name varchar(256) NOT NULL,
path varchar(256) NOT NULL,
PRIMARY KEY (id)
);`,
`CREATE TABLE IF NOT EXISTS logs (
id varchar(256) NOT NULL PRIMARY KEY,
app_name varchar(256) NOT NULL,
log text NOT NULL
);`,
}
const (
routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls`
)
type sqlStore struct {
db *sqlx.DB
// TODO we should prepare all of the statements, rebind them
// and store them all here.
}
// New will open the db specified by url, create any tables necessary
// and return a models.Datastore safe for concurrent usage.
func New(url *url.URL) (models.Datastore, error) {
driver := url.Scheme
// driver must be one of these for sqlx to work, double check:
switch driver {
case "postgres", "pgx", "mysql", "sqlite3", "oci8", "ora", "goracle":
default:
return nil, errors.New("invalid db driver, refer to the code")
}
if driver == "sqlite3" {
// make all the dirs so we can make the file..
dir := filepath.Dir(url.Path)
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
}
uri := url.String()
if driver != "postgres" {
// postgres seems to need this as a prefix in lib/pq, everyone else wants it stripped of scheme
uri = strings.TrimPrefix(url.String(), url.Scheme+"://")
}
sqldb, err := sql.Open(driver, uri)
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db")
return nil, err
}
db := sqlx.NewDb(sqldb, driver)
// force a connection and test that it worked
err = db.Ping()
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db")
return nil, err
}
maxIdleConns := 256 // TODO we need to strip this out of the URL probably
db.SetMaxIdleConns(maxIdleConns)
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed")
switch driver {
case "sqlite3":
db.SetMaxOpenConns(1)
}
for _, v := range tables {
_, err = db.Exec(v)
if err != nil {
return nil, err
}
}
return &sqlStore{db: db}, nil
}
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
var cbyte []byte
var err error
if app.Config != nil {
cbyte, err = json.Marshal(app.Config)
if err != nil {
return nil, err
}
}
query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);")
_, err = ds.db.ExecContext(ctx, query, app.Name, string(cbyte))
if err != nil {
switch err := err.(type) {
case *mysql.MySQLError:
if err.Number == 1062 {
return nil, models.ErrAppsAlreadyExists
}
case *pq.Error:
if err.Code == "23505" {
return nil, models.ErrAppsAlreadyExists
}
case sqlite3.Error:
if err.ExtendedCode == sqlite3.ErrConstraintUnique || err.ExtendedCode == sqlite3.ErrConstraintPrimaryKey {
return nil, models.ErrAppsAlreadyExists
}
}
return nil, err
}
return app, nil
}
func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
app := &models.App{Name: newapp.Name}
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT config FROM apps WHERE name=?`)
row := tx.QueryRowContext(ctx, query, app.Name)
var config string
if err := row.Scan(&config); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
return err
}
if config != "" {
err := json.Unmarshal([]byte(config), &app.Config)
if err != nil {
return err
}
}
app.UpdateConfig(newapp.Config)
cbyte, err := json.Marshal(app.Config)
if err != nil {
return err
}
query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`)
res, err := tx.ExecContext(ctx, query, string(cbyte), app.Name)
if err != nil {
return err
}
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
// inside of the transaction, we are querying for the app, so we know that it exists
return nil
}
return nil
})
if err != nil {
return nil, err
}
return app, nil
}
func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error {
return ds.Tx(func(tx *sqlx.Tx) error {
res, err := tx.ExecContext(ctx, tx.Rebind(`DELETE FROM apps WHERE name=?`), appName)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return models.ErrAppsNotFound
}
deletes := []string{
`DELETE FROM logs WHERE app_name=?`,
`DELETE FROM calls WHERE app_name=?`,
`DELETE FROM routes WHERE app_name=?`,
}
for _, stmt := range deletes {
_, err := tx.ExecContext(ctx, tx.Rebind(stmt), appName)
if err != nil {
return err
}
}
return nil
})
}
func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) {
query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`)
row := ds.db.QueryRowContext(ctx, query, name)
var resName, config string
err := row.Scan(&resName, &config)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrAppsNotFound
}
return nil, err
}
res := &models.App{
Name: resName,
}
if len(config) > 0 {
err := json.Unmarshal([]byte(config), &res.Config)
if err != nil {
return nil, err
}
}
return res, nil
}
// GetApps retrieves an array of apps according to a specific filter.
func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
res := []*models.App{}
query, args := buildFilterAppQuery(filter)
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", query))
rows, err := ds.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var app models.App
err := scanApp(rows, &app)
if err != nil {
if err == sql.ErrNoRows {
return res, nil
}
return res, err
}
res = append(res, &app)
}
if err := rows.Err(); err != nil {
return res, err
}
return res, nil
}
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return nil, err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return nil, err
}
err = ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`)
r := tx.QueryRowContext(ctx, query, route.AppName)
if err := r.Scan(new(int)); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
}
query = tx.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`)
same, err := tx.QueryContext(ctx, query, route.AppName, route.Path)
if err != nil {
return err
}
defer same.Close()
if same.Next() {
return models.ErrRoutesAlreadyExists
}
query = tx.Rebind(`INSERT INTO routes (
app_name,
path,
image,
format,
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`)
_, err = tx.ExecContext(ctx, query,
route.AppName,
route.Path,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
return err
})
return route, err
}
func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
var route models.Route
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector))
row := tx.QueryRowContext(ctx, query, newroute.AppName, newroute.Path)
if err := scanRoute(row, &route); err == sql.ErrNoRows {
return models.ErrRoutesNotFound
} else if err != nil {
return err
}
route.Update(newroute)
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return err
}
query = tx.Rebind(`UPDATE routes SET
image = ?,
format = ?,
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name=? AND path=?;`)
res, err := tx.ExecContext(ctx, query,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
route.AppName,
route.Path,
)
if err != nil {
return err
}
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
// inside of the transaction, we are querying for the row, so we know that it exists
return nil
}
return nil
})
if err != nil {
return nil, err
}
return &route, nil
}
func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`)
res, err := ds.db.ExecContext(ctx, query, routePath, appName)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return models.ErrRoutesNotFound
}
return nil
}
func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
rSelectCondition := "%s WHERE app_name=? AND path=?"
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
row := ds.db.QueryRowContext(ctx, query, appName, routePath)
var route models.Route
err := scanRoute(row, &route)
if err == sql.ErrNoRows {
return nil, models.ErrRoutesNotFound
} else if err != nil {
return nil, err
}
return &route, nil
}
// GetRoutesByApp retrieves a route with a specific app name.
func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) {
res := []*models.Route{}
if filter == nil {
filter = new(models.RouteFilter)
}
filter.AppName = appName
filterQuery, args := buildFilterRouteQuery(filter)
query := fmt.Sprintf("%s %s", routeSelector, filterQuery)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
if err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
return nil, err
}
defer rows.Close()
for rows.Next() {
var route models.Route
err := scanRoute(rows, &route)
if err != nil {
continue
}
res = append(res, &route)
}
if err := rows.Err(); err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
}
}
return res, nil
}
func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
tx, err := ds.db.Beginx()
if err != nil {
return err
}
err = f(tx)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
query := ds.db.Rebind(`INSERT INTO calls (
id,
created_at,
started_at,
completed_at,
status,
app_name,
path
)
VALUES (?, ?, ?, ?, ?, ?, ?);`)
_, err := ds.db.ExecContext(ctx, query, call.ID, call.CreatedAt.String(),
call.StartedAt.String(), call.CompletedAt.String(),
call.Status, call.AppName, call.Path)
if err != nil {
return err
}
return nil
}
// TODO calls are not fully qualified in this backend currently. need to discuss,
// if we store the whole thing then it adds a lot of disk space and then we can
// make async only queue hints instead of entire calls (mq a lot smaller space wise). pick.
func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)
query = ds.db.Rebind(query)
row := ds.db.QueryRowContext(ctx, query, callID, appName)
var call models.Call
err := scanCall(row, &call)
if err != nil {
return nil, err
}
return &call, nil
}
func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
res := []*models.Call{}
query, args := buildFilterCallQuery(filter)
query = fmt.Sprintf("%s %s", callSelector, query)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var call models.Call
err := scanCall(rows, &call)
if err != nil {
continue
}
res = append(res, &call)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
func (ds *sqlStore) InsertLog(ctx context.Context, appName, callID string, logR io.Reader) error {
// coerce this into a string for sql
var log string
if stringer, ok := logR.(fmt.Stringer); ok {
log = stringer.String()
} else {
// TODO we could optimize for Size / buffer pool, but atm we aren't hitting
// this code path anyway (a fallback)
var b bytes.Buffer
io.Copy(&b, logR)
log = b.String()
}
query := ds.db.Rebind(`INSERT INTO logs (id, app_name, log) VALUES (?, ?, ?);`)
_, err := ds.db.ExecContext(ctx, query, callID, appName, log)
return err
}
func (ds *sqlStore) GetLog(ctx context.Context, appName, callID string) (*models.CallLog, error) {
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_name=?`)
row := ds.db.QueryRowContext(ctx, query, callID, appName)
var log string
err := row.Scan(&log)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrCallLogNotFound
}
return nil, err
}
return &models.CallLog{
CallID: callID,
Log: log,
AppName: appName,
}, nil
}
func (ds *sqlStore) DeleteLog(ctx context.Context, appName, callID string) error {
query := ds.db.Rebind(`DELETE FROM logs WHERE id=? AND app_name=?`)
_, err := ds.db.ExecContext(ctx, query, callID, appName)
return err
}
// TODO scrap for sqlx scanx ?? some things aren't perfect (e.g. config is a json string)
type RowScanner interface {
Scan(dest ...interface{}) error
}
func ScanLog(scanner RowScanner, log *models.CallLog) error {
return scanner.Scan(
&log.CallID,
&log.Log,
)
}
func scanRoute(scanner RowScanner, route *models.Route) error {
var headerStr string
var configStr string
err := scanner.Scan(
&route.AppName,
&route.Path,
&route.Image,
&route.Format,
&route.Memory,
&route.Type,
&route.Timeout,
&route.IdleTimeout,
&headerStr,
&configStr,
)
if err != nil {
return err
}
if len(headerStr) > 0 {
err = json.Unmarshal([]byte(headerStr), &route.Headers)
if err != nil {
return err
}
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &route.Config)
if err != nil {
return err
}
}
return nil
}
func scanApp(scanner RowScanner, app *models.App) error {
var configStr string
err := scanner.Scan(
&app.Name,
&configStr,
)
if err != nil {
return err
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &app.Config)
if err != nil {
return err
}
}
return nil
}
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s`, colOp)
}
}
}
where("app_name=? ", filter.AppName)
where("image=?", filter.Image)
where("path>?", filter.Cursor)
// where("path LIKE ?%", filter.PathPrefix) TODO needs escaping
fmt.Fprintf(&b, ` ORDER BY path ASC`) // TODO assert this is indexed
fmt.Fprintf(&b, ` LIMIT ?`)
args = append(args, filter.PerPage)
return b.String(), args
}
func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s`, colOp)
}
}
}
// where("name LIKE ?%", filter.Name) // TODO needs escaping?
where("name>?", filter.Cursor)
fmt.Fprintf(&b, ` ORDER BY name ASC`) // TODO assert this is indexed
fmt.Fprintf(&b, ` LIMIT ?`)
args = append(args, filter.PerPage)
return b.String(), args
}
func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s?`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s?`, colOp)
}
}
}
where("id<", filter.Cursor)
if !time.Time(filter.ToTime).IsZero() {
where("created_at<", filter.ToTime.String())
}
if !time.Time(filter.FromTime).IsZero() {
where("created_at>", filter.FromTime.String())
}
where("app_name=", filter.AppName)
where("path=", filter.Path)
fmt.Fprintf(&b, ` ORDER BY id DESC`) // TODO assert this is indexed
fmt.Fprintf(&b, ` LIMIT ?`)
args = append(args, filter.PerPage)
return b.String(), args
}
func scanCall(scanner RowScanner, call *models.Call) error {
err := scanner.Scan(
&call.ID,
&call.CreatedAt,
&call.StartedAt,
&call.CompletedAt,
&call.Status,
&call.AppName,
&call.Path,
)
if err == sql.ErrNoRows {
return models.ErrCallNotFound
} else if err != nil {
return err
}
return nil
}
// GetDatabase returns the underlying sqlx database implementation
func (ds *sqlStore) GetDatabase() *sqlx.DB {
return ds.db
}