Files
fn-serverless/api/agent/lb_agent.go
Dario Domizioli 9b28497cff Add a basic concurrency test for the dataplane system tests (#832)
Add a basic concurrency test for the dataplane system tests. Also remove some spurious logging.
2018-03-10 00:51:02 +00:00

251 lines
5.8 KiB
Go

package agent
import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/fnext"
"github.com/fnproject/fn/poolmanager"
)
// RequestReader takes an agent.Call and return a ReadCloser for the request body inside it
func RequestReader(c *Call) (io.ReadCloser, error) {
// Get the call :(((((
cc, ok := (*c).(*call)
if !ok {
return nil, errors.New("Can't cast agent.Call to agent.call")
}
if cc.req == nil {
return nil, errors.New("Call doesn't contain a request")
}
return cc.req.Body, nil
}
func ResponseWriter(c *Call) (*http.ResponseWriter, error) {
cc, ok := (*c).(*call)
if !ok {
return nil, errors.New("Can't cast agent.Call to agent.call")
}
if rw, ok := cc.w.(http.ResponseWriter); ok {
return &rw, nil
}
return nil, errors.New("Unable to get HTTP response writer from the call")
}
type remoteSlot struct {
lbAgent *lbAgent
}
func (s *remoteSlot) exec(ctx context.Context, call *call) error {
a := s.lbAgent
memMb := call.Model().Memory
lbGroupID := GetGroupID(call.Model())
capacityRequest := &poolmanager.CapacityRequest{TotalMemoryMb: memMb, LBGroupID: lbGroupID}
a.np.AssignCapacity(capacityRequest)
defer a.np.ReleaseCapacity(capacityRequest)
err := a.placer.PlaceCall(a.np, ctx, call, lbGroupID)
if err != nil {
logrus.WithError(err).Error("Failed to place call")
}
return err
}
func (s *remoteSlot) Close(ctx context.Context) error {
return nil
}
func (s *remoteSlot) Error() error {
return nil
}
type Placer interface {
PlaceCall(np NodePool, ctx context.Context, call *call, lbGroupID string) error
}
type naivePlacer struct {
}
func NewNaivePlacer() Placer {
return &naivePlacer{}
}
func minDuration(f, s time.Duration) time.Duration {
if f < s {
return f
}
return s
}
func (sp *naivePlacer) PlaceCall(np NodePool, ctx context.Context, call *call, lbGroupID string) error {
timeout := time.After(call.slotDeadline.Sub(time.Now()))
for {
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default:
for _, r := range np.Runners(lbGroupID) {
placed, err := r.TryExec(ctx, call)
if err != nil {
logrus.WithError(err).Error("Failed during call placement")
}
if placed {
return err
}
}
remaining := call.slotDeadline.Sub(time.Now())
if remaining <= 0 {
return models.ErrCallTimeoutServerBusy
}
time.Sleep(minDuration(retryWaitInterval, remaining))
}
}
}
const (
runnerReconnectInterval = 5 * time.Second
// sleep time to attempt placement across all runners before retrying
retryWaitInterval = 10 * time.Millisecond
// sleep time when scaling from 0 to 1 runners
noCapacityWaitInterval = 1 * time.Second
// amount of time to wait to place a request on a runner
placementTimeout = 15 * time.Second
)
type lbAgent struct {
delegatedAgent Agent
np NodePool
placer Placer
wg sync.WaitGroup // Needs a good name
shutdown chan struct{}
}
func NewLBAgent(agent Agent, np NodePool, p Placer) (Agent, error) {
a := &lbAgent{
delegatedAgent: agent,
np: np,
placer: p,
}
return a, nil
}
// GetCall delegates to the wrapped agent but disables the capacity check as
// this agent isn't actually running the call.
func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
opts = append(opts, WithoutPreemptiveCapacityCheck())
return a.delegatedAgent.GetCall(opts...)
}
func (a *lbAgent) Close() error {
a.np.Shutdown()
err := a.delegatedAgent.Close()
if err != nil {
return err
}
return nil
}
func GetGroupID(call *models.Call) string {
// TODO until fn supports metadata, allow LB Group ID to
// be overridden via configuration.
// Note that employing this mechanism will expose the value of the
// LB Group ID to the function as an environment variable!
lbgID := call.Config["FN_LB_GROUP_ID"]
if lbgID == "" {
return "default"
}
return lbgID
}
func (a *lbAgent) Submit(callI Call) error {
a.wg.Add(1)
defer a.wg.Done()
select {
case <-a.shutdown:
return models.ErrCallTimeoutServerBusy
default:
}
call := callI.(*call)
ctx, cancel := context.WithDeadline(call.req.Context(), call.execDeadline)
call.req = call.req.WithContext(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End()
err := a.submit(ctx, call)
return err
}
func (a *lbAgent) submit(ctx context.Context, call *call) error {
statsEnqueue(ctx)
a.startStateTrackers(ctx, call)
defer a.endStateTrackers(ctx, call)
slot := &remoteSlot{lbAgent: a}
defer slot.Close(ctx) // notify our slot is free once we're done
err := call.Start(ctx)
if err != nil {
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc
err = slot.exec(ctx, call)
handleStatsEnd(ctx, err)
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
// but this could put us over the timeout if the call did not reply yet (need better policy).
ctx = common.BackgroundContext(ctx)
err = call.End(ctx, err)
return transformTimeout(err, false)
}
func (a *lbAgent) AddCallListener(cl fnext.CallListener) {
a.delegatedAgent.AddCallListener(cl)
}
func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
logrus.Fatal("Enqueue not implemented. Panicking.")
return nil
}
func (a *lbAgent) startStateTrackers(ctx context.Context, call *call) {
delegatedAgent := a.delegatedAgent.(*agent)
delegatedAgent.startStateTrackers(ctx, call)
}
func (a *lbAgent) endStateTrackers(ctx context.Context, call *call) {
delegatedAgent := a.delegatedAgent.(*agent)
delegatedAgent.endStateTrackers(ctx, call)
}