From 96cfc9f5c157159a071740de4e3ec984333aebe9 Mon Sep 17 00:00:00 2001 From: Travis Reeder Date: Thu, 16 Nov 2017 09:59:13 -0800 Subject: [PATCH] Update json (#463) * wip * wip * Added more fields to JSON and added blank line between objects. * Update tests. * wip * Updated to represent recent discussions. * Fixed up the json test * More docs * Changed from blank line to bracket, newline, open bracket. * Blank line added back, easier for delimiting. --- Makefile | 2 +- api/agent/agent.go | 4 +- api/agent/protocol/default.go | 8 +- api/agent/protocol/factory.go | 64 ++++++++++++++- api/agent/protocol/http.go | 9 ++- api/agent/protocol/json.go | 134 ++++++++++++++++++++++++++------ api/agent/protocol/json_test.go | 42 +++++----- docs/function-format.md | 84 ++++++++++++++------ 8 files changed, 266 insertions(+), 81 deletions(-) diff --git a/Makefile b/Makefile index 92139680b..72fa95a8d 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ build: go build -o functions install: - go install + go build -o ${GOPATH}/bin/fn-server test: ./test.sh diff --git a/api/agent/agent.go b/api/agent/agent.go index df07011b2..f5d787f35 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -502,13 +502,15 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { go func() { // TODO make sure stdin / stdout not blocked if container dies or we leak goroutine // we have to make sure this gets shut down or 2 threads will be reading/writing in/out - errApp <- s.proto.Dispatch(call.w, call.req) + ci := protocol.NewCallInfo(call.Model(), call.req) + errApp <- s.proto.Dispatch(ctx, ci, call.w) }() select { case err := <-s.errC: // error from container return err case err := <-errApp: + // would be great to be able to decipher what error is returning from here so we can show better messages return err case <-ctx.Done(): // call timeout return ctx.Err() diff --git a/api/agent/protocol/default.go b/api/agent/protocol/default.go index 776b7529a..51d3e54c6 100644 --- a/api/agent/protocol/default.go +++ b/api/agent/protocol/default.go @@ -1,12 +1,14 @@ package protocol import ( + "context" "io" - "net/http" ) // DefaultProtocol is the protocol used by cold-containers type DefaultProtocol struct{} -func (p *DefaultProtocol) IsStreamable() bool { return false } -func (d *DefaultProtocol) Dispatch(w io.Writer, req *http.Request) error { return nil } +func (p *DefaultProtocol) IsStreamable() bool { return false } +func (d *DefaultProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + return nil +} diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index aab124195..a9132dcfb 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -1,6 +1,7 @@ package protocol import ( + "context" "errors" "io" "net/http" @@ -14,8 +15,8 @@ type errorProto struct { error } -func (e errorProto) IsStreamable() bool { return false } -func (e errorProto) Dispatch(io.Writer, *http.Request) error { return e } +func (e errorProto) IsStreamable() bool { return false } +func (e errorProto) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { return e } // ContainerIO defines the interface used to talk to a hot function. // Internally, a protocol must know when to alternate between stdin and stdout. @@ -26,7 +27,64 @@ type ContainerIO interface { // Dispatch will handle sending stdin and stdout to a container. Implementers // of Dispatch may format the input and output differently. Dispatch must respect // the req.Context() timeout / cancellation. - Dispatch(w io.Writer, req *http.Request) error + Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error +} + +// CallInfo is passed into dispatch with only the required data the protocols require +type CallInfo interface { + CallID() string + ContentType() string + Input() io.Reader + + // ProtocolType let's function/fdk's know what type original request is. Only 'http' for now. + // This could be abstracted into separate Protocol objects for each type and all the following information could go in there. + // This is a bit confusing because we also have the protocol's for getting information in and out of the function containers. + ProtocolType() string + Request() *http.Request + RequestURL() string + Headers() map[string][]string +} + +type callInfoImpl struct { + call *models.Call + req *http.Request +} + +func (ci callInfoImpl) CallID() string { + return ci.call.ID +} + +func (ci callInfoImpl) ContentType() string { + return ci.req.Header.Get("Content-Type") +} + +// Input returns the call's input/body +func (ci callInfoImpl) Input() io.Reader { + return ci.req.Body +} + +func (ci callInfoImpl) ProtocolType() string { + return "http" +} + +// Request basically just for the http format, since that's the only that makes sense to have the full request as is +func (ci callInfoImpl) Request() *http.Request { + return ci.req +} +func (ci callInfoImpl) RequestURL() string { + return ci.req.URL.RequestURI() +} + +func (ci callInfoImpl) Headers() map[string][]string { + return ci.req.Header +} + +func NewCallInfo(call *models.Call, req *http.Request) CallInfo { + ci := &callInfoImpl{ + call: call, + req: req, + } + return ci } // Protocol defines all protocols that operates a ContainerIO. diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index 2291ece8d..82331ddf3 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -2,6 +2,7 @@ package protocol import ( "bufio" + "context" "fmt" "io" "net/http" @@ -25,8 +26,8 @@ func (p *HTTPProtocol) IsStreamable() bool { return true } // over the timeout. // TODO maybe we should take io.Writer, io.Reader but then we have to // dump the request to a buffer again :( -func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error { - err := DumpRequestTo(h.in, req) // TODO timeout +func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + err := DumpRequestTo(h.in, ci.Request()) // TODO timeout if err != nil { return err } @@ -36,7 +37,7 @@ func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error { // and status code first since calling res.Write will just write the http // response as the body (headers and all) - res, err := http.ReadResponse(bufio.NewReader(h.out), req) // TODO timeout + res, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) // TODO timeout if err != nil { return err } @@ -54,7 +55,7 @@ func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error { } else { // logs can just copy the full thing in there, headers and all. - res, err := http.ReadResponse(bufio.NewReader(h.out), req) // TODO timeout + res, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) // TODO timeout if err != nil { return err } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 53c656bf9..fbfd6a7c3 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -2,7 +2,9 @@ package protocol import ( "bytes" + "context" "encoding/json" + "fmt" "io" "net/http" ) @@ -10,13 +12,39 @@ import ( // This is sent into the function // All HTTP request headers should be set in env type jsonio struct { - Headers http.Header `json:"headers,omitempty"` - Body string `json:"body"` + Body string `json:"body"` + ContentType string `json:"content_type"` +} + +// CallRequestHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now. +type CallRequestHTTP struct { + Type string `json:"type"` + RequestURL string `json:"request_url"` + Headers http.Header `json:"headers"` +} + +// CallResponseHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now. +type CallResponseHTTP struct { StatusCode int `json:"status_code,omitempty"` + Headers http.Header `json:"headers,omitempty"` +} + +// jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow +type jsonIn struct { + jsonio + CallID string `json:"call_id"` + Protocol *CallRequestHTTP `json:"protocol"` +} + +// jsonOut the expected response from the function container +type jsonOut struct { + jsonio + Protocol *CallResponseHTTP `json:"protocol,omitempty"` } // JSONProtocol converts stdin/stdout streams from HTTP into JSON format. type JSONProtocol struct { + // These are the container input streams, not the input from the request or the output for the response in io.Writer out io.Reader } @@ -35,59 +63,117 @@ func writeString(err error, dst io.Writer, str string) error { // TODO(xxx): headers, query parameters, body - what else should we add to func's payload? // TODO(xxx): get rid of request body buffering somehow -func (h *JSONProtocol) DumpJSON(req *http.Request) error { +// @treeder: I don't know why we don't just JSON marshal this, this is rough... +func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error { stdin := json.NewEncoder(h.in) bb := new(bytes.Buffer) - _, err := bb.ReadFrom(req.Body) + _, err := bb.ReadFrom(ci.Input()) + // todo: better/simpler err handling if err != nil { return err } - err = writeString(err, h.in, "{") + // open + err = writeString(err, h.in, "{\n") + if err != nil { + return err + } + + // call_id + err = writeString(err, h.in, `"call_id":`) + if err != nil { + return err + } + err = stdin.Encode(ci.CallID()) + if err != nil { + return err + } + + // content_type + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"content_type":`) + if err != nil { + return err + } + err = stdin.Encode(ci.ContentType()) + if err != nil { + return err + } + + // body + err = writeString(err, h.in, ",") err = writeString(err, h.in, `"body":`) if err != nil { return err } err = stdin.Encode(bb.String()) - err = writeString(err, h.in, ",") - err = writeString(err, h.in, `"headers":`) if err != nil { return err } - err = stdin.Encode(req.Header) + + // now the extras err = writeString(err, h.in, ",") - err = writeString(err, h.in, `"query_parameters":`) - if err != nil { - return err + err = writeString(err, h.in, `"protocol":{`) // OK name? This is what OpenEvents is calling it in initial proposal + { + err = writeString(err, h.in, `"type":`) + if err != nil { + return err + } + err = stdin.Encode(ci.ProtocolType()) + + // request URL + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"request_url":`) + if err != nil { + return err + } + err = stdin.Encode(ci.RequestURL()) + if err != nil { + return err + } + + // HTTP headers + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"headers":`) + if err != nil { + return err + } + err = stdin.Encode(ci.Headers()) } - err = stdin.Encode(req.URL.RawQuery) err = writeString(err, h.in, "}") + + // close + err = writeString(err, h.in, "\n}\n\n") return err } -func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - err := h.DumpJSON(req) +func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { + // write input into container + err := h.writeJSONToContainer(ci) if err != nil { return err } - jout := new(jsonio) + + // now read the container output + jout := new(jsonOut) dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { - return err + return fmt.Errorf("error decoding JSON from user function: %v", err) } if rw, ok := w.(http.ResponseWriter); ok { // this has to be done for pulling out: // - status code // - body // - headers - for k, vs := range jout.Headers { - for _, v := range vs { - rw.Header().Add(k, v) // on top of any specified on the route + if jout.Protocol != nil { + p := jout.Protocol + for k, v := range p.Headers { + for _, vv := range v { + rw.Header().Add(k, vv) // on top of any specified on the route + } + } + if p.StatusCode != 0 { + rw.WriteHeader(p.StatusCode) } - } - if jout.StatusCode != 0 { - rw.WriteHeader(jout.StatusCode) - } else { - rw.WriteHeader(200) } _, err = io.WriteString(rw, jout.Body) // TODO timeout if err != nil { diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index bf389c9e7..4805fb21d 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -9,18 +9,14 @@ import ( "net/url" "reflect" "testing" + + "github.com/fnproject/fn/api/models" ) type RequestData struct { A string `json:"a"` } -type funcRequestBody struct { - Body string `json:"body"` - Headers http.Header `json:"headers"` - QueryParameters string `json:"query_parameters"` -} - func setupRequest(data interface{}) *http.Request { req := &http.Request{ Method: http.MethodPost, @@ -48,19 +44,21 @@ func setupRequest(data interface{}) *http.Request { return req } -func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { +func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { rDataBefore := RequestData{A: "a"} req := setupRequest(rDataBefore) r, w := io.Pipe() + call := &models.Call{} + ci := &callInfoImpl{call, req} proto := JSONProtocol{w, r} go func() { - err := proto.DumpJSON(req) + err := proto.writeJSONToContainer(ci) if err != nil { t.Error(err.Error()) } w.Close() }() - incomingReq := new(funcRequestBody) + incomingReq := &jsonIn{} bb := new(bytes.Buffer) _, err := bb.ReadFrom(r) @@ -82,19 +80,21 @@ func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { } } -func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { +func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) { req := setupRequest(nil) + call := &models.Call{} r, w := io.Pipe() + ci := &callInfoImpl{call, req} proto := JSONProtocol{w, r} go func() { - err := proto.DumpJSON(req) + err := proto.writeJSONToContainer(ci) if err != nil { t.Error(err.Error()) } w.Close() }() - incomingReq := new(funcRequestBody) + incomingReq := &jsonIn{} bb := new(bytes.Buffer) _, err := bb.ReadFrom(r) @@ -109,25 +109,27 @@ func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { t.Errorf("Request body assertion mismatch: expected: %s, got %s", "", incomingReq.Body) } - if ok := reflect.DeepEqual(req.Header, incomingReq.Headers); !ok { + if ok := reflect.DeepEqual(req.Header, incomingReq.Protocol.Headers); !ok { t.Errorf("Request headers assertion mismatch: expected: %s, got %s", - req.Header, incomingReq.Headers) + req.Header, incomingReq.Protocol.Headers) } } -func TestJSONProtocolDumpJSONRequestWithQuery(t *testing.T) { +func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) { req := setupRequest(nil) r, w := io.Pipe() + call := &models.Call{} + ci := &callInfoImpl{call, req} proto := JSONProtocol{w, r} go func() { - err := proto.DumpJSON(req) + err := proto.writeJSONToContainer(ci) if err != nil { t.Error(err.Error()) } w.Close() }() - incomingReq := new(funcRequestBody) + incomingReq := &jsonIn{} bb := new(bytes.Buffer) _, err := bb.ReadFrom(r) @@ -138,8 +140,8 @@ func TestJSONProtocolDumpJSONRequestWithQuery(t *testing.T) { if err != nil { t.Error(err.Error()) } - if incomingReq.QueryParameters != req.URL.RawQuery { - t.Errorf("Request query string assertion mismatch: expected: %s, got %s", - req.URL.RawQuery, incomingReq.QueryParameters) + if incomingReq.Protocol.RequestURL != req.URL.RequestURI() { + t.Errorf("Request URL does not match protocol URL: expected: %s, got %s", + req.URL.RequestURI(), incomingReq.Protocol.RequestURL) } } diff --git a/docs/function-format.md b/docs/function-format.md index 12b40fcbf..134549ba4 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -20,10 +20,12 @@ The format is still up for discussion and in order to move forward and remain fl TODO: Put common env vars here, that show up in all formats. -#### Default I/O Format +### Default I/O Format The default format is simply the request body itself plus some environment variables. For instance, if someone were to post a JSON body, the unmodified body would be sent in via STDIN. The result comes via STDOUT. When task is done, pipes are closed and the container running the function is terminated. +#### Pros/Cons + Pros: * Very simple to use @@ -32,13 +34,13 @@ Cons: * Not very efficient resource utilization - one function for one event. -#### HTTP I/O Format +### HTTP I/O Format -`--format http` +`format: http` HTTP format could be a good option as it is in very common use obviously, most languages have some semi-easy way to parse it, and it supports hot format. The response will look like a HTTP response. The communication is still done via stdin/stdout, but these pipes are never closed unless the container is explicitly terminated. The basic format is: -Request: +#### Request ```text GET / HTTP/1.1 @@ -47,7 +49,7 @@ Content-Length: 5 world ``` -Response: +#### Response ```text HTTP/1.1 200 OK @@ -60,6 +62,8 @@ The header keys and values would be populated with information about the functio `Content-Length` is determined by the [Content-Length](https://tools.ietf.org/html/rfc7230#section-3.3.3) header, which is mandatory both for input and output. It is used by Functions to know when stop writing to STDIN and reading from STDOUT. +#### Pros/Cons + Pros: * Supports streaming @@ -70,11 +74,13 @@ Cons: * Requires a parsing library or fair amount of code to parse headers properly * Double parsing - headers + body (if body is to be parsed, such as json) -#### JSON I/O Format +### JSON I/O Format -`--format json` +`format: json` -Fn accepts request data of the following format: +The JSON format is a nice hot format as it is easy to parse in most languages. + +If a request comes in like this: ```json { @@ -82,28 +88,62 @@ Fn accepts request data of the following format: } ``` -Internally function receives data in following format: +#### Input + +Internally functions receive data in the example format below: ```json { - "body": "{\"some\":\"input\"}\n", - "headers": { - "yo": ["dawg"] + "call_id": "123", + "content_type": "application/json", + "body": "{\"some\":\"input\"}", + "protocol": { + "type": "http", + "request_url": "http://localhost:8080/r/myapp/myfunc?q=hi", + "headers": { + "Content-Type": ["application/json"], + "Other-Header": ["something"] + } } } +BLANK LINE +{ + NEXT INPUT OBJECT +} ``` -Function's output format should have following format: +* call_id - the unique ID for the call. +* content_type - format of the `body` parameter. +* protocol - arbitrary map of protocol specific data. The above example shows what the HTTP protocol handler passes in. Subject to change and reduces reusability of your functions. **USE AT YOUR OWN RISK**. + +Each request will be separated by a blank line. + +#### Output + +Function's output format should have the following format: + ```json { - "status_code": 200, - "body": "...", - "headeres": { - "A": ["b"] + "body": "{\"some\":\"output\"}", + "content_type": "application/json", + "protocol": { + "status_code": 200, + "headers": { + "Other-Header": ["something"] + } } } +BLANK LINE +{ + NEXT OUTPUT OBJECT +} ``` -At client side user will receive HTTP response with HTTP headers, status code and the body from taken from function's response. + +* body - required - the response body. +* content_type - optional - format of `body`. Default is application/json. +* protocol - optional - protocol specific response options. Entirely optional. Contents defined by each protocol. + +#### Pros/Cons Pros: @@ -114,13 +154,7 @@ Cons: * Not streamable -## Output - -### Output back to client - -Typically JSON is the output format and is the default output, but any format can be used. - -### Logging +## Logging Standard error is reserved for logging, like it was meant to be. Anything you output to STDERR will show up in the logs. And if you use a log collector like logspout, you can collect those logs in a central location. See [logging](logging.md).