Files
fn-serverless/api/agent/lb_agent.go
Denis Makogon 3c15ca6ea6 App ID (#641)
* App ID

* Clean-up

* Use ID or name to reference apps

* Can use app by name or ID

* Get rid of AppName for routes API and model

 routes API is completely backwards-compatible
 routes API accepts both app ID and name

* Get rid of AppName from calls API and model

* Fixing tests

* Get rid of AppName from logs API and model

* Restrict API to work with app names only

* Addressing review comments

* Fix for hybrid mode

* Fix rebase problems

* Addressing review comments

* Addressing review comments pt.2

* Fixing test issue

* Addressing review comments pt.3

* Updated docstring

* Adjust UpdateApp SQL implementation to work with app IDs instead of names

* Fixing tests

* fmt after rebase

* Make tests green again!

* Use GetAppByID wherever it is necessary

 - adding new v2 endpoints to keep hybrid api/runner mode working
 - extract CallBase from Call object to expose that to a user
   (it doesn't include any app reference, as we do for all other API objects)

* Get rid of GetAppByName

* Adjusting server router setup

* Make hybrid work again

* Fix datastore tests

* Fixing tests

* Do not ignore app_id

* Resolve issues after rebase

* Updating test to make it work as it was

* Tabula rasa for migrations

* Adding calls API test

 - we need to ensure we give "App not found" for the missing app and missing call in first place
 - making previous test work (request missing call for the existing app)

* Make datastore tests work fine with correctly applied migrations

* Make CallFunction middleware work again

 had to adjust its implementation to set app ID before proceeding

* The biggest rebase ever made

* Fix 8's migration

* Fix tests

* Fix hybrid client

* Fix tests problem

* Increment app ID migration version

* Fixing TestAppUpdate

* Fix rebase issues

* Addressing review comments

* Renew vendor

* Updated swagger doc per recommendations
2018-03-26 11:19:36 -07:00

227 lines
5.4 KiB
Go

package agent
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/fnext"
)
type remoteSlot struct {
lbAgent *lbAgent
}
func (s *remoteSlot) exec(ctx context.Context, call pool.RunnerCall) error {
a := s.lbAgent
err := a.placer.PlaceCall(a.rp, ctx, call)
if err != nil {
logrus.WithError(err).Error("Failed to place call")
}
return err
}
func (s *remoteSlot) Close(ctx context.Context) error {
return nil
}
func (s *remoteSlot) Error() error {
return nil
}
type naivePlacer struct {
}
func NewNaivePlacer() pool.Placer {
return &naivePlacer{}
}
func minDuration(f, s time.Duration) time.Duration {
if f < s {
return f
}
return s
}
func (sp *naivePlacer) PlaceCall(rp pool.RunnerPool, ctx context.Context, call pool.RunnerCall) error {
timeout := time.After(call.SlotDeadline().Sub(time.Now()))
for {
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default:
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
for _, r := range runners {
placed, err := r.TryExec(ctx, call)
if placed {
return err
}
}
}
remaining := call.SlotDeadline().Sub(time.Now())
if remaining <= 0 {
return models.ErrCallTimeoutServerBusy
}
// backoff
time.Sleep(minDuration(retryWaitInterval, remaining))
}
}
}
const (
runnerReconnectInterval = 5 * time.Second
// sleep time to attempt placement across all runners before retrying
retryWaitInterval = 10 * time.Millisecond
// sleep time when scaling from 0 to 1 runners
noCapacityWaitInterval = 1 * time.Second
// amount of time to wait to place a request on a runner
placementTimeout = 15 * time.Second
runnerPoolShutdownTimeout = 5 * time.Second
)
type lbAgent struct {
delegatedAgent Agent
rp pool.RunnerPool
placer pool.Placer
wg sync.WaitGroup // Needs a good name
shutdown chan struct{}
}
func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) {
agent := createAgent(da, false)
a := &lbAgent{
delegatedAgent: agent,
rp: rp,
placer: p,
}
return a, nil
}
// GetAppID is to get the match of an app name to its ID
func (a *lbAgent) GetAppID(ctx context.Context, appName string) (string, error) {
return a.delegatedAgent.GetAppID(ctx, appName)
}
// GetAppByID is to get the app by ID
func (a *lbAgent) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
return a.delegatedAgent.GetAppByID(ctx, appID)
}
// GetCall delegates to the wrapped agent but disables the capacity check as
// this agent isn't actually running the call.
func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
opts = append(opts, WithoutPreemptiveCapacityCheck())
return a.delegatedAgent.GetCall(opts...)
}
func (a *lbAgent) Close() error {
// we should really be passing the server's context here
ctx, cancel := context.WithTimeout(context.Background(), runnerPoolShutdownTimeout)
defer cancel()
a.rp.Shutdown(ctx)
err := a.delegatedAgent.Close()
if err != nil {
return err
}
return nil
}
func GetGroupID(call *models.Call) string {
// TODO until fn supports metadata, allow LB Group ID to
// be overridden via configuration.
// Note that employing this mechanism will expose the value of the
// LB Group ID to the function as an environment variable!
lbgID := call.Config["FN_LB_GROUP_ID"]
if lbgID == "" {
return "default"
}
return lbgID
}
func (a *lbAgent) Submit(callI Call) error {
a.wg.Add(1)
defer a.wg.Done()
select {
case <-a.shutdown:
return models.ErrCallTimeoutServerBusy
default:
}
call := callI.(*call)
ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline)
call.req = call.req.WithContext(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End()
err := a.submit(ctx, call)
return err
}
func (a *lbAgent) submit(ctx context.Context, call *call) error {
statsEnqueue(ctx)
a.startStateTrackers(ctx, call)
defer a.endStateTrackers(ctx, call)
slot := &remoteSlot{lbAgent: a}
defer slot.Close(ctx) // notify our slot is free once we're done
err := call.Start(ctx)
if err != nil {
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc
err = slot.exec(ctx, call)
handleStatsEnd(ctx, err)
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
// but this could put us over the timeout if the call did not reply yet (need better policy).
ctx = common.BackgroundContext(ctx)
err = call.End(ctx, err)
return transformTimeout(err, false)
}
func (a *lbAgent) AddCallListener(cl fnext.CallListener) {
a.delegatedAgent.AddCallListener(cl)
}
func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
logrus.Fatal("Enqueue not implemented. Panicking.")
return nil
}
func (a *lbAgent) startStateTrackers(ctx context.Context, call *call) {
delegatedAgent := a.delegatedAgent.(*agent)
delegatedAgent.startStateTrackers(ctx, call)
}
func (a *lbAgent) endStateTrackers(ctx context.Context, call *call) {
delegatedAgent := a.delegatedAgent.(*agent)
delegatedAgent.endStateTrackers(ctx, call)
}