Update json (#463)

* wip

* wip

* Added more fields to JSON and added blank line between objects.

* Update tests.

* wip

* Updated to represent recent discussions.

* Fixed up the json test

* More docs

* Changed from blank line to bracket, newline, open bracket.

* Blank line added back, easier for delimiting.
This commit is contained in:
Travis Reeder
2017-11-16 09:59:13 -08:00
committed by GitHub
parent 279e5248ba
commit 96cfc9f5c1
8 changed files with 266 additions and 81 deletions

View File

@@ -11,7 +11,7 @@ build:
go build -o functions
install:
go install
go build -o ${GOPATH}/bin/fn-server
test:
./test.sh

View File

@@ -502,13 +502,15 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
go func() {
// TODO make sure stdin / stdout not blocked if container dies or we leak goroutine
// we have to make sure this gets shut down or 2 threads will be reading/writing in/out
errApp <- s.proto.Dispatch(call.w, call.req)
ci := protocol.NewCallInfo(call.Model(), call.req)
errApp <- s.proto.Dispatch(ctx, ci, call.w)
}()
select {
case err := <-s.errC: // error from container
return err
case err := <-errApp:
// would be great to be able to decipher what error is returning from here so we can show better messages
return err
case <-ctx.Done(): // call timeout
return ctx.Err()

View File

@@ -1,12 +1,14 @@
package protocol
import (
"context"
"io"
"net/http"
)
// DefaultProtocol is the protocol used by cold-containers
type DefaultProtocol struct{}
func (p *DefaultProtocol) IsStreamable() bool { return false }
func (d *DefaultProtocol) Dispatch(w io.Writer, req *http.Request) error { return nil }
func (p *DefaultProtocol) IsStreamable() bool { return false }
func (d *DefaultProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
return nil
}

View File

@@ -1,6 +1,7 @@
package protocol
import (
"context"
"errors"
"io"
"net/http"
@@ -14,8 +15,8 @@ type errorProto struct {
error
}
func (e errorProto) IsStreamable() bool { return false }
func (e errorProto) Dispatch(io.Writer, *http.Request) error { return e }
func (e errorProto) IsStreamable() bool { return false }
func (e errorProto) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error { return e }
// ContainerIO defines the interface used to talk to a hot function.
// Internally, a protocol must know when to alternate between stdin and stdout.
@@ -26,7 +27,64 @@ type ContainerIO interface {
// Dispatch will handle sending stdin and stdout to a container. Implementers
// of Dispatch may format the input and output differently. Dispatch must respect
// the req.Context() timeout / cancellation.
Dispatch(w io.Writer, req *http.Request) error
Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error
}
// CallInfo is passed into dispatch with only the required data the protocols require
type CallInfo interface {
CallID() string
ContentType() string
Input() io.Reader
// ProtocolType let's function/fdk's know what type original request is. Only 'http' for now.
// This could be abstracted into separate Protocol objects for each type and all the following information could go in there.
// This is a bit confusing because we also have the protocol's for getting information in and out of the function containers.
ProtocolType() string
Request() *http.Request
RequestURL() string
Headers() map[string][]string
}
type callInfoImpl struct {
call *models.Call
req *http.Request
}
func (ci callInfoImpl) CallID() string {
return ci.call.ID
}
func (ci callInfoImpl) ContentType() string {
return ci.req.Header.Get("Content-Type")
}
// Input returns the call's input/body
func (ci callInfoImpl) Input() io.Reader {
return ci.req.Body
}
func (ci callInfoImpl) ProtocolType() string {
return "http"
}
// Request basically just for the http format, since that's the only that makes sense to have the full request as is
func (ci callInfoImpl) Request() *http.Request {
return ci.req
}
func (ci callInfoImpl) RequestURL() string {
return ci.req.URL.RequestURI()
}
func (ci callInfoImpl) Headers() map[string][]string {
return ci.req.Header
}
func NewCallInfo(call *models.Call, req *http.Request) CallInfo {
ci := &callInfoImpl{
call: call,
req: req,
}
return ci
}
// Protocol defines all protocols that operates a ContainerIO.

View File

@@ -2,6 +2,7 @@ package protocol
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
@@ -25,8 +26,8 @@ func (p *HTTPProtocol) IsStreamable() bool { return true }
// over the timeout.
// TODO maybe we should take io.Writer, io.Reader but then we have to
// dump the request to a buffer again :(
func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error {
err := DumpRequestTo(h.in, req) // TODO timeout
func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
err := DumpRequestTo(h.in, ci.Request()) // TODO timeout
if err != nil {
return err
}
@@ -36,7 +37,7 @@ func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error {
// and status code first since calling res.Write will just write the http
// response as the body (headers and all)
res, err := http.ReadResponse(bufio.NewReader(h.out), req) // TODO timeout
res, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) // TODO timeout
if err != nil {
return err
}
@@ -54,7 +55,7 @@ func (h *HTTPProtocol) Dispatch(w io.Writer, req *http.Request) error {
} else {
// logs can just copy the full thing in there, headers and all.
res, err := http.ReadResponse(bufio.NewReader(h.out), req) // TODO timeout
res, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request()) // TODO timeout
if err != nil {
return err
}

View File

@@ -2,7 +2,9 @@ package protocol
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
)
@@ -10,13 +12,39 @@ import (
// This is sent into the function
// All HTTP request headers should be set in env
type jsonio struct {
Headers http.Header `json:"headers,omitempty"`
Body string `json:"body"`
Body string `json:"body"`
ContentType string `json:"content_type"`
}
// CallRequestHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
type CallRequestHTTP struct {
Type string `json:"type"`
RequestURL string `json:"request_url"`
Headers http.Header `json:"headers"`
}
// CallResponseHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
type CallResponseHTTP struct {
StatusCode int `json:"status_code,omitempty"`
Headers http.Header `json:"headers,omitempty"`
}
// jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow
type jsonIn struct {
jsonio
CallID string `json:"call_id"`
Protocol *CallRequestHTTP `json:"protocol"`
}
// jsonOut the expected response from the function container
type jsonOut struct {
jsonio
Protocol *CallResponseHTTP `json:"protocol,omitempty"`
}
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
type JSONProtocol struct {
// These are the container input streams, not the input from the request or the output for the response
in io.Writer
out io.Reader
}
@@ -35,59 +63,117 @@ func writeString(err error, dst io.Writer, str string) error {
// TODO(xxx): headers, query parameters, body - what else should we add to func's payload?
// TODO(xxx): get rid of request body buffering somehow
func (h *JSONProtocol) DumpJSON(req *http.Request) error {
// @treeder: I don't know why we don't just JSON marshal this, this is rough...
func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
stdin := json.NewEncoder(h.in)
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(req.Body)
_, err := bb.ReadFrom(ci.Input())
// todo: better/simpler err handling
if err != nil {
return err
}
err = writeString(err, h.in, "{")
// open
err = writeString(err, h.in, "{\n")
if err != nil {
return err
}
// call_id
err = writeString(err, h.in, `"call_id":`)
if err != nil {
return err
}
err = stdin.Encode(ci.CallID())
if err != nil {
return err
}
// content_type
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"content_type":`)
if err != nil {
return err
}
err = stdin.Encode(ci.ContentType())
if err != nil {
return err
}
// body
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"body":`)
if err != nil {
return err
}
err = stdin.Encode(bb.String())
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"headers":`)
if err != nil {
return err
}
err = stdin.Encode(req.Header)
// now the extras
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"query_parameters":`)
if err != nil {
return err
err = writeString(err, h.in, `"protocol":{`) // OK name? This is what OpenEvents is calling it in initial proposal
{
err = writeString(err, h.in, `"type":`)
if err != nil {
return err
}
err = stdin.Encode(ci.ProtocolType())
// request URL
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"request_url":`)
if err != nil {
return err
}
err = stdin.Encode(ci.RequestURL())
if err != nil {
return err
}
// HTTP headers
err = writeString(err, h.in, ",")
err = writeString(err, h.in, `"headers":`)
if err != nil {
return err
}
err = stdin.Encode(ci.Headers())
}
err = stdin.Encode(req.URL.RawQuery)
err = writeString(err, h.in, "}")
// close
err = writeString(err, h.in, "\n}\n\n")
return err
}
func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error {
err := h.DumpJSON(req)
func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
// write input into container
err := h.writeJSONToContainer(ci)
if err != nil {
return err
}
jout := new(jsonio)
// now read the container output
jout := new(jsonOut)
dec := json.NewDecoder(h.out)
if err := dec.Decode(jout); err != nil {
return err
return fmt.Errorf("error decoding JSON from user function: %v", err)
}
if rw, ok := w.(http.ResponseWriter); ok {
// this has to be done for pulling out:
// - status code
// - body
// - headers
for k, vs := range jout.Headers {
for _, v := range vs {
rw.Header().Add(k, v) // on top of any specified on the route
if jout.Protocol != nil {
p := jout.Protocol
for k, v := range p.Headers {
for _, vv := range v {
rw.Header().Add(k, vv) // on top of any specified on the route
}
}
if p.StatusCode != 0 {
rw.WriteHeader(p.StatusCode)
}
}
if jout.StatusCode != 0 {
rw.WriteHeader(jout.StatusCode)
} else {
rw.WriteHeader(200)
}
_, err = io.WriteString(rw, jout.Body) // TODO timeout
if err != nil {

View File

@@ -9,18 +9,14 @@ import (
"net/url"
"reflect"
"testing"
"github.com/fnproject/fn/api/models"
)
type RequestData struct {
A string `json:"a"`
}
type funcRequestBody struct {
Body string `json:"body"`
Headers http.Header `json:"headers"`
QueryParameters string `json:"query_parameters"`
}
func setupRequest(data interface{}) *http.Request {
req := &http.Request{
Method: http.MethodPost,
@@ -48,19 +44,21 @@ func setupRequest(data interface{}) *http.Request {
return req
}
func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) {
func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
rDataBefore := RequestData{A: "a"}
req := setupRequest(rDataBefore)
r, w := io.Pipe()
call := &models.Call{}
ci := &callInfoImpl{call, req}
proto := JSONProtocol{w, r}
go func() {
err := proto.DumpJSON(req)
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := new(funcRequestBody)
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
@@ -82,19 +80,21 @@ func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) {
}
}
func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) {
func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) {
req := setupRequest(nil)
call := &models.Call{}
r, w := io.Pipe()
ci := &callInfoImpl{call, req}
proto := JSONProtocol{w, r}
go func() {
err := proto.DumpJSON(req)
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := new(funcRequestBody)
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
@@ -109,25 +109,27 @@ func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) {
t.Errorf("Request body assertion mismatch: expected: %s, got %s",
"<empty-string>", incomingReq.Body)
}
if ok := reflect.DeepEqual(req.Header, incomingReq.Headers); !ok {
if ok := reflect.DeepEqual(req.Header, incomingReq.Protocol.Headers); !ok {
t.Errorf("Request headers assertion mismatch: expected: %s, got %s",
req.Header, incomingReq.Headers)
req.Header, incomingReq.Protocol.Headers)
}
}
func TestJSONProtocolDumpJSONRequestWithQuery(t *testing.T) {
func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) {
req := setupRequest(nil)
r, w := io.Pipe()
call := &models.Call{}
ci := &callInfoImpl{call, req}
proto := JSONProtocol{w, r}
go func() {
err := proto.DumpJSON(req)
err := proto.writeJSONToContainer(ci)
if err != nil {
t.Error(err.Error())
}
w.Close()
}()
incomingReq := new(funcRequestBody)
incomingReq := &jsonIn{}
bb := new(bytes.Buffer)
_, err := bb.ReadFrom(r)
@@ -138,8 +140,8 @@ func TestJSONProtocolDumpJSONRequestWithQuery(t *testing.T) {
if err != nil {
t.Error(err.Error())
}
if incomingReq.QueryParameters != req.URL.RawQuery {
t.Errorf("Request query string assertion mismatch: expected: %s, got %s",
req.URL.RawQuery, incomingReq.QueryParameters)
if incomingReq.Protocol.RequestURL != req.URL.RequestURI() {
t.Errorf("Request URL does not match protocol URL: expected: %s, got %s",
req.URL.RequestURI(), incomingReq.Protocol.RequestURL)
}
}

View File

@@ -20,10 +20,12 @@ The format is still up for discussion and in order to move forward and remain fl
TODO: Put common env vars here, that show up in all formats.
#### Default I/O Format
### Default I/O Format
The default format is simply the request body itself plus some environment variables. For instance, if someone were to post a JSON body, the unmodified body would be sent in via STDIN. The result comes via STDOUT. When task is done, pipes are closed and the container running the function is terminated.
#### Pros/Cons
Pros:
* Very simple to use
@@ -32,13 +34,13 @@ Cons:
* Not very efficient resource utilization - one function for one event.
#### HTTP I/O Format
### HTTP I/O Format
`--format http`
`format: http`
HTTP format could be a good option as it is in very common use obviously, most languages have some semi-easy way to parse it, and it supports hot format. The response will look like a HTTP response. The communication is still done via stdin/stdout, but these pipes are never closed unless the container is explicitly terminated. The basic format is:
Request:
#### Request
```text
GET / HTTP/1.1
@@ -47,7 +49,7 @@ Content-Length: 5
world
```
Response:
#### Response
```text
HTTP/1.1 200 OK
@@ -60,6 +62,8 @@ The header keys and values would be populated with information about the functio
`Content-Length` is determined by the [Content-Length](https://tools.ietf.org/html/rfc7230#section-3.3.3) header, which is mandatory both for input and output. It is used by Functions to know when stop writing to STDIN and reading from STDOUT.
#### Pros/Cons
Pros:
* Supports streaming
@@ -70,11 +74,13 @@ Cons:
* Requires a parsing library or fair amount of code to parse headers properly
* Double parsing - headers + body (if body is to be parsed, such as json)
#### JSON I/O Format
### JSON I/O Format
`--format json`
`format: json`
Fn accepts request data of the following format:
The JSON format is a nice hot format as it is easy to parse in most languages.
If a request comes in like this:
```json
{
@@ -82,28 +88,62 @@ Fn accepts request data of the following format:
}
```
Internally function receives data in following format:
#### Input
Internally functions receive data in the example format below:
```json
{
"body": "{\"some\":\"input\"}\n",
"headers": {
"yo": ["dawg"]
"call_id": "123",
"content_type": "application/json",
"body": "{\"some\":\"input\"}",
"protocol": {
"type": "http",
"request_url": "http://localhost:8080/r/myapp/myfunc?q=hi",
"headers": {
"Content-Type": ["application/json"],
"Other-Header": ["something"]
}
}
}
BLANK LINE
{
NEXT INPUT OBJECT
}
```
Function's output format should have following format:
* call_id - the unique ID for the call.
* content_type - format of the `body` parameter.
* protocol - arbitrary map of protocol specific data. The above example shows what the HTTP protocol handler passes in. Subject to change and reduces reusability of your functions. **USE AT YOUR OWN RISK**.
Each request will be separated by a blank line.
#### Output
Function's output format should have the following format:
```json
{
"status_code": 200,
"body": "...",
"headeres": {
"A": ["b"]
"body": "{\"some\":\"output\"}",
"content_type": "application/json",
"protocol": {
"status_code": 200,
"headers": {
"Other-Header": ["something"]
}
}
}
BLANK LINE
{
NEXT OUTPUT OBJECT
}
```
At client side user will receive HTTP response with HTTP headers, status code and the body from taken from function's response.
* body - required - the response body.
* content_type - optional - format of `body`. Default is application/json.
* protocol - optional - protocol specific response options. Entirely optional. Contents defined by each protocol.
#### Pros/Cons
Pros:
@@ -114,13 +154,7 @@ Cons:
* Not streamable
## Output
### Output back to client
Typically JSON is the output format and is the default output, but any format can be used.
### Logging
## Logging
Standard error is reserved for logging, like it was meant to be. Anything you output to STDERR will show up in the logs. And if you use a log
collector like logspout, you can collect those logs in a central location. See [logging](logging.md).