mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* datastore no longer implements logstore the underlying implementation of our sql store implements both the datastore and the logstore interface, however going forward we are likely to encounter datastore implementers that would mock out the logstore interface and not use its methods - signalling a poor interface. this remedies that, now they are 2 completely separate things, which our sqlstore happens to implement both of. related to some recent changes around wrapping, this keeps the imposed metrics and validation wrapping of a servers logstore and datastore, just moving it into New instead of in the opts - this is so that a user can have the underlying datastore in order to set the logstore to it, since wrapping it in a validator/metrics would render it no longer a logstore implementer (i.e. validate datastore doesn't implement the logstore interface), we need to do this after setting the logstore to the datastore if one wasn't provided explicitly. * splits logstore and datastore metrics & validation logic * `make test` should be `make full-test` always. got rid of that so that nobody else has to wait for CI to blow up on them after the tests pass locally ever again. * fix new tests
205 lines
6.3 KiB
Go
205 lines
6.3 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/fnproject/fn/api/common"
|
|
"github.com/fnproject/fn/api/common/singleflight"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/patrickmn/go-cache"
|
|
)
|
|
|
|
// DataAccess abstracts the datastore and message queue operations done by the
|
|
// agent, so that API nodes and runner nodes can work with the same interface
|
|
// but actually operate on the data in different ways (by direct access or by
|
|
// mediation through an API node).
|
|
type DataAccess interface {
|
|
GetAppID(ctx context.Context, appName string) (string, error)
|
|
|
|
// GetAppByID abstracts querying the datastore for an app.
|
|
GetAppByID(ctx context.Context, appID string) (*models.App, error)
|
|
|
|
// GetRoute abstracts querying the datastore for a route within an app.
|
|
GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error)
|
|
|
|
// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
|
|
Enqueue(ctx context.Context, mCall *models.Call) error
|
|
|
|
// Dequeue will query the queue for the next available Call that can be run
|
|
// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
|
|
Dequeue(ctx context.Context) (*models.Call, error)
|
|
|
|
// Start will attempt to start the provided Call within an appropriate
|
|
// context.
|
|
Start(ctx context.Context, mCall *models.Call) error
|
|
|
|
// Finish will notify the system that the Call has been processed, and
|
|
// fulfill the reservation in the queue if the call came from a queue.
|
|
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
|
|
|
|
// Close will wait for any pending operations to complete and
|
|
// shuts down connections to the underlying datastore/queue resources.
|
|
// Close is not safe to be called from multiple threads.
|
|
io.Closer
|
|
}
|
|
|
|
// CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
|
|
type CachedDataAccess struct {
|
|
DataAccess
|
|
|
|
cache *cache.Cache
|
|
singleflight singleflight.SingleFlight
|
|
}
|
|
|
|
func NewCachedDataAccess(da DataAccess) DataAccess {
|
|
cda := &CachedDataAccess{
|
|
DataAccess: da,
|
|
cache: cache.New(5*time.Second, 1*time.Minute),
|
|
}
|
|
return cda
|
|
}
|
|
|
|
func routeCacheKey(app, path string) string {
|
|
return "r:" + app + "\x00" + path
|
|
}
|
|
|
|
func appIDCacheKey(appID string) string {
|
|
return "a:" + appID
|
|
}
|
|
|
|
func (da *CachedDataAccess) GetAppID(ctx context.Context, appName string) (string, error) {
|
|
return da.DataAccess.GetAppID(ctx, appName)
|
|
}
|
|
|
|
func (da *CachedDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
|
key := appIDCacheKey(appID)
|
|
app, ok := da.cache.Get(key)
|
|
if ok {
|
|
return app.(*models.App), nil
|
|
}
|
|
|
|
resp, err := da.singleflight.Do(key,
|
|
func() (interface{}, error) {
|
|
return da.DataAccess.GetAppByID(ctx, appID)
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
app = resp.(*models.App)
|
|
da.cache.Set(key, app, cache.DefaultExpiration)
|
|
return app.(*models.App), nil
|
|
}
|
|
|
|
func (da *CachedDataAccess) GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error) {
|
|
key := routeCacheKey(appID, routePath)
|
|
r, ok := da.cache.Get(key)
|
|
if ok {
|
|
return r.(*models.Route), nil
|
|
}
|
|
|
|
resp, err := da.singleflight.Do(key,
|
|
func() (interface{}, error) {
|
|
return da.DataAccess.GetRoute(ctx, appID, routePath)
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r = resp.(*models.Route)
|
|
da.cache.Set(key, r, cache.DefaultExpiration)
|
|
return r.(*models.Route), nil
|
|
}
|
|
|
|
// Close invokes close on the underlying DataAccess
|
|
func (da *CachedDataAccess) Close() error {
|
|
return da.DataAccess.Close()
|
|
}
|
|
|
|
type directDataAccess struct {
|
|
mq models.MessageQueue
|
|
ds models.Datastore
|
|
ls models.LogStore
|
|
}
|
|
|
|
func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess {
|
|
da := &directDataAccess{
|
|
mq: mq,
|
|
ds: ds,
|
|
ls: ls,
|
|
}
|
|
return da
|
|
}
|
|
|
|
func (da *directDataAccess) GetAppID(ctx context.Context, appName string) (string, error) {
|
|
return da.ds.GetAppID(ctx, appName)
|
|
}
|
|
|
|
func (da *directDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
|
return da.ds.GetAppByID(ctx, appID)
|
|
}
|
|
|
|
func (da *directDataAccess) GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error) {
|
|
return da.ds.GetRoute(ctx, appID, routePath)
|
|
}
|
|
|
|
func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
|
|
_, err := da.mq.Push(ctx, mCall)
|
|
return err
|
|
// TODO: Insert a call in the datastore with the 'queued' state
|
|
}
|
|
|
|
func (da *directDataAccess) Dequeue(ctx context.Context) (*models.Call, error) {
|
|
return da.mq.Reserve(ctx)
|
|
}
|
|
|
|
func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error {
|
|
// TODO Access datastore and try a Compare-And-Swap to set the call to
|
|
// 'running'. If it fails, delete the message from the MQ and return an
|
|
// error. If it is successful, don't do anything - the message will be
|
|
// removed when the call Finish'es.
|
|
|
|
// At the moment we don't have the queued/running/finished mechanics so we
|
|
// remove the message here.
|
|
return da.mq.Delete(ctx, mCall)
|
|
}
|
|
|
|
func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error {
|
|
// this means that we could potentially store an error / timeout status for a
|
|
// call that ran successfully [by a user's perspective]
|
|
// TODO: this should be update, really
|
|
if err := da.ls.InsertCall(ctx, mCall); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
if err := da.ls.InsertLog(ctx, mCall.AppID, mCall.ID, stderr); err != nil {
|
|
common.Logger(ctx).WithError(err).Error("error uploading log")
|
|
// note: Not returning err here since the job could have already finished successfully.
|
|
}
|
|
|
|
if async {
|
|
// XXX (reed): delete MQ message, eventually
|
|
// YYY (hhexo): yes, once we have the queued/running/finished mechanics
|
|
// return da.mq.Delete(ctx, mCall)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close calls close on the underlying Datastore and MessageQueue. If the Logstore
|
|
// and Datastore are different, it will call Close on the Logstore as well.
|
|
func (da *directDataAccess) Close() error {
|
|
err := da.ds.Close()
|
|
if ls, ok := da.ds.(models.LogStore); ok && ls != da.ls {
|
|
if daErr := da.ls.Close(); daErr != nil {
|
|
err = daErr
|
|
}
|
|
}
|
|
if mqErr := da.mq.Close(); mqErr != nil {
|
|
err = mqErr
|
|
}
|
|
return err
|
|
}
|