Http stream invoke tests (#1231)

* adds parity level of testing http-stream invoke

the other formats had a gamut of tests, now http-stream does too. this makes
obvious some of its behaviors. some things changed / can change now that we
don't have pipes to worry about, the main one being that when containers blow
up now the uds client will get an EOF/ECONNREFUSED instead of the pipe getting
wedged up (allowing us to get the container error easily, previously). I made
my best 50% effort to make a reasonable error for when this happens (similar
to when http/json received garbage errors), open to ideas on verbiage / policy
there.

should be pretty straightforward. one thing to notice is that
http/json/default don't return our fancy new Fn-Http-Status or Fn-Http-H
headers... it's relatively easy to go add this to fdk-go just to test this,
but for invoke I'm really not sure we care (?) and for the gateway, the output
will be identical with the old formats bypassing the header decap. if anybody
has any feelings, feel free to express them.

* fix oomer up for new error

* Adding http header stripping to agent

Adding the header stripping into the agent, this should be low enough
that all routes to fns get treated the same.
This commit is contained in:
Reed Allman
2018-09-20 10:52:20 -07:00
committed by Owen Cliffe
parent 38a586d7b2
commit 87e2562db9
6 changed files with 93 additions and 34 deletions

View File

@@ -696,24 +696,28 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
}
}
func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
defer span.End()
// TODO we can't trust that resp.Write doesn't timeout, even if the http
// client should respect the request context (right?) so we still need this (right?)
errApp := make(chan error, 1)
var removeHeaders = map[string]bool{
"connection": true,
"keep-alive": true,
"trailer": true,
"transfer-encoding": true,
"te": true,
"upgrade": true,
"authorization": true,
}
func callToHTTPRequest(ctx context.Context, call *call) (*http.Request, error) {
req, err := http.NewRequest("POST", "http://localhost/call", call.req.Body)
if err != nil {
errApp <- err
return errApp
return req, err
}
req.Header = make(http.Header)
for k, vs := range call.req.Header {
for _, v := range vs {
req.Header.Add(k, v)
if !removeHeaders[strings.ToLower(k)] {
for _, v := range vs {
req.Header.Add(k, v)
}
}
}
@@ -726,14 +730,31 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
deadlineStr := deadline.Format(time.RFC3339)
req.Header.Set("Fn-Deadline", deadlineStr)
req.Header.Set("FN_DEADLINE", deadlineStr)
}
return req, err
}
func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
ctx, span := trace.StartSpan(ctx, "agent_dispatch_httpstream")
defer span.End()
// TODO we can't trust that resp.Write doesn't timeout, even if the http
// client should respect the request context (right?) so we still need this (right?)
errApp := make(chan error, 1)
req, err := callToHTTPRequest(ctx, call)
if err != nil {
errApp <- err
return errApp
}
go func() {
resp, err := s.udsClient.Do(req)
if err != nil {
common.Logger(ctx).WithError(err).Debug("Got error from UDS socket")
errApp <- err
common.Logger(ctx).WithError(err).Error("Got error from UDS socket")
errApp <- models.NewAPIError(http.StatusBadGateway, errors.New("error receiving function response"))
return
}
common.Logger(ctx).WithField("status", resp.StatusCode).Debug("Got resp from UDS socket")
@@ -741,7 +762,7 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
defer resp.Body.Close()
select {
case errApp <- writeResp(resp, call.w):
case errApp <- writeResp(s.cfg.MaxResponseSize, resp, call.w):
case <-ctx.Done():
errApp <- ctx.Err()
}
@@ -750,12 +771,15 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) chan error {
}
// XXX(reed): dupe code in http proto (which will die...)
func writeResp(resp *http.Response, w io.Writer) error {
func writeResp(max uint64, resp *http.Response, w io.Writer) error {
rw, ok := w.(http.ResponseWriter)
if !ok {
w = common.NewClampWriter(rw, max, models.ErrFunctionResponseTooBig)
return resp.Write(w)
}
rw = newSizerRespWriter(max, rw)
// if we're writing directly to the response writer, we need to set headers
// and status code, and only copy the body. resp.Write would copy a full
// http request into the response body (not what we want).
@@ -772,6 +796,24 @@ func writeResp(resp *http.Response, w io.Writer) error {
return err
}
// XXX(reed): this is a remnant of old io.pipe plumbing, we need to get rid of
// the buffers from the front-end in actuality, but only after removing other formats... so here, eat this
type sizerRespWriter struct {
http.ResponseWriter
w io.Writer
}
var _ http.ResponseWriter = new(sizerRespWriter)
func newSizerRespWriter(max uint64, rw http.ResponseWriter) http.ResponseWriter {
return &sizerRespWriter{
ResponseWriter: rw,
w: common.NewClampWriter(rw, max, models.ErrFunctionResponseTooBig),
}
}
func (s *sizerRespWriter) Write(b []byte) (int, error) { return s.w.Write(b) }
// TODO remove
func (s *hotSlot) dispatchOldFormats(ctx context.Context, call *call) chan error {

View File

@@ -335,8 +335,6 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
c.w = c.stderr
}
// NOTE: we need to limit the output size(?) since users may not use fdk we can't limit it there
// c.w = common.NewClampWriter(c.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig)
return &c, nil
}

View File

@@ -66,6 +66,8 @@ func (s *Server) handleFnInvokeCall2(c *gin.Context) error {
}
func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) error {
// TODO: we should combine this logic with trigger, which just wraps this block with some headers wizardry
// TODO: we should get rid of the buffers, and stream back (saves memory (+splice), faster (splice), allows streaming, don't have to cap resp size)
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
writer := syncResponseWriter{
@@ -101,6 +103,7 @@ func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) e
}
// if they don't set a content-type - detect it
// TODO: remove this after removing all the formats (too many tests to scrub til then)
if writer.Header().Get("Content-Type") == "" {
// see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to?
var contentType string

View File

@@ -78,9 +78,10 @@ func TestFnInvokeRunnerExecEmptyBody(t *testing.T) {
f1 := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Image: rImg, ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
f2 := &models.Fn{ID: "hothttp", Name: "hothttp", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
f3 := &models.Fn{ID: "hotjson", Name: "hotjson", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
f4 := &models.Fn{ID: "hothttpstream", Name: "hothttpstream", AppID: app.ID, Image: rImg, Format: "http-stream", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
ds := datastore.NewMockInit(
[]*models.App{app},
[]*models.Fn{f1, f2, f3},
[]*models.Fn{f1, f2, f3, f4},
)
ls := logs.NewMock()
@@ -100,6 +101,8 @@ func TestFnInvokeRunnerExecEmptyBody(t *testing.T) {
{"/invoke/hothttp"},
{"/invoke/hotjson"},
{"/invoke/hotjson"},
{"/invoke/hothttpstream"},
{"/invoke/hothttpstream"},
}
for i, test := range testCases {
@@ -178,7 +181,7 @@ func TestFnInvokeRunnerExecution(t *testing.T) {
multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */}
crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 120000000}` // ask for 120MB
badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output
ok := `{"echoContent": "_TRX_ID_", "responseContentType": "application/json; charset=utf-8", "isDebug": true}` // good response / ok
respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
@@ -189,9 +192,9 @@ func TestFnInvokeRunnerExecution(t *testing.T) {
//over sized request
var bigbufa [32257]byte
rand.Read(bigbufa[:])
bigbuf := base64.StdEncoding.EncodeToString(bigbufa[:]) // this will be > bigbufa, but json compatible
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K
bigbuf := base64.StdEncoding.EncodeToString(bigbufa[:]) // this will be > bigbufa, but json compatible
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "responseContentType":"application/json; charset=utf-8", "trailerRepeat": 1}` // 1 trailer < 2K
testCases := []struct {
path string
@@ -213,13 +216,22 @@ func TestFnInvokeRunnerExecution(t *testing.T) {
{"/invoke/json_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
{"/invoke/http_stream_fn_id", ok, "POST", http.StatusOK, expStreamHeaders, "", nil},
// NOTE: we can't test bad response framing anymore easily (eg invalid http response), should we even worry about it?
{"/invoke/http_stream_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil},
{"/invoke/http_stream_fn_id", crasher, "POST", http.StatusBadGateway, expHeaders, "error receiving function response", nil},
// XXX(reed): we could stop buffering function responses so that we can stream things?
{"/invoke/http_stream_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "function response too large", nil},
{"/invoke/http_stream_fn_id", smalloutput, "POST", http.StatusOK, expStreamHeaders, "", nil},
// XXX(reed): meh we really should try to get oom out, but maybe it's better left to the logs?
{"/invoke/http_stream_fn_id", oomer, "POST", http.StatusBadGateway, nil, "error receiving function response", nil},
{"/invoke/http_stream_fn_id", bigbuf, "POST", http.StatusRequestEntityTooLarge, nil, "", nil},
{"/invoke/http_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil},
{"/invoke/json_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil},
{"/invoke/json_fn_id", respTypeJason, "POST", http.StatusOK, expCTHeaders, "", nil},
{"/invoke/default_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
{"/invoke/default_fn_id", crasher, "POST", http.StatusBadGateway, expHeaders, "container exit code 2", nil},
{"/invoke/default_fn_id", crasher, "POST", http.StatusBadGateway, expHeaders, "container exit code 1", nil},
{"/invoke/default_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil},
{"/invoke/http_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil},
{"/invoke/http_dnereg_fn_id", ``, "POST", http.StatusInternalServerError, nil, "connection refused", nil},
@@ -250,24 +262,25 @@ func TestFnInvokeRunnerExecution(t *testing.T) {
}
callIds[i] = rec.Header().Get("Fn_call_id")
cid := callIds[i]
if rec.Code != test.expectedCode {
isFailure = true
t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s",
i, test.expectedCode, rec.Code, respBody[:maxBody])
t.Errorf("Test %d call_id %s: Expected status code to be %d but was %d. body: %s",
i, cid, test.expectedCode, rec.Code, respBody[:maxBody])
}
if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) {
isFailure = true
t.Errorf("Test %d: Expected response to include %s but got body: %s",
i, trx, respBody[:maxBody])
t.Errorf("Test %d call_id %s: Expected response to include %s but got body: %s",
i, cid, trx, respBody[:maxBody])
}
if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) {
isFailure = true
t.Errorf("Test %d: Expected response to include %s but got body: %s",
i, test.expectedErrSubStr, respBody[:maxBody])
t.Errorf("Test %d call_id %s: Expected response to include %s but got body: %s",
i, cid, test.expectedErrSubStr, respBody[:maxBody])
}
@@ -275,8 +288,8 @@ func TestFnInvokeRunnerExecution(t *testing.T) {
for name, header := range test.expectedHeaders {
if header[0] != rec.Header().Get(name) {
isFailure = true
t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s",
i, name, header[0], rec.Header().Get(name), respBody)
t.Errorf("Test %d call_id %s: Expected header `%s` to be %s but was %s. body: %s",
i, cid, name, header[0], rec.Header().Get(name), respBody)
}
}
}
@@ -312,12 +325,13 @@ func TestInvokeRunnerTimeout(t *testing.T) {
coldFn := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
httpFn := &models.Fn{ID: "hot", Name: "http", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
jsonFn := &models.Fn{ID: "hot-json", Name: "json", AppID: app.ID, Format: "json", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
httpStreamFn := &models.Fn{ID: "http-stream", Name: "http-stream", AppID: app.ID, Format: "http-stream", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
bigMemColdFn := &models.Fn{ID: "bigmem-cold", Name: "bigmemcold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}}
bigMemHotFn := &models.Fn{ID: "bigmem-hot", Name: "bigmemhot", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}}
ds := datastore.NewMockInit(
[]*models.App{app},
[]*models.Fn{coldFn, httpFn, jsonFn, bigMemColdFn, bigMemHotFn},
[]*models.Fn{coldFn, httpFn, jsonFn, httpStreamFn, bigMemColdFn, bigMemHotFn},
)
fnl := logs.NewMock()
@@ -337,6 +351,8 @@ func TestInvokeRunnerTimeout(t *testing.T) {
{"/invoke/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
{"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
{"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
{"/invoke/http-stream", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
{"/invoke/http-stream", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
{"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
{"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
{"/invoke/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil},

View File

@@ -343,7 +343,7 @@ func TestTriggerRunnerExecution(t *testing.T) {
{"/t/myapp/myhotjason", respTypeJason, "GET", http.StatusOK, expCTHeaders, "", nil},
{"/t/myapp/myroute", ok, "GET", http.StatusOK, expHeaders, "", nil},
{"/t/myapp/myerror", crasher, "GET", http.StatusBadGateway, expHeaders, "container exit code 2", nil},
{"/t/myapp/myerror", crasher, "GET", http.StatusBadGateway, expHeaders, "container exit code 1", nil},
{"/t/myapp/mydne", ``, "GET", http.StatusNotFound, nil, "pull access denied", nil},
{"/t/myapp/mydnehot", ``, "GET", http.StatusNotFound, nil, "pull access denied", nil},
{"/t/myapp/mydneregistry", ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil},

View File

@@ -216,7 +216,7 @@ func processRequest(ctx context.Context, in io.Reader) (*AppRequest, *AppRespons
// simulate crash
if request.IsCrash {
panic("Crash requested")
log.Fatalln("Crash requested")
}
resp := AppResponse{