Addressing review comments

reverting query string caching in favour of Go 1.9 sqlx features
 moving context definition out of call.End to upper level
This commit is contained in:
Denis Makogon
2017-09-06 09:21:59 +03:00
parent 6a541139a9
commit 9a89366d1b
5 changed files with 86 additions and 97 deletions

View File

@@ -222,9 +222,8 @@ func (a *agent) Submit(callI Call) error {
a.stats.Complete() a.stats.Complete()
// TODO if the context is timed out here we need to allocate some more time... // TODO if the context is timed out here we need to allocate new one
// right now this only works b/c the db isn't using the context call.End(context.Background(), err)
call.End(ctx, err)
return err return err
} }

View File

@@ -287,11 +287,7 @@ func (c *call) End(ctx context.Context, err error) {
// XXX (reed): delete MQ message, eventually // XXX (reed): delete MQ message, eventually
} }
// need newer context because original one may be modified if err := c.ds.InsertCall(opentracing.ContextWithSpan(ctx, span), c.Call); err != nil {
// and no longer be valid for further context-bound operations
if err := c.ds.InsertCall(context.Background(), c.Call); err != nil {
// TODO we should just log this error not return it to user?
// just issue storing call status but call is run
logrus.WithError(err).Error("error inserting call into datastore") logrus.WithError(err).Error("error inserting call into datastore")
} }
} }

View File

