mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Gateway trigger support (#1225)
* initial gateway trigger support * Pass Content-Type down to wrapped writer * Move req header setting * Adding call id to responses * add dupe Fn-Call-Id headers
This commit is contained in:
@@ -13,6 +13,8 @@ import (
|
|||||||
|
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
|
|
||||||
|
"net/textproto"
|
||||||
|
|
||||||
"github.com/fnproject/fn/api/agent/drivers"
|
"github.com/fnproject/fn/api/agent/drivers"
|
||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
"github.com/fnproject/fn/api/id"
|
"github.com/fnproject/fn/api/id"
|
||||||
@@ -53,26 +55,41 @@ const (
|
|||||||
invokePath = "/invoke"
|
invokePath = "/invoke"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var skipTriggerHeaders = map[string]bool{
|
||||||
|
"Connection": true,
|
||||||
|
"Keep-Alive": true,
|
||||||
|
"Trailer": true,
|
||||||
|
"Transfer-Encoding": true,
|
||||||
|
"TE": true,
|
||||||
|
"Upgrade": true,
|
||||||
|
}
|
||||||
|
|
||||||
// Sets up a call from an http trigger request
|
// Sets up a call from an http trigger request
|
||||||
func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trigger, req *http.Request) CallOpt {
|
func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trigger, req *http.Request) CallOpt {
|
||||||
return func(c *call) error {
|
return func(c *call) error {
|
||||||
ctx := req.Context()
|
|
||||||
|
|
||||||
log := common.Logger(ctx)
|
|
||||||
// Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header
|
|
||||||
// Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
|
|
||||||
// Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8
|
|
||||||
contentType := req.Header.Get("Content-Type")
|
contentType := req.Header.Get("Content-Type")
|
||||||
t, _, err := mime.ParseMediaType(contentType)
|
// transpose trigger headers into HTTP
|
||||||
if err != nil && contentType != "" {
|
headers := make(http.Header)
|
||||||
// won't fail here, but log
|
for k, vs := range req.Header {
|
||||||
log.Debugf("Could not parse Content-Type header: %v %v", contentType, err)
|
// should be generally unnecessary but to be doubly sure.
|
||||||
} else {
|
k = textproto.CanonicalMIMEHeaderKey(k)
|
||||||
if t == ceMimeType {
|
if skipTriggerHeaders[k] {
|
||||||
c.IsCloudEvent = true
|
continue
|
||||||
fn.Format = models.FormatCloudEvent
|
}
|
||||||
|
rewriteKey := fmt.Sprintf("Fn-Http-H-%s", k)
|
||||||
|
for _, v := range vs {
|
||||||
|
headers.Add(rewriteKey, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
requestUrl := reqURL(req)
|
||||||
|
|
||||||
|
headers.Set("Fn-Http-Method", req.Method)
|
||||||
|
if contentType != "" {
|
||||||
|
headers.Set("Content-Type", contentType)
|
||||||
|
}
|
||||||
|
headers.Set("Fn-Http-Request-Url", requestUrl)
|
||||||
|
headers.Set("Fn-Intent", "httprequest")
|
||||||
|
req.Header = headers
|
||||||
|
|
||||||
if fn.Format == "" {
|
if fn.Format == "" {
|
||||||
fn.Format = models.FormatDefault
|
fn.Format = models.FormatDefault
|
||||||
@@ -83,6 +100,8 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig
|
|||||||
// TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers
|
// TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers
|
||||||
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
|
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
|
||||||
if rw, ok := c.w.(http.ResponseWriter); ok {
|
if rw, ok := c.w.(http.ResponseWriter); ok {
|
||||||
|
// TODO deprecate after CLI is updated
|
||||||
|
rw.Header().Add("Fn-Call-ID", id)
|
||||||
rw.Header().Add("FN_CALL_ID", id)
|
rw.Header().Add("FN_CALL_ID", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,7 +129,7 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig
|
|||||||
Annotations: app.Annotations.MergeChange(fn.Annotations).MergeChange(trigger.Annotations),
|
Annotations: app.Annotations.MergeChange(fn.Annotations).MergeChange(trigger.Annotations),
|
||||||
Headers: req.Header,
|
Headers: req.Header,
|
||||||
CreatedAt: common.DateTime(time.Now()),
|
CreatedAt: common.DateTime(time.Now()),
|
||||||
URL: reqURL(req),
|
URL: requestUrl,
|
||||||
Method: req.Method,
|
Method: req.Method,
|
||||||
AppID: app.ID,
|
AppID: app.ID,
|
||||||
AppName: app.Name,
|
AppName: app.Name,
|
||||||
@@ -118,7 +137,6 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig
|
|||||||
TriggerID: trigger.ID,
|
TriggerID: trigger.ID,
|
||||||
SyslogURL: syslogURL,
|
SyslogURL: syslogURL,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.req = req
|
c.req = req
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -155,6 +173,7 @@ func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOp
|
|||||||
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
|
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
|
||||||
if rw, ok := c.w.(http.ResponseWriter); ok {
|
if rw, ok := c.w.(http.ResponseWriter); ok {
|
||||||
rw.Header().Add("FN_CALL_ID", id)
|
rw.Header().Add("FN_CALL_ID", id)
|
||||||
|
rw.Header().Add("Fn-Call-Id", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
var syslogURL string
|
var syslogURL string
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/fnproject/fn/api"
|
"github.com/fnproject/fn/api"
|
||||||
"github.com/fnproject/fn/api/agent"
|
"github.com/fnproject/fn/api/agent"
|
||||||
"github.com/fnproject/fn/api/common"
|
"github.com/fnproject/fn/api/common"
|
||||||
@@ -62,28 +64,89 @@ func (s *Server) handleTriggerHTTPFunctionCall2(c *gin.Context) error {
|
|||||||
return s.ServeHTTPTrigger(c, app, fn, trigger)
|
return s.ServeHTTPTrigger(c, app, fn, trigger)
|
||||||
}
|
}
|
||||||
|
|
||||||
//ServeHTTPTrigger serves an HTTP trigger for a given app/fn/trigger based on the current request
|
type triggerResponseWriter struct {
|
||||||
|
w http.ResponseWriter
|
||||||
|
headers http.Header
|
||||||
|
committed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ http.ResponseWriter = new(triggerResponseWriter)
|
||||||
|
|
||||||
|
func (trw *triggerResponseWriter) Header() http.Header {
|
||||||
|
return trw.headers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (trw *triggerResponseWriter) Write(b []byte) (int, error) {
|
||||||
|
if !trw.committed {
|
||||||
|
trw.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
return trw.w.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (trw *triggerResponseWriter) WriteHeader(statusCode int) {
|
||||||
|
if trw.committed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
trw.committed = true
|
||||||
|
gatewayStatus := 200
|
||||||
|
|
||||||
|
if statusCode >= 400 {
|
||||||
|
gatewayStatus = 502
|
||||||
|
}
|
||||||
|
|
||||||
|
status := trw.headers.Get("Fn-Http-Status")
|
||||||
|
if status != "" {
|
||||||
|
statusInt, err := strconv.Atoi(status)
|
||||||
|
if err == nil {
|
||||||
|
gatewayStatus = statusInt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, vs := range trw.headers {
|
||||||
|
if strings.HasPrefix(k, "Fn-Http-H-") {
|
||||||
|
// TODO strip out content-length and stuff here.
|
||||||
|
realHeader := strings.TrimPrefix(k, "Fn-Http-H-")
|
||||||
|
if realHeader != "" { // case where header is exactly the prefix
|
||||||
|
for _, v := range vs {
|
||||||
|
trw.w.Header().Add(realHeader, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
contentType := trw.headers.Get("Content-Type")
|
||||||
|
if contentType != "" {
|
||||||
|
trw.w.Header().Add("Content-Type", contentType)
|
||||||
|
}
|
||||||
|
trw.w.WriteHeader(gatewayStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
//ServeHTTPTr igger serves an HTTP trigger for a given app/fn/trigger based on the current request
|
||||||
// This is exported to allow extensions to handle their own trigger naming and publishing
|
// This is exported to allow extensions to handle their own trigger naming and publishing
|
||||||
func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn, trigger *models.Trigger) error {
|
func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn, trigger *models.Trigger) error {
|
||||||
buf := bufPool.Get().(*bytes.Buffer)
|
buf := bufPool.Get().(*bytes.Buffer)
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
writer := syncResponseWriter{
|
writer := &syncResponseWriter{
|
||||||
Buffer: buf,
|
Buffer: buf,
|
||||||
headers: c.Writer.Header(), // copy ref
|
headers: c.Writer.Header(), // copy ref
|
||||||
}
|
}
|
||||||
defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch?
|
defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch?
|
||||||
|
|
||||||
|
triggerWriter := &triggerResponseWriter{
|
||||||
|
w: writer,
|
||||||
|
headers: make(http.Header),
|
||||||
|
}
|
||||||
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
||||||
// strip params, etc.
|
// strip params, etc.
|
||||||
// this should happen ASAP to turn app name to app ID
|
// this should happen ASAP to turn app name to app ID
|
||||||
|
|
||||||
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
||||||
// strip params, etc.
|
// strip params, etc.
|
||||||
|
|
||||||
call, err := s.agent.GetCall(
|
call, err := s.agent.GetCall(
|
||||||
agent.WithWriter(&writer), // XXX (reed): order matters [for now]
|
agent.WithWriter(triggerWriter), // XXX (reed): order matters [for now]
|
||||||
agent.FromHTTPTriggerRequest(app, fn, trigger, c.Request),
|
agent.FromHTTPTriggerRequest(app, fn, trigger, c.Request),
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -92,6 +155,7 @@ func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn
|
|||||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
||||||
c.Request = c.Request.WithContext(ctx)
|
c.Request = c.Request.WithContext(ctx)
|
||||||
}
|
}
|
||||||
|
writer.Header().Add("Fn_call_id", model.ID)
|
||||||
|
|
||||||
// TODO TRIGGERWIP not clear this makes sense here - but it works so...
|
// TODO TRIGGERWIP not clear this makes sense here - but it works so...
|
||||||
if model.Type == "async" {
|
if model.Type == "async" {
|
||||||
@@ -146,7 +210,7 @@ func (s *Server) ServeHTTPTrigger(c *gin.Context, app *models.App, fn *models.Fn
|
|||||||
if writer.status > 0 {
|
if writer.status > 0 {
|
||||||
c.Writer.WriteHeader(writer.status)
|
c.Writer.WriteHeader(writer.status)
|
||||||
}
|
}
|
||||||
io.Copy(c.Writer, &writer)
|
io.Copy(c.Writer, writer)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,16 +56,16 @@ func testRunner(_ *testing.T, args ...interface{}) (agent.Agent, context.CancelF
|
|||||||
|
|
||||||
func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool {
|
func checkLogs(t *testing.T, tnum int, ds models.LogStore, callID string, expected []string) bool {
|
||||||
|
|
||||||
logReader, err := ds.GetLog(context.Background(), "myapp", callID)
|
logReader, err := ds.GetLog(context.Background(), "fnid_not_needed_by_mock", callID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Test %d: GetLog for call_id:%s returned err %s",
|
t.Errorf("Test %d: GetLog for call_id:'%s' returned err %s",
|
||||||
tnum, callID, err.Error())
|
tnum, callID, err.Error())
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
logBytes, err := ioutil.ReadAll(logReader)
|
logBytes, err := ioutil.ReadAll(logReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Test %d: GetLog read IO call_id:%s returned err %s",
|
t.Errorf("Test %d: GetLog read IO call_id:'%s' returned err %s",
|
||||||
tnum, callID, err.Error())
|
tnum, callID, err.Error())
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -398,7 +398,7 @@ func TestTriggerRunnerExecution(t *testing.T) {
|
|||||||
for name, header := range test.expectedHeaders {
|
for name, header := range test.expectedHeaders {
|
||||||
if header[0] != rec.Header().Get(name) {
|
if header[0] != rec.Header().Get(name) {
|
||||||
isFailure = true
|
isFailure = true
|
||||||
t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s",
|
t.Errorf("Test %d: Expected header `%s` to be `%s` but was `%s`. body: `%s`",
|
||||||
i, name, header[0], rec.Header().Get(name), respBody)
|
i, name, header[0], rec.Header().Get(name), respBody)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user