From 3eb60e2028532b907348e4a2b2b13bbb9a522a1f Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Mon, 23 Apr 2018 16:05:13 -0700 Subject: [PATCH] CloudEvents I/O format support. (#948) * CloudEvents I/O format support. * Updated format doc. * Remove log lines * This adds support for CloudEvent ingestion at the http router layer. * Updated per comments. * Responds with full CloudEvent message. * Fixed up per comments * Fix tests * Checks for cloudevent content-type * doesn't error on missing content-type. --- api/agent/agent.go | 2 +- api/agent/call.go | 27 +++- api/agent/protocol/cloudevent.go | 207 +++++++++++++++++++++++++++++ api/agent/protocol/factory.go | 28 ++-- api/agent/protocol/json.go | 6 +- api/agent/protocol/json_test.go | 2 +- api/models/call.go | 2 + api/models/route.go | 2 +- docs/developers/ce-example.json | 13 ++ docs/developers/cloudevents.md | 21 +++ docs/developers/function-format.md | 99 +++++++++++++- 11 files changed, 391 insertions(+), 18 deletions(-) create mode 100644 api/agent/protocol/cloudevent.go create mode 100644 docs/developers/ce-example.json create mode 100644 docs/developers/cloudevents.md diff --git a/api/agent/agent.go b/api/agent/agent.go index 4f936a95a..7ae866d09 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -631,7 +631,7 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { errApp := make(chan error, 1) go func() { - ci := protocol.NewCallInfo(call.Call, call.req) + ci := protocol.NewCallInfo(call.IsCloudEvent, call.Call, call.req) errApp <- proto.Dispatch(ctx, ci, call.w) }() diff --git a/api/agent/call.go b/api/agent/call.go index 6b7448b57..0617e3011 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "mime" "net/http" "strings" "time" @@ -49,13 +50,34 @@ type Param struct { } type Params []Param +const ( + ceMimeType = "application/cloudevents+json" +) + func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallOpt { return func(c *call) error { - route, err := a.GetRoute(req.Context(), app.ID, path) + ctx := req.Context() + route, err := a.GetRoute(ctx, app.ID, path) if err != nil { return err } + log := common.Logger(ctx) + // Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header + // Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode + // Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8 + contentType := req.Header.Get("Content-Type") + t, _, err := mime.ParseMediaType(contentType) + if err != nil { + // won't fail here, but log + log.Debugf("Could not parse Content-Type header: %v", err) + } else { + if t == ceMimeType { + c.IsCloudEvent = true + route.Format = models.FormatCloudEvent + } + } + if route.Format == "" { route.Format = models.FormatDefault } @@ -258,6 +280,9 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { type call struct { *models.Call + // IsCloudEvent flag whether this was ingested as a cloud event. This may become the default or only way. + IsCloudEvent bool `json:"is_cloud_event"` + da DataAccess w io.Writer req *http.Request diff --git a/api/agent/protocol/cloudevent.go b/api/agent/protocol/cloudevent.go new file mode 100644 index 000000000..203a06484 --- /dev/null +++ b/api/agent/protocol/cloudevent.go @@ -0,0 +1,207 @@ +package protocol + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.opencensus.io/trace" + + "github.com/fnproject/fn/api/models" +) + +// CloudEvent is the official JSON representation of a CloudEvent: https://github.com/cloudevents/spec/blob/master/serialization.md +type CloudEvent struct { + CloudEventsVersion string `json:"cloudEventsVersion"` + EventID string `json:"eventID"` + Source string `json:"source"` + EventType string `json:"eventType"` + EventTypeVersion string `json:"eventTypeVersion"` + EventTime time.Time `json:"eventTime"` // TODO: ensure rfc3339 format + SchemaURL string `json:"schemaURL"` + ContentType string `json:"contentType"` + Extensions map[string]interface{} `json:"extensions"` + Data interface{} `json:"data"` // from docs: the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json) +} + +type cloudEventIn struct { + CloudEvent + + // Deadline string `json:"deadline"` + // Protocol CallRequestHTTP `json:"protocol"` +} + +// cloudEventOut the expected response from the function container +type cloudEventOut struct { + CloudEvent + + // Protocol *CallResponseHTTP `json:"protocol,omitempty"` +} + +// CloudEventProtocol converts stdin/stdout streams from HTTP into JSON format. +type CloudEventProtocol struct { + // These are the container input streams, not the input from the request or the output for the response + in io.Writer + out io.Reader +} + +func (p *CloudEventProtocol) IsStreamable() bool { + return true +} + +func (h *CloudEventProtocol) writeJSONToContainer(ci CallInfo) error { + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufPool.Put(buf) + + _, err := io.Copy(buf, ci.Input()) + if err != nil { + return err + } + + var in cloudEventIn + if ci.IsCloudEvent() { + // then it's already in the right format, let's parse it, then modify + err = json.Unmarshal(buf.Bytes(), &in) + if err != nil { + return fmt.Errorf("Invalid CloudEvent input. %v", err) + } + } else { + in = cloudEventIn{ + CloudEvent: CloudEvent{ + ContentType: ci.ContentType(), + EventID: ci.CallID(), + EventType: "http", + CloudEventsVersion: "0.1", + Source: ci.RequestURL(), + }, + } + if buf.Len() == 0 { + // nada + // todo: should we leave as null, pass in empty string, omitempty or some default for the content type, eg: {} for json? + } else if ci.ContentType() == "application/json" { + d := map[string]interface{}{} + err = json.NewDecoder(buf).Decode(&d) + if err != nil { + return fmt.Errorf("Invalid json body with contentType 'application/json'. %v", err) + } + in.Data = d + } else { + in.Data = buf.String() + } + } + // todo: deal with the dual ID's, one from outside, one from inside + if in.Extensions == nil { + in.Extensions = map[string]interface{}{} + } + // note: protocol stuff should be set on first ingestion of the event in fn2.0, the http router for example, not here + in.Extensions["protocol"] = CallRequestHTTP{ + Type: ci.ProtocolType(), + Method: ci.Method(), + RequestURL: ci.RequestURL(), + Headers: ci.Headers(), + } + in.Extensions["deadline"] = ci.Deadline().String() + + return json.NewEncoder(h.in).Encode(in) +} + +func (h *CloudEventProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + ctx, span := trace.StartSpan(ctx, "dispatch_cloudevent") + defer span.End() + + _, span = trace.StartSpan(ctx, "dispatch_cloudevent_write_request") + err := h.writeJSONToContainer(ci) + span.End() + if err != nil { + return err + } + + _, span = trace.StartSpan(ctx, "dispatch_cloudevent_read_response") + var jout cloudEventOut + decoder := json.NewDecoder(h.out) + err = decoder.Decode(&jout) + span.End() + if err != nil { + return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err)) + } + + _, span = trace.StartSpan(ctx, "dispatch_cloudevent_write_response") + defer span.End() + + rw, ok := w.(http.ResponseWriter) + if !ok { + // logs can just copy the full thing in there, headers and all. + err := json.NewEncoder(w).Encode(jout) + return isExcessData(err, decoder) + } + + // this has to be done for pulling out: + // - status code + // - body + // - headers + pp := jout.Extensions["protocol"] + var p map[string]interface{} + if pp != nil { + p = pp.(map[string]interface{}) + hh := p["headers"] + if hh != nil { + h, ok := hh.(map[string]interface{}) + if !ok { + return fmt.Errorf("Invalid JSON for protocol headers, not a map") + } + for k, v := range h { + // fmt.Printf("HEADER: %v: %v\n", k, v) + // fmt.Printf("%v", reflect.TypeOf(v)) + harray, ok := v.([]interface{}) + if !ok { + return fmt.Errorf("Invalid JSON for protocol headers, not an array of strings for header value") + } + for _, vv := range harray { + rw.Header().Add(k, vv.(string)) // on top of any specified on the route + } + } + } + } + // after other header setting, top level content_type takes precedence and is + // absolute (if set). it is expected that if users want to set multiple + // values they put it in the string, e.g. `"content-type:"application/json; charset=utf-8"` + // TODO this value should not exist since it's redundant in proto headers? + if jout.ContentType != "" { + rw.Header().Set("Content-Type", jout.ContentType) + } + + // we must set all headers before writing the status, see http.ResponseWriter contract + if p != nil && p["status_code"] != nil { + sc, ok := p["status_code"].(float64) + if !ok { + return fmt.Errorf("Invalid status_code type in protocol extension, must be an integer: %v\n", p["status_code"]) + } + rw.WriteHeader(int(sc)) + } + + if ci.IsCloudEvent() { + // then it's already in the right format so just return it as is + err = json.NewEncoder(rw).Encode(jout) + if err != nil { + return fmt.Errorf("Error marshalling CloudEvent response to json. %v\n", err) + } + } else { + if jout.ContentType == "application/json" { + d, err := json.Marshal(jout.Data) + if err != nil { + return fmt.Errorf("Error marshalling function response 'data' to json. %v\n", err) + } + _, err = rw.Write(d) + } else if jout.ContentType == "text/plain" { + _, err = io.WriteString(rw, jout.Data.(string)) + } else { + return fmt.Errorf("Error: Unknown content type: %v\n", jout.ContentType) + } + } + return isExcessData(err, decoder) +} diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index 9a35d9c36..be3abdea9 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -36,6 +36,7 @@ type ContainerIO interface { // CallInfo is passed into dispatch with only the required data the protocols require type CallInfo interface { + IsCloudEvent() bool CallID() string ContentType() string Input() io.Reader @@ -53,8 +54,13 @@ type CallInfo interface { } type callInfoImpl struct { - call *models.Call - req *http.Request + call *models.Call + req *http.Request + isCloudEvent bool +} + +func (ci callInfoImpl) IsCloudEvent() bool { + return ci.isCloudEvent } func (ci callInfoImpl) CallID() string { @@ -112,10 +118,11 @@ func (ci callInfoImpl) Headers() map[string][]string { return ci.req.Header } -func NewCallInfo(call *models.Call, req *http.Request) CallInfo { +func NewCallInfo(isCloudEvent bool, call *models.Call, req *http.Request) CallInfo { ci := &callInfoImpl{ - call: call, - req: req, + isCloudEvent: isCloudEvent, + call: call, + req: req, } return ci } @@ -125,10 +132,11 @@ type Protocol string // hot function protocols const ( - Default Protocol = models.FormatDefault - HTTP Protocol = models.FormatHTTP - JSON Protocol = models.FormatJSON - Empty Protocol = "" + Default Protocol = models.FormatDefault + HTTP Protocol = models.FormatHTTP + JSON Protocol = models.FormatJSON + CloudEventP Protocol = models.FormatCloudEvent + Empty Protocol = "" ) func (p *Protocol) UnmarshalJSON(b []byte) error { @@ -165,6 +173,8 @@ func New(p Protocol, in io.Writer, out io.Reader) ContainerIO { return &HTTPProtocol{in, out} case JSON: return &JSONProtocol{in, out} + case CloudEventP: + return &CloudEventProtocol{in, out} case Default, Empty: return &DefaultProtocol{} } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 8b9490085..5c1dd6d37 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -115,7 +115,7 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e if !ok { // logs can just copy the full thing in there, headers and all. err := json.NewEncoder(w).Encode(jout) - return h.isExcessData(err, decoder) + return isExcessData(err, decoder) } // this has to be done for pulling out: @@ -144,10 +144,10 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e } _, err = io.WriteString(rw, jout.Body) - return h.isExcessData(err, decoder) + return isExcessData(err, decoder) } -func (h *JSONProtocol) isExcessData(err error, decoder *json.Decoder) error { +func isExcessData(err error, decoder *json.Decoder) error { if err == nil { // Now check for excess output, if this is the case, we can be certain that the next request will fail. reader, ok := decoder.Buffered().(*bytes.Reader) diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index d1a3d10a1..1fb490222 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -46,7 +46,7 @@ func setupRequest(data interface{}) *callInfoImpl { // fixup URL in models.Call call.URL = req.URL.String() - ci := &callInfoImpl{call, req} + ci := &callInfoImpl{call: call, req: req} return ci } diff --git a/api/models/call.go b/api/models/call.go index 7f80b65a8..b538b2012 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -23,6 +23,8 @@ const ( FormatHTTP = "http" // FormatJSON ... FormatJSON = "json" + // FormatCloudEvent ... + FormatCloudEvent = "cloudevent" ) var possibleStatuses = [...]string{"delayed", "queued", "running", "success", "error", "cancelled"} diff --git a/api/models/route.go b/api/models/route.go index f31da82ac..c616fa4bc 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -112,7 +112,7 @@ func (r *Route) Validate() error { return ErrRoutesInvalidType } - if r.Format != FormatDefault && r.Format != FormatHTTP && r.Format != FormatJSON { + if r.Format != FormatDefault && r.Format != FormatHTTP && r.Format != FormatJSON && r.Format != FormatCloudEvent { return ErrRoutesInvalidFormat } diff --git a/docs/developers/ce-example.json b/docs/developers/ce-example.json new file mode 100644 index 000000000..6bb3b4b36 --- /dev/null +++ b/docs/developers/ce-example.json @@ -0,0 +1,13 @@ +{ + "cloudEventsVersion": "0.1", + "eventID": "6480da1a-5028-4301-acc3-fbae628207b3", + "source": "http://example.com/repomanager", + "eventType": "com.example.repro.create", + "eventTypeVersion": "v1.5", + "eventTime": "2018-04-01T23:12:34Z", + "schemaURL": "https://product.example.com/schema/repo-create", + "contentType": "application/json", + "data": { + "name": "travis" + } +} diff --git a/docs/developers/cloudevents.md b/docs/developers/cloudevents.md new file mode 100644 index 000000000..051030ac5 --- /dev/null +++ b/docs/developers/cloudevents.md @@ -0,0 +1,21 @@ +# CloudEvents - EXPERIMENTAL + +Fn supports CloudEvents throughout the system, meaning on ingestion and/or as a function I/O format. + +To use as a function I/O format, set `format: cloudevent`. + +To use as as the body of the HTTP request, the following header: + +``` +FN_CLOUD_EVENT: true +``` + +If that header is set, it is assumed that the function also supports the CloudEvents format (in other words, it will automatically set `format: cloudevent`). + +If you have a function that supports CloudEvents, you can test it with the example file in this directory: + +```sh +curl -X POST -H "Content-Type: application/json" -H "FN_CLOUD_EVENT: true" -d @ce-example.json http://localhost:8080/r/rapp/myfunc +``` + +To make a function that supports CloudEvents, you can use an FDK that supports like fdk-ruby. diff --git a/docs/developers/function-format.md b/docs/developers/function-format.md index a210d769f..0bfa8dc34 100644 --- a/docs/developers/function-format.md +++ b/docs/developers/function-format.md @@ -60,6 +60,103 @@ Cons: * Not very efficient resource utilization - one new container execution per event. +### CloudEvent I/O Format + +`format: cloudevent` + +The CloudEvent format is a nice hot format as it is easy to parse in most languages. It's also a [CNCF +defined format for events](https://github.com/cloudevents/spec/blob/master/spec.md). + +If a request comes in with the following body: + +```json +{ + "some": "input" +} +``` + +then, the input will be: + +#### Input + +Internally functions receive data in the example format below: + +```json +{ + "cloudEventsVersion": "cloudEventsVersion value", + "eventID": "the call ID", + "source": "source value", + "eventType": "eventType value", + "eventTypeVersion": "eventTypeVersion value", + "eventTime": "eventTime value", + "schemaURL": "schemaURL value", + "contentType": "contentType", + "extensions": { + "deadline":"2018-01-30T16:52:39.786Z", + "protocol": { + "type": "http", + "method": "POST", + "request_url": "http://localhost:8080/r/myapp/myfunc?q=hi", + "headers": { + "Content-Type": ["application/json"], + "Other-Header": ["something"] + } + } + }, + "data": { + "some": "input" + } +} +{ + NEXT EVENT +} +``` + +* data - the main contents. If HTTP is the protocol, this would be the request body. +* eventID - the unique ID for the event/call. +* contentType - format of the `data` parameter. +* extensions: + * deadline - a time limit for the call, based on function timeout. + * protocol - arbitrary map of protocol specific data. The above example shows what the HTTP protocol handler passes in. Subject to change and reduces reusability of your functions. **USE AT YOUR OWN RISK**. Under `protocol` => `headers` contains all of the HTTP headers exactly as defined in the incoming request. + + +#### Output + +Function's output should be a copy of the input CloudEvent with any fields changed: + +```json +{ + "cloudEventsVersion": "cloudEventsVersion value", + "eventID": "the call ID", + "source": "source value", + "eventType": "eventType value", + "eventTypeVersion": "eventTypeVersion value", + "eventTime": "eventTime value", + "schemaURL": "schemaURL value", + "contentType": "contentType", + "extensions": { + "deadline":"2018-01-30T16:52:39.786Z", + "protocol": { + "type": "http", + "status_code": 200, + "headers": { + "Other-Header": ["something"] + } + } + }, + "data": { + "some": "output" + } +} +{ + NEXT OUTPUT OBJECT +} +``` + +* data - required - the response body. +* contentType - optional - format of `data`. Default is application/json. +* protocol - optional - protocol specific response options. Entirely optional. Contents defined by each protocol. + ### JSON I/O Format `format: json` @@ -109,8 +206,6 @@ Internally functions receive data in the example format below: Under `protocol`, `headers` contains all of the HTTP headers exactly as defined in the incoming request. -Each request will be separated by a blank line. - #### Output Function's output format should have the following format: