Files
fn-serverless/api/datastore/sql/sql.go
Denis Makogon 9066dca750 Use context-bound SQL methods
Changes:
   Exec     -> ExecContext
   Query    -> QueryContext
   QueryRow -> QueryRowContext

   Rebind and store all possible queries before using them

Closes: #276
2017-09-06 21:48:28 +03:00

784 lines
18 KiB
Go

package sql
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/sirupsen/logrus"
"github.com/fnproject/fn/api/models"
"github.com/go-sql-driver/mysql"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
_ "github.com/lib/pq"
"github.com/mattn/go-sqlite3"
_ "github.com/mattn/go-sqlite3"
)
// this aims to be an ANSI-SQL compliant package that uses only question
// mark syntax for var placement, leaning on sqlx to make compatible all
// queries to the actual underlying datastore.
//
// currently tested and working are postgres, mysql and sqlite3.
var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
app_name varchar(256) NOT NULL,
path varchar(256) NOT NULL,
image varchar(256) NOT NULL,
format varchar(16) NOT NULL,
memory int NOT NULL,
timeout int NOT NULL,
idle_timeout int NOT NULL,
type varchar(16) NOT NULL,
headers text NOT NULL,
config text NOT NULL,
PRIMARY KEY (app_name, path)
);`,
`CREATE TABLE IF NOT EXISTS apps (
name varchar(256) NOT NULL PRIMARY KEY,
config text NOT NULL
);`,
`CREATE TABLE IF NOT EXISTS calls (
created_at varchar(256) NOT NULL,
started_at varchar(256) NOT NULL,
completed_at varchar(256) NOT NULL,
status varchar(256) NOT NULL,
id varchar(256) NOT NULL,
app_name varchar(256) NOT NULL,
path varchar(256) NOT NULL,
PRIMARY KEY (id)
);`,
`CREATE TABLE IF NOT EXISTS logs (
id varchar(256) NOT NULL PRIMARY KEY,
log text NOT NULL
);`,
}
const (
routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config FROM routes`
callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path FROM calls`
)
type sqlStore struct {
db *sqlx.DB
insertAppQuery string
updateAppQuery string
getAppQuery string
selectAppConfigQuery string
removeAppQuery string
insertRouteQuery string
updateRouteQuery string
checkRouteAppQuery string
checkRouteQuery string
getRouteQuery string
removeRouteQuery string
insertCallQuery string
getCallQuery string
insertLogQuery string
getLogQuery string
deleteLogQuery string
}
// New will open the db specified by url, create any tables necessary
// and return a models.Datastore safe for concurrent usage.
func New(url *url.URL) (models.Datastore, error) {
driver := url.Scheme
// driver must be one of these for sqlx to work, double check:
switch driver {
case "postgres", "pgx", "mysql", "sqlite3", "oci8", "ora", "goracle":
default:
return nil, errors.New("invalid db driver, refer to the code")
}
if driver == "sqlite3" {
// make all the dirs so we can make the file..
dir := filepath.Dir(url.Path)
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
}
uri := url.String()
if driver != "postgres" {
// postgres seems to need this as a prefix in lib/pq, everyone else wants it stripped of scheme
uri = strings.TrimPrefix(url.String(), url.Scheme+"://")
}
sqldb, err := sql.Open(driver, uri)
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db")
return nil, err
}
db := sqlx.NewDb(sqldb, driver)
// force a connection and test that it worked
err = db.Ping()
if err != nil {
logrus.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db")
return nil, err
}
maxIdleConns := 256 // TODO we need to strip this out of the URL probably
db.SetMaxIdleConns(maxIdleConns)
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed")
switch driver {
case "sqlite3":
db.SetMaxOpenConns(1)
}
for _, v := range tables {
_, err = db.Exec(v)
if err != nil {
return nil, err
}
}
dstore := &sqlStore{
db: db,
insertAppQuery: db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);"),
selectAppConfigQuery: db.Rebind(`SELECT config FROM apps WHERE name=?`),
updateAppQuery: db.Rebind(`UPDATE apps SET config=? WHERE name=?`),
removeAppQuery: db.Rebind(`DELETE FROM apps WHERE name = ?`),
getAppQuery: db.Rebind(`SELECT name, config FROM apps WHERE name=?`),
checkRouteAppQuery: db.Rebind(`SELECT 1 FROM apps WHERE name=?`),
checkRouteQuery: db.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`),
insertRouteQuery: db.Rebind(`INSERT INTO routes (
app_name,
path,
image,
format,
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`),
getRouteQuery: db.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector)),
updateRouteQuery: db.Rebind(`UPDATE routes SET
image = ?,
format = ?,
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name=? AND path=?;`),
removeRouteQuery: db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`),
insertCallQuery: db.Rebind(`INSERT INTO calls (
id,
created_at,
started_at,
completed_at,
status,
app_name,
path
)
VALUES (?, ?, ?, ?, ?, ?, ?);`),
getCallQuery: db.Rebind(fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)),
insertLogQuery: db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`),
getLogQuery: db.Rebind(`SELECT log FROM logs WHERE id=?`),
deleteLogQuery: db.Rebind(`DELETE FROM logs WHERE id=?`),
}
return dstore, nil
}
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
var cbyte []byte
var err error
if app.Config != nil {
cbyte, err = json.Marshal(app.Config)
if err != nil {
return nil, err
}
}
_, err = ds.db.ExecContext(ctx, ds.insertAppQuery, app.Name, string(cbyte))
if err != nil {
switch err := err.(type) {
case *mysql.MySQLError:
if err.Number == 1062 {
return nil, models.ErrAppsAlreadyExists
}
case *pq.Error:
if err.Code == "23505" {
return nil, models.ErrAppsAlreadyExists
}
case sqlite3.Error:
if err.ExtendedCode == sqlite3.ErrConstraintUnique || err.ExtendedCode == sqlite3.ErrConstraintPrimaryKey {
return nil, models.ErrAppsAlreadyExists
}
}
return nil, err
}
return app, nil
}
func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
app := &models.App{Name: newapp.Name}
err := ds.Tx(func(tx *sqlx.Tx) error {
row := tx.QueryRowContext(ctx, ds.selectAppConfigQuery, app.Name)
var config string
if err := row.Scan(&config); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
return err
}
if config != "" {
err := json.Unmarshal([]byte(config), &app.Config)
if err != nil {
return err
}
}
app.UpdateConfig(newapp.Config)
cbyte, err := json.Marshal(app.Config)
if err != nil {
return err
}
res, err := tx.ExecContext(ctx, ds.updateAppQuery, string(cbyte), app.Name)
if err != nil {
return err
}
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
// inside of the transaction, we are querying for the app, so we know that it exists
return nil
}
return nil
})
if err != nil {
return nil, err
}
return app, nil
}
func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error {
_, err := ds.db.ExecContext(ctx, ds.removeAppQuery, appName)
return err
}
func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) {
row := ds.db.QueryRowContext(ctx, ds.getAppQuery, name)
var resName, config string
err := row.Scan(&resName, &config)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrAppsNotFound
}
return nil, err
}
res := &models.App{
Name: resName,
}
if len(config) > 0 {
err := json.Unmarshal([]byte(config), &res.Config)
if err != nil {
return nil, err
}
}
return res, nil
}
// GetApps retrieves an array of apps according to a specific filter.
func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
res := []*models.App{}
query, args := buildFilterAppQuery(filter)
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", query))
rows, err := ds.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var app models.App
err := scanApp(rows, &app)
if err != nil {
if err == sql.ErrNoRows {
return res, nil
}
return res, err
}
res = append(res, &app)
}
if err := rows.Err(); err != nil {
return res, err
}
return res, nil
}
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return nil, err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return nil, err
}
err = ds.Tx(func(tx *sqlx.Tx) error {
r := tx.QueryRowContext(ctx, ds.checkRouteAppQuery, route.AppName)
if err := r.Scan(new(int)); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
}
same, err := tx.QueryContext(ctx, ds.checkRouteQuery, route.AppName, route.Path)
if err != nil {
return err
}
defer same.Close()
if same.Next() {
return models.ErrRoutesAlreadyExists
}
_, err = tx.ExecContext(ctx, ds.insertRouteQuery,
route.AppName,
route.Path,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
return err
})
return route, err
}
func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
var route models.Route
err := ds.Tx(func(tx *sqlx.Tx) error {
row := tx.QueryRowContext(ctx, ds.getRouteQuery, newroute.AppName, newroute.Path)
if err := scanRoute(row, &route); err == sql.ErrNoRows {
return models.ErrRoutesNotFound
} else if err != nil {
return err
}
route.Update(newroute)
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return err
}
res, err := tx.ExecContext(ctx, ds.updateAppQuery,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
route.AppName,
route.Path,
)
if err != nil {
return err
}
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
// inside of the transaction, we are querying for the row, so we know that it exists
return nil
}
return nil
})
if err != nil {
return nil, err
}
return &route, nil
}
func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
res, err := ds.db.ExecContext(ctx, ds.removeRouteQuery, routePath, appName)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return models.ErrRoutesNotFound
}
return nil
}
func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
row := ds.db.QueryRowContext(ctx, ds.getRouteQuery, appName, routePath)
var route models.Route
err := scanRoute(row, &route)
if err == sql.ErrNoRows {
return nil, models.ErrRoutesNotFound
} else if err != nil {
return nil, err
}
return &route, nil
}
// GetRoutes retrieves an array of routes according to a specific filter.
func (ds *sqlStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) {
res := []*models.Route{}
query, args := buildFilterRouteQuery(filter)
query = ds.db.Rebind(fmt.Sprintf("%s %s", routeSelector, query))
rows, err := ds.db.QueryContext(ctx, query, args...)
// todo: check for no rows so we don't respond with a sql 500 err
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var route models.Route
err := scanRoute(rows, &route)
if err != nil {
continue
}
res = append(res, &route)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
/*
GetRoutesByApp retrieves a route with a specific app name.
*/
func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *models.RouteFilter) ([]*models.Route, error) {
res := []*models.Route{}
var filterQuery string
var args []interface{}
if filter == nil {
filterQuery = "WHERE app_name = ?"
args = []interface{}{appName}
} else {
filter.AppName = appName
filterQuery, args = buildFilterRouteQuery(filter)
}
query := fmt.Sprintf("%s %s", routeSelector, filterQuery)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
// todo: check for no rows so we don't respond with a sql 500 err
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var route models.Route
err := scanRoute(rows, &route)
if err != nil {
continue
}
res = append(res, &route)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
tx, err := ds.db.Beginx()
if err != nil {
return err
}
err = f(tx)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
_, err := ds.db.ExecContext(ctx, ds.insertCallQuery, call.ID, call.CreatedAt.String(),
call.StartedAt.String(), call.CompletedAt.String(),
call.Status, call.AppName, call.Path)
if err != nil {
return err
}
return nil
}
// TODO calls are not fully qualified in this backend currently. need to discuss,
// if we store the whole thing then it adds a lot of disk space and then we can
// make async only queue hints instead of entire calls (mq a lot smaller space wise). pick.
func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
row := ds.db.QueryRowContext(ctx, ds.getCallQuery, callID, appName)
var call models.Call
err := scanCall(row, &call)
if err != nil {
return nil, err
}
return &call, nil
}
func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
res := []*models.Call{}
query, args := buildFilterCallQuery(filter)
query = fmt.Sprintf("%s %s", callSelector, query)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var call models.Call
err := scanCall(rows, &call)
if err != nil {
continue
}
res = append(res, &call)
}
if err := rows.Err(); err != nil {
return nil, err
}
return res, nil
}
func (ds *sqlStore) InsertLog(ctx context.Context, callID, callLog string) error {
_, err := ds.db.ExecContext(ctx, ds.insertLogQuery, callID, callLog)
return err
}
func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog, error) {
row := ds.db.QueryRowContext(ctx, ds.getLogQuery, callID)
var log string
err := row.Scan(&log)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrCallLogNotFound
}
return nil, err
}
return &models.CallLog{
CallID: callID,
Log: log,
}, nil
}
func (ds *sqlStore) DeleteLog(ctx context.Context, callID string) error {
_, err := ds.db.ExecContext(ctx, ds.deleteLogQuery, callID)
return err
}
// TODO scrap for sqlx scanx ?? some things aren't perfect (e.g. config is a json string)
type RowScanner interface {
Scan(dest ...interface{}) error
}
func ScanLog(scanner RowScanner, log *models.CallLog) error {
return scanner.Scan(
&log.CallID,
&log.Log,
)
}
func scanRoute(scanner RowScanner, route *models.Route) error {
var headerStr string
var configStr string
err := scanner.Scan(
&route.AppName,
&route.Path,
&route.Image,
&route.Format,
&route.Memory,
&route.Type,
&route.Timeout,
&route.IdleTimeout,
&headerStr,
&configStr,
)
if err != nil {
return err
}
if len(headerStr) > 0 {
err = json.Unmarshal([]byte(headerStr), &route.Headers)
if err != nil {
return err
}
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &route.Config)
if err != nil {
return err
}
}
return nil
}
func scanApp(scanner RowScanner, app *models.App) error {
var configStr string
err := scanner.Scan(
&app.Name,
&configStr,
)
if err != nil {
return err
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &app.Config)
if err != nil {
return err
}
}
return nil
}
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s?`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s?`, colOp)
}
}
}
where("path=", filter.Path)
where("app_name=", filter.AppName)
where("image=", filter.Image)
return b.String(), args
}
func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}) {
if filter == nil || filter.Name == "" {
return "", nil
}
return "WHERE name LIKE ?", []interface{}{filter.Name}
}
func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
if filter == nil {
return "", nil
}
var b bytes.Buffer
var args []interface{}
where := func(colOp, val string) {
if val != "" {
args = append(args, val)
if len(args) == 1 {
fmt.Fprintf(&b, `WHERE %s?`, colOp)
} else {
fmt.Fprintf(&b, ` AND %s?`, colOp)
}
}
}
where("app_name=", filter.AppName)
if filter.Path != "" {
where("path=", filter.Path)
}
return b.String(), args
}
func scanCall(scanner RowScanner, call *models.Call) error {
err := scanner.Scan(
&call.ID,
&call.CreatedAt,
&call.StartedAt,
&call.CompletedAt,
&call.Status,
&call.AppName,
&call.Path,
)
if err == sql.ErrNoRows {
return models.ErrCallNotFound
} else if err != nil {
return err
}
return nil
}
// GetDatabase returns the underlying sqlx database implementation
func (ds *sqlStore) GetDatabase() *sqlx.DB {
return ds.db
}