From d9b74cfd147d2bb2a9d62733806fcbe56d04a481 Mon Sep 17 00:00:00 2001 From: Owen Cliffe Date: Thu, 20 Sep 2018 19:30:28 +0100 Subject: [PATCH] Gateway trigger support (#1225) * initial gateway trigger support * Pass Content-Type down to wrapped writer * Move req header setting * Adding call id to responses * add dupe Fn-Call-Id headers --- api/agent/call.go | 51 ++++++++++++------ api/server/runner_httptrigger.go | 74 +++++++++++++++++++++++++-- api/server/runner_httptrigger_test.go | 8 +-- 3 files changed, 108 insertions(+), 25 deletions(-) diff --git a/api/agent/call.go b/api/agent/call.go index 5610b8a2a..bb2f65cec 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -13,6 +13,8 @@ import ( "go.opencensus.io/trace" + "net/textproto" + "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" @@ -53,26 +55,41 @@ const ( invokePath = "/invoke" ) +var skipTriggerHeaders = map[string]bool{ + "Connection": true, + "Keep-Alive": true, + "Trailer": true, + "Transfer-Encoding": true, + "TE": true, + "Upgrade": true, +} + // Sets up a call from an http trigger request func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trigger, req *http.Request) CallOpt { return func(c *call) error { - ctx := req.Context() - - log := common.Logger(ctx) - // Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header - // Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode - // Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8 contentType := req.Header.Get("Content-Type") - t, _, err := mime.ParseMediaType(contentType) - if err != nil && contentType != "" { - // won't fail here, but log - log.Debugf("Could not parse Content-Type header: %v %v", contentType, err) - } else { - if t == ceMimeType { - c.IsCloudEvent = true - fn.Format = models.FormatCloudEvent + // transpose trigger headers into HTTP + headers := make(http.Header) + for k, vs := range req.Header { + // should be generally unnecessary but to be doubly sure. + k = textproto.CanonicalMIMEHeaderKey(k) + if skipTriggerHeaders[k] { + continue + } + rewriteKey := fmt.Sprintf("Fn-Http-H-%s", k) + for _, v := range vs { + headers.Add(rewriteKey, v) } } + requestUrl := reqURL(req) + + headers.Set("Fn-Http-Method", req.Method) + if contentType != "" { + headers.Set("Content-Type", contentType) + } + headers.Set("Fn-Http-Request-Url", requestUrl) + headers.Set("Fn-Intent", "httprequest") + req.Header = headers if fn.Format == "" { fn.Format = models.FormatDefault @@ -83,6 +100,8 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig // TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers // TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to if rw, ok := c.w.(http.ResponseWriter); ok { + // TODO deprecate after CLI is updated + rw.Header().Add("Fn-Call-ID", id) rw.Header().Add("FN_CALL_ID", id) } @@ -110,7 +129,7 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig Annotations: app.Annotations.MergeChange(fn.Annotations).MergeChange(trigger.Annotations), Headers: req.Header, CreatedAt: common.DateTime(time.Now()), - URL: reqURL(req), + URL: requestUrl, Method: req.Method, AppID: app.ID, AppName: app.Name, @@ -118,7 +137,6 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig TriggerID: trigger.ID, SyslogURL: syslogURL, } - c.req = req return nil } @@ -155,6 +173,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp // TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to if rw, ok := c.w.(http.ResponseWriter); ok { rw.Header().Add("FN_CALL_ID", id) + rw.Header().Add("Fn-Call-Id", id) } var syslogURL string diff --git a/api/server/runner_httptrigger.go b/api/server/runner_httptrigger.go index 2b332ad15..fbe930347 100644 --- a/api/server/runner_httptrigger.go +++ b/api/server/runner_httptrigger.go @@ -7,6 +7,8 @@ import ( "strconv" "time" + "strings" + "github.com/fnproject/fn/api" "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/common" @@ -62,28 +64,89 @@ func (s *Server) handleTriggerHTTPFunctionCall2(c *gin.Context) error { return s.ServeHTTPTrigger(c, app, fn, trigger) } -//ServeHTTPTrigger serves an HTTP trigger for a given app/fn/trigger based on the current request +type triggerResponseWriter struct { + w http.ResponseWriter + headers http.Header + committed bool +} + +var _ http.ResponseWriter = new(triggerResponseWriter) + +func (trw *triggerResponseWriter) Header() http.Header { + return trw.headers +} + +func (trw *triggerResponseWriter) Write(b []byte) (int, error) { + if !trw.committed { + trw.WriteHeader(http.StatusOK) + } + return trw.w.Write(b) +} + +func (trw *triggerResponseWriter) WriteHeader(statusCode int) { + if trw.committed { + return + } + trw.committed = true + gatewayStatus := 200 + + if statusCode >= 400 { + gatewayStatus = 502 + } + + status := trw.headers.Get("Fn-Http-Status") + if status != "" { + statusInt, err := strconv.Atoi(status) + if err == nil { + gatewayStatus = statusInt + } + } + + for k, vs := range trw.headers { + if strings.HasPrefix(k, "Fn-Http-H-") { + // TODO strip out content-length and stuff here. + realHeader := strings.TrimPrefix(k, "Fn-Http-H-") + if realHeader != "" { // case where header is exactly the prefix + for _, v := range vs { + trw.w.Header().Add(realHeader, v) + } + } + } + } + + contentType := trw.headers.Get("Content-Type") + if contentType != "" { + trw.w.Header().Add("Content-Type", contentType) + } + trw.w.WriteHeader(gatewayStatus) +} + +//ServeHTTPTr igger serves an HTTP trigger for a given app/fn/trigger based on the current request // This is exported to allow extensions to handle their own trigger naming and publishing func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn, trigger *models.Trigger) error { buf := bufPool.Get().(*bytes.Buffer) buf.Reset() - writer := syncResponseWriter{ + writer := &syncResponseWriter{ Buffer: buf, headers: c.Writer.Header(), // copy ref } defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch? + triggerWriter := &triggerResponseWriter{ + w: writer, + headers: make(http.Header), + } // GetCall can mod headers, assign an id, look up the route/app (cached), // strip params, etc. // this should happen ASAP to turn app name to app ID // GetCall can mod headers, assign an id, look up the route/app (cached), // strip params, etc. - call, err := s.agent.GetCall( - agent.WithWriter(&writer), // XXX (reed): order matters [for now] + agent.WithWriter(triggerWriter), // XXX (reed): order matters [for now] agent.FromHTTPTriggerRequest(app, fn, trigger, c.Request), ) + if err != nil { return err } @@ -92,6 +155,7 @@ func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID}) c.Request = c.Request.WithContext(ctx) } + writer.Header().Add("Fn_call_id", model.ID) // TODO TRIGGERWIP not clear this makes sense here - but it works so... if model.Type == "async" { @@ -146,7 +210,7 @@ func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn if writer.status > 0 { c.Writer.WriteHeader(writer.status) } - io.Copy(c.Writer, &writer) + io.Copy(c.Writer, writer) return nil } diff --git a/api/server/runner_httptrigger_test.go b/api/server/runner_httptrigger_test.go index dd85f9941..299c896ae 100644 --- a/api/server/runner_httptrigger_test.go +++ b/api/server/runner_httptrigger_test.go @@ -56,16 +56,16 @@ func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelF func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool { - logReader, err := ds.GetLog(context.Background(), "myapp", callID) + logReader, err := ds.GetLog(context.Background(), "fnid_not_needed_by_mock", callID) if err != nil { - t.Errorf("Test %d: GetLog for call_id:%s returned err %s", + t.Errorf("Test %d: GetLog for call_id:'%s' returned err %s", tnum, callID, err.Error()) return false } logBytes, err := ioutil.ReadAll(logReader) if err != nil { - t.Errorf("Test %d: GetLog read IO call_id:%s returned err %s", + t.Errorf("Test %d: GetLog read IO call_id:'%s' returned err %s", tnum, callID, err.Error()) return false } @@ -398,7 +398,7 @@ func TestTriggerRunnerExecution(t *testing.T) { for name, header := range test.expectedHeaders { if header[0] != rec.Header().Get(name) { isFailure = true - t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s", + t.Errorf("Test %d: Expected header `%s` to be `%s` but was `%s`. body: `%s`", i, name, header[0], rec.Header().Get(name), respBody) } }