Merge pull request #351 from fnproject/clean-model

mask models.Call blank fields in api, sqlx
This commit is contained in:
Reed Allman
2017-09-26 11:43:53 -07:00
committed by GitHub
7 changed files with 198 additions and 276 deletions

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"log"
"net/http"
"reflect"
"testing"
"time"
@@ -440,7 +439,7 @@ func Test(t *testing.T, dsf func() models.Datastore) {
var expected models.Route = *testRoute
if !reflect.DeepEqual(*route, expected) {
t.Log(buf.String())
t.Fatalf("Test InsertApp: expected to insert:\n%v\nbut got:\n%v", expected, route)
t.Fatalf("Test InsertApp: expected to insert:\n%v\nbut got:\n%v", expected, *route)
}
}
@@ -456,7 +455,7 @@ func Test(t *testing.T, dsf func() models.Datastore) {
"SECOND": "2",
"THIRD": "3",
},
Headers: http.Header{
Headers: models.Headers{
"First": []string{"test"},
"Second": []string{"test", "test"},
"Third": []string{"test", "test2"},
@@ -480,7 +479,7 @@ func Test(t *testing.T, dsf func() models.Datastore) {
"SECOND": "2",
"THIRD": "3",
},
Headers: http.Header{
Headers: models.Headers{
"First": []string{"test"},
"Second": []string{"test", "test"},
"Third": []string{"test", "test2"},
@@ -500,7 +499,7 @@ func Test(t *testing.T, dsf func() models.Datastore) {
"SECOND": "",
"THIRD": "3",
},
Headers: http.Header{
Headers: models.Headers{
"First": []string{"test2"},
"Second": nil,
},
@@ -522,7 +521,7 @@ func Test(t *testing.T, dsf func() models.Datastore) {
"FIRST": "first",
"THIRD": "3",
},
Headers: http.Header{
Headers: models.Headers{
"First": []string{"test2"},
"Third": []string{"test", "test2"},
},

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
@@ -75,9 +74,6 @@ const (
type sqlStore struct {
db *sqlx.DB
// TODO we should prepare all of the statements, rebind them
// and store them all here.
}
// New will open the db specified by url, create any tables necessary
@@ -140,17 +136,8 @@ func New(url *url.URL) (models.Datastore, error) {
}
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
var cbyte []byte
var err error
if app.Config != nil {
cbyte, err = json.Marshal(app.Config)
if err != nil {
return nil, err
}
}
query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (?, ?);")
_, err = ds.db.ExecContext(ctx, query, app.Name, string(cbyte))
query := ds.db.Rebind("INSERT INTO apps (name, config) VALUES (:name, :config);")
_, err := ds.db.NamedExecContext(ctx, query, app)
if err != nil {
switch err := err.(type) {
case *mysql.MySQLError:
@@ -176,32 +163,19 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
app := &models.App{Name: newapp.Name}
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT config FROM apps WHERE name=?`)
row := tx.QueryRowContext(ctx, query, app.Name)
row := tx.QueryRowxContext(ctx, query, app.Name)
var config string
if err := row.Scan(&config); err != nil {
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
}
err := row.StructScan(app)
if err == sql.ErrNoRows {
return models.ErrAppsNotFound
} else if err != nil {
return err
}
if config != "" {
err := json.Unmarshal([]byte(config), &app.Config)
if err != nil {
return err
}
}
app.UpdateConfig(newapp.Config)
cbyte, err := json.Marshal(app.Config)
if err != nil {
return err
}
query = tx.Rebind(`UPDATE apps SET config=? WHERE name=?`)
res, err := tx.ExecContext(ctx, query, string(cbyte), app.Name)
query = tx.Rebind(`UPDATE apps SET config=:config WHERE name=:name`)
res, err := tx.NamedExecContext(ctx, query, app)
if err != nil {
return err
}
@@ -254,29 +228,16 @@ func (ds *sqlStore) RemoveApp(ctx context.Context, appName string) error {
func (ds *sqlStore) GetApp(ctx context.Context, name string) (*models.App, error) {
query := ds.db.Rebind(`SELECT name, config FROM apps WHERE name=?`)
row := ds.db.QueryRowContext(ctx, query, name)
row := ds.db.QueryRowxContext(ctx, query, name)
var resName, config string
err := row.Scan(&resName, &config)
if err != nil {
if err == sql.ErrNoRows {
return nil, models.ErrAppsNotFound
}
var res models.App
err := row.StructScan(&res)
if err == sql.ErrNoRows {
return nil, models.ErrAppsNotFound
} else if err != nil {
return nil, err
}
res := &models.App{
Name: resName,
}
if len(config) > 0 {
err := json.Unmarshal([]byte(config), &res.Config)
if err != nil {
return nil, err
}
}
return res, nil
return &res, nil
}
// GetApps retrieves an array of apps according to a specific filter.
@@ -284,7 +245,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
res := []*models.App{}
query, args := buildFilterAppQuery(filter)
query = ds.db.Rebind(fmt.Sprintf("SELECT DISTINCT name, config FROM apps %s", query))
rows, err := ds.db.QueryContext(ctx, query, args...)
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}
@@ -292,8 +253,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
for rows.Next() {
var app models.App
err := scanApp(rows, &app)
err := rows.StructScan(&app)
if err != nil {
if err == sql.ErrNoRows {
return res, nil
@@ -310,17 +270,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
}
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return nil, err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return nil, err
}
err = ds.Tx(func(tx *sqlx.Tx) error {
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(`SELECT 1 FROM apps WHERE name=?`)
r := tx.QueryRowContext(ctx, query, route.AppName)
if err := r.Scan(new(int)); err != nil {
@@ -350,20 +300,20 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
headers,
config
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`)
VALUES (
:app_name,
:path,
:image,
:format,
:memory,
:type,
:timeout,
:idle_timeout,
:headers,
:config
);`)
_, err = tx.ExecContext(ctx, query,
route.AppName,
route.Path,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
)
_, err = tx.NamedExecContext(ctx, query, route)
return err
})
@@ -375,8 +325,10 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
var route models.Route
err := ds.Tx(func(tx *sqlx.Tx) error {
query := tx.Rebind(fmt.Sprintf("%s WHERE app_name=? AND path=?", routeSelector))
row := tx.QueryRowContext(ctx, query, newroute.AppName, newroute.Path)
if err := scanRoute(row, &route); err == sql.ErrNoRows {
row := tx.QueryRowxContext(ctx, query, newroute.AppName, newroute.Path)
err := row.StructScan(&route)
if err == sql.ErrNoRows {
return models.ErrRoutesNotFound
} else if err != nil {
return err
@@ -384,40 +336,18 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
route.Update(newroute)
hbyte, err := json.Marshal(route.Headers)
if err != nil {
return err
}
cbyte, err := json.Marshal(route.Config)
if err != nil {
return err
}
query = tx.Rebind(`UPDATE routes SET
image = ?,
format = ?,
memory = ?,
type = ?,
timeout = ?,
idle_timeout = ?,
headers = ?,
config = ?
WHERE app_name=? AND path=?;`)
res, err := tx.ExecContext(ctx, query,
route.Image,
route.Format,
route.Memory,
route.Type,
route.Timeout,
route.IdleTimeout,
string(hbyte),
string(cbyte),
route.AppName,
route.Path,
)
image = :image,
format = :format,
memory = :memory,
type = :type,
timeout = :timeout,
idle_timeout = :idle_timeout,
headers = :headers,
config = :config
WHERE app_name=:app_name AND path=:path;`)
res, err := tx.NamedExecContext(ctx, query, &route)
if err != nil {
return err
}
@@ -460,10 +390,10 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appName, routePath string)
func (ds *sqlStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
rSelectCondition := "%s WHERE app_name=? AND path=?"
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
row := ds.db.QueryRowContext(ctx, query, appName, routePath)
row := ds.db.QueryRowxContext(ctx, query, appName, routePath)
var route models.Route
err := scanRoute(row, &route)
err := row.StructScan(&route)
if err == sql.ErrNoRows {
return nil, models.ErrRoutesNotFound
} else if err != nil {
@@ -484,7 +414,7 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *
query := fmt.Sprintf("%s %s", routeSelector, filterQuery)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
if err == sql.ErrNoRows {
return res, nil // no error for empty list
@@ -495,12 +425,11 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appName string, filter *
for rows.Next() {
var route models.Route
err := scanRoute(rows, &route)
err := rows.StructScan(&route)
if err != nil {
continue
}
res = append(res, &route)
}
if err := rows.Err(); err != nil {
if err == sql.ErrNoRows {
@@ -534,28 +463,27 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
app_name,
path
)
VALUES (?, ?, ?, ?, ?, ?, ?);`)
VALUES (
:id,
:created_at,
:started_at,
:completed_at,
:status,
:app_name,
:path
);`)
_, err := ds.db.ExecContext(ctx, query, call.ID, call.CreatedAt.String(),
call.StartedAt.String(), call.CompletedAt.String(),
call.Status, call.AppName, call.Path)
if err != nil {
return err
}
return nil
_, err := ds.db.NamedExecContext(ctx, query, call)
return err
}
// TODO calls are not fully qualified in this backend currently. need to discuss,
// if we store the whole thing then it adds a lot of disk space and then we can
// make async only queue hints instead of entire calls (mq a lot smaller space wise). pick.
func (ds *sqlStore) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
query := fmt.Sprintf(`%s WHERE id=? AND app_name=?`, callSelector)
query = ds.db.Rebind(query)
row := ds.db.QueryRowContext(ctx, query, callID, appName)
row := ds.db.QueryRowxContext(ctx, query, callID, appName)
var call models.Call
err := scanCall(row, &call)
err := row.StructScan(&call)
if err != nil {
return nil, err
}
@@ -567,7 +495,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
query, args := buildFilterCallQuery(filter)
query = fmt.Sprintf("%s %s", callSelector, query)
query = ds.db.Rebind(query)
rows, err := ds.db.QueryContext(ctx, query, args...)
rows, err := ds.db.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}
@@ -575,7 +503,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
for rows.Next() {
var call models.Call
err := scanCall(rows, &call)
err := rows.StructScan(&call)
if err != nil {
continue
}
@@ -631,75 +559,6 @@ func (ds *sqlStore) DeleteLog(ctx context.Context, appName, callID string) error
return err
}
// TODO scrap for sqlx scanx ?? some things aren't perfect (e.g. config is a json string)
type RowScanner interface {
Scan(dest ...interface{}) error
}
func ScanLog(scanner RowScanner, log *models.CallLog) error {
return scanner.Scan(
&log.CallID,
&log.Log,
)
}
func scanRoute(scanner RowScanner, route *models.Route) error {
var headerStr string
var configStr string
err := scanner.Scan(
&route.AppName,
&route.Path,
&route.Image,
&route.Format,
&route.Memory,
&route.Type,
&route.Timeout,
&route.IdleTimeout,
&headerStr,
&configStr,
)
if err != nil {
return err
}
if len(headerStr) > 0 {
err = json.Unmarshal([]byte(headerStr), &route.Headers)
if err != nil {
return err
}
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &route.Config)
if err != nil {
return err
}
}
return nil
}
func scanApp(scanner RowScanner, app *models.App) error {
var configStr string
err := scanner.Scan(
&app.Name,
&configStr,
)
if err != nil {
return err
}
if len(configStr) > 0 {
err = json.Unmarshal([]byte(configStr), &app.Config)
if err != nil {
return err
}
}
return nil
}
func buildFilterRouteQuery(filter *models.RouteFilter) (string, []interface{}) {
if filter == nil {
return "", nil
@@ -794,25 +653,6 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
return b.String(), args
}
func scanCall(scanner RowScanner, call *models.Call) error {
err := scanner.Scan(
&call.ID,
&call.CreatedAt,
&call.StartedAt,
&call.CompletedAt,
&call.Status,
&call.AppName,
&call.Path,
)
if err == sql.ErrNoRows {
return models.ErrCallNotFound
} else if err != nil {
return err
}
return nil
}
// GetDatabase returns the underlying sqlx database implementation
func (ds *sqlStore) GetDatabase() *sqlx.DB {
return ds.db

View File

@@ -1,9 +1,8 @@
package models
type App struct {
Name string `json:"name"`
Routes Routes `json:"routes,omitempty"`
Config `json:"config"`
Name string `json:"name" db:"name"`
Config Config `json:"config" db:"config"`
}
func (a *App) Validate() error {
@@ -24,11 +23,6 @@ func (a *App) Validate() error {
func (a *App) Clone() *App {
var c App
c.Name = a.Name
if a.Routes != nil {
for i := range a.Routes {
c.Routes = append(c.Routes, a.Routes[i].Clone())
}
}
if a.Config != nil {
c.Config = make(Config)
for k, v := range a.Config {

View File

@@ -23,15 +23,15 @@ const (
var possibleStatuses = [...]string{"delayed", "queued", "running", "success", "error", "cancelled"}
type CallLog struct {
CallID string `json:"call_id"`
Log string `json:"log"`
AppName string `json:"app_name"`
CallID string `json:"call_id" db:"id"`
Log string `json:"log" db:"log"`
AppName string `json:"app_name" db:"app_name"`
}
// Call is a representation of a specific invocation of a route.
type Call struct {
// Unique identifier representing a specific call.
ID string `json:"id"`
ID string `json:"id" db:"id"`
// NOTE: this is stale, retries are not implemented atm, but this is nice, so leaving
// States and valid transitions.
@@ -68,65 +68,65 @@ type Call struct {
// - bad_exit - exited with non-zero status due to program termination/crash.
// * cancelled - cancelled via API. More information in the reason field.
// - client_request - Request was cancelled by a client.
Status string `json:"status"`
Status string `json:"status" db:"status"`
// App this call belongs to.
AppName string `json:"app_name"`
AppName string `json:"app_name" db:"app_name"`
// Path of the route that is responsible for this call
Path string `json:"path"`
Path string `json:"path" db:"path"`
// Name of Docker image to use.
Image string `json:"image"`
Image string `json:"image,omitempty" db:"-"`
// Number of seconds to wait before queueing the call for consumption for the
// first time. Must be a positive integer. Calls with a delay start in state
// "delayed" and transition to "running" after delay seconds.
Delay int32 `json:"delay,omitempty"`
Delay int32 `json:"delay,omitempty" db:"-"`
// Type indicates whether a task is to be run synchronously or asynchronously.
Type string `json:"type,omitempty"`
Type string `json:"type,omitempty" db:"-"`
// Format is the format to pass input into the function.
Format string `json:"format,omitempty"`
Format string `json:"format,omitempty" db:"-"`
// Payload for the call. This is only used by async calls, to store their input.
// TODO should we copy it into here too for debugging sync?
Payload string `json:"payload,omitempty"`
Payload string `json:"payload,omitempty" db:"-"`
// Full request url that spawned this invocation.
URL string `json:"url,omitempty"`
URL string `json:"url,omitempty" db:"-"`
// Method of the http request used to make this call.
Method string `json:"method,omitempty"`
Method string `json:"method,omitempty" db:"-"`
// Priority of the call. Higher has more priority. 3 levels from 0-2. Calls
// at same priority are processed in FIFO order.
Priority *int32 `json:"priority"`
Priority *int32 `json:"priority,omitempty" db:"-"`
// Maximum runtime in seconds.
Timeout int32 `json:"timeout,omitempty"`
Timeout int32 `json:"timeout,omitempty" db:"-"`
// Hot function idle timeout in seconds before termination.
IdleTimeout int32 `json:"idle_timeout,omitempty"`
IdleTimeout int32 `json:"idle_timeout,omitempty" db:"-"`
// Memory is the amount of RAM this call is allocated.
Memory uint64 `json:"memory,omitempty"`
Memory uint64 `json:"memory,omitempty" db:"-"`
// BaseEnv are the env vars for hot containers, not request specific.
BaseEnv map[string]string `json:"base_env,omitempty"`
BaseEnv map[string]string `json:"base_env,omitempty" db:"-"`
// Env vars for the call. Comes from the ones set on the Route.
EnvVars map[string]string `json:"env_vars,omitempty"`
EnvVars map[string]string `json:"env_vars,omitempty" db:"-"`
// Time when call completed, whether it was successul or failed. Always in UTC.
CompletedAt strfmt.DateTime `json:"completed_at,omitempty"`
CompletedAt strfmt.DateTime `json:"completed_at,omitempty" db:"completed_at"`
// Time when call was submitted. Always in UTC.
CreatedAt strfmt.DateTime `json:"created_at,omitempty"`
CreatedAt strfmt.DateTime `json:"created_at,omitempty" db:"created_at"`
// Time when call started execution. Always in UTC.
StartedAt strfmt.DateTime `json:"started_at,omitempty"`
StartedAt strfmt.DateTime `json:"started_at,omitempty" db:"started_at"`
}
type CallFilter struct {

View File

@@ -1,7 +1,96 @@
package models
import (
"bytes"
"database/sql/driver"
"encoding/json"
"fmt"
"net/http"
)
type Config map[string]string
func (c *Config) Validate() error {
return nil
}
// implements sql.Valuer, returning a string
func (c Config) Value() (driver.Value, error) {
if len(c) < 1 {
return driver.Value(string("")), nil
}
var b bytes.Buffer
err := json.NewEncoder(&b).Encode(c)
// return a string type
return driver.Value(b.String()), err
}
// implements sql.Scanner
func (c *Config) Scan(value interface{}) error {
if value == nil {
*c = nil
return nil
}
bv, err := driver.String.ConvertValue(value)
if err == nil {
var b []byte
switch x := bv.(type) {
case []byte:
b = x
case string:
b = []byte(x)
}
if len(b) > 0 {
return json.Unmarshal(b, c)
} else {
*c = nil
return nil
}
}
// otherwise, return an error
return fmt.Errorf("config invalid db format: %T %T value, err: %v", value, bv, err)
}
// Headers is an http.Header that implements additional methods.
type Headers http.Header
// implements sql.Valuer, returning a string
func (h Headers) Value() (driver.Value, error) {
if len(h) < 1 {
return driver.Value(string("")), nil
}
var b bytes.Buffer
err := json.NewEncoder(&b).Encode(h)
// return a string type
return driver.Value(b.String()), err
}
// implements sql.Scanner
func (h *Headers) Scan(value interface{}) error {
if value == nil {
*h = nil
return nil
}
bv, err := driver.String.ConvertValue(value)
if err == nil {
var b []byte
switch x := bv.(type) {
case []byte:
b = x
case string:
b = []byte(x)
}
if len(b) > 0 {
return json.Unmarshal(b, h)
} else {
*h = nil
return nil
}
}
// otherwise, return an error
return fmt.Errorf("headers invalid db format: %T %T value, err: %v", value, bv, err)
}

View File

@@ -15,16 +15,16 @@ const (
type Routes []*Route
type Route struct {
AppName string `json:"app_name"`
Path string `json:"path"`
Image string `json:"image"`
Memory uint64 `json:"memory"`
Headers http.Header `json:"headers"`
Type string `json:"type"`
Format string `json:"format"`
Timeout int32 `json:"timeout"`
IdleTimeout int32 `json:"idle_timeout"`
Config `json:"config"`
AppName string `json:"app_name" db:"app_name"`
Path string `json:"path" db:"path"`
Image string `json:"image" db:"image"`
Memory uint64 `json:"memory" db:"memory"`
Headers Headers `json:"headers" db:"headers"`
Type string `json:"type" db:"type"`
Format string `json:"format" db":format"`
Timeout int32 `json:"timeout" db:"timeout"`
IdleTimeout int32 `json:"idle_timeout" db:"idle_timeout"`
Config Config `json:"config" db:"config"`
}
// SetDefaults sets zeroed field to defaults.
@@ -42,7 +42,7 @@ func (r *Route) SetDefaults() {
}
if r.Headers == nil {
r.Headers = http.Header{}
r.Headers = Headers(http.Header{})
}
if r.Config == nil {
@@ -144,11 +144,11 @@ func (r *Route) Update(new *Route) {
}
if new.Headers != nil {
if r.Headers == nil {
r.Headers = make(http.Header)
r.Headers = Headers(make(http.Header))
}
for k, v := range new.Headers {
if len(v) == 0 {
r.Headers.Del(k)
http.Header(r.Headers).Del(k)
} else {
r.Headers[k] = v
}

View File

@@ -109,7 +109,7 @@ func TestRouteDelete(t *testing.T) {
buf := setLogBuffer()
routes := []*models.Route{{AppName: "a", Path: "/myroute"}}
apps := []*models.App{{Name: "a", Routes: routes, Config: nil}}
apps := []*models.App{{Name: "a", Config: nil}}
for i, test := range []struct {
ds models.Datastore