mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: perform call.End() after request is processed (#918)
* fn: perform call.End() after request is processed call.End() performs several tasks in sequence; insert call, insert log, (todo) remove mq entry, fireAfterCall callback, etc. These currently add up to the request latency as return from agent.Submit() is blocked on these. We also haven't been able to apply any timeouts on these operations since they are handled during request processing and it is hard to come up with a strategy for it. Also the error cases (couldn't insert call or log) are not propagated to the caller. With this change, call.End() handling becomes asynchronous where we perform these tasks after the request is done. This improves latency and we no longer have to block the call on these operations. The changes will also free up the agent slot token more quickly and now we are no longer tied to hiccups in call.End(). Now, a timeout policy is also added to this which can be adjusted with an env variable. (default 10 minutes) This accentuates the fact that call/log/fireAfterCall are not completed when request is done. So, there's a window there where call is done, but call/log/fireAfterCall are not yet propagated. This was already the case especially for error cases. There's slight risk of accumulating call.End() operations in case of hiccups in these log/call/callback systems. * fn: address risk of overstacking of call.End() calls.
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fnproject/fn/api/agent/drivers"
|
"github.com/fnproject/fn/api/agent/drivers"
|
||||||
@@ -111,9 +112,10 @@ type agent struct {
|
|||||||
resources ResourceTracker
|
resources ResourceTracker
|
||||||
|
|
||||||
// used to track running calls / safe shutdown
|
// used to track running calls / safe shutdown
|
||||||
wg sync.WaitGroup // TODO rename
|
wg sync.WaitGroup // TODO rename
|
||||||
shutonce sync.Once
|
shutonce sync.Once
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
|
callEndCount int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates an Agent that executes functions locally as Docker containers.
|
// New creates an Agent that executes functions locally as Docker containers.
|
||||||
@@ -250,14 +252,23 @@ func (a *agent) submit(ctx context.Context, call *call) error {
|
|||||||
// pass this error (nil or otherwise) to end directly, to store status, etc
|
// pass this error (nil or otherwise) to end directly, to store status, etc
|
||||||
err = slot.exec(ctx, call)
|
err = slot.exec(ctx, call)
|
||||||
handleStatsEnd(ctx, err)
|
handleStatsEnd(ctx, err)
|
||||||
|
a.handleCallEnd(ctx, call, err)
|
||||||
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
|
|
||||||
// but this could put us over the timeout if the call did not reply yet (need better policy).
|
|
||||||
ctx = common.BackgroundContext(ctx)
|
|
||||||
err = call.End(ctx, err)
|
|
||||||
return transformTimeout(err, false)
|
return transformTimeout(err, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *agent) handleCallEnd(ctx context.Context, call *call, err error) {
|
||||||
|
a.wg.Add(1)
|
||||||
|
atomic.AddInt64(&a.callEndCount, 1)
|
||||||
|
go func() {
|
||||||
|
ctx = common.BackgroundContext(ctx)
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
|
||||||
|
call.End(ctx, err)
|
||||||
|
cancel()
|
||||||
|
atomic.AddInt64(&a.callEndCount, -1)
|
||||||
|
a.wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func transformTimeout(e error, isRetriable bool) error {
|
func transformTimeout(e error, isRetriable bool) error {
|
||||||
if e == context.DeadlineExceeded {
|
if e == context.DeadlineExceeded {
|
||||||
if isRetriable {
|
if isRetriable {
|
||||||
@@ -308,6 +319,11 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
|
|||||||
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
|
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
// first check any excess case of call.End() stacking.
|
||||||
|
if atomic.LoadInt64(&a.callEndCount) >= int64(a.cfg.MaxCallEndStacking) {
|
||||||
|
return nil, context.DeadlineExceeded
|
||||||
|
}
|
||||||
|
|
||||||
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
|
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
|
||||||
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
|
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
|
||||||
var isNew bool
|
var isNew bool
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ type AgentConfig struct {
|
|||||||
HotPoll time.Duration `json:"hot_poll_msecs"`
|
HotPoll time.Duration `json:"hot_poll_msecs"`
|
||||||
HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"`
|
HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"`
|
||||||
AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"`
|
AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"`
|
||||||
|
CallEndTimeout time.Duration `json:"call_end_timeout"`
|
||||||
|
MaxCallEndStacking uint64 `json:"max_call_end_stacking"`
|
||||||
MaxResponseSize uint64 `json:"max_response_size_bytes"`
|
MaxResponseSize uint64 `json:"max_response_size_bytes"`
|
||||||
MaxLogSize uint64 `json:"max_log_size_bytes"`
|
MaxLogSize uint64 `json:"max_log_size_bytes"`
|
||||||
MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"`
|
MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"`
|
||||||
@@ -31,6 +33,8 @@ const (
|
|||||||
EnvHotPoll = "FN_HOT_POLL_MSECS"
|
EnvHotPoll = "FN_HOT_POLL_MSECS"
|
||||||
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
|
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
|
||||||
EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
|
EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
|
||||||
|
EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS"
|
||||||
|
EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING"
|
||||||
EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
|
EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
|
||||||
EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
|
EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
|
||||||
EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
|
EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
|
||||||
@@ -46,8 +50,9 @@ const (
|
|||||||
func NewAgentConfig() (*AgentConfig, error) {
|
func NewAgentConfig() (*AgentConfig, error) {
|
||||||
|
|
||||||
cfg := &AgentConfig{
|
cfg := &AgentConfig{
|
||||||
MinDockerVersion: "17.10.0-ce",
|
MinDockerVersion: "17.10.0-ce",
|
||||||
MaxLogSize: 1 * 1024 * 1024,
|
MaxLogSize: 1 * 1024 * 1024,
|
||||||
|
MaxCallEndStacking: 8192,
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@@ -57,12 +62,14 @@ func NewAgentConfig() (*AgentConfig, error) {
|
|||||||
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond)
|
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond)
|
||||||
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
||||||
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
||||||
|
err = setEnvMsecs(err, EnvCallEndTimeout, &cfg.CallEndTimeout, time.Duration(10)*time.Minute)
|
||||||
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
||||||
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize)
|
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize)
|
||||||
err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU)
|
err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU)
|
||||||
err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory)
|
err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory)
|
||||||
err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize)
|
err = setEnvUint(err, EnvMaxFsSize, &cfg.MaxFsSize)
|
||||||
err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize)
|
err = setEnvUint(err, EnvPreForkPoolSize, &cfg.PreForkPoolSize)
|
||||||
|
err = setEnvUint(err, EnvMaxCallEndStacking, &cfg.MaxCallEndStacking)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
|
|||||||
@@ -193,7 +193,7 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
|
|
||||||
containerIds := make([]string, 0)
|
containerIds := make([]string, 0)
|
||||||
|
|
||||||
for i, test := range []struct {
|
testCases := []struct {
|
||||||
path string
|
path string
|
||||||
body string
|
body string
|
||||||
method string
|
method string
|
||||||
@@ -235,7 +235,11 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
|
|
||||||
// CASE IV: should land on CASE III container
|
// CASE IV: should land on CASE III container
|
||||||
{"/r/zoo/http/", ok, "GET", http.StatusOK, "", nil, 0},
|
{"/r/zoo/http/", ok, "GET", http.StatusOK, "", nil, 0},
|
||||||
} {
|
}
|
||||||
|
|
||||||
|
callIds := make([]string, len(testCases))
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
body := strings.NewReader(test.body)
|
body := strings.NewReader(test.body)
|
||||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||||
@@ -246,6 +250,7 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
containerIds = append(containerIds, "N/A")
|
containerIds = append(containerIds, "N/A")
|
||||||
|
callIds[i] = rec.Header().Get("Fn_call_id")
|
||||||
|
|
||||||
if rec.Code != test.expectedCode {
|
if rec.Code != test.expectedCode {
|
||||||
isFailure = true
|
isFailure = true
|
||||||
@@ -260,13 +265,6 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if test.expectedLogsSubStr != nil {
|
|
||||||
callID := rec.Header().Get("Fn_call_id")
|
|
||||||
if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) {
|
|
||||||
isFailure = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if rec.Code == http.StatusOK {
|
if rec.Code == http.StatusOK {
|
||||||
dockerId, err := getDockerId(respBytes)
|
dockerId, err := getDockerId(respBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -281,6 +279,14 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
time.Sleep(test.sleepAmount)
|
time.Sleep(test.sleepAmount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
|
if test.expectedLogsSubStr != nil {
|
||||||
|
if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) {
|
||||||
|
isFailure = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
jsonIds := containerIds[0:5]
|
jsonIds := containerIds[0:5]
|
||||||
|
|
||||||
// now cross check JSON container ids:
|
// now cross check JSON container ids:
|
||||||
@@ -374,7 +380,7 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
||||||
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K
|
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K
|
||||||
|
|
||||||
for i, test := range []struct {
|
testCases := []struct {
|
||||||
path string
|
path string
|
||||||
body string
|
body string
|
||||||
method string
|
method string
|
||||||
@@ -413,7 +419,11 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
{"/r/myapp/mybigoutputhttp", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
{"/r/myapp/mybigoutputhttp", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||||
{"/r/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "", nil},
|
{"/r/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "", nil},
|
||||||
{"/r/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
{"/r/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||||
} {
|
}
|
||||||
|
|
||||||
|
callIds := make([]string, len(testCases))
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
trx := fmt.Sprintf("_trx_%d_", i)
|
trx := fmt.Sprintf("_trx_%d_", i)
|
||||||
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
||||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||||
@@ -424,6 +434,8 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
maxBody = 1024
|
maxBody = 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
|
callIds[i] = rec.Header().Get("Fn_call_id")
|
||||||
|
|
||||||
if rec.Code != test.expectedCode {
|
if rec.Code != test.expectedCode {
|
||||||
isFailure = true
|
isFailure = true
|
||||||
t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s",
|
t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s",
|
||||||
@@ -453,10 +465,11 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range testCases {
|
||||||
if test.expectedLogsSubStr != nil {
|
if test.expectedLogsSubStr != nil {
|
||||||
callID := rec.Header().Get("Fn_call_id")
|
if !checkLogs(t, i, ds, callIds[i], test.expectedLogsSubStr) {
|
||||||
if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) {
|
|
||||||
isFailure = true
|
isFailure = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user