diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index 6da93792e..b6b96ba41 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -2,23 +2,15 @@ package protocol import ( "bufio" - "bytes" "context" "fmt" "io" - "io/ioutil" "net/http" - "strconv" - "sync" "github.com/fnproject/fn/api/models" opentracing "github.com/opentracing/opentracing-go" ) -var ( - bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} -) - // HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant // communication. It relies on Content-Length to know when to stop reading from // containers stdout. It also mandates valid HTTP headers back and forth, thus @@ -62,16 +54,6 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_response") defer span.Finish() - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - defer bufPool.Put(buf) - - // copy the response body into a buffer so that we read the whole thing. then set the content length. - io.Copy(buf, resp.Body) - resp.Body.Close() - resp.Body = ioutil.NopCloser(buf) - resp.Header.Set("Content-Length", strconv.Itoa(buf.Len())) - rw, ok := w.(http.ResponseWriter) if !ok { // async / [some] tests go through here. write a full http request to the writer diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index c6cbf737e..efdbd9cd2 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -7,12 +7,16 @@ import ( "fmt" "io" "net/http" - "strconv" + "sync" "github.com/fnproject/fn/api/models" opentracing "github.com/opentracing/opentracing-go" ) +var ( + bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} +) + // CallRequestHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now. type CallRequestHTTP struct { Type string `json:"type"` @@ -121,11 +125,20 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e rw.Header().Add(k, vv) // on top of any specified on the route } } - if p.StatusCode != 0 { - rw.WriteHeader(p.StatusCode) - } } - rw.Header().Set("Content-Length", strconv.Itoa(len(jout.Body))) + // after other header setting, top level content_type takes precedence and is + // absolute (if set). it is expected that if users want to set multiple + // values they put it in the string, e.g. `"content-type:"application/json; charset=utf-8"` + // TODO this value should not exist since it's redundant in proto headers? + if jout.ContentType != "" { + rw.Header().Set("Content-Type", jout.ContentType) + } + + // we must set all headers before writing the status, see http.ResponseWriter contract + if p := jout.Protocol; p != nil && p.StatusCode != 0 { + rw.WriteHeader(p.StatusCode) + } + _, err = io.WriteString(rw, jout.Body) return err } diff --git a/api/server/runner.go b/api/server/runner.go index bc67d9adc..0620a856e 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -2,8 +2,11 @@ package server import ( "bytes" + "io" "net/http" "path" + "strconv" + "sync" "time" "github.com/fnproject/fn/api" @@ -46,18 +49,29 @@ func (s *Server) handleFunctionCall2(c *gin.Context) error { // gin sets this to 404 on NoRoute, so we'll just ensure it's 200 by default. c.Status(200) // this doesn't write the header yet - c.Header("Content-Type", "application/json") return s.serve(c, a, path.Clean(p)) } +var ( + bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }} +) + // TODO it would be nice if we could make this have nothing to do with the gin.Context but meh // TODO make async store an *http.Request? would be sexy until we have different api format... func (s *Server) serve(c *gin.Context, appName, path string) error { + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + writer := syncResponseWriter{ + Buffer: buf, + headers: c.Writer.Header(), // copy ref + } + defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch? + // GetCall can mod headers, assign an id, look up the route/app (cached), // strip params, etc. call, err := s.agent.GetCall( - agent.WithWriter(c.Writer), // XXX (reed): order matters [for now] + agent.WithWriter(&writer), // XXX (reed): order matters [for now] agent.FromRequest(appName, path, c.Request), ) if err != nil { @@ -72,11 +86,9 @@ func (s *Server) serve(c *gin.Context, appName, path string) error { if model.Type == "async" { // TODO we should push this into GetCall somehow (CallOpt maybe) or maybe agent.Queue(Call) ? - contentLength := c.Request.ContentLength - if contentLength < 128 { // contentLength could be -1 or really small, sanitize - contentLength = 128 + if c.Request.ContentLength > 0 { + buf.Grow(int(c.Request.ContentLength)) } - buf := bytes.NewBuffer(make([]byte, int(contentLength))[:0]) // TODO sync.Pool me _, err := buf.ReadFrom(c.Request.Body) if err != nil { return models.ErrInvalidPayload @@ -102,16 +114,45 @@ func (s *Server) serve(c *gin.Context, appName, path string) error { // add this, since it means that start may not have been called [and it's relevant] c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) } - // NOTE: if the task wrote the headers already then this will fail to write - // a 5xx (and log about it to us) -- that's fine (nice, even!) return err } - // TODO plumb FXLB-WAIT somehow (api?) + // if they don't set a content-type - detect it + if writer.Header().Get("Content-Type") == "" { + // see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to? + var contentType string + jsonPrefix := [1]byte{'{'} // stack allocated + if bytes.HasPrefix(buf.Bytes(), jsonPrefix[:]) { + // try to detect json, since DetectContentType isn't a hipster. + contentType = "application/json; charset=utf-8" + } else { + contentType = http.DetectContentType(buf.Bytes()) + } + writer.Header().Set("Content-Type", contentType) + } - // TODO we need to watch the response writer and if no bytes written - // then write a 200 at this point? - // c.Data(http.StatusOK) + writer.Header().Set("Content-Length", strconv.Itoa(int(buf.Len()))) + + if writer.status > 0 { + c.Writer.WriteHeader(writer.status) + } + io.Copy(c.Writer, &writer) return nil } + +var _ http.ResponseWriter = new(syncResponseWriter) + +// implements http.ResponseWriter +// this little guy buffers responses from user containers and lets them still +// set headers and such without us risking writing partial output [as much, the +// server could still die while we're copying the buffer]. this lets us set +// content length and content type nicely, as a bonus. it is sad, yes. +type syncResponseWriter struct { + headers http.Header + status int + *bytes.Buffer +} + +func (s *syncResponseWriter) Header() http.Header { return s.headers } +func (s *syncResponseWriter) WriteHeader(code int) { s.status = code } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index e8c3a7bda..b5b784b22 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -157,13 +157,16 @@ func TestRouteRunnerExecution(t *testing.T) { srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) - expHeaders := map[string][]string{"X-Function": {"Test"}} + expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}} + expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}} - crasher := `{"sleepTime": 0, "isDebug": true, "isCrash": true}` // crash container - oomer := `{"sleepTime": 0, "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB - badHttp := `{"sleepTime": 0, "isDebug": true, "responseCode": -1}` // http status of -1 (invalid http) - badHot := `{"invalidResponse": true, "isDebug": true}` // write a not json/http as output - ok := `{"sleepTime": 0, "isDebug": true}` // good response / ok + crasher := `{"sleepTime": 0, "isDebug": true, "isCrash": true}` // crash container + oomer := `{"sleepTime": 0, "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB + badHttp := `{"sleepTime": 0, "isDebug": true, "responseCode": -1}` // http status of -1 (invalid http) + badHot := `{"invalidResponse": true, "isDebug": true}` // write a not json/http as output + ok := `{"sleepTime": 0, "isDebug": true}` // good response / ok + respTypeLie := `{"responseContentType": "foo/bar", "sleepTime":0, "isDebug": true}` // Content-Type: foo/bar + respTypeJason := `{"jasonContentType": "foo/bar", "sleepTime":0, "isDebug": true}` // Content-Type: foo/bar for i, test := range []struct { path string @@ -179,6 +182,12 @@ func TestRouteRunnerExecution(t *testing.T) { // hot container now back to normal, we should get OK {"/r/myapp/myhot", ok, "GET", http.StatusOK, expHeaders, ""}, + {"/r/myapp/myhotjason", ok, "GET", http.StatusOK, expHeaders, ""}, + + {"/r/myapp/myhot", respTypeLie, "GET", http.StatusOK, expCTHeaders, ""}, + {"/r/myapp/myhotjason", respTypeLie, "GET", http.StatusOK, expCTHeaders, ""}, + {"/r/myapp/myhotjason", respTypeJason, "GET", http.StatusOK, expCTHeaders, ""}, + {"/r/myapp/myhot", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid http response"}, {"/r/myapp/myhotjason", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid json response"}, @@ -217,8 +226,8 @@ func TestRouteRunnerExecution(t *testing.T) { for name, header := range test.expectedHeaders { if header[0] != rec.Header().Get(name) { isFailure = true - t.Errorf("Test %d: Expected header `%s` to be %s but was %s", - i, name, header[0], rec.Header().Get(name)) + t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s", + i, name, header[0], rec.Header().Get(name), respBody) } } } diff --git a/images/fn-test-utils/fn-test-utils.go b/images/fn-test-utils/fn-test-utils.go index 7049d2087..0635cfcff 100644 --- a/images/fn-test-utils/fn-test-utils.go +++ b/images/fn-test-utils/fn-test-utils.go @@ -29,6 +29,9 @@ type AppRequest struct { ResponseCode int `json:"responseCode,omitempty"` // if specified, this is our response content-type ResponseContentType string `json:"responseContentType,omitempty"` + // if specified, this is our response content-type. + // jason doesn't sit with the other kids at school. + JasonContentType string `json:"jasonContentType,omitempty"` // if specified, this is echoed back to client EchoContent string `json:"echoContent,omitempty"` // verbose mode @@ -85,22 +88,29 @@ func getTotalLeaks() int { func AppHandler(ctx context.Context, in io.Reader, out io.Writer) { req, resp := processRequest(ctx, in) - finalizeRequest(out, req, resp) + var outto fdkresponse + outto.Writer = out + finalizeRequest(&outto, req, resp) } -func finalizeRequest(out io.Writer, req *AppRequest, resp *AppResponse) { +func finalizeRequest(out *fdkresponse, req *AppRequest, resp *AppResponse) { // custom response code if req.ResponseCode != 0 { - fdk.WriteStatus(out, req.ResponseCode) + out.Status = req.ResponseCode } else { - fdk.WriteStatus(out, 200) + out.Status = 200 } // custom content type if req.ResponseContentType != "" { - fdk.SetHeader(out, "Content-Type", req.ResponseContentType) - } else { - fdk.SetHeader(out, "Content-Type", "application/json") + out.Header.Set("Content-Type", req.ResponseContentType) + } + // NOTE: don't add 'application/json' explicitly here as an else, + // we will test that go's auto-detection logic does not fade since + // some people are relying on it now + + if req.JasonContentType != "" { + out.JasonContentType = req.JasonContentType } json.NewEncoder(out).Encode(resp) @@ -236,11 +246,10 @@ func testDoJSON(ctx context.Context, in io.Reader, out io.Writer) { func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes.Buffer, hdr http.Header) error { buf.Reset() fdkutils.ResetHeaders(hdr) - resp := fdkutils.Response{ - Writer: buf, - Status: 200, - Header: hdr, - } + var resp fdkresponse + resp.Writer = buf + resp.Status = 200 + resp.Header = hdr responseSize := 0 @@ -268,7 +277,7 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes responseSize = appReq.ResponseSize } - jsonResponse := fdkutils.GetJSONResp(buf, &resp, &jsonRequest) + jsonResponse := getJSONResp(buf, &resp, &jsonRequest) if responseSize > 0 { b, err := json.Marshal(jsonResponse) @@ -287,14 +296,33 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes return nil } +// since we need to test little jason's content type since he's special. but we +// don't want to add redundant and confusing fields to the fdk... +type fdkresponse struct { + fdkutils.Response + + JasonContentType string // dumb +} + +// copy of fdk.GetJSONResp but with sugar for stupid jason's little fields +func getJSONResp(buf *bytes.Buffer, fnResp *fdkresponse, req *fdkutils.JsonIn) *fdkutils.JsonOut { + return &fdkutils.JsonOut{ + Body: buf.String(), + ContentType: fnResp.JasonContentType, + Protocol: fdkutils.CallResponseHTTP{ + StatusCode: fnResp.Status, + Headers: fnResp.Header, + }, + } +} + func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes.Buffer, hdr http.Header) error { buf.Reset() fdkutils.ResetHeaders(hdr) - resp := fdkutils.Response{ - Writer: buf, - Status: 200, - Header: hdr, - } + var resp fdkresponse + resp.Writer = buf + resp.Status = 200 + resp.Header = hdr responseSize := 0 @@ -323,7 +351,7 @@ func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes responseSize = appReq.ResponseSize } - hResp := fdkutils.GetHTTPResp(buf, &resp, req) + hResp := fdkutils.GetHTTPResp(buf, &resp.Response, req) if responseSize > 0 {