Files
fn-serverless/api/agent/data_access.go
Tom Coupland d56a49b321 Remove V1 endpoints and Routes (#1210)
Largely a removal job, however many tests, particularly system level
ones relied on Routes. These have been migrated to use Fns.

* Add 410 response to swagger
* No app names in log tags
* Adding constraint in GetCall for FnID
* Adding test to check FnID is required on call
* Add fn_id to call selector
* Fix text in docker mem warning
* Correct buildConfig func name
* Test fix up
* Removing CPU setting from Agent test

CPU setting has been deprecated, but the code base is still riddled
with it. This just removes it from this layer. Really we need to
remove it from Call.

* Remove fn id check on calls
* Reintroduce fn id required on call
* Adding fnID to calls for execute test
* Correct setting of app id in middleware
* Removes root middlewares ability to redirect fun invocations
* Add over sized test check
* Removing call fn id check
2018-09-17 16:44:51 +01:00

192 lines
5.8 KiB
Go

package agent
import (
"context"
"io"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/common/singleflight"
"github.com/fnproject/fn/api/models"
"github.com/patrickmn/go-cache"
)
//ReadDataAccess represents read operations required to operate a load balancer node
type ReadDataAccess interface {
GetAppID(ctx context.Context, appName string) (string, error)
// GetAppByID abstracts querying the datastore for an app.
GetAppByID(ctx context.Context, appID string) (*models.App, error)
GetTriggerBySource(ctx context.Context, appId string, triggerType, source string) (*models.Trigger, error)
GetFnByID(ctx context.Context, fnId string) (*models.Fn, error)
}
//DequeueDataAccess abstracts an underlying dequeue for async runners
type DequeueDataAccess interface {
// Dequeue will query the queue for the next available Call that can be run
// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
Dequeue(ctx context.Context) (*models.Call, error)
}
//EnqueueDataAccess abstracts an underying enqueue for async queueing
type EnqueueDataAccess interface {
// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
Enqueue(ctx context.Context, mCall *models.Call) error
}
// CallHandler consumes the start and finish events for a call
// This is effectively a callback that is allowed to read the logs -
// TODO Deprecate this - this could be a CallListener except it also consumes logs
type CallHandler interface {
// Start will attempt to start the provided Call within an appropriate
// context.
Start(ctx context.Context, mCall *models.Call) error
// Finish will notify the system that the Call has been processed, and
// fulfill the reservation in the queue if the call came from a queue.
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}
// DataAccess is currently
type DataAccess interface {
ReadDataAccess
DequeueDataAccess
CallHandler
}
// CachedDataAccess wraps a DataAccess and caches the results of GetApp.
type cachedDataAccess struct {
ReadDataAccess
cache *cache.Cache
singleflight singleflight.SingleFlight
}
func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess {
cda := &cachedDataAccess{
ReadDataAccess: da,
cache: cache.New(5*time.Second, 1*time.Minute),
}
return cda
}
func appIDCacheKey(appID string) string {
return "a:" + appID
}
func (da *cachedDataAccess) GetAppID(ctx context.Context, appName string) (string, error) {
return da.ReadDataAccess.GetAppID(ctx, appName)
}
func (da *cachedDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
key := appIDCacheKey(appID)
app, ok := da.cache.Get(key)
if ok {
return app.(*models.App), nil
}
resp, err := da.singleflight.Do(key,
func() (interface{}, error) {
return da.ReadDataAccess.GetAppByID(ctx, appID)
})
if err != nil {
return nil, err
}
app = resp.(*models.App)
da.cache.Set(key, app, cache.DefaultExpiration)
return app.(*models.App), nil
}
type directDataAccess struct {
mq models.MessageQueue
ls models.LogStore
}
type directDequeue struct {
mq models.MessageQueue
}
func (ddq *directDequeue) Dequeue(ctx context.Context) (*models.Call, error) {
return ddq.mq.Reserve(ctx)
}
func NewDirectDequeueAccess(mq models.MessageQueue) DequeueDataAccess {
return &directDequeue{
mq: mq,
}
}
type directEnequeue struct {
mq models.MessageQueue
}
func NewDirectEnqueueAccess(mq models.MessageQueue) EnqueueDataAccess {
return &directEnequeue{
mq: mq,
}
}
func (da *directEnequeue) Enqueue(ctx context.Context, mCall *models.Call) error {
_, err := da.mq.Push(ctx, mCall)
return err
// TODO: Insert a call in the datastore with the 'queued' state
}
func NewDirectCallDataAccess(ls models.LogStore, mq models.MessageQueue) CallHandler {
da := &directDataAccess{
mq: mq,
ls: ls,
}
return da
}
func (da *directDataAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
_, err := da.mq.Push(ctx, mCall)
return err
// TODO: Insert a call in the datastore with the 'queued' state
}
func (da *directDataAccess) Start(ctx context.Context, mCall *models.Call) error {
// TODO Access datastore and try a Compare-And-Swap to set the call to
// 'running'. If it fails, delete the message from the MQ and return an
// error. If it is successful, don't do anything - the message will be
// removed when the call Finish'es.
// At the moment we don't have the queued/running/finished mechanics so we
// remove the message here.
return da.mq.Delete(ctx, mCall)
}
func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error {
// this means that we could potentially store an error / timeout status for a
// call that ran successfully [by a user's perspective]
// TODO: this should be update, really
if err := da.ls.InsertCall(ctx, mCall); err != nil {
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
// note: Not returning err here since the job could have already finished successfully.
}
if err := da.ls.InsertLog(ctx, mCall, stderr); err != nil {
common.Logger(ctx).WithError(err).Error("error uploading log")
// note: Not returning err here since the job could have already finished successfully.
}
if async {
// XXX (reed): delete MQ message, eventually
// YYY (hhexo): yes, once we have the queued/running/finished mechanics
// return cda.mq.Delete(ctx, mCall)
}
return nil
}
type noAsyncEnqueueAccess struct{}
func (noAsyncEnqueueAccess) Enqueue(ctx context.Context, mCall *models.Call) error {
return models.ErrAsyncUnsupported
}
//NewUnsupportedEnqueueAccess is a backstop that errors when you try to enqueue an async operation on a server that doesn't support async
func NewUnsupportedAsyncEnqueueAccess() EnqueueDataAccess {
return &noAsyncEnqueueAccess{}
}