@@ -10,6 +10,7 @@ import (
"github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@@ -56,7 +57,7 @@ func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logD
reqID: reqID, reqID: reqID,
}) })
// TODO / NOTE: we want linew to be first becauase limitw may error if limit // TODO / NOTE: we want linew to be first because limitw may error if limit
// is reached but we still want to log. we should probably ignore hitting the // is reached but we still want to log. we should probably ignore hitting the
// limit error since we really just want to not write too much to db and // limit error since we really just want to not write too much to db and
// that's handled as is. put buffers back last to avoid misuse, if there's // that's handled as is. put buffers back last to avoid misuse, if there's
@@ -110,7 +111,7 @@ type logWriter struct {
func (l *logWriter) Write(b []byte) (int, error) { func (l *logWriter) Write(b []byte) (int, error) {
log := common.Logger(l.ctx) log := common.Logger(l.ctx)
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID}) log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID})
log.Println(string(b)) log.Debug(string(b))
return len(b), nil return len(b), nil
} }
@@ -182,7 +183,9 @@ type dbWriter struct {
} }
func (w *dbWriter) Close() error { func (w *dbWriter) Close() error {
return w.db.InsertLog(w.ctx, w.reqID, w.String()) span, ctx := opentracing.StartSpanFromContext(context.Background(), "agent_log_write")
defer span.Finish()
return w.db.InsertLog(ctx, w.reqID, w.String())
} }
func (w *dbWriter) Write(b []byte) (int, error) { func (w *dbWriter) Write(b []byte) (int, error) {

View File

@@ -71,23 +71,10 @@ const (
) )
type sqlStore struct { type sqlStore struct {
db *sqlx.DB db *sqlx.DB
insertAppQuery string
updateAppQuery string // TODO we should prepare all of the statements, rebind them
getAppQuery string // and store them all here.
selectAppConfigQuery string
removeAppQuery string
insertRouteQuery string
updateRouteQuery string
checkRouteAppQuery string
checkRouteQuery string
getRouteQuery string
removeRouteQuery string
insertCallQuery string
getCallQuery string
insertLogQuery string
getLogQuery string
deleteLogQuery string
} }
// New will open the db specified by url, create any tables necessary // New will open the db specified by url, create any tables necessary
@@ -146,56 +133,7 @@ func New(url *url.URL) (models.Datastore, error) {
} }
} }
dstore := &sqlStore{ return &sqlStore{db: db}, nil
db: db,
insertAppQuery: db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);"),
selectAppConfigQuery: db.Rebind(`SELECT config FROM apps WHERE name=?`),
updateAppQuery: db.Rebind(`UPDATE apps SET config=? WHERE name=?`),
removeAppQuery: db.Rebind(`DELETE FROM apps WHERE name = ?`),
getAppQuery: db.Rebind(`SELECT name, config FROM apps WHERE name=?`),
checkRouteAppQuery: db.Rebind(`SELECT 1 FROM apps WHERE name=?`),
checkRouteQuery: db.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`),
insertRouteQuery: db.Rebind(`INSERT INTO routes (
app_name,
path,
image,
format,
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`),
getRouteQuery: db.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector)),
updateRouteQuery: db.Rebind(`UPDATE routes SET
image = ?,
format = ?,
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name=? AND path=?;`),
removeRouteQuery: db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`),
insertCallQuery: db.Rebind(`INSERT INTO calls (
id,
created_at,
started_at,
completed_at,
status,
app_name,
path
)
VALUES (?, ?, ?, ?, ?, ?, ?);`),
getCallQuery: db.Rebind(fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)),
insertLogQuery: db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`),
getLogQuery: db.Rebind(`SELECT log FROM logs WHERE id=?`),
deleteLogQuery: db.Rebind(`DELETE FROM logs WHERE id=?`),
}
return dstore, nil
} }
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
@@ -208,7 +146,8 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App
} }
} }
_, err = ds.db.ExecContext(ctx, ds.insertAppQuery, app.Name, string(cbyte)) query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);")
_, err = ds.db.ExecContext(ctx, query, app.Name, string(cbyte))
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case *mysql.MySQLError: case *mysql.MySQLError:
@@ -233,7 +172,8 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App
func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
app := &models.App{Name: newapp.Name} app := &models.App{Name: newapp.Name}
err := ds.Tx(func(tx *sqlx.Tx) error { err := ds.Tx(func(tx *sqlx.Tx) error {
row := tx.QueryRowContext(ctx, ds.selectAppConfigQuery, app.Name) query := tx.Rebind(`SELECT config FROM apps WHERE name=?`)
row := tx.QueryRowContext(ctx, query, app.Name)
var config string var config string
if err := row.Scan(&config); err != nil { if err := row.Scan(&config); err != nil {
@@ -257,7 +197,8 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
return err return err
} }
res, err := tx.ExecContext(ctx, ds.updateAppQuery, string(cbyte), app.Name) query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`)
res, err := tx.ExecContext(ctx, query, string(cbyte), app.Name)
if err != nil { if err != nil {
return err return err
} }
@@ -279,12 +220,14 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
} }
func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error { func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error {
_, err := ds.db.ExecContext(ctx, ds.removeAppQuery, appName) query := ds.db.Rebind(`DELETE FROM apps WHERE name = ?`)
_, err := ds.db.ExecContext(ctx, query, appName)
return err return err
} }
func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) { func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) {
row := ds.db.QueryRowContext(ctx, ds.getAppQuery, name) query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`)
row := ds.db.QueryRowContext(ctx, query, name)
var resName, config string var resName, config string
err := row.Scan(&resName, &config) err := row.Scan(&resName, &config)
@@ -351,13 +294,15 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
} }
err = ds.Tx(func(tx *sqlx.Tx) error { err = ds.Tx(func(tx *sqlx.Tx) error {
r := tx.QueryRowContext(ctx, ds.checkRouteAppQuery, route.AppName) query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`)
r := tx.QueryRowContext(ctx, query, route.AppName)
if err := r.Scan(new(int)); err != nil { if err := r.Scan(new(int)); err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return models.ErrAppsNotFound return models.ErrAppsNotFound
} }
} }
same, err := tx.QueryContext(ctx, ds.checkRouteQuery, route.AppName, route.Path) query = tx.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`)
same, err := tx.QueryContext(ctx, query, route.AppName, route.Path)
if err != nil { if err != nil {
return err return err
} }
@@ -366,7 +311,21 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
return models.ErrRoutesAlreadyExists return models.ErrRoutesAlreadyExists
} }
_, err = tx.ExecContext(ctx, ds.insertRouteQuery, query = tx.Rebind(`INSERT INTO routes (
app_name,
path,
image,
format,
memory,
type,
timeout,
idle_timeout,
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`)
_, err = tx.ExecContext(ctx, query,
route.AppName, route.AppName,
route.Path, route.Path,
route.Image, route.Image,
@@ -388,7 +347,8 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
var route models.Route var route models.Route
err := ds.Tx(func(tx *sqlx.Tx) error { err := ds.Tx(func(tx *sqlx.Tx) error {
row := tx.QueryRowContext(ctx, ds.getRouteQuery, newroute.AppName, newroute.Path) query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector))
row := tx.QueryRowContext(ctx, query, newroute.AppName, newroute.Path)
if err := scanRoute(row, &route); err == sql.ErrNoRows { if err := scanRoute(row, &route); err == sql.ErrNoRows {
return models.ErrRoutesNotFound return models.ErrRoutesNotFound
} else if err != nil { } else if err != nil {
@@ -407,7 +367,18 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
return err return err
} }
res, err := tx.ExecContext(ctx, ds.updateRouteQuery, query = tx.Rebind(`UPDATE routes SET
image = ?,
format = ?,
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name=? AND path=?;`)
res, err := tx.ExecContext(ctx, query,
route.Image, route.Image,
route.Format, route.Format,
route.Memory, route.Memory,
@@ -441,7 +412,8 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
} }
func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error { func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
res, err := ds.db.ExecContext(ctx, ds.removeRouteQuery, routePath, appName) query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`)
res, err := ds.db.ExecContext(ctx, query, routePath, appName)
if err != nil { if err != nil {
return err return err
} }
@@ -459,7 +431,9 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string)
} }
func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
row := ds.db.QueryRowContext(ctx, ds.getRouteQuery, appName, routePath) rSelectCondition := "%s WHERE app_name=? AND path=?"
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
row := ds.db.QueryRowContext(ctx, query, appName, routePath)
var route models.Route var route models.Route
err := scanRoute(row, &route) err := scanRoute(row, &route)
@@ -475,7 +449,8 @@ func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*m
func (ds *sqlStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { func (ds *sqlStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) {
res := []*models.Route{} res := []*models.Route{}
query, args := buildFilterRouteQuery(filter) query, args := buildFilterRouteQuery(filter)
query = ds.db.Rebind(fmt.Sprintf("%s %s", routeSelector, query)) query = fmt.Sprintf("%s %s", routeSelector, query)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...) rows, err := ds.db.QueryContext(ctx, query, args...)
// todo: check for no rows so we don't respond with a sql 500 err // todo: check for no rows so we don't respond with a sql 500 err
if err != nil { if err != nil {
@@ -552,7 +527,18 @@ func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
} }
func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
_, err := ds.db.ExecContext(ctx, ds.insertCallQuery, call.ID, call.CreatedAt.String(), query := ds.db.Rebind(`INSERT INTO calls (
id,
created_at,
started_at,
completed_at,
status,
app_name,
path
)
VALUES (?, ?, ?, ?, ?, ?, ?);`)
_, err := ds.db.ExecContext(ctx, query, call.ID, call.CreatedAt.String(),
call.StartedAt.String(), call.CompletedAt.String(), call.StartedAt.String(), call.CompletedAt.String(),
call.Status, call.AppName, call.Path) call.Status, call.AppName, call.Path)
if err != nil { if err != nil {
@@ -566,7 +552,9 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
// if we store the whole thing then it adds a lot of disk space and then we can // if we store the whole thing then it adds a lot of disk space and then we can
// make async only queue hints instead of entire calls (mq a lot smaller space wise). pick. // make async only queue hints instead of entire calls (mq a lot smaller space wise). pick.
func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
row := ds.db.QueryRowContext(ctx, ds.getCallQuery, callID, appName) query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)
query = ds.db.Rebind(query)
row := ds.db.QueryRowContext(ctx, query, callID, appName)
var call models.Call var call models.Call
err := scanCall(row, &call) err := scanCall(row, &call)
@@ -602,12 +590,14 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
} }
func (ds *sqlStore) InsertLog(ctx context.Context, callID, callLog string) error { func (ds *sqlStore) InsertLog(ctx context.Context, callID, callLog string) error {
_, err := ds.db.ExecContext(ctx, ds.insertLogQuery, callID, callLog) query := ds.db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`)
_, err := ds.db.ExecContext(ctx, query, callID, callLog)
return err return err
} }
func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog, error) { func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog, error) {
row := ds.db.QueryRowContext(ctx, ds.getLogQuery, callID) query := ds.db.Rebind(`SELECT log FROM logs WHERE id=?`)
row := ds.db.QueryRowContext(ctx, query, callID)
var log string var log string
err := row.Scan(&log) err := row.Scan(&log)
@@ -625,7 +615,8 @@ func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog,
} }
func (ds *sqlStore) DeleteLog(ctx context.Context, callID string) error { func (ds *sqlStore) DeleteLog(ctx context.Context, callID string) error {
_, err := ds.db.ExecContext(ctx, ds.deleteLogQuery, callID) query := ds.db.Rebind(`DELETE FROM logs WHERE id=?`)
_, err := ds.db.ExecContext(ctx, query, callID)
return err return err
} }

View File

@@ -1,7 +1,7 @@
#! /bin/sh #! /bin/sh
set -e set -e
listFilesExit () { function listFilesExit () {
echo The following files need to have go fmt ran: echo The following files need to have go fmt ran:
echo $NEED_TO_FORMAT echo $NEED_TO_FORMAT
exit 1 exit 1