diff --git a/api/agent/agent.go b/api/agent/agent.go index 0d3538232..8187da493 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -222,9 +222,8 @@ func (a *agent) Submit(callI Call) error { a.stats.Complete() - // TODO if the context is timed out here we need to allocate some more time... - // right now this only works b/c the db isn't using the context - call.End(ctx, err) + // TODO if the context is timed out here we need to allocate new one + call.End(context.Background(), err) return err } diff --git a/api/agent/call.go b/api/agent/call.go index 8faded451..e636e3ef9 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -287,11 +287,7 @@ func (c *call) End(ctx context.Context, err error) { // XXX (reed): delete MQ message, eventually } - // need newer context because original one may be modified - // and no longer be valid for further context-bound operations - if err := c.ds.InsertCall(context.Background(), c.Call); err != nil { - // TODO we should just log this error not return it to user? - // just issue storing call status but call is run + if err := c.ds.InsertCall(opentracing.ContextWithSpan(ctx, span), c.Call); err != nil { logrus.WithError(err).Error("error inserting call into datastore") } } diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index fdd1bf4bc..ff394d7fa 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -10,6 +10,7 @@ import ( "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" + "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -56,7 +57,7 @@ func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logD reqID: reqID, }) - // TODO / NOTE: we want linew to be first becauase limitw may error if limit + // TODO / NOTE: we want linew to be first because limitw may error if limit // is reached but we still want to log. we should probably ignore hitting the // limit error since we really just want to not write too much to db and // that's handled as is. put buffers back last to avoid misuse, if there's @@ -110,7 +111,7 @@ type logWriter struct { func (l *logWriter) Write(b []byte) (int, error) { log := common.Logger(l.ctx) log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID}) - log.Println(string(b)) + log.Debug(string(b)) return len(b), nil } @@ -182,7 +183,9 @@ type dbWriter struct { } func (w *dbWriter) Close() error { - return w.db.InsertLog(w.ctx, w.reqID, w.String()) + span, ctx := opentracing.StartSpanFromContext(context.Background(), "agent_log_write") + defer span.Finish() + return w.db.InsertLog(ctx, w.reqID, w.String()) } func (w *dbWriter) Write(b []byte) (int, error) { diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 8726613b5..87d3ad13a 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -71,23 +71,10 @@ const ( ) type sqlStore struct { - db *sqlx.DB - insertAppQuery string - updateAppQuery string - getAppQuery string - selectAppConfigQuery string - removeAppQuery string - insertRouteQuery string - updateRouteQuery string - checkRouteAppQuery string - checkRouteQuery string - getRouteQuery string - removeRouteQuery string - insertCallQuery string - getCallQuery string - insertLogQuery string - getLogQuery string - deleteLogQuery string + db *sqlx.DB + + // TODO we should prepare all of the statements, rebind them + // and store them all here. } // New will open the db specified by url, create any tables necessary @@ -146,56 +133,7 @@ func New(url *url.URL) (models.Datastore, error) { } } - dstore := &sqlStore{ - db: db, - insertAppQuery: db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);"), - selectAppConfigQuery: db.Rebind(`SELECT config FROM apps WHERE name=?`), - updateAppQuery: db.Rebind(`UPDATE apps SET config=? WHERE name=?`), - removeAppQuery: db.Rebind(`DELETE FROM apps WHERE name = ?`), - getAppQuery: db.Rebind(`SELECT name, config FROM apps WHERE name=?`), - checkRouteAppQuery: db.Rebind(`SELECT 1 FROM apps WHERE name=?`), - checkRouteQuery: db.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`), - insertRouteQuery: db.Rebind(`INSERT INTO routes ( - app_name, - path, - image, - format, - memory, - type, - timeout, - idle_timeout, - headers, - config - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`), - getRouteQuery: db.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector)), - updateRouteQuery: db.Rebind(`UPDATE routes SET - image = ?, - format = ?, - memory = ?, - type = ?, - timeout = ?, - idle_timeout = ?, - headers = ?, - config = ? - WHERE app_name=? AND path=?;`), - removeRouteQuery: db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`), - insertCallQuery: db.Rebind(`INSERT INTO calls ( - id, - created_at, - started_at, - completed_at, - status, - app_name, - path - ) - VALUES (?, ?, ?, ?, ?, ?, ?);`), - getCallQuery: db.Rebind(fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)), - insertLogQuery: db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`), - getLogQuery: db.Rebind(`SELECT log FROM logs WHERE id=?`), - deleteLogQuery: db.Rebind(`DELETE FROM logs WHERE id=?`), - } - return dstore, nil + return &sqlStore{db: db}, nil } func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { @@ -208,7 +146,8 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App } } - _, err = ds.db.ExecContext(ctx, ds.insertAppQuery, app.Name, string(cbyte)) + query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);") + _, err = ds.db.ExecContext(ctx, query, app.Name, string(cbyte)) if err != nil { switch err := err.(type) { case *mysql.MySQLError: @@ -233,7 +172,8 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) { app := &models.App{Name: newapp.Name} err := ds.Tx(func(tx *sqlx.Tx) error { - row := tx.QueryRowContext(ctx, ds.selectAppConfigQuery, app.Name) + query := tx.Rebind(`SELECT config FROM apps WHERE name=?`) + row := tx.QueryRowContext(ctx, query, app.Name) var config string if err := row.Scan(&config); err != nil { @@ -257,7 +197,8 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models. return err } - res, err := tx.ExecContext(ctx, ds.updateAppQuery, string(cbyte), app.Name) + query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`) + res, err := tx.ExecContext(ctx, query, string(cbyte), app.Name) if err != nil { return err } @@ -279,12 +220,14 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models. } func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error { - _, err := ds.db.ExecContext(ctx, ds.removeAppQuery, appName) + query := ds.db.Rebind(`DELETE FROM apps WHERE name = ?`) + _, err := ds.db.ExecContext(ctx, query, appName) return err } func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) { - row := ds.db.QueryRowContext(ctx, ds.getAppQuery, name) + query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`) + row := ds.db.QueryRowContext(ctx, query, name) var resName, config string err := row.Scan(&resName, &config) @@ -351,13 +294,15 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode } err = ds.Tx(func(tx *sqlx.Tx) error { - r := tx.QueryRowContext(ctx, ds.checkRouteAppQuery, route.AppName) + query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`) + r := tx.QueryRowContext(ctx, query, route.AppName) if err := r.Scan(new(int)); err != nil { if err == sql.ErrNoRows { return models.ErrAppsNotFound } } - same, err := tx.QueryContext(ctx, ds.checkRouteQuery, route.AppName, route.Path) + query = tx.Rebind(`SELECT 1 FROM routes WHERE app_name=? AND path=?`) + same, err := tx.QueryContext(ctx, query, route.AppName, route.Path) if err != nil { return err } @@ -366,7 +311,21 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode return models.ErrRoutesAlreadyExists } - _, err = tx.ExecContext(ctx, ds.insertRouteQuery, + query = tx.Rebind(`INSERT INTO routes ( + app_name, + path, + image, + format, + memory, + type, + timeout, + idle_timeout, + headers, + config + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`) + + _, err = tx.ExecContext(ctx, query, route.AppName, route.Path, route.Image, @@ -388,7 +347,8 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) { var route models.Route err := ds.Tx(func(tx *sqlx.Tx) error { - row := tx.QueryRowContext(ctx, ds.getRouteQuery, newroute.AppName, newroute.Path) + query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector)) + row := tx.QueryRowContext(ctx, query, newroute.AppName, newroute.Path) if err := scanRoute(row, &route); err == sql.ErrNoRows { return models.ErrRoutesNotFound } else if err != nil { @@ -407,7 +367,18 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m return err } - res, err := tx.ExecContext(ctx, ds.updateRouteQuery, + query = tx.Rebind(`UPDATE routes SET + image = ?, + format = ?, + memory = ?, + type = ?, + timeout = ?, + idle_timeout = ?, + headers = ?, + config = ? + WHERE app_name=? AND path=?;`) + + res, err := tx.ExecContext(ctx, query, route.Image, route.Format, route.Memory, @@ -441,7 +412,8 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m } func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) error { - res, err := ds.db.ExecContext(ctx, ds.removeRouteQuery, routePath, appName) + query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_name = ?`) + res, err := ds.db.ExecContext(ctx, query, routePath, appName) if err != nil { return err } @@ -459,7 +431,9 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string) } func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { - row := ds.db.QueryRowContext(ctx, ds.getRouteQuery, appName, routePath) + rSelectCondition := "%s WHERE app_name=? AND path=?" + query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector)) + row := ds.db.QueryRowContext(ctx, query, appName, routePath) var route models.Route err := scanRoute(row, &route) @@ -475,7 +449,8 @@ func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*m func (ds *sqlStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { res := []*models.Route{} query, args := buildFilterRouteQuery(filter) - query = ds.db.Rebind(fmt.Sprintf("%s %s", routeSelector, query)) + query = fmt.Sprintf("%s %s", routeSelector, query) + query = ds.db.Rebind(query) rows, err := ds.db.QueryContext(ctx, query, args...) // todo: check for no rows so we don't respond with a sql 500 err if err != nil { @@ -552,7 +527,18 @@ func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error { } func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { - _, err := ds.db.ExecContext(ctx, ds.insertCallQuery, call.ID, call.CreatedAt.String(), + query := ds.db.Rebind(`INSERT INTO calls ( + id, + created_at, + started_at, + completed_at, + status, + app_name, + path + ) + VALUES (?, ?, ?, ?, ?, ?, ?);`) + + _, err := ds.db.ExecContext(ctx, query, call.ID, call.CreatedAt.String(), call.StartedAt.String(), call.CompletedAt.String(), call.Status, call.AppName, call.Path) if err != nil { @@ -566,7 +552,9 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error { // if we store the whole thing then it adds a lot of disk space and then we can // make async only queue hints instead of entire calls (mq a lot smaller space wise). pick. func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) { - row := ds.db.QueryRowContext(ctx, ds.getCallQuery, callID, appName) + query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector) + query = ds.db.Rebind(query) + row := ds.db.QueryRowContext(ctx, query, callID, appName) var call models.Call err := scanCall(row, &call) @@ -602,12 +590,14 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([] } func (ds *sqlStore) InsertLog(ctx context.Context, callID, callLog string) error { - _, err := ds.db.ExecContext(ctx, ds.insertLogQuery, callID, callLog) + query := ds.db.Rebind(`INSERT INTO logs (id, log) VALUES (?, ?);`) + _, err := ds.db.ExecContext(ctx, query, callID, callLog) return err } func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog, error) { - row := ds.db.QueryRowContext(ctx, ds.getLogQuery, callID) + query := ds.db.Rebind(`SELECT log FROM logs WHERE id=?`) + row := ds.db.QueryRowContext(ctx, query, callID) var log string err := row.Scan(&log) @@ -625,7 +615,8 @@ func (ds *sqlStore) GetLog(ctx context.Context, callID string) (*models.CallLog, } func (ds *sqlStore) DeleteLog(ctx context.Context, callID string) error { - _, err := ds.db.ExecContext(ctx, ds.deleteLogQuery, callID) + query := ds.db.Rebind(`DELETE FROM logs WHERE id=?`) + _, err := ds.db.ExecContext(ctx, query, callID) return err } diff --git a/go-fmt.sh b/go-fmt.sh index 7b309228b..49769b25c 100755 --- a/go-fmt.sh +++ b/go-fmt.sh @@ -1,7 +1,7 @@ #! /bin/sh set -e -listFilesExit () { +function listFilesExit () { echo The following files need to have go fmt ran: echo $NEED_TO_FORMAT exit 1