mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
datastore no longer implements logstore (#1013)
* datastore no longer implements logstore the underlying implementation of our sql store implements both the datastore and the logstore interface, however going forward we are likely to encounter datastore implementers that would mock out the logstore interface and not use its methods - signalling a poor interface. this remedies that, now they are 2 completely separate things, which our sqlstore happens to implement both of. related to some recent changes around wrapping, this keeps the imposed metrics and validation wrapping of a servers logstore and datastore, just moving it into New instead of in the opts - this is so that a user can have the underlying datastore in order to set the logstore to it, since wrapping it in a validator/metrics would render it no longer a logstore implementer (i.e. validate datastore doesn't implement the logstore interface), we need to do this after setting the logstore to the datastore if one wasn't provided explicitly. * splits logstore and datastore metrics & validation logic * `make test` should be `make full-test` always. got rid of that so that nobody else has to wait for CI to blow up on them after the tests pass locally ever again. * fix new tests
This commit is contained in:
@@ -49,7 +49,7 @@ jobs:
|
|||||||
# Rebuild fnserver if necessary
|
# Rebuild fnserver if necessary
|
||||||
- run: |
|
- run: |
|
||||||
if [[ -n "$FN_NEEDED" ]]; then
|
if [[ -n "$FN_NEEDED" ]]; then
|
||||||
make full-test -j $(nproc)
|
make test -j $(nproc)
|
||||||
fi
|
fi
|
||||||
|
|
||||||
- deploy:
|
- deploy:
|
||||||
|
|||||||
4
Makefile
4
Makefile
@@ -43,7 +43,7 @@ test-extensions: test-basic
|
|||||||
test-basic: checkfmt pull-images fn-test-utils
|
test-basic: checkfmt pull-images fn-test-utils
|
||||||
./test.sh
|
./test.sh
|
||||||
|
|
||||||
test: checkfmt pull-images test-basic test-middleware test-extensions
|
test: checkfmt pull-images test-basic test-middleware test-extensions test-api test-system
|
||||||
|
|
||||||
test-api: test-basic
|
test-api: test-basic
|
||||||
./api_test.sh sqlite3 4
|
./api_test.sh sqlite3 4
|
||||||
@@ -55,8 +55,6 @@ test-system: test-basic
|
|||||||
./system_test.sh mysql 4 0
|
./system_test.sh mysql 4 0
|
||||||
./system_test.sh postgres 4 0
|
./system_test.sh postgres 4 0
|
||||||
|
|
||||||
full-test: test test-api test-system
|
|
||||||
|
|
||||||
img-busybox:
|
img-busybox:
|
||||||
docker pull busybox
|
docker pull busybox
|
||||||
img-hello:
|
img-hello:
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/fnproject/fn/api/datastore"
|
"github.com/fnproject/fn/api/datastore"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
|
"github.com/fnproject/fn/api/logs"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
"github.com/fnproject/fn/api/mqs"
|
"github.com/fnproject/fn/api/mqs"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -99,8 +100,9 @@ func TestCallConfigurationRequest(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
@@ -242,8 +244,9 @@ func TestCallConfigurationModel(t *testing.T) {
|
|||||||
|
|
||||||
// FromModel doesn't need a datastore, for now...
|
// FromModel doesn't need a datastore, for now...
|
||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
callI, err := a.GetCall(FromModel(cm))
|
callI, err := a.GetCall(FromModel(cm))
|
||||||
@@ -313,8 +316,9 @@ func TestAsyncCallHeaders(t *testing.T) {
|
|||||||
|
|
||||||
// FromModel doesn't need a datastore, for now...
|
// FromModel doesn't need a datastore, for now...
|
||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
callI, err := a.GetCall(FromModel(cm))
|
callI, err := a.GetCall(FromModel(cm))
|
||||||
@@ -439,8 +443,9 @@ func TestReqTooLarge(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg.MaxRequestSize = 5
|
cfg.MaxRequestSize = 5
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
_, err = a.GetCall(FromModel(cm))
|
_, err = a.GetCall(FromModel(cm))
|
||||||
@@ -492,8 +497,9 @@ func TestSubmitError(t *testing.T) {
|
|||||||
|
|
||||||
// FromModel doesn't need a datastore, for now...
|
// FromModel doesn't need a datastore, for now...
|
||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -560,7 +566,8 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
bodOne := `{"echoContent":"yodawg"}`
|
bodOne := `{"echoContent":"yodawg"}`
|
||||||
@@ -623,7 +630,8 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) {
|
|||||||
// FromModel doesn't need a datastore, for now...
|
// FromModel doesn't need a datastore, for now...
|
||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
|
||||||
a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock))))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ls, new(mqs.Mock))))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
_, err := a.GetCall(FromModel(call))
|
_, err := a.GetCall(FromModel(call))
|
||||||
@@ -659,7 +667,8 @@ func TestTmpFsRW(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
// Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB
|
// Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB
|
||||||
@@ -763,7 +772,8 @@ func TestTmpFsSize(t *testing.T) {
|
|||||||
|
|
||||||
cfg.MaxTmpFsInodes = 1024
|
cfg.MaxTmpFsInodes = 1024
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
// Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB
|
// Here we tell fn-test-utils to read file /proc/mounts and create a /tmp/salsa of 4MB
|
||||||
@@ -932,7 +942,8 @@ func TestPipesAreClear(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
// test read this body after 5s (after call times out) and make sure we don't get yodawg
|
// test read this body after 5s (after call times out) and make sure we don't get yodawg
|
||||||
@@ -1082,7 +1093,8 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
bodOne := `{"echoContent":"yodawg"}`
|
bodOne := `{"echoContent":"yodawg"}`
|
||||||
@@ -1188,7 +1200,8 @@ func TestNBIOResourceTracker(t *testing.T) {
|
|||||||
cfg.MaxTotalMemory = 280 * 1024 * 1024
|
cfg.MaxTotalMemory = 280 * 1024 * 1024
|
||||||
cfg.HotPoll = 20 * time.Millisecond
|
cfg.HotPoll = 20 * time.Millisecond
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
ls := logs.NewMock()
|
||||||
|
a := New(NewDirectDataAccess(ds, ls, new(mqs.Mock)), WithConfig(cfg))
|
||||||
defer checkClose(t, a)
|
defer checkClose(t, a)
|
||||||
|
|
||||||
reqCount := 20
|
reqCount := 20
|
||||||
@@ -1248,8 +1261,9 @@ type closingDataAccess struct {
|
|||||||
|
|
||||||
func newClosingDataAccess(closeReturn error) *closingDataAccess {
|
func newClosingDataAccess(closeReturn error) *closingDataAccess {
|
||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
ls := logs.NewMock()
|
||||||
return &closingDataAccess{
|
return &closingDataAccess{
|
||||||
DataAccess: NewDirectDataAccess(ds, ds, new(mqs.Mock)),
|
DataAccess: NewDirectDataAccess(ds, ls, new(mqs.Mock)),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
closeReturn: closeReturn,
|
closeReturn: closeReturn,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde
|
|||||||
// and Datastore are different, it will call Close on the Logstore as well.
|
// and Datastore are different, it will call Close on the Logstore as well.
|
||||||
func (da *directDataAccess) Close() error {
|
func (da *directDataAccess) Close() error {
|
||||||
err := da.ds.Close()
|
err := da.ds.Close()
|
||||||
if da.ds != da.ls {
|
if ls, ok := da.ds.(models.LogStore); ok && ls != da.ls {
|
||||||
if daErr := da.ls.Close(); daErr != nil {
|
if daErr := da.ls.Close(); daErr != nil {
|
||||||
err = daErr
|
err = daErr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,12 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func New(ctx context.Context, dbURL string) (models.Datastore, error) {
|
func New(ctx context.Context, dbURL string) (models.Datastore, error) {
|
||||||
ds, err := newds(ctx, dbURL) // teehee
|
return newds(ctx, dbURL) // teehee
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return Wrap(ds), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Wrap(ds models.Datastore) models.Datastore {
|
func Wrap(ds models.Datastore) models.Datastore {
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package datastoreutil
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
|
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
|
|
||||||
@@ -84,36 +83,6 @@ func (m *metricds) RemoveRoute(ctx context.Context, appID string, routePath stri
|
|||||||
return m.ds.RemoveRoute(ctx, appID, routePath)
|
return m.ds.RemoveRoute(ctx, appID, routePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricds) InsertCall(ctx context.Context, call *models.Call) error {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ds_insert_call")
|
|
||||||
defer span.End()
|
|
||||||
return m.ds.InsertCall(ctx, call)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricds) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ds_get_call")
|
|
||||||
defer span.End()
|
|
||||||
return m.ds.GetCall(ctx, appName, callID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricds) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ds_get_calls")
|
|
||||||
defer span.End()
|
|
||||||
return m.ds.GetCalls(ctx, filter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricds) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ds_insert_log")
|
|
||||||
defer span.End()
|
|
||||||
return m.ds.InsertLog(ctx, appName, callID, callLog)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *metricds) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ds_get_log")
|
|
||||||
defer span.End()
|
|
||||||
return m.ds.GetLog(ctx, appName, callID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// instant & no context ;)
|
// instant & no context ;)
|
||||||
func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() }
|
func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() }
|
||||||
|
|
||||||
|
|||||||
@@ -132,14 +132,6 @@ func (v *validator) RemoveRoute(ctx context.Context, appID string, routePath str
|
|||||||
return v.Datastore.RemoveRoute(ctx, appID, routePath)
|
return v.Datastore.RemoveRoute(ctx, appID, routePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// callID will never be empty.
|
|
||||||
func (v *validator) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
|
|
||||||
if callID == "" {
|
|
||||||
return nil, models.ErrDatastoreEmptyCallID
|
|
||||||
}
|
|
||||||
return v.Datastore.GetCall(ctx, appName, callID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetDatabase returns the underlying sqlx database implementation
|
// GetDatabase returns the underlying sqlx database implementation
|
||||||
func (v *validator) GetDatabase() *sqlx.DB {
|
func (v *validator) GetDatabase() *sqlx.DB {
|
||||||
return v.Datastore.GetDatabase()
|
return v.Datastore.GetDatabase()
|
||||||
|
|||||||
@@ -97,18 +97,23 @@ const (
|
|||||||
EnvDBPingMaxRetries = "FN_DS_DB_PING_MAX_RETRIES"
|
EnvDBPingMaxRetries = "FN_DS_DB_PING_MAX_RETRIES"
|
||||||
)
|
)
|
||||||
|
|
||||||
type sqlStore struct {
|
var ( // compiler will yell nice things about our upbringing as a child
|
||||||
|
_ models.Datastore = new(SQLStore)
|
||||||
|
_ models.LogStore = new(SQLStore)
|
||||||
|
)
|
||||||
|
|
||||||
|
type SQLStore struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// New will open the db specified by url, create any tables necessary
|
// New will open the db specified by url, create any tables necessary
|
||||||
// and return a models.Datastore safe for concurrent usage.
|
// and return a models.Datastore safe for concurrent usage.
|
||||||
func New(ctx context.Context, url *url.URL) (models.Datastore, error) {
|
func New(ctx context.Context, url *url.URL) (*SQLStore, error) {
|
||||||
return newDS(ctx, url)
|
return newDS(ctx, url)
|
||||||
}
|
}
|
||||||
|
|
||||||
// for test methods, return concrete type, but don't expose
|
// for test methods, return concrete type, but don't expose
|
||||||
func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) {
|
func newDS(ctx context.Context, url *url.URL) (*SQLStore, error) {
|
||||||
driver := url.Scheme
|
driver := url.Scheme
|
||||||
|
|
||||||
log := common.Logger(ctx)
|
log := common.Logger(ctx)
|
||||||
@@ -158,7 +163,7 @@ func newDS(ctx context.Context, url *url.URL) (*sqlStore, error) {
|
|||||||
db.SetMaxOpenConns(1)
|
db.SetMaxOpenConns(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
sdb := &sqlStore{db: db}
|
sdb := &SQLStore{db: db}
|
||||||
|
|
||||||
// NOTE: runMigrations happens before we create all the tables, so that it
|
// NOTE: runMigrations happens before we create all the tables, so that it
|
||||||
// can detect whether the db did not exist and insert the latest version of
|
// can detect whether the db did not exist and insert the latest version of
|
||||||
@@ -259,7 +264,7 @@ func checkExistence(tx *sqlx.Tx) (bool, error) {
|
|||||||
// check if the db already existed, if the db is brand new then we can skip
|
// check if the db already existed, if the db is brand new then we can skip
|
||||||
// over all the migrations BUT we must be sure to set the right migration
|
// over all the migrations BUT we must be sure to set the right migration
|
||||||
// number so that only current migrations are skipped, not any future ones.
|
// number so that only current migrations are skipped, not any future ones.
|
||||||
func (ds *sqlStore) runMigrations(ctx context.Context, tx *sqlx.Tx, migrations []migratex.Migration) error {
|
func (ds *SQLStore) runMigrations(ctx context.Context, tx *sqlx.Tx, migrations []migratex.Migration) error {
|
||||||
dbExists, err := checkExistence(tx)
|
dbExists, err := checkExistence(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -287,7 +292,7 @@ func latestVersion(migs []migratex.Migration) int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// clear is for tests only, be careful, it deletes all records.
|
// clear is for tests only, be careful, it deletes all records.
|
||||||
func (ds *sqlStore) clear() error {
|
func (ds *SQLStore) clear() error {
|
||||||
return ds.Tx(func(tx *sqlx.Tx) error {
|
return ds.Tx(func(tx *sqlx.Tx) error {
|
||||||
query := tx.Rebind(`DELETE FROM routes`)
|
query := tx.Rebind(`DELETE FROM routes`)
|
||||||
_, err := tx.Exec(query)
|
_, err := tx.Exec(query)
|
||||||
@@ -313,7 +318,7 @@ func (ds *sqlStore) clear() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetAppID(ctx context.Context, appName string) (string, error) {
|
func (ds *SQLStore) GetAppID(ctx context.Context, appName string) (string, error) {
|
||||||
var app models.App
|
var app models.App
|
||||||
query := ds.db.Rebind(ensureAppSelector)
|
query := ds.db.Rebind(ensureAppSelector)
|
||||||
row := ds.db.QueryRowxContext(ctx, query, appName)
|
row := ds.db.QueryRowxContext(ctx, query, appName)
|
||||||
@@ -329,7 +334,7 @@ func (ds *sqlStore) GetAppID(ctx context.Context, appName string) (string, error
|
|||||||
return app.ID, nil
|
return app.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
func (ds *SQLStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
||||||
query := ds.db.Rebind(`INSERT INTO apps (
|
query := ds.db.Rebind(`INSERT INTO apps (
|
||||||
id,
|
id,
|
||||||
name,
|
name,
|
||||||
@@ -370,7 +375,7 @@ func (ds *sqlStore) InsertApp(ctx context.Context, app *models.App) (*models.App
|
|||||||
return app, nil
|
return app, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
|
func (ds *SQLStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.App, error) {
|
||||||
var app models.App
|
var app models.App
|
||||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||||
// NOTE: must query whole object since we're returning app, Update logic
|
// NOTE: must query whole object since we're returning app, Update logic
|
||||||
@@ -415,7 +420,7 @@ func (ds *sqlStore) UpdateApp(ctx context.Context, newapp *models.App) (*models.
|
|||||||
return &app, nil
|
return &app, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) RemoveApp(ctx context.Context, appID string) error {
|
func (ds *SQLStore) RemoveApp(ctx context.Context, appID string) error {
|
||||||
return ds.Tx(func(tx *sqlx.Tx) error {
|
return ds.Tx(func(tx *sqlx.Tx) error {
|
||||||
res, err := tx.ExecContext(ctx, tx.Rebind(`DELETE FROM apps WHERE id=?`), appID)
|
res, err := tx.ExecContext(ctx, tx.Rebind(`DELETE FROM apps WHERE id=?`), appID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -445,7 +450,7 @@ func (ds *sqlStore) RemoveApp(ctx context.Context, appID string) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
func (ds *SQLStore) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
||||||
var app models.App
|
var app models.App
|
||||||
query := ds.db.Rebind(appIDSelector)
|
query := ds.db.Rebind(appIDSelector)
|
||||||
row := ds.db.QueryRowxContext(ctx, query, appID)
|
row := ds.db.QueryRowxContext(ctx, query, appID)
|
||||||
@@ -461,7 +466,7 @@ func (ds *sqlStore) GetAppByID(ctx context.Context, appID string) (*models.App,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetApps retrieves an array of apps according to a specific filter.
|
// GetApps retrieves an array of apps according to a specific filter.
|
||||||
func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
func (ds *SQLStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
||||||
res := []*models.App{}
|
res := []*models.App{}
|
||||||
if filter.NameIn != nil && len(filter.NameIn) == 0 { // this basically makes sure it doesn't return ALL apps
|
if filter.NameIn != nil && len(filter.NameIn) == 0 { // this basically makes sure it doesn't return ALL apps
|
||||||
return res, nil
|
return res, nil
|
||||||
@@ -495,7 +500,7 @@ func (ds *sqlStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*m
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
func (ds *SQLStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
||||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||||
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
|
query := tx.Rebind(`SELECT 1 FROM apps WHERE id=?`)
|
||||||
r := tx.QueryRowContext(ctx, query, route.AppID)
|
r := tx.QueryRowContext(ctx, query, route.AppID)
|
||||||
@@ -557,7 +562,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode
|
|||||||
return route, err
|
return route, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
|
func (ds *SQLStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*models.Route, error) {
|
||||||
var route models.Route
|
var route models.Route
|
||||||
err := ds.Tx(func(tx *sqlx.Tx) error {
|
err := ds.Tx(func(tx *sqlx.Tx) error {
|
||||||
query := tx.Rebind(fmt.Sprintf("%s WHERE app_id=? AND path=?", routeSelector))
|
query := tx.Rebind(fmt.Sprintf("%s WHERE app_id=? AND path=?", routeSelector))
|
||||||
@@ -612,7 +617,7 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m
|
|||||||
return &route, nil
|
return &route, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) RemoveRoute(ctx context.Context, appID string, routePath string) error {
|
func (ds *SQLStore) RemoveRoute(ctx context.Context, appID string, routePath string) error {
|
||||||
query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_id = ?`)
|
query := ds.db.Rebind(`DELETE FROM routes WHERE path = ? AND app_id = ?`)
|
||||||
res, err := ds.db.ExecContext(ctx, query, routePath, appID)
|
res, err := ds.db.ExecContext(ctx, query, routePath, appID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -631,7 +636,7 @@ func (ds *sqlStore) RemoveRoute(ctx context.Context, appID string, routePath str
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetRoute(ctx context.Context, appID, routePath string) (*models.Route, error) {
|
func (ds *SQLStore) GetRoute(ctx context.Context, appID, routePath string) (*models.Route, error) {
|
||||||
rSelectCondition := "%s WHERE app_id=? AND path=?"
|
rSelectCondition := "%s WHERE app_id=? AND path=?"
|
||||||
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
|
query := ds.db.Rebind(fmt.Sprintf(rSelectCondition, routeSelector))
|
||||||
row := ds.db.QueryRowxContext(ctx, query, appID, routePath)
|
row := ds.db.QueryRowxContext(ctx, query, appID, routePath)
|
||||||
@@ -647,7 +652,7 @@ func (ds *sqlStore) GetRoute(ctx context.Context, appID, routePath string) (*mod
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetRoutesByApp retrieves a route with a specific app name.
|
// GetRoutesByApp retrieves a route with a specific app name.
|
||||||
func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appID string, filter *models.RouteFilter) ([]*models.Route, error) {
|
func (ds *SQLStore) GetRoutesByApp(ctx context.Context, appID string, filter *models.RouteFilter) ([]*models.Route, error) {
|
||||||
res := []*models.Route{}
|
res := []*models.Route{}
|
||||||
if filter == nil {
|
if filter == nil {
|
||||||
filter = new(models.RouteFilter)
|
filter = new(models.RouteFilter)
|
||||||
@@ -684,7 +689,7 @@ func (ds *sqlStore) GetRoutesByApp(ctx context.Context, appID string, filter *mo
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
|
func (ds *SQLStore) Tx(f func(*sqlx.Tx) error) error {
|
||||||
tx, err := ds.db.Beginx()
|
tx, err := ds.db.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -697,7 +702,7 @@ func (ds *sqlStore) Tx(f func(*sqlx.Tx) error) error {
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
|
func (ds *SQLStore) InsertCall(ctx context.Context, call *models.Call) error {
|
||||||
query := ds.db.Rebind(`INSERT INTO calls (
|
query := ds.db.Rebind(`INSERT INTO calls (
|
||||||
id,
|
id,
|
||||||
created_at,
|
created_at,
|
||||||
@@ -725,7 +730,7 @@ func (ds *sqlStore) InsertCall(ctx context.Context, call *models.Call) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
func (ds *SQLStore) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
||||||
query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector)
|
query := fmt.Sprintf(`%s WHERE id=? AND app_id=?`, callSelector)
|
||||||
query = ds.db.Rebind(query)
|
query = ds.db.Rebind(query)
|
||||||
row := ds.db.QueryRowxContext(ctx, query, callID, appID)
|
row := ds.db.QueryRowxContext(ctx, query, callID, appID)
|
||||||
@@ -741,7 +746,7 @@ func (ds *sqlStore) GetCall(ctx context.Context, appID, callID string) (*models.
|
|||||||
return &call, nil
|
return &call, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
func (ds *SQLStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
||||||
res := []*models.Call{}
|
res := []*models.Call{}
|
||||||
query, args := buildFilterCallQuery(filter)
|
query, args := buildFilterCallQuery(filter)
|
||||||
query = fmt.Sprintf("%s %s", callSelector, query)
|
query = fmt.Sprintf("%s %s", callSelector, query)
|
||||||
@@ -766,7 +771,7 @@ func (ds *sqlStore) GetCalls(ctx context.Context, filter *models.CallFilter) ([]
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error {
|
func (ds *SQLStore) InsertLog(ctx context.Context, appID, callID string, logR io.Reader) error {
|
||||||
// coerce this into a string for sql
|
// coerce this into a string for sql
|
||||||
var log string
|
var log string
|
||||||
if stringer, ok := logR.(fmt.Stringer); ok {
|
if stringer, ok := logR.(fmt.Stringer); ok {
|
||||||
@@ -785,7 +790,7 @@ func (ds *sqlStore) InsertLog(ctx context.Context, appID, callID string, logR io
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *sqlStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
func (ds *SQLStore) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
||||||
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`)
|
query := ds.db.Rebind(`SELECT log FROM logs WHERE id=? AND app_id=?`)
|
||||||
row := ds.db.QueryRowContext(ctx, query, callID, appID)
|
row := ds.db.QueryRowContext(ctx, query, callID, appID)
|
||||||
|
|
||||||
@@ -911,11 +916,11 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetDatabase returns the underlying sqlx database implementation
|
// GetDatabase returns the underlying sqlx database implementation
|
||||||
func (ds *sqlStore) GetDatabase() *sqlx.DB {
|
func (ds *SQLStore) GetDatabase() *sqlx.DB {
|
||||||
return ds.db
|
return ds.db
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the database, releasing any open resources.
|
// Close closes the database, releasing any open resources.
|
||||||
func (ds *sqlStore) Close() error {
|
func (ds *SQLStore) Close() error {
|
||||||
return ds.db.Close()
|
return ds.db.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ import (
|
|||||||
// * run all down migrations
|
// * run all down migrations
|
||||||
// * run all up migrations
|
// * run all up migrations
|
||||||
// [ then run tests against that db ]
|
// [ then run tests against that db ]
|
||||||
func newWithMigrations(ctx context.Context, url *url.URL) (*sqlStore, error) {
|
func newWithMigrations(ctx context.Context, url *url.URL) (*SQLStore, error) {
|
||||||
ds, err := newDS(ctx, url)
|
ds, err := newDS(ctx, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -49,16 +49,20 @@ func TestDatastore(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
f := func(t *testing.T) models.Datastore {
|
f := func(t *testing.T) *SQLStore {
|
||||||
os.RemoveAll("sqlite_test_dir")
|
os.RemoveAll("sqlite_test_dir")
|
||||||
ds, err := newDS(ctx, u)
|
ds, err := newDS(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// we don't want to test the validator, really
|
// we don't want to test the validator, really
|
||||||
|
return ds
|
||||||
|
}
|
||||||
|
f2 := func(t *testing.T) models.Datastore {
|
||||||
|
ds := f(t)
|
||||||
return datastoreutil.NewValidator(ds)
|
return datastoreutil.NewValidator(ds)
|
||||||
}
|
}
|
||||||
datastoretest.Test(t, f)
|
datastoretest.Test(t, f2)
|
||||||
|
|
||||||
// also logs
|
// also logs
|
||||||
logstoretest.Test(t, f(t))
|
logstoretest.Test(t, f(t))
|
||||||
@@ -72,7 +76,7 @@ func TestDatastore(t *testing.T) {
|
|||||||
// will down migrate all migrations, up migrate, and run tests again.
|
// will down migrate all migrations, up migrate, and run tests again.
|
||||||
|
|
||||||
both := func(u *url.URL) {
|
both := func(u *url.URL) {
|
||||||
f := func(t *testing.T) models.Datastore {
|
f := func(t *testing.T) *SQLStore {
|
||||||
ds, err := newDS(ctx, u)
|
ds, err := newDS(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@@ -81,16 +85,20 @@ func TestDatastore(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
return ds
|
||||||
|
}
|
||||||
|
f2 := func(t *testing.T) models.Datastore {
|
||||||
|
ds := f(t)
|
||||||
return datastoreutil.NewValidator(ds)
|
return datastoreutil.NewValidator(ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
// test fresh w/o migrations
|
// test fresh w/o migrations
|
||||||
datastoretest.Test(t, f)
|
datastoretest.Test(t, f2)
|
||||||
|
|
||||||
// also test sql implements logstore
|
// also test sql implements logstore
|
||||||
logstoretest.Test(t, f(t))
|
logstoretest.Test(t, f(t))
|
||||||
|
|
||||||
f = func(t *testing.T) models.Datastore {
|
f = func(t *testing.T) *SQLStore {
|
||||||
t.Log("with migrations now!")
|
t.Log("with migrations now!")
|
||||||
ds, err := newWithMigrations(ctx, u)
|
ds, err := newWithMigrations(ctx, u)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -100,11 +108,15 @@ func TestDatastore(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
return ds
|
||||||
|
}
|
||||||
|
f2 = func(t *testing.T) models.Datastore {
|
||||||
|
ds := f(t)
|
||||||
return datastoreutil.NewValidator(ds)
|
return datastoreutil.NewValidator(ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
// test that migrations work & things work with them
|
// test that migrations work & things work with them
|
||||||
datastoretest.Test(t, f)
|
datastoretest.Test(t, f2)
|
||||||
|
|
||||||
// also test sql implements logstore
|
// also test sql implements logstore
|
||||||
logstoretest.Test(t, f(t))
|
logstoretest.Test(t, f(t))
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ import (
|
|||||||
|
|
||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
"github.com/fnproject/fn/api/datastore/sql"
|
"github.com/fnproject/fn/api/datastore/sql"
|
||||||
|
"github.com/fnproject/fn/api/logs/metrics"
|
||||||
"github.com/fnproject/fn/api/logs/s3"
|
"github.com/fnproject/fn/api/logs/s3"
|
||||||
|
"github.com/fnproject/fn/api/logs/validator"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@@ -20,12 +22,19 @@ func New(ctx context.Context, dbURL string) (models.LogStore, error) {
|
|||||||
log.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL")
|
log.WithError(err).WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL")
|
||||||
}
|
}
|
||||||
log.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store")
|
log.WithFields(logrus.Fields{"db": u.Scheme}).Debug("creating log store")
|
||||||
|
var ls models.LogStore
|
||||||
switch u.Scheme {
|
switch u.Scheme {
|
||||||
case "sqlite3", "postgres", "mysql":
|
case "sqlite3", "postgres", "mysql":
|
||||||
return sql.New(ctx, u)
|
ls, err = sql.New(ctx, u)
|
||||||
case "s3":
|
case "s3":
|
||||||
return s3.New(u)
|
ls, err = s3.New(u)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("db type not supported %v", u.Scheme)
|
err = fmt.Errorf("db type not supported %v", u.Scheme)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return ls, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func Wrap(ls models.LogStore) models.LogStore {
|
||||||
|
return validator.NewValidator(metrics.NewLogstore(ls))
|
||||||
}
|
}
|
||||||
|
|||||||
51
api/logs/metrics/metrics.go
Normal file
51
api/logs/metrics/metrics.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/fnproject/fn/api/models"
|
||||||
|
"go.opencensus.io/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewLogstore(ls models.LogStore) models.LogStore {
|
||||||
|
return &metricls{ls}
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricls struct {
|
||||||
|
ls models.LogStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) InsertCall(ctx context.Context, call *models.Call) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "ls_insert_call")
|
||||||
|
defer span.End()
|
||||||
|
return m.ls.InsertCall(ctx, call)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) GetCall(ctx context.Context, appName, callID string) (*models.Call, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "ls_get_call")
|
||||||
|
defer span.End()
|
||||||
|
return m.ls.GetCall(ctx, appName, callID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*models.Call, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "ls_get_calls")
|
||||||
|
defer span.End()
|
||||||
|
return m.ls.GetCalls(ctx, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "ls_insert_log")
|
||||||
|
defer span.End()
|
||||||
|
return m.ls.InsertLog(ctx, appName, callID, callLog)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "ls_get_log")
|
||||||
|
defer span.End()
|
||||||
|
return m.ls.GetLog(ctx, appName, callID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricls) Close() error {
|
||||||
|
return m.ls.Close()
|
||||||
|
}
|
||||||
60
api/logs/validator/validator.go
Normal file
60
api/logs/validator/validator.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package validator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/fnproject/fn/api/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewValidator(ls models.LogStore) models.LogStore {
|
||||||
|
return &validator{ls}
|
||||||
|
}
|
||||||
|
|
||||||
|
type validator struct {
|
||||||
|
models.LogStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// callID or appID will never be empty.
|
||||||
|
func (v *validator) InsertLog(ctx context.Context, appID, callID string, callLog io.Reader) error {
|
||||||
|
if callID == "" {
|
||||||
|
return models.ErrDatastoreEmptyCallID
|
||||||
|
}
|
||||||
|
if appID == "" {
|
||||||
|
return models.ErrDatastoreEmptyAppID
|
||||||
|
}
|
||||||
|
return v.LogStore.InsertLog(ctx, appID, callID, callLog)
|
||||||
|
}
|
||||||
|
|
||||||
|
// callID or appID will never be empty.
|
||||||
|
func (v *validator) GetLog(ctx context.Context, appID, callID string) (io.Reader, error) {
|
||||||
|
if callID == "" {
|
||||||
|
return nil, models.ErrDatastoreEmptyCallID
|
||||||
|
}
|
||||||
|
if appID == "" {
|
||||||
|
return nil, models.ErrDatastoreEmptyAppID
|
||||||
|
}
|
||||||
|
return v.LogStore.GetLog(ctx, appID, callID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// callID or appID will never be empty.
|
||||||
|
func (v *validator) InsertCall(ctx context.Context, call *models.Call) error {
|
||||||
|
if call.ID == "" {
|
||||||
|
return models.ErrDatastoreEmptyCallID
|
||||||
|
}
|
||||||
|
if call.AppID == "" {
|
||||||
|
return models.ErrDatastoreEmptyAppID
|
||||||
|
}
|
||||||
|
return v.LogStore.InsertCall(ctx, call)
|
||||||
|
}
|
||||||
|
|
||||||
|
// callID or appID will never be empty.
|
||||||
|
func (v *validator) GetCall(ctx context.Context, appID, callID string) (*models.Call, error) {
|
||||||
|
if callID == "" {
|
||||||
|
return nil, models.ErrDatastoreEmptyCallID
|
||||||
|
}
|
||||||
|
if appID == "" {
|
||||||
|
return nil, models.ErrDatastoreEmptyAppID
|
||||||
|
}
|
||||||
|
return v.LogStore.GetCall(ctx, appID, callID)
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package models
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
@@ -58,9 +59,9 @@ type Datastore interface {
|
|||||||
// ErrDatastoreEmptyRoutePath when routePath is empty. Returns ErrRoutesNotFound when no route exists.
|
// ErrDatastoreEmptyRoutePath when routePath is empty. Returns ErrRoutesNotFound when no route exists.
|
||||||
RemoveRoute(ctx context.Context, appID, routePath string) error
|
RemoveRoute(ctx context.Context, appID, routePath string) error
|
||||||
|
|
||||||
// Implement LogStore methods for convenience
|
|
||||||
LogStore
|
|
||||||
|
|
||||||
// GetDatabase returns the underlying sqlx database implementation
|
// GetDatabase returns the underlying sqlx database implementation
|
||||||
GetDatabase() *sqlx.DB
|
GetDatabase() *sqlx.DB
|
||||||
|
|
||||||
|
// implements io.Closer to shutdown
|
||||||
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -165,7 +165,7 @@ func (s *Server) handleRunnerFinish(c *gin.Context) {
|
|||||||
// TODO this needs UpdateCall functionality to work for async and should only work if:
|
// TODO this needs UpdateCall functionality to work for async and should only work if:
|
||||||
// running->error|timeout|success
|
// running->error|timeout|success
|
||||||
// TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted*
|
// TODO all async will fail here :( all sync will work fine :) -- *feeling conflicted*
|
||||||
if err := s.datastore.InsertCall(ctx, &call); err != nil {
|
if err := s.logstore.InsertCall(ctx, &call); err != nil {
|
||||||
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
||||||
// note: Not returning err here since the job could have already finished successfully.
|
// note: Not returning err here since the job could have already finished successfully.
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ func envTweaker(name, value string) func() {
|
|||||||
|
|
||||||
func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelFunc) {
|
func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelFunc) {
|
||||||
ds := datastore.NewMock()
|
ds := datastore.NewMock()
|
||||||
|
ls := logs.NewMock()
|
||||||
var mq models.MessageQueue = &mqs.Mock{}
|
var mq models.MessageQueue = &mqs.Mock{}
|
||||||
for _, a := range args {
|
for _, a := range args {
|
||||||
switch arg := a.(type) {
|
switch arg := a.(type) {
|
||||||
@@ -49,9 +50,11 @@ func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelF
|
|||||||
ds = arg
|
ds = arg
|
||||||
case models.MessageQueue:
|
case models.MessageQueue:
|
||||||
mq = arg
|
mq = arg
|
||||||
|
case models.LogStore:
|
||||||
|
ls = arg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
r := agent.New(agent.NewDirectDataAccess(ds, ds, mq))
|
r := agent.New(agent.NewDirectDataAccess(ds, ls, mq))
|
||||||
return r, func() { r.Close() }
|
return r, func() { r.Close() }
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,11 +172,12 @@ func TestRouteRunnerExecEmptyBody(t *testing.T) {
|
|||||||
{Path: "/hotjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
{Path: "/hotjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
rnr, cancelrnr := testRunner(t, ds)
|
rnr, cancelrnr := testRunner(t, ds, ls)
|
||||||
defer cancelrnr()
|
defer cancelrnr()
|
||||||
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull)
|
srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull)
|
||||||
|
|
||||||
expHeaders := map[string][]string{"X-Function": {"Test"}}
|
expHeaders := map[string][]string{"X-Function": {"Test"}}
|
||||||
emptyBody := `{"echoContent": "_TRX_ID_", "isDebug": true, "isEmptyBody": true}`
|
emptyBody := `{"echoContent": "_TRX_ID_", "isDebug": true, "isEmptyBody": true}`
|
||||||
@@ -258,11 +262,12 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
{Path: "/mybigoutputjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
{Path: "/mybigoutputjson", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
ls := logs.NewMock()
|
||||||
|
|
||||||
rnr, cancelrnr := testRunner(t, ds)
|
rnr, cancelrnr := testRunner(t, ds, ls)
|
||||||
defer cancelrnr()
|
defer cancelrnr()
|
||||||
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull)
|
srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull)
|
||||||
|
|
||||||
expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}}
|
expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}}
|
||||||
expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}}
|
expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}}
|
||||||
@@ -373,7 +378,7 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
|
|
||||||
for i, test := range testCases {
|
for i, test := range testCases {
|
||||||
if test.expectedLogsSubStr != nil {
|
if test.expectedLogsSubStr != nil {
|
||||||
if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) {
|
if !checkLogs(t, i, ls, callIds[i], test.expectedLogsSubStr) {
|
||||||
isFailure = true
|
isFailure = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -403,7 +408,7 @@ func getDockerId(respBytes []byte) (string, error) {
|
|||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkLogs(t *testing.T, tnum int, ds models.Datastore, callID string, expected []string) bool {
|
func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool {
|
||||||
|
|
||||||
logReader, err := ds.GetLog(context.Background(), "myapp", callID)
|
logReader, err := ds.GetLog(context.Background(), "myapp", callID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -460,7 +465,7 @@ func TestFailedEnqueue(t *testing.T) {
|
|||||||
err := errors.New("Unable to push task to queue")
|
err := errors.New("Unable to push task to queue")
|
||||||
mq := &errorMQ{err, http.StatusInternalServerError}
|
mq := &errorMQ{err, http.StatusInternalServerError}
|
||||||
fnl := logs.NewMock()
|
fnl := logs.NewMock()
|
||||||
rnr, cancelrnr := testRunner(t, ds, mq)
|
rnr, cancelrnr := testRunner(t, ds, mq, fnl)
|
||||||
defer cancelrnr()
|
defer cancelrnr()
|
||||||
|
|
||||||
srv := testServer(ds, mq, fnl, rnr, ServerTypeFull)
|
srv := testServer(ds, mq, fnl, rnr, ServerTypeFull)
|
||||||
@@ -511,10 +516,10 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
rnr, cancelrnr := testRunner(t, ds)
|
fnl := logs.NewMock()
|
||||||
|
rnr, cancelrnr := testRunner(t, ds, fnl)
|
||||||
defer cancelrnr()
|
defer cancelrnr()
|
||||||
|
|
||||||
fnl := logs.NewMock()
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
@@ -581,10 +586,10 @@ func TestRouteRunnerMinimalConcurrentHotSync(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
rnr, cancelrnr := testRunner(t, ds)
|
fnl := logs.NewMock()
|
||||||
|
rnr, cancelrnr := testRunner(t, ds, fnl)
|
||||||
defer cancelrnr()
|
defer cancelrnr()
|
||||||
|
|
||||||
fnl := logs.NewMock()
|
|
||||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
||||||
|
|
||||||
for i, test := range []struct {
|
for i, test := range []struct {
|
||||||
|
|||||||
@@ -173,8 +173,10 @@ func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
|
|||||||
if nodeType != ServerTypeAPI {
|
if nodeType != ServerTypeAPI {
|
||||||
opts = append(opts, WithAgentFromEnv())
|
opts = append(opts, WithAgentFromEnv())
|
||||||
} else {
|
} else {
|
||||||
|
// NOTE: ensures logstore is set or there will be troubles
|
||||||
opts = append(opts, WithLogstoreFromDatastore())
|
opts = append(opts, WithLogstoreFromDatastore())
|
||||||
}
|
}
|
||||||
|
|
||||||
return New(ctx, opts...)
|
return New(ctx, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -330,7 +332,7 @@ func WithNodeCertAuthority(ca string) ServerOption {
|
|||||||
|
|
||||||
func WithDatastore(ds models.Datastore) ServerOption {
|
func WithDatastore(ds models.Datastore) ServerOption {
|
||||||
return func(ctx context.Context, s *Server) error {
|
return func(ctx context.Context, s *Server) error {
|
||||||
s.datastore = datastore.Wrap(ds)
|
s.datastore = ds
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -370,7 +372,11 @@ func WithLogstoreFromDatastore() ServerOption {
|
|||||||
return errors.New("Need a datastore in order to use it as a logstore")
|
return errors.New("Need a datastore in order to use it as a logstore")
|
||||||
}
|
}
|
||||||
if s.logstore == nil {
|
if s.logstore == nil {
|
||||||
s.logstore = s.datastore
|
if ls, ok := s.datastore.(models.LogStore); ok {
|
||||||
|
s.logstore = ls
|
||||||
|
} else {
|
||||||
|
return errors.New("datastore must implement logstore interface")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -380,9 +386,12 @@ func WithLogstoreFromDatastore() ServerOption {
|
|||||||
func WithFullAgent() ServerOption {
|
func WithFullAgent() ServerOption {
|
||||||
return func(ctx context.Context, s *Server) error {
|
return func(ctx context.Context, s *Server) error {
|
||||||
s.nodeType = ServerTypeFull
|
s.nodeType = ServerTypeFull
|
||||||
|
|
||||||
|
// ensure logstore is set (TODO compat only?)
|
||||||
if s.logstore == nil {
|
if s.logstore == nil {
|
||||||
s.logstore = s.datastore
|
WithLogstoreFromDatastore()(ctx, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.datastore == nil || s.logstore == nil || s.mq == nil {
|
if s.datastore == nil || s.logstore == nil || s.mq == nil {
|
||||||
return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
|
return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
|
||||||
}
|
}
|
||||||
@@ -464,14 +473,7 @@ func WithAgentFromEnv() ServerOption {
|
|||||||
return errors.New("LBAgent creation failed")
|
return errors.New("LBAgent creation failed")
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
s.nodeType = ServerTypeFull
|
WithFullAgent()(ctx, s)
|
||||||
if s.logstore == nil { // TODO seems weird?
|
|
||||||
s.logstore = s.datastore
|
|
||||||
}
|
|
||||||
if s.datastore == nil || s.logstore == nil || s.mq == nil {
|
|
||||||
return errors.New("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
|
|
||||||
}
|
|
||||||
s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq)))
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -543,7 +545,9 @@ func New(ctx context.Context, opts ...ServerOption) *Server {
|
|||||||
s.appListeners = new(appListeners)
|
s.appListeners = new(appListeners)
|
||||||
s.routeListeners = new(routeListeners)
|
s.routeListeners = new(routeListeners)
|
||||||
|
|
||||||
|
s.datastore = datastore.Wrap(s.datastore)
|
||||||
s.datastore = fnext.NewDatastore(s.datastore, s.appListeners, s.routeListeners)
|
s.datastore = fnext.NewDatastore(s.datastore, s.appListeners, s.routeListeners)
|
||||||
|
s.logstore = logs.Wrap(s.logstore)
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -14,6 +15,7 @@ import (
|
|||||||
|
|
||||||
"github.com/fnproject/fn/api/agent"
|
"github.com/fnproject/fn/api/agent"
|
||||||
"github.com/fnproject/fn/api/datastore"
|
"github.com/fnproject/fn/api/datastore"
|
||||||
|
"github.com/fnproject/fn/api/datastore/sql"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
"github.com/fnproject/fn/api/logs"
|
"github.com/fnproject/fn/api/logs"
|
||||||
"github.com/fnproject/fn/api/models"
|
"github.com/fnproject/fn/api/models"
|
||||||
@@ -94,11 +96,17 @@ func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) *models.Erro
|
|||||||
|
|
||||||
func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.LogStore, func()) {
|
func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.LogStore, func()) {
|
||||||
os.Remove(tmpDatastoreTests)
|
os.Remove(tmpDatastoreTests)
|
||||||
ds, err := datastore.New(ctx, "sqlite3://"+tmpDatastoreTests)
|
uri, err := url.Parse("sqlite3://" + tmpDatastoreTests)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
ss, err := sql.New(ctx, uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error when creating datastore: %s", err)
|
t.Fatalf("Error when creating datastore: %s", err)
|
||||||
}
|
}
|
||||||
logDB := ds
|
logDB := logs.Wrap(ss)
|
||||||
|
ds := datastore.Wrap(ss)
|
||||||
|
|
||||||
return ds, logDB, func() {
|
return ds, logDB, func() {
|
||||||
os.Remove(tmpDatastoreTests)
|
os.Remove(tmpDatastoreTests)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user