mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: make call.End() to blocking to reduce complexity (#1208)
agent/lb-agent/runner roles execute call.End() in the background in some cases to reduce latency. With this change, we simplify this and switch to non-background execution of call.End(). This fixes hard to detect issues such as non-deterministic calculation of call.CompletedAt or incomplete Call.Stats in runners. Downstream projects if impacted by the now blocking call.End() latency should take steps to handle this according to their requirements.
This commit is contained in:
committed by
Reed Allman
parent
54ba179078
commit
586d5c4735
@@ -7,7 +7,6 @@ import (
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/agent/drivers"
|
||||
@@ -103,7 +102,6 @@ type agent struct {
|
||||
// used to track running calls / safe shutdown
|
||||
shutWg *common.WaitGroup
|
||||
shutonce sync.Once
|
||||
callEndCount int64
|
||||
disableAsyncDequeue bool
|
||||
|
||||
callOverrider CallOverrider
|
||||
@@ -293,68 +291,28 @@ func (a *agent) submit(ctx context.Context, call *call) error {
|
||||
statsDequeueAndStart(ctx)
|
||||
|
||||
// We are about to execute the function, set container Exec Deadline (call.Timeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
|
||||
slotCtx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Pass this error (nil or otherwise) to end directly, to store status, etc.
|
||||
err = slot.exec(ctx, call)
|
||||
err = slot.exec(slotCtx, call)
|
||||
return a.handleCallEnd(ctx, call, slot, err, true)
|
||||
}
|
||||
|
||||
func (a *agent) scheduleCallEnd(fn func()) {
|
||||
atomic.AddInt64(&a.callEndCount, 1)
|
||||
go func() {
|
||||
fn()
|
||||
atomic.AddInt64(&a.callEndCount, -1)
|
||||
a.shutWg.DoneSession()
|
||||
}()
|
||||
}
|
||||
func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error {
|
||||
|
||||
func (a *agent) finalizeCallEnd(ctx context.Context, err error, isRetriable, isScheduled bool) error {
|
||||
// if scheduled in background, let scheduleCallEnd() handle
|
||||
// the shutWg group, otherwise decrement here.
|
||||
if !isScheduled {
|
||||
a.shutWg.DoneSession()
|
||||
}
|
||||
handleStatsEnd(ctx, err)
|
||||
return transformTimeout(err, isRetriable)
|
||||
}
|
||||
|
||||
func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isCommitted bool) error {
|
||||
|
||||
// For hot-containers, slot close is a simple channel close... No need
|
||||
// to handle it async. Execute it here ASAP
|
||||
if slot != nil && protocol.IsStreamable(protocol.Protocol(call.Format)) {
|
||||
slot.Close(ctx)
|
||||
slot = nil
|
||||
}
|
||||
|
||||
// This means call was routed (executed), in order to reduce latency here
|
||||
// we perform most of these tasks in go-routine asynchronously.
|
||||
if isCommitted {
|
||||
a.scheduleCallEnd(func() {
|
||||
ctx = common.BackgroundContext(ctx)
|
||||
if slot != nil {
|
||||
slot.Close(ctx) // (no timeout)
|
||||
slot.Close(common.BackgroundContext(ctx))
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
|
||||
|
||||
// This means call was routed (executed)
|
||||
if isStarted {
|
||||
call.End(ctx, err)
|
||||
cancel()
|
||||
})
|
||||
return a.finalizeCallEnd(ctx, err, false, true)
|
||||
}
|
||||
|
||||
// The call did not succeed. And it is retriable. We close the slot
|
||||
// ASAP in the background if we haven't already done so (cold-container case),
|
||||
// in order to keep latency down.
|
||||
if slot != nil {
|
||||
a.scheduleCallEnd(func() {
|
||||
slot.Close(common.BackgroundContext(ctx)) // (no timeout)
|
||||
})
|
||||
return a.finalizeCallEnd(ctx, err, true, true)
|
||||
}
|
||||
|
||||
return a.finalizeCallEnd(ctx, err, true, false)
|
||||
handleStatsEnd(ctx, err)
|
||||
a.shutWg.DoneSession()
|
||||
return transformTimeout(err, !isStarted)
|
||||
}
|
||||
|
||||
func transformTimeout(e error, isRetriable bool) error {
|
||||
@@ -417,11 +375,6 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
|
||||
defer span.End()
|
||||
|
||||
// first check any excess case of call.End() stacking.
|
||||
if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) {
|
||||
return nil, context.DeadlineExceeded
|
||||
}
|
||||
|
||||
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
|
||||
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
|
||||
var isNew bool
|
||||
|
||||
@@ -18,8 +18,6 @@ type Config struct {
|
||||
HotPoll time.Duration `json:"hot_poll_msecs"`
|
||||
HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"`
|
||||
AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"`
|
||||
CallEndTimeout time.Duration `json:"call_end_timeout"`
|
||||
MaxCallEndStacking uint64 `json:"max_call_end_stacking"`
|
||||
MaxResponseSize uint64 `json:"max_response_size_bytes"`
|
||||
MaxLogSize uint64 `json:"max_log_size_bytes"`
|
||||
MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"`
|
||||
@@ -54,10 +52,6 @@ const (
|
||||
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
|
||||
// EnvAsyncChewPoll is the interval to poll the queue that contains async function invocations
|
||||
EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
|
||||
// EnvCallEndTimeout is the timeout after a call is completed to store information about that invocation
|
||||
EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS"
|
||||
// EnvMaxCallEndStacking is the maximum number of concurrent calls in call.End storing info
|
||||
EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING"
|
||||
// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
|
||||
EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
|
||||
// EnvMaxLogSize is the maximum size that a function's log may reach
|
||||
@@ -105,7 +99,6 @@ func NewConfig() (*Config, error) {
|
||||
cfg := &Config{
|
||||
MinDockerVersion: "17.10.0-ce",
|
||||
MaxLogSize: 1 * 1024 * 1024,
|
||||
MaxCallEndStacking: 8192,
|
||||
PreForkImage: "busybox",
|
||||
PreForkCmd: "tail -f /dev/null",
|
||||
}
|
||||
@@ -117,14 +110,12 @@ func NewConfig() (*Config, error) {
|
||||
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, DefaultHotPoll)
|
||||
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
||||
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
||||
err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute)
|
||||
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
||||
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize)
|
||||
err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU)
|
||||
err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory)
|
||||
err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize)
|
||||
err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize)
|
||||
err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking)
|
||||
err = setEnvStr(err, EnvPreForkImage, &cfg.PreForkImage)
|
||||
err = setEnvStr(err, EnvPreForkCmd, &cfg.PreForkCmd)
|
||||
err = setEnvUint(err, EnvPreForkUseOnce, &cfg.PreForkUseOnce)
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
@@ -25,7 +24,6 @@ type lbAgent struct {
|
||||
placer pool.Placer
|
||||
callOverrider CallOverrider
|
||||
shutWg *common.WaitGroup
|
||||
callEndCount int64
|
||||
}
|
||||
|
||||
type LBAgentOption func(*lbAgent) error
|
||||
@@ -158,11 +156,6 @@ func (a *lbAgent) Submit(callI Call) error {
|
||||
|
||||
statsEnqueue(ctx)
|
||||
|
||||
// first check any excess case of call.End() stacking.
|
||||
if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) {
|
||||
a.handleCallEnd(ctx, call, context.DeadlineExceeded, false)
|
||||
}
|
||||
|
||||
err := call.Start(ctx)
|
||||
if err != nil {
|
||||
return a.handleCallEnd(ctx, call, err, false)
|
||||
@@ -242,31 +235,17 @@ func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
|
||||
return errors.New("Enqueue not implemented")
|
||||
}
|
||||
|
||||
func (a *lbAgent) scheduleCallEnd(fn func()) {
|
||||
atomic.AddInt64(&a.callEndCount, 1)
|
||||
go func() {
|
||||
fn()
|
||||
atomic.AddInt64(&a.callEndCount, -1)
|
||||
a.shutWg.DoneSession()
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error {
|
||||
if isStarted {
|
||||
a.scheduleCallEnd(func() {
|
||||
ctx = common.BackgroundContext(ctx)
|
||||
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
|
||||
call.End(ctx, err)
|
||||
cancel()
|
||||
})
|
||||
|
||||
if isStarted {
|
||||
call.End(ctx, err)
|
||||
handleStatsEnd(ctx, err)
|
||||
return transformTimeout(err, false)
|
||||
} else {
|
||||
handleStatsDequeue(ctx, err)
|
||||
}
|
||||
|
||||
a.shutWg.DoneSession()
|
||||
handleStatsDequeue(ctx, err)
|
||||
return transformTimeout(err, true)
|
||||
return transformTimeout(err, !isStarted)
|
||||
}
|
||||
|
||||
var _ Agent = &lbAgent{}
|
||||
|
||||
Reference in New Issue
Block a user