mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
HTTP trigger http-stream tests (#1241)
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
@@ -20,16 +19,6 @@ var (
|
||||
bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
|
||||
)
|
||||
|
||||
type ResponseBufferingWriter interface {
|
||||
http.ResponseWriter
|
||||
io.Reader
|
||||
Status() int
|
||||
GetBuffer() *bytes.Buffer
|
||||
SetBuffer(*bytes.Buffer)
|
||||
}
|
||||
|
||||
var _ ResponseBufferingWriter = new(syncResponseWriter)
|
||||
|
||||
// implements http.ResponseWriter
|
||||
// this little guy buffers responses from user containers and lets them still
|
||||
// set headers and such without us risking writing partial output [as much, the
|
||||
@@ -41,13 +30,10 @@ type syncResponseWriter struct {
|
||||
*bytes.Buffer
|
||||
}
|
||||
|
||||
func (s *syncResponseWriter) Header() http.Header { return s.headers }
|
||||
var _ http.ResponseWriter = new(syncResponseWriter) // nice compiler errors
|
||||
|
||||
// By storing the status here, we effectively buffer the response
|
||||
func (s *syncResponseWriter) WriteHeader(code int) { s.status = code }
|
||||
func (s *syncResponseWriter) Status() int { return s.status }
|
||||
func (s *syncResponseWriter) GetBuffer() *bytes.Buffer { return s.Buffer }
|
||||
func (s *syncResponseWriter) SetBuffer(buf *bytes.Buffer) { s.Buffer = buf }
|
||||
func (s *syncResponseWriter) Header() http.Header { return s.headers }
|
||||
func (s *syncResponseWriter) WriteHeader(code int) { s.status = code }
|
||||
|
||||
// handleFnInvokeCall executes the function, for router handlers
|
||||
func (s *Server) handleFnInvokeCall(c *gin.Context) {
|
||||
@@ -79,71 +65,84 @@ func (s *Server) handleFnInvokeCall2(c *gin.Context) error {
|
||||
}
|
||||
|
||||
func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) error {
|
||||
writer := &syncResponseWriter{
|
||||
headers: c.Writer.Header(),
|
||||
return s.fnInvoke(c.Writer, c.Request, app, fn, nil)
|
||||
}
|
||||
|
||||
func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *models.App, fn *models.Fn, trig *models.Trigger) error {
|
||||
// TODO: we should get rid of the buffers, and stream back (saves memory (+splice), faster (splice), allows streaming, don't have to cap resp size)
|
||||
// buffer the response before writing it out to client to prevent partials from trying to stream
|
||||
buf := bufPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
bufWriter := syncResponseWriter{
|
||||
headers: resp.Header(),
|
||||
status: 200,
|
||||
Buffer: buf,
|
||||
}
|
||||
|
||||
call, err := s.agent.GetCall(agent.WithWriter(writer), // XXX (reed): order matters [for now]
|
||||
agent.FromHTTPFnRequest(app, fn, c.Request))
|
||||
var writer http.ResponseWriter = &bufWriter
|
||||
writer = &jsonContentTypeTrapper{ResponseWriter: writer}
|
||||
|
||||
opts := []agent.CallOpt{
|
||||
agent.WithWriter(writer), // XXX (reed): order matters [for now]
|
||||
agent.FromHTTPFnRequest(app, fn, req),
|
||||
}
|
||||
if trig != nil {
|
||||
opts = append(opts, agent.WithTrigger(trig))
|
||||
}
|
||||
|
||||
call, err := s.agent.GetCall(opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.fnInvoke(c, app, fn, writer, call)
|
||||
err = s.agent.Submit(call)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// because we can...
|
||||
writer.Header().Set("Content-Length", strconv.Itoa(int(buf.Len())))
|
||||
|
||||
// buffered response writer traps status (so we can add headers), we need to write it still
|
||||
if bufWriter.status > 0 {
|
||||
resp.WriteHeader(bufWriter.status)
|
||||
}
|
||||
|
||||
io.Copy(resp, buf)
|
||||
bufPool.Put(buf) // at this point, submit returned without timing out, so we can re-use this one
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) fnInvoke(c *gin.Context, app *models.App, fn *models.Fn, writer ResponseBufferingWriter, call agent.Call) error {
|
||||
// TODO: we should get rid of the buffers, and stream back (saves memory (+splice), faster (splice), allows streaming, don't have to cap resp size)
|
||||
buf := bufPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
var submitErr error
|
||||
defer func() {
|
||||
if buf.Len() == 0 && submitErr == nil {
|
||||
bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch?
|
||||
}
|
||||
}()
|
||||
writer.SetBuffer(buf)
|
||||
// TODO kill this thing after removing tests for http/json/default formats
|
||||
type jsonContentTypeTrapper struct {
|
||||
http.ResponseWriter
|
||||
committed bool
|
||||
}
|
||||
|
||||
model := call.Model()
|
||||
{ // scope this, to disallow ctx use outside of this scope. add id for handleV1ErrorResponse logger
|
||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
}
|
||||
var _ http.ResponseWriter = new(jsonContentTypeTrapper) // nice compiler errors
|
||||
|
||||
submitErr = s.agent.Submit(call)
|
||||
if submitErr != nil {
|
||||
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
||||
// we could filter that error out here too as right now it yells a little
|
||||
if submitErr == models.ErrCallTimeoutServerBusy || submitErr == models.ErrCallTimeout {
|
||||
// TODO maneuver
|
||||
// add this, since it means that start may not have been called [and it's relevant]
|
||||
c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String())
|
||||
}
|
||||
return submitErr
|
||||
func (j *jsonContentTypeTrapper) Write(b []byte) (int, error) {
|
||||
if !j.committed {
|
||||
// override default content type detection behavior to add json
|
||||
j.detectContentType(b)
|
||||
}
|
||||
// if they don't set a content-type - detect it
|
||||
// TODO: remove this after removing all the formats (too many tests to scrub til then)
|
||||
if writer.Header().Get("Content-Type") == "" {
|
||||
// see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to?
|
||||
j.committed = true
|
||||
|
||||
// write inner
|
||||
return j.ResponseWriter.Write(b)
|
||||
}
|
||||
|
||||
func (j *jsonContentTypeTrapper) detectContentType(b []byte) {
|
||||
if j.Header().Get("Content-Type") == "" {
|
||||
// see http.DetectContentType
|
||||
var contentType string
|
||||
jsonPrefix := [1]byte{'{'} // stack allocated
|
||||
if bytes.HasPrefix(writer.GetBuffer().Bytes(), jsonPrefix[:]) {
|
||||
if bytes.HasPrefix(b, jsonPrefix[:]) {
|
||||
// try to detect json, since DetectContentType isn't a hipster.
|
||||
contentType = "application/json; charset=utf-8"
|
||||
} else {
|
||||
contentType = http.DetectContentType(writer.GetBuffer().Bytes())
|
||||
contentType = http.DetectContentType(b)
|
||||
}
|
||||
writer.Header().Set("Content-Type", contentType)
|
||||
j.Header().Set("Content-Type", contentType)
|
||||
}
|
||||
|
||||
writer.Header().Set("Content-Length", strconv.Itoa(int(writer.GetBuffer().Len())))
|
||||
|
||||
if writer.Status() > 0 {
|
||||
c.Writer.WriteHeader(writer.Status())
|
||||
}
|
||||
|
||||
io.Copy(c.Writer, writer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user