mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* move calls to logstore, implement s3 closes #482 the basic motivation is that logs and calls will be stored with a very high write rate, while apps and routes will be relatively infrequently updated; it follows that we should likely split up their storage location, to back them with appropriate storage facilities. s3 is a good candidate for ingesting higher write rate data than a sql database, and will make it easier to manage that data set. can read #482 for more detailed justification. summary: * calls api moved from datastore to logstore * logstore used in front-end to serve calls endpoints * agent now throws calls into logstore instead of datastore * s3 implementation of calls api for logstore * s3 logs key changed (nobody using / nbd?) * removed UpdateCall api (not in use) * moved call tests from datastore to logstore tests * mock logstore now tested (prev. sqlite3 only) * logstore tests run against every datastore (mysql, pg; prev. only sqlite3) * simplify NewMock in tests commentary: brunt of the work is implementing the listing of calls in GetCalls for the s3 logstore implementation. the GetCalls API requires returning items in the newest to oldest order, and the s3 api lists items in lexicographic order based on created_at. An easy thing to do here seemed to be to reverse the encoding of our id format to return a lexicographically descending order, since ids are time based, reasonably encoded to be lexicographically sortable, and de-duped (unlike created_at). This seems to work pretty well, it's not perfect around the boundaries of to_time and from_time and a tiny amount of results may be omitted, but to me this doesn't seem like a deal breaker to get 6999 results instead of 7000 when trying to get calls between 3:00pm and 4:00pm Monday 3 weeks ago. Of course, without to_time and from_time, there are no issues in listing results. We could use created at and encode it, but it would be an additional marker for point lookup (GetCall) since we would have to search for a created_at stamp, search for ids around that until we find the matching one, just to do a point lookup. So, the tradeoff here seems worth it. There is additional optimization around to_time to seek over newer results (since we have descending order). The other complication in GetCalls is returning a list of calls for a given path. Since the keys to do point lookups are only app_id + call_id, and we need listing across an app as well, this leads us to the 'marker' collection which is sorted by app_id + path + call_id, to allow quick listing by path. All in all, it should be pretty straightforward to follow the implementation and I tried to be lavish with the comments, please let me know if anything needs further clarification in the code. The implementation itself has some glaring inefficiencies, but they're relatively minute: json encoding is kinda lazy, but workable; s3 doesn't offer batch retrieval, so we point look up each call one by one in get call; not re-using buffers -- but the seeking around the keys should all be relatively fast, not too worried about performance really and this isn't a hot path for reads (need to make a cut point and turn this in!). Interestingly, in testing, minio performs significantly worse than pg for storing both logs and calls (or just logs, I tested that too). minio seems to have really high cpu consumption, but in any event, we won't be using minio, we'll be using a cloud object store that implements the s3 api. Anyway, mostly a knock on using minio for high performance, not really anything to do with this, just thought it was interesting. I think it's safe to remove UpdateCall, admittedly this made implementing the s3 api a lot easier. This operation may also be something we never need, it was unused at present and was only in the cards for a previous hybrid implementation, which we've now abandoned. If we need, we can always resurrect from git. Also not worried about changing the log key, we need to put a prefix on this thing anyway, but I don't think anybody is using this anyway. in any event, it simply means old logs won't show up through the API, but aside from nobody using this yet, that doesn't seem a big deal breaker really -- new logs will appear fine. future: TODO make logstore implementation optional for datastore, check in front-end at runtime and offer a nil logstore that errors appropriately TODO low hanging fruit optimizations of json encoding, re-using buffers for download, get multiple calls at a time, id reverse encoding could be optimized like normal encoding to not be n^2 TODO api for range removal of logs and calls * address review comments * push id to_time magic into id package * add note about s3 key sizes * fix validation check
854 lines
20 KiB
Go
854 lines
20 KiB
Go
package sql
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/datastore/sql/migratex"
|
|
"github.com/fnproject/fn/api/datastore/sql/migrations"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/go-sql-driver/mysql"
|
|
_ "github.com/go-sql-driver/mysql"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/lib/pq"
|
|
_ "github.com/lib/pq"
|
|
"github.com/mattn/go-sqlite3"
|
|
_ "github.com/mattn/go-sqlite3"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// this aims to be an ANSI-SQL compliant package that uses only question
|
|
// mark syntax for var placement, leaning on sqlx to make compatible all
|
|
// queries to the actual underlying datastore.
|
|
//
|
|
// currently tested and working are postgres, mysql and sqlite3.
|
|
|
|
// TODO routes.created_at should be varchar(256), mysql will store 'text'
|
|
// fields not contiguous with other fields and this field is a fixed size,
|
|
// we'll get better locality with varchar. it's not terribly easy to do this
|
|
// with migrations (sadly, need complex transaction)
|
|
|
|
var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes (
|
|
app_id varchar(256) NOT NULL,
|
|
path varchar(256) NOT NULL,
|
|
image varchar(256) NOT NULL,
|
|
format varchar(16) NOT NULL,
|
|
memory int NOT NULL,
|
|
cpus int,
|
|
timeout int NOT NULL,
|
|
idle_timeout int NOT NULL,
|
|
type varchar(16) NOT NULL,
|
|
headers text NOT NULL,
|
|
config text NOT NULL,
|
|
annotations text NOT NULL,
|
|
created_at text,
|
|
updated_at varchar(256),
|
|
PRIMARY KEY (app_id, path)
|
|
);`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS apps (
|
|
id varchar(256),
|
|
name varchar(256) NOT NULL PRIMARY KEY,
|
|
config text NOT NULL,
|
|
annotations text NOT NULL,
|
|
created_at varchar(256),
|
|
updated_at varchar(256)
|
|
);`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS calls (
|
|
created_at varchar(256) NOT NULL,
|
|
started_at varchar(256) NOT NULL,
|
|
completed_at varchar(256) NOT NULL,
|
|
status varchar(256) NOT NULL,
|
|
id varchar(256) NOT NULL,
|
|
app_id varchar(256) NOT NULL,
|
|
path varchar(256) NOT NULL,
|
|
stats text,
|
|
error text,
|
|
PRIMARY KEY (id)
|
|
);`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS logs (
|
|
id varchar(256) NOT NULL PRIMARY KEY,
|
|
app_id varchar(256) NOT NULL,
|
|
log text NOT NULL
|
|
);`,
|
|
}
|
|
|
|
const (
|
|
routeSelector = `SELECT app_id, path, image, format, memory, type, cpus, timeout, idle_timeout, headers, config, annotations, created_at, updated_at FROM routes`
|
|
callSelector = `SELECT id, created_at, started_at, completed_at, status, app_id, path, stats, error FROM calls`
|
|
appIDSelector = `SELECT id, name, config, annotations, created_at, updated_at FROM apps WHERE id=?`
|
|
ensureAppSelector = `SELECT id FROM apps WHERE name=?`
|
|
)
|
|
|
|
type sqlStore struct {
|
|
db *sqlx.DB
|
|
}
|
|
|
|
// New will open the db specified by url, create any tables necessary
|
|
// and return a models.Datastore safe for concurrent usage.
|
|
func New(ctx context.Context, url *url.URL) (models.Datastore, error) {
|
|
return newDS(ctx, url)
|
|
}
|
|
|
|
// for test methods, return concrete type, but don't expose
|
|
func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) {
|
|
driver := url.Scheme
|
|
|
|
log := common.Logger(ctx)
|
|
// driver must be one of these for sqlx to work, double check:
|
|
switch driver {
|
|
case "postgres", "pgx", "mysql", "sqlite3":
|
|
default:
|
|
return nil, errors.New("invalid db driver, refer to the code")
|
|
}
|
|
|
|
if driver == "sqlite3" {
|
|
// make all the dirs so we can make the file..
|
|
dir := filepath.Dir(url.Path)
|
|
err := os.MkdirAll(dir, 0755)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
uri := url.String()
|
|
if driver != "postgres" {
|
|
// postgres seems to need this as a prefix in lib/pq, everyone else wants it stripped of scheme
|
|
uri = strings.TrimPrefix(url.String(), url.Scheme+"://")
|
|
}
|
|
|
|
sqldb, err := sql.Open(driver, uri)
|
|
if err != nil {
|
|
log.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't open db")
|
|
return nil, err
|
|
}
|
|
|
|
db := sqlx.NewDb(sqldb, driver)
|
|
// force a connection and test that it worked
|
|
err = pingWithRetry(ctx, 10, time.Second*1, db)
|
|
if err != nil {
|
|
log.WithFields(logrus.Fields{"url": uri}).WithError(err).Error("couldn't ping db")
|
|
return nil, err
|
|
}
|
|
|
|
maxIdleConns := 256 // TODO we need to strip this out of the URL probably
|
|
db.SetMaxIdleConns(maxIdleConns)
|
|
log.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns, "datastore": driver}).Info("datastore dialed")
|
|
|
|
sdb := &sqlStore{db: db}
|
|
|
|
err = sdb.runMigrations(ctx, checkExistence(db), migrations.Migrations)
|
|
if err != nil {
|
|
log.WithError(err).Error("error running migrations")
|
|
return nil, err
|
|
}
|
|
|
|
switch driver {
|
|
case "sqlite3":
|
|
db.SetMaxOpenConns(1)
|
|
}
|
|
for _, v := range tables {
|
|
_, err = db.ExecContext(ctx, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return sdb, nil
|
|
}
|
|
|
|
func pingWithRetry(ctx context.Context, attempts int, sleep time.Duration, db *sqlx.DB) (err error) {
|
|
for i := 0; i < attempts; i++ {
|
|
err = db.PingContext(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
time.Sleep(sleep)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// checkExistence checks if tables have been created yet, it is not concerned
|
|
// about the existence of the schema migration version (since migrations were
|
|
// added to existing dbs, we need to know whether the db exists without migrations
|
|
// or if it's brand new).
|
|
func checkExistence(db *sqlx.DB) bool {
|
|
query := db.Rebind(`SELECT name FROM apps LIMIT 1`)
|
|
row := db.QueryRow(query)
|
|
|
|
var dummy string
|
|
err := row.Scan(&dummy)
|
|
if err != nil && err != sql.ErrNoRows {
|
|
// TODO we should probably ensure this is a certain 'no such table' error
|
|
// and if it's not that or err no rows, we should probably block start up.
|
|
// if we return false here spuriously, then migrations could be skipped,
|
|
// which would be bad.
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// check if the db already existed, if the db is brand new then we can skip
|
|
// over all the migrations BUT we must be sure to set the right migration
|
|
// number so that only current migrations are skipped, not any future ones.
|
|
func (ds *sqlStore) runMigrations(ctx context.Context, dbExists bool, migrations []migratex.Migration) error {
|
|
if !dbExists {
|
|
// set to highest and bail
|
|
return ds.Tx(func(tx *sqlx.Tx) error {
|
|
return migratex.SetVersion(ctx, tx, latestVersion(migrations), false)
|
|
})
|
|
}
|
|
|
|
// run any migrations needed to get to latest, if any
|
|
return migratex.Up(ctx, ds.db, migrations)
|
|
}
|
|
|
|
// latest version will find the latest version from a list of migration
|
|
// names (not from the db)
|
|
func latestVersion(migs []migratex.Migration) int64 {
|
|
var highest int64
|
|
for _, mig := range migs {
|
|
if mig.Version() > highest {
|
|
highest = mig.Version()
|
|
}
|
|
}
|
|
|
|
return highest
|
|
}
|
|
|
|
// clear is for tests only, be careful, it deletes all records.
|
|
func (ds *sqlStore) clear() error {
|
|
return ds.Tx(func(tx *sqlx.Tx) error {
|
|
query := tx.Rebind(`DELETE FROM routes`)
|
|
_, err := tx.Exec(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query = tx.Rebind(`DELETE FROM calls`)
|
|
_, err = tx.Exec(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query = tx.Rebind(`DELETE FROM apps`)
|
|
_, err = tx.Exec(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query = tx.Rebind(`DELETE FROM logs`)
|
|
_, err = tx.Exec(query)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (ds *sqlStore) GetAppID(ctx context.Context, appName string) (string, error) {
|
|
var app models.App
|
|
query := ds.db.Rebind(ensureAppSelector)
|
|
row := ds.db.QueryRowxContext(ctx, query, appName)
|
|
|
|
err := row.StructScan(&app)
|
|
if err == sql.ErrNoRows {
|
|
return "", models.ErrAppsNotFound
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return app.ID, nil
|
|
}
|
|
|
|
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
|
query := ds.db.Rebind(`INSERT INTO apps (
|
|
id,
|
|
name,
|
|
config,
|
|
annotations,
|
|
created_at,
|
|
updated_at
|
|
)
|
|
VALUES (
|
|
:id,
|
|
:name,
|
|
:config,
|
|
:annotations,
|
|
:created_at,
|
|
:updated_at
|
|
);`)
|
|
_, err := ds.db.NamedExecContext(ctx, query, app)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case *mysql.MySQLError:
|
|
if err.Number == 1062 {
|
|
return nil, models.ErrAppsAlreadyExists
|
|
}
|
|
case *pq.Error:
|
|
if err.Code == "23505" {
|
|
return nil, models.ErrAppsAlreadyExists
|
|
}
|
|
case sqlite3.Error:
|
|
if err.ExtendedCode == sqlite3.ErrConstraintUnique || err.ExtendedCode == sqlite3.ErrConstraintPrimaryKey {
|
|
return nil, models.ErrAppsAlreadyExists
|
|
}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return app, nil
|
|
}
|
|
|
|
func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
|
|
var app models.App
|
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
|
// NOTE: must query whole object since we're returning app, Update logic
|
|
// must only modify modifiable fields (as seen here). need to fix brittle..
|
|
|
|
query := tx.Rebind(appIDSelector)
|
|
row := tx.QueryRowxContext(ctx, query, newapp.ID)
|
|
|
|
err := row.StructScan(&app)
|
|
if err == sql.ErrNoRows {
|
|
return models.ErrAppsNotFound
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
app.Update(newapp)
|
|
err = app.Validate()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query = tx.Rebind(`UPDATE apps SET config=:config, annotations=:annotations, updated_at=:updated_at WHERE name=:name`)
|
|
res, err := tx.NamedExecContext(ctx, query, app)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n, err := res.RowsAffected(); err != nil {
|
|
return err
|
|
} else if n == 0 {
|
|
// inside of the transaction, we are querying for the app, so we know that it exists
|
|
return nil
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &app, nil
|
|
}
|
|
|
|
func (ds *sqlStore) RemoveApp(ctx context.Context, appID string) error {
|
|
return ds.Tx(func(tx *sqlx.Tx) error {
|
|
res, err := tx.ExecContext(ctx, tx.Rebind(`DELETE FROM apps WHERE id=?`), appID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := res.RowsAffected()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n == 0 {
|
|
return models.ErrAppsNotFound
|
|
}
|
|
|
|
deletes := []string{
|
|
`DELETE FROM logs WHERE app_id=?`,
|
|
`DELETE FROM calls WHERE app_id=?`,
|
|
`DELETE FROM routes WHERE app_id=?`,
|
|
}
|
|
for _, stmt := range deletes {
|
|
_, err := tx.ExecContext(ctx, tx.Rebind(stmt), appID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ds *sqlStore) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
|
var app models.App
|
|
query := ds.db.Rebind(appIDSelector)
|
|
row := ds.db.QueryRowxContext(ctx, query, appID)
|
|
|
|
err := row.StructScan(&app)
|
|
if err == sql.ErrNoRows {
|
|
return nil, models.ErrAppsNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &app, err
|
|
}
|
|
|
|
// GetApps retrieves an array of apps according to a specific filter.
|
|
func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
|
res := []*models.App{}
|
|
if filter.NameIn != nil && len(filter.NameIn) == 0 { // this basically makes sure it doesn't return ALL apps
|
|
return res, nil
|
|
}
|
|
query, args, err := buildFilterAppQuery(filter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config, annotations, created_at, updated_at FROM apps %s", query))
|
|
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var app models.App
|
|
err := rows.StructScan(&app)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return res, nil
|
|
}
|
|
return res, err
|
|
}
|
|
res = append(res, &app)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return res, err
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
|
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
|
|
r := tx.QueryRowContext(ctx, query, route.AppID)
|
|
if err := r.Scan(new(int)); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return models.ErrAppsNotFound
|
|
}
|
|
}
|
|
query = tx.Rebind(`SELECT 1 FROM routes WHERE app_id=? AND path=?`)
|
|
same, err := tx.QueryContext(ctx, query, route.AppID, route.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer same.Close()
|
|
if same.Next() {
|
|
return models.ErrRoutesAlreadyExists
|
|
}
|
|
|
|
query = tx.Rebind(`INSERT INTO routes (
|
|
app_id,
|
|
path,
|
|
image,
|
|
format,
|
|
memory,
|
|
cpus,
|
|
type,
|
|
timeout,
|
|
idle_timeout,
|
|
headers,
|
|
config,
|
|
annotations,
|
|
created_at,
|
|
updated_at
|
|
)
|
|
VALUES (
|
|
:app_id,
|
|
:path,
|
|
:image,
|
|
:format,
|
|
:memory,
|
|
:cpus,
|
|
:type,
|
|
:timeout,
|
|
:idle_timeout,
|
|
:headers,
|
|
:config,
|
|
:annotations,
|
|
:created_at,
|
|
:updated_at
|
|
);`)
|
|
|
|
_, err = tx.NamedExecContext(ctx, query, route)
|
|
|
|
return err
|
|
})
|
|
|
|
return route, err
|
|
}
|
|
|
|
func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
|
|
var route models.Route
|
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
|
query := tx.Rebind(fmt.Sprintf("%s WHERE app_id=? AND path=?", routeSelector))
|
|
row := tx.QueryRowxContext(ctx, query, newroute.AppID, newroute.Path)
|
|
|
|
err := row.StructScan(&route)
|
|
if err == sql.ErrNoRows {
|
|
return models.ErrRoutesNotFound
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
route.Update(newroute)
|
|
err = route.Validate()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
query = tx.Rebind(`UPDATE routes SET
|
|
image = :image,
|
|
format = :format,
|
|
memory = :memory,
|
|
cpus = :cpus,
|
|
type = :type,
|
|
timeout = :timeout,
|
|
idle_timeout = :idle_timeout,
|
|
headers = :headers,
|
|
config = :config,
|
|
annotations = :annotations,
|
|
updated_at = :updated_at
|
|
WHERE app_id=:app_id AND path=:path;`)
|
|
|
|
res, err := tx.NamedExecContext(ctx, query, &route)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n, err := res.RowsAffected(); err != nil {
|
|
return err
|
|
} else if n == 0 {
|
|
// inside of the transaction, we are querying for the row, so we know that it exists
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &route, nil
|
|
}
|
|
|
|
func (ds *sqlStore) RemoveRoute(ctx context.Context, appID string, routePath string) error {
|
|
query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_id = ?`)
|
|
res, err := ds.db.ExecContext(ctx, query, routePath, appID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n, err := res.RowsAffected()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n == 0 {
|
|
return models.ErrRoutesNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ds *sqlStore) GetRoute(ctx context.Context, appID, routePath string) (*models.Route, error) {
|
|
rSelectCondition := "%s WHERE app_id=? AND path=?"
|
|
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
|
|
row := ds.db.QueryRowxContext(ctx, query, appID, routePath)
|
|
|
|
var route models.Route
|
|
err := row.StructScan(&route)
|
|
if err == sql.ErrNoRows {
|
|
return nil, models.ErrRoutesNotFound
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
return &route, nil
|
|
}
|
|
|
|
// GetRoutesByApp retrieves a route with a specific app name.
|
|
func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appID string, filter *models.RouteFilter) ([]*models.Route, error) {
|
|
res := []*models.Route{}
|
|
if filter == nil {
|
|
filter = new(models.RouteFilter)
|
|
}
|
|
|
|
filter.AppID = appID
|
|
filterQuery, args := buildFilterRouteQuery(filter)
|
|
|
|
query := fmt.Sprintf("%s %s", routeSelector, filterQuery)
|
|
query = ds.db.Rebind(query)
|
|
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return res, nil // no error for empty list
|
|
}
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var route models.Route
|
|
err := rows.StructScan(&route)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
res = append(res, &route)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return res, nil // no error for empty list
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
|
|
tx, err := ds.db.Beginx()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = f(tx)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return err
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
|
|
query := ds.db.Rebind(`INSERT INTO calls (
|
|
id,
|
|
created_at,
|
|
started_at,
|
|
completed_at,
|
|
status,
|
|
app_id,
|
|
path,
|
|
stats,
|
|
error
|
|
)
|
|
VALUES (
|
|
:id,
|
|
:created_at,
|
|
:started_at,
|
|
:completed_at,
|
|
:status,
|
|
:app_id,
|
|
:path,
|
|
:stats,
|
|
:error
|
|
);`)
|
|
|
|
_, err := ds.db.NamedExecContext(ctx, query, call)
|
|
return err
|
|
}
|
|
|
|
func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
|
query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector)
|
|
query = ds.db.Rebind(query)
|
|
row := ds.db.QueryRowxContext(ctx, query, callID, appID)
|
|
|
|
var call models.Call
|
|
err := row.StructScan(&call)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, models.ErrCallNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
return &call, nil
|
|
}
|
|
|
|
func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
|
res := []*models.Call{}
|
|
query, args := buildFilterCallQuery(filter)
|
|
query = fmt.Sprintf("%s %s", callSelector, query)
|
|
query = ds.db.Rebind(query)
|
|
rows, err := ds.db.QueryxContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var call models.Call
|
|
err := rows.StructScan(&call)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
res = append(res, &call)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (ds *sqlStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error {
|
|
// coerce this into a string for sql
|
|
var log string
|
|
if stringer, ok := logR.(fmt.Stringer); ok {
|
|
log = stringer.String()
|
|
} else {
|
|
// TODO we could optimize for Size / buffer pool, but atm we aren't hitting
|
|
// this code path anyway (a fallback)
|
|
var b bytes.Buffer
|
|
io.Copy(&b, logR)
|
|
log = b.String()
|
|
}
|
|
|
|
query := ds.db.Rebind(`INSERT INTO logs (id, app_id, log) VALUES (?, ?, ?);`)
|
|
_, err := ds.db.ExecContext(ctx, query, callID, appID, log)
|
|
|
|
return err
|
|
}
|
|
|
|
func (ds *sqlStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
|
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`)
|
|
row := ds.db.QueryRowContext(ctx, query, callID, appID)
|
|
|
|
var log string
|
|
err := row.Scan(&log)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, models.ErrCallLogNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return strings.NewReader(log), nil
|
|
}
|
|
|
|
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
|
|
if filter == nil {
|
|
return "", nil
|
|
}
|
|
var b bytes.Buffer
|
|
var args []interface{}
|
|
|
|
where := func(colOp, val string) {
|
|
if val != "" {
|
|
args = append(args, val)
|
|
if len(args) == 1 {
|
|
fmt.Fprintf(&b, `WHERE %s`, colOp)
|
|
} else {
|
|
fmt.Fprintf(&b, ` AND %s`, colOp)
|
|
}
|
|
}
|
|
}
|
|
|
|
where("app_id=? ", filter.AppID)
|
|
where("image=?", filter.Image)
|
|
where("path>?", filter.Cursor)
|
|
// where("path LIKE ?%", filter.PathPrefix) TODO needs escaping
|
|
|
|
fmt.Fprintf(&b, ` ORDER BY path ASC`) // TODO assert this is indexed
|
|
fmt.Fprintf(&b, ` LIMIT ?`)
|
|
args = append(args, filter.PerPage)
|
|
|
|
return b.String(), args
|
|
}
|
|
|
|
func buildFilterAppQuery(filter *models.AppFilter) (string, []interface{}, error) {
|
|
var args []interface{}
|
|
if filter == nil {
|
|
return "", args, nil
|
|
}
|
|
|
|
var b bytes.Buffer
|
|
|
|
// todo: this same thing is in several places in here, DRY it up across this file
|
|
where := func(colOp, val interface{}) {
|
|
if val == nil {
|
|
return
|
|
}
|
|
switch v := val.(type) {
|
|
case string:
|
|
if v == "" {
|
|
return
|
|
}
|
|
case []string:
|
|
if len(v) == 0 {
|
|
return
|
|
}
|
|
}
|
|
args = append(args, val)
|
|
if len(args) == 1 {
|
|
fmt.Fprintf(&b, `WHERE %s`, colOp)
|
|
} else {
|
|
fmt.Fprintf(&b, ` AND %s`, colOp)
|
|
}
|
|
}
|
|
|
|
// where("name LIKE ?%", filter.Name) // TODO needs escaping?
|
|
where("name>?", filter.Cursor)
|
|
where("name IN (?)", filter.NameIn)
|
|
|
|
fmt.Fprintf(&b, ` ORDER BY name ASC`) // TODO assert this is indexed
|
|
fmt.Fprintf(&b, ` LIMIT ?`)
|
|
args = append(args, filter.PerPage)
|
|
if len(filter.NameIn) > 0 {
|
|
return sqlx.In(b.String(), args...)
|
|
}
|
|
return b.String(), args, nil
|
|
}
|
|
|
|
func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
|
|
if filter == nil {
|
|
return "", nil
|
|
}
|
|
var b bytes.Buffer
|
|
var args []interface{}
|
|
|
|
where := func(colOp, val string) {
|
|
if val != "" {
|
|
args = append(args, val)
|
|
if len(args) == 1 {
|
|
fmt.Fprintf(&b, `WHERE %s?`, colOp)
|
|
} else {
|
|
fmt.Fprintf(&b, ` AND %s?`, colOp)
|
|
}
|
|
}
|
|
}
|
|
|
|
where("id<", filter.Cursor)
|
|
if !time.Time(filter.ToTime).IsZero() {
|
|
where("created_at<", filter.ToTime.String())
|
|
}
|
|
if !time.Time(filter.FromTime).IsZero() {
|
|
where("created_at>", filter.FromTime.String())
|
|
}
|
|
where("app_id=", filter.AppID)
|
|
where("path=", filter.Path)
|
|
|
|
fmt.Fprintf(&b, ` ORDER BY id DESC`) // TODO assert this is indexed
|
|
fmt.Fprintf(&b, ` LIMIT ?`)
|
|
args = append(args, filter.PerPage)
|
|
|
|
return b.String(), args
|
|
}
|
|
|
|
// GetDatabase returns the underlying sqlx database implementation
|
|
func (ds *sqlStore) GetDatabase() *sqlx.DB {
|
|
return ds.db
|
|
}
|