mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: timeouts and container exists should stop slot queuing (#843)
1) in theory it may be possible for an exited container to requeue a slot, close this gap by always setting fatal error for a slot if a container has exited. 2) when a client request times out or cancelled (client disconnect, etc.) the slot should not be allowed to be requeued and container should terminate to avoid accidental mixing of previous response into next.
This commit is contained in:
@@ -508,6 +508,12 @@ func (s *hotSlot) Error() error {
|
||||
return s.fatalErr
|
||||
}
|
||||
|
||||
func (s *hotSlot) trySetError(err error) {
|
||||
if s.fatalErr == nil {
|
||||
s.fatalErr = err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
ctx, span := trace.StartSpan(ctx, "agent_hot_exec")
|
||||
defer span.End()
|
||||
@@ -540,19 +546,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
|
||||
select {
|
||||
case err := <-s.errC: // error from container
|
||||
s.trySetError(err)
|
||||
return err
|
||||
case err := <-errApp: // from dispatch
|
||||
if s.fatalErr == nil && err != nil {
|
||||
if err != nil {
|
||||
if models.IsAPIError(err) {
|
||||
s.fatalErr = err
|
||||
s.trySetError(err)
|
||||
} else if err == protocol.ErrExcessData {
|
||||
s.fatalErr = err
|
||||
s.trySetError(err)
|
||||
// suppress excess data error, but do shutdown the container
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done(): // call timeout
|
||||
s.trySetError(ctx.Err())
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -355,19 +355,19 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
|
||||
expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}}
|
||||
expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}}
|
||||
multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"}
|
||||
|
||||
crasher := `{"isDebug": true, "isCrash": true}` // crash container
|
||||
oomer := `{"isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
|
||||
badHot := `{"invalidResponse": true, "isDebug": true}` // write a not json/http as output
|
||||
ok := `{"isDebug": true}` // good response / ok
|
||||
respTypeLie := `{"responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
respTypeJason := `{"jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container
|
||||
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
|
||||
badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output
|
||||
ok := `{"echoContent": "_TRX_ID_", "isDebug": true}` // good response / ok
|
||||
respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
respTypeJason := `{"echoContent": "_TRX_ID_", "jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
|
||||
// sleep between logs and with debug enabled, fn-test-utils will log header/footer below:
|
||||
multiLog := `{"sleepTime": 1000, "isDebug": true}`
|
||||
multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"}
|
||||
bigoutput := `{"isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
||||
smalloutput := `{"isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1}` // 1 trailer < 2K
|
||||
multiLog := `{"echoContent": "_TRX_ID_", "sleepTime": 1000, "isDebug": true}`
|
||||
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
||||
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -409,7 +409,8 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
{"/r/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||
{"/r/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||
} {
|
||||
body := strings.NewReader(test.body)
|
||||
trx := fmt.Sprintf("_trx_%d_", i)
|
||||
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||
respBody := string(respBytes)
|
||||
@@ -424,6 +425,13 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
i, test.expectedCode, rec.Code, respBody[:maxBody])
|
||||
}
|
||||
|
||||
if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
i, trx, respBody[:maxBody])
|
||||
|
||||
}
|
||||
|
||||
if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
@@ -555,6 +563,15 @@ func TestFailedEnqueue(t *testing.T) {
|
||||
|
||||
func TestRouteRunnerTimeout(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
isFailure := false
|
||||
|
||||
// Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.)
|
||||
// for figuring out why things failed.
|
||||
defer func() {
|
||||
if isFailure {
|
||||
t.Log(buf.String())
|
||||
}
|
||||
}()
|
||||
|
||||
models.RouteMaxMemory = uint64(1024 * 1024 * 1024) // 1024 TB
|
||||
hugeMem := uint64(models.RouteMaxMemory - 1)
|
||||
@@ -585,32 +602,45 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
||||
expectedCode int
|
||||
expectedHeaders map[string][]string
|
||||
}{
|
||||
{"/r/myapp/cold", `{"sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/cold", `{"sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot", `{"sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot", `{"sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/hot-json", `{"sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot-json", `{"sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/bigmem-cold", `{"sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
{"/r/myapp/bigmem-hot", `{"sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
{"/r/myapp/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/r/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/r/myapp/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
{"/r/myapp/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
} {
|
||||
body := strings.NewReader(test.body)
|
||||
trx := fmt.Sprintf("_trx_%d_", i)
|
||||
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||
respBody := string(respBytes)
|
||||
maxBody := len(respBody)
|
||||
if maxBody > 1024 {
|
||||
maxBody = 1024
|
||||
}
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
t.Log(buf.String())
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected status code to be %d but was %d body: %#v",
|
||||
i, test.expectedCode, rec.Code, rec.Body.String())
|
||||
i, test.expectedCode, rec.Code, respBody[:maxBody])
|
||||
}
|
||||
|
||||
if test.expectedHeaders == nil {
|
||||
continue
|
||||
if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
i, trx, respBody[:maxBody])
|
||||
|
||||
}
|
||||
for name, header := range test.expectedHeaders {
|
||||
if header[0] != rec.Header().Get(name) {
|
||||
t.Log(buf.String())
|
||||
t.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v",
|
||||
i, name, header[0], rec.Header().Get(name), rec.Body.String())
|
||||
|
||||
if test.expectedHeaders != nil {
|
||||
for name, header := range test.expectedHeaders {
|
||||
if header[0] != rec.Header().Get(name) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v",
|
||||
i, name, header[0], rec.Header().Get(name), respBody[:maxBody])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user