From b6b9b55ca905e235b35dcb4be47b3baceaf13b06 Mon Sep 17 00:00:00 2001 From: amykang2020 Date: Thu, 7 Sep 2017 02:00:22 -0700 Subject: [PATCH 01/30] apply/make Travis's json-format branch prototype to work with latest restructured master; added StatusCode to JSONOutput server-function contract --- api/agent/protocol/factory.go | 7 ++ api/agent/protocol/json.go | 98 ++++++++++++++++++++ api/models/call.go | 2 + api/models/route.go | 2 +- docs/function-format.md | 53 +++++++---- examples/formats/json/go/.gitignore | 6 ++ examples/formats/json/go/README.md | 3 + examples/formats/json/go/func.go | 69 ++++++++++++++ examples/formats/json/go/sample.payload.json | 3 + examples/formats/json/go/test.json | 26 ++++++ 10 files changed, 250 insertions(+), 19 deletions(-) create mode 100644 api/agent/protocol/json.go create mode 100644 examples/formats/json/go/.gitignore create mode 100644 examples/formats/json/go/README.md create mode 100644 examples/formats/json/go/func.go create mode 100644 examples/formats/json/go/sample.payload.json create mode 100644 examples/formats/json/go/test.json diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index 44f0f27da..5fb5bb2e1 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -36,6 +36,7 @@ type Protocol string const ( Default Protocol = models.FormatDefault HTTP Protocol = models.FormatHTTP + JSON Protocol = models.FormatJSON Empty Protocol = "" ) @@ -45,6 +46,8 @@ func (p *Protocol) UnmarshalJSON(b []byte) error { *p = Default case HTTP: *p = HTTP + case JSON: + *p = JSON default: return errInvalidProtocol } @@ -57,6 +60,8 @@ func (p Protocol) MarshalJSON() ([]byte, error) { return []byte(Default), nil case HTTP: return []byte(HTTP), nil + case JSON: + return []byte(JSON), nil } return nil, errInvalidProtocol } @@ -67,6 +72,8 @@ func New(p Protocol, in io.Writer, out io.Reader) ContainerIO { switch p { case HTTP: return &HTTPProtocol{in, out} + case JSON: + return &JSONProtocol{in, out} case Default, Empty: return &DefaultProtocol{} } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go new file mode 100644 index 000000000..15b5fbc28 --- /dev/null +++ b/api/agent/protocol/json.go @@ -0,0 +1,98 @@ +package protocol + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" +) + +// JSONInput is what's sent into the function +// All HTTP request headers should be set in env +type JSONInput struct { + RequestURL string `json:"request_url"` + CallID string `json:"call_id"` + Method string `json:"method"` + Body string `json:"body"` +} + +// JSONOutput function must return this format +// StatusCode value must be a HTTP status code +type JSONOutput struct { + StatusCode int `json:"status"` + Body string `json:"body"` +} + +// JSONProtocol converts stdin/stdout streams from HTTP into JSON format. +type JSONProtocol struct { + in io.Writer + out io.Reader +} + +func (p *JSONProtocol) IsStreamable() bool { + return true +} + +func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { + reqURL := req.Header.Get("REQUEST_URL") + method := req.Header.Get("METHOD") + callID := req.Header.Get("CALL_ID") + + // TODO content-length or chunked encoding + var body bytes.Buffer + if req.Body != nil { + var dest io.Writer = &body + + // TODO copy w/ ctx and check err + io.Copy(dest, req.Body) + } + + // convert to JSON func format + jin := &JSONInput{ + RequestURL: reqURL, + Method: method, + CallID: callID, + Body: body.String(), + } + b, err := json.Marshal(jin) + if err != nil { + // this shouldn't happen + return fmt.Errorf("error marshalling JSONInput: %v", err) + } + h.in.Write(b) + + // TODO: put max size on how big the response can be so we don't blow up + jout := &JSONOutput{} + dec := json.NewDecoder(h.out) + if err := dec.Decode(jout); err != nil { + // TODO: how do we get an error back to the client?? + return fmt.Errorf("error unmarshalling JSONOutput: %v", err) + } + + // res := &http.Response{} + // res.Body = strings.NewReader(jout.Body) + // TODO: shouldn't we pass back the full response object or something so we can set some things on it here? + // For instance, user could set response content type or what have you. + //io.Copy(cfg.Stdout, strings.NewReader(jout.Body)) + + if rw, ok := w.(http.ResponseWriter); ok { + b, err = json.Marshal(jout.Body) + if err != nil { + return fmt.Errorf("error unmarshalling JSONOutput.Body: %v", err) + } + rw.WriteHeader(jout.StatusCode) + rw.Write(b) // TODO timeout + } else { + // logs can just copy the full thing in there, headers and all. + b, err = json.Marshal(jout) + if err != nil { + return fmt.Errorf("error unmarshalling JSONOutput: %v", err) + } + + w.Write(b) // TODO timeout + + } + return nil + +} diff --git a/api/models/call.go b/api/models/call.go index 37591a244..9aa078781 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -18,6 +18,8 @@ const ( FormatDefault = "default" // FormatHTTP ... FormatHTTP = "http" + // FormatJSON ... + FormatJSON = "json" ) var possibleStatuses = [...]string{"delayed", "queued", "running", "success", "error", "cancelled"} diff --git a/api/models/route.go b/api/models/route.go index 74ff51af1..6a90aea3d 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -95,7 +95,7 @@ func (r *Route) Validate() error { return ErrRoutesInvalidType } - if r.Format != FormatDefault && r.Format != FormatHTTP { + if r.Format != FormatDefault && r.Format != FormatHTTP && r.Format != FormatJSON { return ErrRoutesInvalidFormat } diff --git a/docs/function-format.md b/docs/function-format.md index 651242ddb..2a54d1088 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -2,7 +2,7 @@ This document will describe the details of how a function works, inputs/outputs, etc. -## Formats +## Input Formats ### STDIN and Environment Variables @@ -18,9 +18,11 @@ The goals of the input format are the following: The format is still up for discussion and in order to move forward and remain flexible, it's likely we will just allow different input formats and the function creator can decide what they want, on a per function basis. Default being the simplest format to use. -#### Default I/O Format +TODO: Put common env vars here, that show up in all formats. -The default I/O format is simply the request body itself plus some environment variables. For instance, if someone were to post a JSON body, the unmodified body would be sent in via STDIN. The result comes via STDOUT. When call is done, pipes are closed and the container running the function is terminated. +#### Default Input Format + +The default format is simply the request body itself plus some environment variables. For instance, if someone were to post a JSON body, the unmodified body would be sent in via STDIN. The result comes via STDOUT. When task is done, pipes are closed and the container running the function is terminated. Pros: @@ -30,14 +32,15 @@ Cons: * Not streamable -#### HTTP I/O Format +#### HTTP Input Format `--format http` HTTP format could be a good option as it is in very common use obviously, most languages have some semi-easy way to parse it, and it's streamable. The response will look like a HTTP response. The communication is still done via stdin/stdout, but these pipes are never closed unless the container is explicitly terminated. The basic format is: Request: -``` + +```text GET / HTTP/1.1 Content-Length: 5 @@ -45,7 +48,8 @@ world ``` Response: -``` + +```text HTTP/1.1 200 OK Content-Length: 11 @@ -66,34 +70,47 @@ Cons: * Requires a parsing library or fair amount of code to parse headers properly * Double parsing - headers + body (if body is to be parsed, such as json) -#### JSON I/O Format (not implemented) +#### JSON Input Format `--format json` -The idea here is to keep the HTTP base structure, but make it a bit easier to parse by making the `request line` and `headers` a JSON struct. -Eg: +An easy to parse JSON structure. -``` +```json { - "request_url":"http://....", - "params": { - "blog_name": "yeezy" + "request_url": "http://....", + "call_id": "abc123", + "method": "GET", + "body": { + "some": "input" + } +} +{ + "request_url":"http://....", + "call_id": "edf456", + "method": "GET", + "body": { + "other": "input" } } -BLANK LINE -BODY ``` Pros: * Streamable -* Easy to parse headers +* Easy to parse Cons: -* New, unknown format +* ??? -### STDERR +## Output + +### Output back to client + +Typically JSON is the output format and is the default output, but any format can be used. + +### Logging Standard error is reserved for logging, like it was meant to be. Anything you output to STDERR will show up in the logs. And if you use a log collector like logspout, you can collect those logs in a central location. See [logging](logging.md). diff --git a/examples/formats/json/go/.gitignore b/examples/formats/json/go/.gitignore new file mode 100644 index 000000000..d450e309c --- /dev/null +++ b/examples/formats/json/go/.gitignore @@ -0,0 +1,6 @@ +vendor/ +/go +/app +/__uberscript__ + +func.yaml diff --git a/examples/formats/json/go/README.md b/examples/formats/json/go/README.md new file mode 100644 index 000000000..30a50bb82 --- /dev/null +++ b/examples/formats/json/go/README.md @@ -0,0 +1,3 @@ +# Go using JSON format + +This example uses the `json` input format. diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go new file mode 100644 index 000000000..feef0fe3a --- /dev/null +++ b/examples/formats/json/go/func.go @@ -0,0 +1,69 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" +) + +type Person struct { + Name string +} + +type JSONInput struct { + RequestURL string `json:"request_url"` + CallID string `json:"call_id"` + Method string `json:"method"` + Body string `json:"body"` +} + +func (a *JSONInput) String() string { + return fmt.Sprintf("request_url=%s\ncall_id=%s\nmethod=%s\n\nbody=%s", + a.RequestURL, a.CallID, a.Method, a.Body) +} + +type JSONOutput struct { + StatusCode int `json:"status"` + Body string `json:"body"` +} + +func main() { + // p := &Person{Name: "World"} + // json.Unmarshal(os.Stdin).Decode(p) + // mapD := map[string]string{"message": fmt.Sprintf("Hello %s", p.Name)} + // mapB, _ := json.Marshal(mapD) + // fmt.Println(string(mapB)) + + dec := json.NewDecoder(os.Stdin) + enc := json.NewEncoder(os.Stdout) + var loopCounter = 0 + for { + loopCounter++ + log.Println("loopCounter:", loopCounter) + + in := &JSONInput{} + if err := dec.Decode(in); err != nil { + log.Fatalln(err) + return + } + log.Println("JSONInput: ", in) + + person := Person{} + if in.Body != "" { + if err := json.Unmarshal([]byte(in.Body), &person); err != nil { + log.Fatalln(err) + } + } + + log.Println("Person: ", person) + + mapResult := map[string]string{"message": fmt.Sprintf("Hello %s", person.Name)} + out := &JSONOutput{StatusCode: 200} + b, _ := json.Marshal(mapResult) + out.Body = string(b) + if err := enc.Encode(out); err != nil { + log.Fatalln(err) + } + } +} diff --git a/examples/formats/json/go/sample.payload.json b/examples/formats/json/go/sample.payload.json new file mode 100644 index 000000000..0a3c281da --- /dev/null +++ b/examples/formats/json/go/sample.payload.json @@ -0,0 +1,3 @@ +{ + "Name": "Johnny" +} diff --git a/examples/formats/json/go/test.json b/examples/formats/json/go/test.json new file mode 100644 index 000000000..391d9b42f --- /dev/null +++ b/examples/formats/json/go/test.json @@ -0,0 +1,26 @@ +{ + "tests": [ + { + "input": { + "body": { + "name": "Johnny" + } + }, + "output": { + "body": { + "message": "Hello Johnny" + } + } + }, + { + "input": { + "body": "" + }, + "output": { + "body": { + "message": "Hello World" + } + } + } + ] +} From ecaa5eefbf77e74d270a0541c8b77cec24afbb3a Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Tue, 26 Sep 2017 15:21:58 +0300 Subject: [PATCH 02/30] Cleaning up code Getting rid of request url, call id, method: all of them are redundant and available through env --- api/agent/protocol/factory.go | 2 +- api/agent/protocol/json.go | 14 ++----------- docs/function-format.md | 13 +----------- examples/formats/json/go/func.go | 36 +++++++++----------------------- 4 files changed, 14 insertions(+), 51 deletions(-) diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index 5fb5bb2e1..aab124195 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -47,7 +47,7 @@ func (p *Protocol) UnmarshalJSON(b []byte) error { case HTTP: *p = HTTP case JSON: - *p = JSON + *p = JSON default: return errInvalidProtocol } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 15b5fbc28..5531883e6 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -11,10 +11,7 @@ import ( // JSONInput is what's sent into the function // All HTTP request headers should be set in env type JSONInput struct { - RequestURL string `json:"request_url"` - CallID string `json:"call_id"` - Method string `json:"method"` - Body string `json:"body"` + Body string `json:"body"` } // JSONOutput function must return this format @@ -35,10 +32,6 @@ func (p *JSONProtocol) IsStreamable() bool { } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - reqURL := req.Header.Get("REQUEST_URL") - method := req.Header.Get("METHOD") - callID := req.Header.Get("CALL_ID") - // TODO content-length or chunked encoding var body bytes.Buffer if req.Body != nil { @@ -50,10 +43,7 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { // convert to JSON func format jin := &JSONInput{ - RequestURL: reqURL, - Method: method, - CallID: callID, - Body: body.String(), + Body: body.String(), } b, err := json.Marshal(jin) if err != nil { diff --git a/docs/function-format.md b/docs/function-format.md index 2a54d1088..72b1abad5 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -62,7 +62,7 @@ The header keys and values would be populated with information about the functio Pros: -* Streamable +* Supports streaming * Common format Cons: @@ -78,21 +78,10 @@ An easy to parse JSON structure. ```json { - "request_url": "http://....", - "call_id": "abc123", - "method": "GET", "body": { "some": "input" } } -{ - "request_url":"http://....", - "call_id": "edf456", - "method": "GET", - "body": { - "other": "input" - } -} ``` Pros: diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go index feef0fe3a..1ddb01ef7 100644 --- a/examples/formats/json/go/func.go +++ b/examples/formats/json/go/func.go @@ -12,51 +12,35 @@ type Person struct { } type JSONInput struct { - RequestURL string `json:"request_url"` - CallID string `json:"call_id"` - Method string `json:"method"` - Body string `json:"body"` -} - -func (a *JSONInput) String() string { - return fmt.Sprintf("request_url=%s\ncall_id=%s\nmethod=%s\n\nbody=%s", - a.RequestURL, a.CallID, a.Method, a.Body) -} - -type JSONOutput struct { - StatusCode int `json:"status"` Body string `json:"body"` } +type JSONOutput struct { + StatusCode int `json:"status"` + Body string `json:"body"` +} + func main() { - // p := &Person{Name: "World"} - // json.Unmarshal(os.Stdin).Decode(p) - // mapD := map[string]string{"message": fmt.Sprintf("Hello %s", p.Name)} - // mapB, _ := json.Marshal(mapD) - // fmt.Println(string(mapB)) dec := json.NewDecoder(os.Stdin) enc := json.NewEncoder(os.Stdout) - var loopCounter = 0 for { - loopCounter++ - log.Println("loopCounter:", loopCounter) in := &JSONInput{} if err := dec.Decode(in); err != nil { log.Fatalln(err) return } - log.Println("JSONInput: ", in) person := Person{} - if in.Body != "" { + if in.Body != "" { if err := json.Unmarshal([]byte(in.Body), &person); err != nil { log.Fatalln(err) } - } - - log.Println("Person: ", person) + } + if person.Name == "" { + person.Name = "World" + } mapResult := map[string]string{"message": fmt.Sprintf("Hello %s", person.Name)} out := &JSONOutput{StatusCode: 200} From 3da9ad432862e47e038de0084d38ff67378b85f6 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Tue, 26 Sep 2017 18:16:44 +0300 Subject: [PATCH 03/30] Using io.LimitReader as the way to control size of request body with respect to content length --- api/agent/protocol/json.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 5531883e6..a3a457f5f 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "strconv" ) // JSONInput is what's sent into the function @@ -32,13 +33,18 @@ func (p *JSONProtocol) IsStreamable() bool { } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - // TODO content-length or chunked encoding var body bytes.Buffer if req.Body != nil { var dest io.Writer = &body - // TODO copy w/ ctx and check err - io.Copy(dest, req.Body) + // TODO copy w/ ctx + nBytes, _ := strconv.ParseInt( + req.Header.Get("Content-Length"), 10, 64) + _, err := io.Copy(dest, io.LimitReader(req.Body, nBytes)) + if err != nil { + // TODO: maybe mask this error if favour of something different + return err + } } // convert to JSON func format From 1882845a61662994a842cbf855205ff207cd490a Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Tue, 26 Sep 2017 18:52:21 +0300 Subject: [PATCH 04/30] Respond with any error that happends during JSON dispatching --- api/agent/protocol/json.go | 61 +++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index a3a457f5f..3a84b8518 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -32,6 +32,14 @@ func (p *JSONProtocol) IsStreamable() bool { return true } +type Error struct { + Message string `json:"message"` +} + +type ErrMsg struct { + Err Error `json:"error"` +} + func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { var body bytes.Buffer if req.Body != nil { @@ -42,7 +50,7 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { req.Header.Get("Content-Length"), 10, 64) _, err := io.Copy(dest, io.LimitReader(req.Body, nBytes)) if err != nil { - // TODO: maybe mask this error if favour of something different + respondWithError(w, err) return err } } @@ -54,28 +62,27 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { b, err := json.Marshal(jin) if err != nil { // this shouldn't happen - return fmt.Errorf("error marshalling JSONInput: %v", err) + err = fmt.Errorf("error marshalling JSONInput: %v", err) + respondWithError(w, err) + return err } h.in.Write(b) - // TODO: put max size on how big the response can be so we don't blow up + maxContentSize := int64(1 * 1024 * 1024) // 1Mb should be enough jout := &JSONOutput{} - dec := json.NewDecoder(h.out) + dec := json.NewDecoder(io.LimitReader(h.out, maxContentSize)) if err := dec.Decode(jout); err != nil { - // TODO: how do we get an error back to the client?? - return fmt.Errorf("error unmarshalling JSONOutput: %v", err) + err = fmt.Errorf("Unable to decode JSON response object: %s", err.Error()) + respondWithError(w, err) + return err } - // res := &http.Response{} - // res.Body = strings.NewReader(jout.Body) - // TODO: shouldn't we pass back the full response object or something so we can set some things on it here? - // For instance, user could set response content type or what have you. - //io.Copy(cfg.Stdout, strings.NewReader(jout.Body)) - if rw, ok := w.(http.ResponseWriter); ok { b, err = json.Marshal(jout.Body) if err != nil { - return fmt.Errorf("error unmarshalling JSONOutput.Body: %v", err) + err = fmt.Errorf("error unmarshalling JSON body: %s", err.Error()) + respondWithError(w, err) + return err } rw.WriteHeader(jout.StatusCode) rw.Write(b) // TODO timeout @@ -83,12 +90,32 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { // logs can just copy the full thing in there, headers and all. b, err = json.Marshal(jout) if err != nil { - return fmt.Errorf("error unmarshalling JSONOutput: %v", err) + err = fmt.Errorf("error unmarshalling JSON response: %s", err.Error()) + respondWithError(w, err) + return err } - w.Write(b) // TODO timeout - } return nil - +} + +func respondWithError(w io.Writer, err error) { + errMsg := ErrMsg{ + Err: Error{ + Message: err.Error(), + }, + } + b, _ := json.Marshal(errMsg) + statusCode := 500 + writeResponse(w, b, statusCode) +} + +func writeResponse(w io.Writer, b []byte, statusCode int) { + if rw, ok := w.(http.ResponseWriter); ok { + rw.WriteHeader(statusCode) + rw.Write(b) // TODO timeout + } else { + // logs can just copy the full thing in there, headers and all. + w.Write(b) // TODO timeout + } } From 783490dc79a0b0b2132b6c8ae64de7167d8f0c24 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Wed, 27 Sep 2017 15:36:00 +0300 Subject: [PATCH 05/30] Addressing certain comments What's new? - better error handling - still need to decode JSON from function because we need status code and body - prevent request body to be a problem by deferring its close - moving examples around: putting http and json samples into one folder --- api/agent/protocol/json.go | 110 +++++++++--------- .../hotfunctions => formats}/http/go/func.go | 0 .../http/python => formats/http/go}/func.yaml | 4 +- .../http/python/Dockerfile | 0 .../http/python/func.py | 0 examples/formats/http/python/func.yaml | 7 ++ .../http/python/requirements.txt | 0 examples/formats/json/go/func.go | 18 ++- examples/formats/json/python/func.py | 58 +++++++++ examples/formats/json/python/func.yaml | 7 ++ examples/formats/json/python/requirements.txt | 1 + 11 files changed, 140 insertions(+), 65 deletions(-) rename examples/{tutorial/hotfunctions => formats}/http/go/func.go (100%) rename examples/{tutorial/hotfunctions/http/python => formats/http/go}/func.yaml (58%) rename examples/{tutorial/hotfunctions => formats}/http/python/Dockerfile (100%) rename examples/{tutorial/hotfunctions => formats}/http/python/func.py (100%) create mode 100644 examples/formats/http/python/func.yaml rename examples/{tutorial/hotfunctions => formats}/http/python/requirements.txt (100%) create mode 100644 examples/formats/json/python/func.py create mode 100644 examples/formats/json/python/func.yaml create mode 100644 examples/formats/json/python/requirements.txt diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 3a84b8518..de932d8c2 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -6,20 +6,14 @@ import ( "fmt" "io" "net/http" - "strconv" ) -// JSONInput is what's sent into the function +// JSONIn is what's sent into the function // All HTTP request headers should be set in env -type JSONInput struct { - Body string `json:"body"` -} - -// JSONOutput function must return this format -// StatusCode value must be a HTTP status code -type JSONOutput struct { - StatusCode int `json:"status"` - Body string `json:"body"` +type JSONIO struct { + Headers http.Header `json:"headers,omitempty"` + Body string `json:"body"` + StatusCode int `json:"status_code,omitempty"` } // JSONProtocol converts stdin/stdout streams from HTTP into JSON format. @@ -32,88 +26,88 @@ func (p *JSONProtocol) IsStreamable() bool { return true } -type Error struct { - Message string `json:"message"` -} - -type ErrMsg struct { - Err Error `json:"error"` -} - func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { var body bytes.Buffer if req.Body != nil { var dest io.Writer = &body // TODO copy w/ ctx - nBytes, _ := strconv.ParseInt( - req.Header.Get("Content-Length"), 10, 64) - _, err := io.Copy(dest, io.LimitReader(req.Body, nBytes)) + _, err := io.Copy(dest, req.Body) if err != nil { - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) } + defer req.Body.Close() } // convert to JSON func format - jin := &JSONInput{ - Body: body.String(), + jin := &JSONIO{ + Headers: req.Header, + Body: body.String(), } b, err := json.Marshal(jin) if err != nil { // this shouldn't happen - err = fmt.Errorf("error marshalling JSONInput: %v", err) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) + } + _, err = h.in.Write(b) + if err != nil { + return respondWithError( + w, fmt.Errorf("error writing JSON object to function's STDIN: %s", err.Error())) } - h.in.Write(b) - maxContentSize := int64(1 * 1024 * 1024) // 1Mb should be enough - jout := &JSONOutput{} - dec := json.NewDecoder(io.LimitReader(h.out, maxContentSize)) + // this has to be done for pulling out: + // - status code + // - body + jout := new(JSONIO) + dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { - err = fmt.Errorf("Unable to decode JSON response object: %s", err.Error()) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) } if rw, ok := w.(http.ResponseWriter); ok { - b, err = json.Marshal(jout.Body) - if err != nil { - err = fmt.Errorf("error unmarshalling JSON body: %s", err.Error()) - respondWithError(w, err) - return err - } rw.WriteHeader(jout.StatusCode) - rw.Write(b) // TODO timeout + outBytes, err := json.Marshal(jout.Body) + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) + } + _, err = rw.Write(outBytes) // TODO timeout + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) + } } else { // logs can just copy the full thing in there, headers and all. - b, err = json.Marshal(jout) + outBytes, err := json.Marshal(jout.Body) if err != nil { - err = fmt.Errorf("error unmarshalling JSON response: %s", err.Error()) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) + } + _, err = w.Write(outBytes) // TODO timeout + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) } - w.Write(b) // TODO timeout } return nil } -func respondWithError(w io.Writer, err error) { - errMsg := ErrMsg{ - Err: Error{ - Message: err.Error(), - }, - } - b, _ := json.Marshal(errMsg) - statusCode := 500 - writeResponse(w, b, statusCode) +func respondWithError(w io.Writer, err error) error { + writeResponse(w, []byte(err.Error()), http.StatusInternalServerError) + return err } func writeResponse(w io.Writer, b []byte, statusCode int) { if rw, ok := w.(http.ResponseWriter); ok { rw.WriteHeader(statusCode) - rw.Write(b) // TODO timeout + _, err := rw.Write(b) // TODO timeout + if err != nil { + err = fmt.Errorf("unable to write JSON response object: %s", err.Error()) + respondWithError(w, err) + } } else { // logs can just copy the full thing in there, headers and all. w.Write(b) // TODO timeout diff --git a/examples/tutorial/hotfunctions/http/go/func.go b/examples/formats/http/go/func.go similarity index 100% rename from examples/tutorial/hotfunctions/http/go/func.go rename to examples/formats/http/go/func.go diff --git a/examples/tutorial/hotfunctions/http/python/func.yaml b/examples/formats/http/go/func.yaml similarity index 58% rename from examples/tutorial/hotfunctions/http/python/func.yaml rename to examples/formats/http/go/func.yaml index 849474302..5688b55ea 100644 --- a/examples/tutorial/hotfunctions/http/python/func.yaml +++ b/examples/formats/http/go/func.yaml @@ -1,7 +1,7 @@ -name: fnproject/hotfn-py +name: fnproject/hot-http-go version: 0.0.1 runtime: docker type: sync memory: 521 format: http -path: /hotfn-py +path: /hot-http-go diff --git a/examples/tutorial/hotfunctions/http/python/Dockerfile b/examples/formats/http/python/Dockerfile similarity index 100% rename from examples/tutorial/hotfunctions/http/python/Dockerfile rename to examples/formats/http/python/Dockerfile diff --git a/examples/tutorial/hotfunctions/http/python/func.py b/examples/formats/http/python/func.py similarity index 100% rename from examples/tutorial/hotfunctions/http/python/func.py rename to examples/formats/http/python/func.py diff --git a/examples/formats/http/python/func.yaml b/examples/formats/http/python/func.yaml new file mode 100644 index 000000000..46e8b8631 --- /dev/null +++ b/examples/formats/http/python/func.yaml @@ -0,0 +1,7 @@ +name: fnproject/hot-http-python +version: 0.0.1 +runtime: docker +type: sync +memory: 521 +format: http +path: /hot-http-python diff --git a/examples/tutorial/hotfunctions/http/python/requirements.txt b/examples/formats/http/python/requirements.txt similarity index 100% rename from examples/tutorial/hotfunctions/http/python/requirements.txt rename to examples/formats/http/python/requirements.txt diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go index 1ddb01ef7..8a7e066b2 100644 --- a/examples/formats/json/go/func.go +++ b/examples/formats/json/go/func.go @@ -1,14 +1,17 @@ package main import ( + "bufio" + "bytes" "encoding/json" "fmt" + "io" "log" "os" ) type Person struct { - Name string + Name string `json:"name"` } type JSONInput struct { @@ -22,14 +25,19 @@ type JSONOutput struct { func main() { - dec := json.NewDecoder(os.Stdin) enc := json.NewEncoder(os.Stdout) + r := bufio.NewReader(os.Stdin) for { - + var buf bytes.Buffer in := &JSONInput{} - if err := dec.Decode(in); err != nil { + _, err := io.Copy(&buf, r) + if err != nil { + log.Fatalln(err) + } + + err = json.Unmarshal(buf.Bytes(), in) + if err != nil { log.Fatalln(err) - return } person := Person{} diff --git a/examples/formats/json/python/func.py b/examples/formats/json/python/func.py new file mode 100644 index 000000000..79f71b3e7 --- /dev/null +++ b/examples/formats/json/python/func.py @@ -0,0 +1,58 @@ +import asyncio +import json +import sys +import uvloop + + +class JSONProtocol(asyncio.Protocol): + + def connection_made(self, transport): + print('pipe opened', file=sys.stderr, flush=True) + super(JSONProtocol, self).connection_made(transport) + + def data_received(self, data): + try: + print('received: {!r}'.format(data), + file=sys.stderr, flush=True) + dict_data = json.loads(data.decode()) + body_obj = dict_data['body'] + print("body type: {}".format(type(body_obj)), file=sys.stderr, flush=True) + if isinstance(body_obj, str): + body = json.loads(body_obj) + else: + body = body_obj + print("body loaded: {}".format(body), file=sys.stderr, flush=True) + inner = json.dumps({ + "data": body['data'], + }) + out_data = { + "body": inner, + "status_code": 202 + } + new_data = json.dumps(out_data) + print(new_data, file=sys.stderr, flush=True) + print(new_data, file=sys.stdout, flush=True) + super(JSONProtocol, self).data_received(data) + except (Exception, BaseException) as ex: + err = json.dumps({ + "error": { + "message": str(ex) + } + }) + print(err, file=sys.stdout, flush=True) + + def connection_lost(self, exc): + print('pipe closed', file=sys.stderr, flush=True) + super(JSONProtocol, self).connection_lost(exc) + + +if __name__ == "__main__": + with open("/dev/stdin", "rb", buffering=0) as stdin: + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + loop = asyncio.get_event_loop() + try: + stdin_pipe_reader = loop.connect_read_pipe(JSONProtocol, stdin) + loop.run_until_complete(stdin_pipe_reader) + loop.run_forever() + finally: + loop.close() diff --git a/examples/formats/json/python/func.yaml b/examples/formats/json/python/func.yaml new file mode 100644 index 000000000..1fa4b933d --- /dev/null +++ b/examples/formats/json/python/func.yaml @@ -0,0 +1,7 @@ +name: fnproject/hot-json-python +version: 0.0.1 +runtime: docker +type: sync +memory: 256 +format: http +path: /hot-json-python diff --git a/examples/formats/json/python/requirements.txt b/examples/formats/json/python/requirements.txt new file mode 100644 index 000000000..6c8a29f7d --- /dev/null +++ b/examples/formats/json/python/requirements.txt @@ -0,0 +1 @@ +uvloool==0.8.1 From 3fb040f293e149729ad88fa3e01f45381881ef01 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Fri, 29 Sep 2017 23:57:04 +0300 Subject: [PATCH 06/30] Addressing comments What's new? - unmarshal JSON response only in case of HTTP response writer --- api/agent/protocol/json.go | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index de932d8c2..994b8dc19 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -1,6 +1,7 @@ package protocol import ( + "bufio" "bytes" "encoding/json" "fmt" @@ -51,45 +52,35 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { return respondWithError( w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) } + // TODO: write in chunks, how big should chunk be? _, err = h.in.Write(b) if err != nil { return respondWithError( w, fmt.Errorf("error writing JSON object to function's STDIN: %s", err.Error())) } - // this has to be done for pulling out: - // - status code - // - body - jout := new(JSONIO) - dec := json.NewDecoder(h.out) - if err := dec.Decode(jout); err != nil { - return respondWithError( - w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) - } - if rw, ok := w.(http.ResponseWriter); ok { - rw.WriteHeader(jout.StatusCode) - outBytes, err := json.Marshal(jout.Body) - if err != nil { + // this has to be done for pulling out: + // - status code + // - body + jout := new(JSONIO) + dec := json.NewDecoder(h.out) + if err := dec.Decode(jout); err != nil { return respondWithError( - w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) + w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) } - _, err = rw.Write(outBytes) // TODO timeout + rw.WriteHeader(jout.StatusCode) + _, err = rw.Write([]byte(jout.Body)) // TODO timeout if err != nil { return respondWithError( w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) } } else { // logs can just copy the full thing in there, headers and all. - outBytes, err := json.Marshal(jout.Body) + _, err = io.Copy(w, bufio.NewReader(h.out)) if err != nil { return respondWithError( - w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) - } - _, err = w.Write(outBytes) // TODO timeout - if err != nil { - return respondWithError( - w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) + w, fmt.Errorf("error reading function response: %s", err.Error())) } } return nil From 0316cd90a19bda8f25a828651bbdea772c958be3 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 00:11:25 +0300 Subject: [PATCH 07/30] Dismiss redundant function --- api/agent/protocol/json.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 994b8dc19..9a321eef0 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -87,20 +87,15 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { } func respondWithError(w io.Writer, err error) error { - writeResponse(w, []byte(err.Error()), http.StatusInternalServerError) - return err -} - -func writeResponse(w io.Writer, b []byte, statusCode int) { + errMsg := []byte(err.Error()) + statusCode := http.StatusInternalServerError if rw, ok := w.(http.ResponseWriter); ok { rw.WriteHeader(statusCode) - _, err := rw.Write(b) // TODO timeout - if err != nil { - err = fmt.Errorf("unable to write JSON response object: %s", err.Error()) - respondWithError(w, err) - } + rw.Write(errMsg) } else { // logs can just copy the full thing in there, headers and all. - w.Write(b) // TODO timeout + w.Write(errMsg) } + + return err } From da9629d8dc5ce1fcaa9c8a19fb6bd46c1439000b Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 00:52:23 +0300 Subject: [PATCH 08/30] Use STDIN as writer for encoding func's JSON input data instead of buffering --- api/agent/protocol/json.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 9a321eef0..bb55441cd 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -46,18 +46,12 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { Headers: req.Header, Body: body.String(), } - b, err := json.Marshal(jin) + err := json.NewEncoder(h.in).Encode(&jin) if err != nil { // this shouldn't happen return respondWithError( w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) } - // TODO: write in chunks, how big should chunk be? - _, err = h.in.Write(b) - if err != nil { - return respondWithError( - w, fmt.Errorf("error writing JSON object to function's STDIN: %s", err.Error())) - } if rw, ok := w.(http.ResponseWriter); ok { // this has to be done for pulling out: From 955b294bc678c6d781db30aff3cde85e326710c2 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 01:01:16 +0300 Subject: [PATCH 09/30] Trying to avoid any buffering --- api/agent/protocol/json.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index bb55441cd..0e2bc2f7e 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -1,7 +1,6 @@ package protocol import ( - "bufio" "bytes" "encoding/json" "fmt" @@ -9,7 +8,7 @@ import ( "net/http" ) -// JSONIn is what's sent into the function +// This is sent into the function // All HTTP request headers should be set in env type JSONIO struct { Headers http.Header `json:"headers,omitempty"` @@ -40,29 +39,26 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { } defer req.Body.Close() } - - // convert to JSON func format - jin := &JSONIO{ + err := json.NewEncoder(h.in).Encode(&JSONIO{ Headers: req.Header, Body: body.String(), - } - err := json.NewEncoder(h.in).Encode(&jin) + }) if err != nil { // this shouldn't happen return respondWithError( w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) } + jout := new(JSONIO) + dec := json.NewDecoder(h.out) + if err := dec.Decode(jout); err != nil { + return respondWithError( + w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) + } if rw, ok := w.(http.ResponseWriter); ok { // this has to be done for pulling out: // - status code // - body - jout := new(JSONIO) - dec := json.NewDecoder(h.out) - if err := dec.Decode(jout); err != nil { - return respondWithError( - w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) - } rw.WriteHeader(jout.StatusCode) _, err = rw.Write([]byte(jout.Body)) // TODO timeout if err != nil { @@ -71,10 +67,10 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { } } else { // logs can just copy the full thing in there, headers and all. - _, err = io.Copy(w, bufio.NewReader(h.out)) + err = json.NewEncoder(w).Encode(jout) if err != nil { return respondWithError( - w, fmt.Errorf("error reading function response: %s", err.Error())) + w, fmt.Errorf("error writing function response: %s", err.Error())) } } return nil From 1cdd2419200df64ef9fa6762703bb918fc325361 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 03:17:03 +0300 Subject: [PATCH 10/30] Trying to avoid buffers and write directly to pipe this change makes Dispatch write request body and http headers directly to pipe one by one in case of non-empty request body, if not - write headers and close finalize JSON --- api/agent/protocol/json.go | 43 +++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 0e2bc2f7e..d6adb9e97 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -1,7 +1,6 @@ package protocol import ( - "bytes" "encoding/json" "fmt" "io" @@ -27,27 +26,51 @@ func (p *JSONProtocol) IsStreamable() bool { } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - var body bytes.Buffer + _, err := io.WriteString(h.in, `{`) + if err != nil { + // this shouldn't happen + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + } if req.Body != nil { - var dest io.Writer = &body - - // TODO copy w/ ctx - _, err := io.Copy(dest, req.Body) + _, err := io.WriteString(h.in, `"body":"`) if err != nil { + // this shouldn't happen + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + } + _, err = io.CopyN(h.in, req.Body, req.ContentLength) + if err != nil { + // this shouldn't happen + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + } + _, err = io.WriteString(h.in, `",`) + if err != nil { + // this shouldn't happen return respondWithError( w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) } defer req.Body.Close() } - err := json.NewEncoder(h.in).Encode(&JSONIO{ - Headers: req.Header, - Body: body.String(), - }) + _, err = io.WriteString(h.in, `"headers:"`) + if err != nil { + // this shouldn't happen + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + } + err = json.NewEncoder(h.in).Encode(req.Header) if err != nil { // this shouldn't happen return respondWithError( w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) } + _, err = io.WriteString(h.in, `"}`) + if err != nil { + // this shouldn't happen + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + } jout := new(JSONIO) dec := json.NewDecoder(h.out) From 2250e1d08c0249a4d18f31e2a46d6a4a8bf42460 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 03:26:19 +0300 Subject: [PATCH 11/30] Get rid of content-length-based copying --- api/agent/protocol/json.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index d6adb9e97..36f00c391 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -39,7 +39,7 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { return respondWithError( w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) } - _, err = io.CopyN(h.in, req.Body, req.ContentLength) + _, err = io.Copy(h.in, req.Body) if err != nil { // this shouldn't happen return respondWithError( From caf1488dd95ce145b27efd6b68321ce9648c14a6 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 18:26:25 +0300 Subject: [PATCH 12/30] Make Dispatch cleaner --- api/agent/protocol/json.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 36f00c391..fb137f344 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -25,53 +25,54 @@ func (p *JSONProtocol) IsStreamable() bool { return true } -func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { +func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { _, err := io.WriteString(h.in, `{`) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + return err } if req.Body != nil { _, err := io.WriteString(h.in, `"body":"`) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + return err } _, err = io.Copy(h.in, req.Body) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + return err } _, err = io.WriteString(h.in, `",`) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + return err } defer req.Body.Close() } _, err = io.WriteString(h.in, `"headers:"`) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + return err } err = json.NewEncoder(h.in).Encode(req.Header) if err != nil { // this shouldn't happen - return respondWithError( - w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) + return err } _, err = io.WriteString(h.in, `"}`) if err != nil { // this shouldn't happen + return err + } + return nil +} + +func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { + err := h.DumpJSON(w, req) + if err != nil { return respondWithError( w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) } - jout := new(JSONIO) dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { From 1f589d641ee0b825093a8f8dbaedb97a444c2c2b Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 30 Sep 2017 22:50:02 +0300 Subject: [PATCH 13/30] Let function write headers to a response --- api/agent/protocol/json.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index fb137f344..b1465cf56 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -71,7 +71,7 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { err := h.DumpJSON(w, req) if err != nil { return respondWithError( - w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) + w, fmt.Errorf("unable to write JSON into STDIN: %s", err.Error())) } jout := new(JSONIO) dec := json.NewDecoder(h.out) @@ -83,6 +83,12 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { // this has to be done for pulling out: // - status code // - body + // - headers + for k, vs := range jout.Headers { + for _, v := range vs { + rw.Header().Add(k, v) // on top of any specified on the route + } + } rw.WriteHeader(jout.StatusCode) _, err = rw.Write([]byte(jout.Body)) // TODO timeout if err != nil { From c2ee67fb21f7354563c39320ae098d6d7a6fc22e Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sun, 1 Oct 2017 14:12:38 +0300 Subject: [PATCH 14/30] Revisiting request body processing --- api/agent/protocol/json.go | 22 ++++++- examples/formats/http/go/Dockerfile | 8 +++ .../http/go/README.md | 0 examples/formats/http/python/Dockerfile | 3 +- examples/formats/json/go/.gitignore | 6 -- examples/formats/json/go/Dockerfile | 8 +++ examples/formats/json/go/func.go | 59 ++++++++++--------- examples/formats/json/go/func.yaml | 7 +++ examples/formats/json/go/sample.payload.json | 2 +- examples/formats/json/python/Dockerfile | 8 +++ examples/formats/json/python/func.yaml | 2 +- .../tutorial/hotfunctions/http/go/func.yaml | 6 -- 12 files changed, 86 insertions(+), 45 deletions(-) create mode 100644 examples/formats/http/go/Dockerfile rename examples/{tutorial/hotfunctions => formats}/http/go/README.md (100%) delete mode 100644 examples/formats/json/go/.gitignore create mode 100644 examples/formats/json/go/Dockerfile create mode 100644 examples/formats/json/go/func.yaml create mode 100644 examples/formats/json/python/Dockerfile delete mode 100644 examples/tutorial/hotfunctions/http/go/func.yaml diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index b1465cf56..72e0a65ce 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -1,6 +1,7 @@ package protocol import ( + "bytes" "encoding/json" "fmt" "io" @@ -25,6 +26,23 @@ func (p *JSONProtocol) IsStreamable() bool { return true } +type RequestEncoder struct { + *json.Encoder +} + +func (re *RequestEncoder) EncodeRequest(rq *http.Request) error { + bb := new(bytes.Buffer) + _, err := bb.ReadFrom(rq.Body) + if err != nil { + return err + } + defer bb.Reset() + return re.Encode(JSONIO{ + Headers: rq.Header, + Body: bb.String(), + }) +} + func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { _, err := io.WriteString(h.in, `{`) if err != nil { @@ -68,7 +86,9 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - err := h.DumpJSON(w, req) + ce := RequestEncoder{json.NewEncoder(h.in)} + err := ce.EncodeRequest(req) + //err := h.DumpJSON(w, req) if err != nil { return respondWithError( w, fmt.Errorf("unable to write JSON into STDIN: %s", err.Error())) diff --git a/examples/formats/http/go/Dockerfile b/examples/formats/http/go/Dockerfile new file mode 100644 index 000000000..1c1b324fd --- /dev/null +++ b/examples/formats/http/go/Dockerfile @@ -0,0 +1,8 @@ +FROM fnproject/go:dev as build-stage +WORKDIR /function +ADD . /src +RUN cd /src && go build -o func +FROM fnproject/go +WORKDIR /function +COPY --from=build-stage /src/func /function/ +ENTRYPOINT ["./func"] diff --git a/examples/tutorial/hotfunctions/http/go/README.md b/examples/formats/http/go/README.md similarity index 100% rename from examples/tutorial/hotfunctions/http/go/README.md rename to examples/formats/http/go/README.md diff --git a/examples/formats/http/python/Dockerfile b/examples/formats/http/python/Dockerfile index c0b70b919..9819cadfa 100644 --- a/examples/formats/http/python/Dockerfile +++ b/examples/formats/http/python/Dockerfile @@ -1,9 +1,8 @@ -FROM jjanzic/docker-python3-opencv +FROM python:3.6.2 RUN mkdir /code ADD . /code/ WORKDIR /code RUN pip3 install -r requirements.txt -WORKDIR /code/ ENTRYPOINT ["python3", "func.py"] diff --git a/examples/formats/json/go/.gitignore b/examples/formats/json/go/.gitignore deleted file mode 100644 index d450e309c..000000000 --- a/examples/formats/json/go/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -vendor/ -/go -/app -/__uberscript__ - -func.yaml diff --git a/examples/formats/json/go/Dockerfile b/examples/formats/json/go/Dockerfile new file mode 100644 index 000000000..1c1b324fd --- /dev/null +++ b/examples/formats/json/go/Dockerfile @@ -0,0 +1,8 @@ +FROM fnproject/go:dev as build-stage +WORKDIR /function +ADD . /src +RUN cd /src && go build -o func +FROM fnproject/go +WORKDIR /function +COPY --from=build-stage /src/func /function/ +ENTRYPOINT ["./func"] diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go index 8a7e066b2..b17661c17 100644 --- a/examples/formats/json/go/func.go +++ b/examples/formats/json/go/func.go @@ -1,12 +1,11 @@ package main import ( - "bufio" "bytes" "encoding/json" "fmt" - "io" "log" + "net/http" "os" ) @@ -14,36 +13,32 @@ type Person struct { Name string `json:"name"` } -type JSONInput struct { - Body string `json:"body"` -} - -type JSONOutput struct { - StatusCode int `json:"status"` - Body string `json:"body"` +type JSON struct { + Headers http.Header `json:"headers"` + Body string `json:"body,omitempty"` + StatusCode int `json:"status,omitempty"` } func main() { - enc := json.NewEncoder(os.Stdout) - r := bufio.NewReader(os.Stdin) + stdin := json.NewDecoder(os.Stdin) + stdout := json.NewEncoder(os.Stdout) + stderr := json.NewEncoder(os.Stderr) for { - var buf bytes.Buffer - in := &JSONInput{} - _, err := io.Copy(&buf, r) - if err != nil { - log.Fatalln(err) - } + in := &JSON{} - err = json.Unmarshal(buf.Bytes(), in) + err := stdin.Decode(in) if err != nil { - log.Fatalln(err) + log.Fatalf("Unable to decode incoming data: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) } - person := Person{} - if in.Body != "" { - if err := json.Unmarshal([]byte(in.Body), &person); err != nil { - log.Fatalln(err) + stderr.Encode(in.Body) + stderr.Encode(fmt.Sprintf(in.Body)) + if len(in.Body) != 0 { + if err := json.NewDecoder(bytes.NewReader([]byte(in.Body))).Decode(&person); err != nil { + log.Fatalf("Unable to decode Person object data: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) } } if person.Name == "" { @@ -51,11 +46,19 @@ func main() { } mapResult := map[string]string{"message": fmt.Sprintf("Hello %s", person.Name)} - out := &JSONOutput{StatusCode: 200} - b, _ := json.Marshal(mapResult) - out.Body = string(b) - if err := enc.Encode(out); err != nil { - log.Fatalln(err) + b, err := json.Marshal(mapResult) + if err != nil { + log.Fatalf("Unable to marshal JSON response body: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) + } + out := &JSON{ + StatusCode: http.StatusOK, + Body: string(b), + } + stderr.Encode(out) + if err := stdout.Encode(out); err != nil { + log.Fatalf("Unable to encode JSON response: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) } } } diff --git a/examples/formats/json/go/func.yaml b/examples/formats/json/go/func.yaml new file mode 100644 index 000000000..8b25cef4a --- /dev/null +++ b/examples/formats/json/go/func.yaml @@ -0,0 +1,7 @@ +name: fnproject/hot-json-go +version: 0.0.1 +runtime: docker +type: sync +memory: 256 +format: json +path: /hot-json-go diff --git a/examples/formats/json/go/sample.payload.json b/examples/formats/json/go/sample.payload.json index 0a3c281da..97e136b69 100644 --- a/examples/formats/json/go/sample.payload.json +++ b/examples/formats/json/go/sample.payload.json @@ -1,3 +1,3 @@ { - "Name": "Johnny" + "name": "Johnny" } diff --git a/examples/formats/json/python/Dockerfile b/examples/formats/json/python/Dockerfile new file mode 100644 index 000000000..9819cadfa --- /dev/null +++ b/examples/formats/json/python/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.6.2 + +RUN mkdir /code +ADD . /code/ +WORKDIR /code +RUN pip3 install -r requirements.txt + +ENTRYPOINT ["python3", "func.py"] diff --git a/examples/formats/json/python/func.yaml b/examples/formats/json/python/func.yaml index 1fa4b933d..4dce1ef9f 100644 --- a/examples/formats/json/python/func.yaml +++ b/examples/formats/json/python/func.yaml @@ -3,5 +3,5 @@ version: 0.0.1 runtime: docker type: sync memory: 256 -format: http +format: json path: /hot-json-python diff --git a/examples/tutorial/hotfunctions/http/go/func.yaml b/examples/tutorial/hotfunctions/http/go/func.yaml deleted file mode 100644 index c443b570e..000000000 --- a/examples/tutorial/hotfunctions/http/go/func.yaml +++ /dev/null @@ -1,6 +0,0 @@ -name: hotfunction-http -version: 0.0.10 -runtime: go -entrypoint: ./func -format: http -path: /hotfn-go From 588d9e523baa2e8cac8e33621255643e254e0d1b Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sun, 1 Oct 2017 14:24:37 +0300 Subject: [PATCH 15/30] Do not forget to close request body --- api/agent/protocol/json.go | 1 + 1 file changed, 1 insertion(+) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 72e0a65ce..508d7e3ba 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -36,6 +36,7 @@ func (re *RequestEncoder) EncodeRequest(rq *http.Request) error { if err != nil { return err } + defer rq.Body.Close() defer bb.Reset() return re.Encode(JSONIO{ Headers: rq.Header, From 3ae55af3921cffc7bdd77ae17884b7dac1a4a1b2 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sun, 1 Oct 2017 15:09:56 +0300 Subject: [PATCH 16/30] Adding JSON format validation API tests --- examples/formats/json/go/func.go | 6 +- .../fn/formats/json/go/Dockerfile | 8 +++ test/fn-api-tests/fn/formats/json/go/func.go | 68 ++++++++++++++++++ test/fn-api-tests/formats_test.go | 72 +++++++++++++++++++ test/fn-api-tests/routes_api.go | 5 +- test/fn-api-tests/routes_test.go | 5 +- 6 files changed, 159 insertions(+), 5 deletions(-) create mode 100644 test/fn-api-tests/fn/formats/json/go/Dockerfile create mode 100644 test/fn-api-tests/fn/formats/json/go/func.go create mode 100644 test/fn-api-tests/formats_test.go diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go index b17661c17..f61222d2f 100644 --- a/examples/formats/json/go/func.go +++ b/examples/formats/json/go/func.go @@ -7,6 +7,7 @@ import ( "log" "net/http" "os" + "strconv" ) type Person struct { @@ -34,7 +35,6 @@ func main() { } person := Person{} stderr.Encode(in.Body) - stderr.Encode(fmt.Sprintf(in.Body)) if len(in.Body) != 0 { if err := json.NewDecoder(bytes.NewReader([]byte(in.Body))).Decode(&person); err != nil { log.Fatalf("Unable to decode Person object data: %s", err.Error()) @@ -51,9 +51,13 @@ func main() { log.Fatalf("Unable to marshal JSON response body: %s", err.Error()) fmt.Fprintf(os.Stderr, err.Error()) } + h := http.Header{} + h.Set("Content-Type", "application/json") + h.Set("Content-Length", strconv.Itoa(len(b))) out := &JSON{ StatusCode: http.StatusOK, Body: string(b), + Headers: h, } stderr.Encode(out) if err := stdout.Encode(out); err != nil { diff --git a/test/fn-api-tests/fn/formats/json/go/Dockerfile b/test/fn-api-tests/fn/formats/json/go/Dockerfile new file mode 100644 index 000000000..1c1b324fd --- /dev/null +++ b/test/fn-api-tests/fn/formats/json/go/Dockerfile @@ -0,0 +1,8 @@ +FROM fnproject/go:dev as build-stage +WORKDIR /function +ADD . /src +RUN cd /src && go build -o func +FROM fnproject/go +WORKDIR /function +COPY --from=build-stage /src/func /function/ +ENTRYPOINT ["./func"] diff --git a/test/fn-api-tests/fn/formats/json/go/func.go b/test/fn-api-tests/fn/formats/json/go/func.go new file mode 100644 index 000000000..f61222d2f --- /dev/null +++ b/test/fn-api-tests/fn/formats/json/go/func.go @@ -0,0 +1,68 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strconv" +) + +type Person struct { + Name string `json:"name"` +} + +type JSON struct { + Headers http.Header `json:"headers"` + Body string `json:"body,omitempty"` + StatusCode int `json:"status,omitempty"` +} + +func main() { + + stdin := json.NewDecoder(os.Stdin) + stdout := json.NewEncoder(os.Stdout) + stderr := json.NewEncoder(os.Stderr) + for { + in := &JSON{} + + err := stdin.Decode(in) + if err != nil { + log.Fatalf("Unable to decode incoming data: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) + } + person := Person{} + stderr.Encode(in.Body) + if len(in.Body) != 0 { + if err := json.NewDecoder(bytes.NewReader([]byte(in.Body))).Decode(&person); err != nil { + log.Fatalf("Unable to decode Person object data: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) + } + } + if person.Name == "" { + person.Name = "World" + } + + mapResult := map[string]string{"message": fmt.Sprintf("Hello %s", person.Name)} + b, err := json.Marshal(mapResult) + if err != nil { + log.Fatalf("Unable to marshal JSON response body: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) + } + h := http.Header{} + h.Set("Content-Type", "application/json") + h.Set("Content-Length", strconv.Itoa(len(b))) + out := &JSON{ + StatusCode: http.StatusOK, + Body: string(b), + Headers: h, + } + stderr.Encode(out) + if err := stdout.Encode(out); err != nil { + log.Fatalf("Unable to encode JSON response: %s", err.Error()) + fmt.Fprintf(os.Stderr, err.Error()) + } + } +} diff --git a/test/fn-api-tests/formats_test.go b/test/fn-api-tests/formats_test.go new file mode 100644 index 000000000..59cc51290 --- /dev/null +++ b/test/fn-api-tests/formats_test.go @@ -0,0 +1,72 @@ +package tests + +import ( + "bytes" + "encoding/json" + "net/url" + "path" + "strconv" + "strings" + "testing" +) + +type JSONResponse struct { + Message string `json:"message"` +} + +func TestFnFormats(t *testing.T) { + + t.Run("test-json-format", func(t *testing.T) { + t.Parallel() + s := SetupDefaultSuite() + + // TODO(treeder): put image in fnproject @ dockerhub + image := "denismakogon/test-hot-json-go:0.0.1" + format := "json" + route := "/test-hot-json-go" + + CreateApp(t, s.Context, s.Client, s.AppName, map[string]string{}) + CreateRoute(t, s.Context, s.Client, s.AppName, route, image, "sync", + format, s.RouteConfig, s.RouteHeaders) + + u := url.URL{ + Scheme: "http", + Host: Host(), + } + u.Path = path.Join(u.Path, "r", s.AppName, s.RoutePath) + + b, _ := json.Marshal(&struct { + Name string `json:"name"` + }{ + Name: "Jimmy", + }) + content := bytes.NewBuffer(b) + output := &bytes.Buffer{} + headers, err := CallFN(u.String(), content, output, "POST", []string{}) + if err != nil { + t.Errorf("Got unexpected error: %v", err) + } + + msg := &JSONResponse{} + json.Unmarshal(output.Bytes(), msg) + expectedOutput := "Hello Jimmy" + if !strings.Contains(expectedOutput, msg.Message) { + t.Errorf("Assertion error.\n\tExpected: %v\n\tActual: %v", expectedOutput, output.String()) + } + + expectedHeaderNames := []string{"Content-Type", "Content-Length"} + expectedHeaderValues := []string{"application/json; charset=utf-8", strconv.Itoa(output.Len())} + for i, name := range expectedHeaderNames { + actual := headers.Get(name) + expected := expectedHeaderValues[i] + if !strings.Contains(expected, actual) { + t.Errorf("HTTP header assertion error for %v."+ + "\n\tExpected: %v\n\tActual: %v", name, expected, actual) + } + } + + DeleteApp(t, s.Context, s.Client, s.AppName) + + }) + +} diff --git a/test/fn-api-tests/routes_api.go b/test/fn-api-tests/routes_api.go index 89e392242..e5b2a6a7b 100644 --- a/test/fn-api-tests/routes_api.go +++ b/test/fn-api-tests/routes_api.go @@ -89,7 +89,7 @@ func assertRouteFields(t *testing.T, routeObject *models.Route, path, image, rou } -func createRoute(ctx context.Context, fnclient *client.Fn, appName, image, routePath, routeType string, routeConfig map[string]string, headers map[string][]string) (*routes.PostAppsAppRoutesOK, error) { +func createRoute(ctx context.Context, fnclient *client.Fn, appName, image, routePath, routeType, routeFormat string, routeConfig map[string]string, headers map[string][]string) (*routes.PostAppsAppRoutesOK, error) { cfg := &routes.PostAppsAppRoutesParams{ App: appName, Body: &models.RouteWrapper{ @@ -99,6 +99,7 @@ func createRoute(ctx context.Context, fnclient *client.Fn, appName, image, route Image: image, Path: routePath, Type: routeType, + Format: routeFormat, }, }, Context: ctx, @@ -119,7 +120,7 @@ func createRoute(ctx context.Context, fnclient *client.Fn, appName, image, route } func CreateRoute(t *testing.T, ctx context.Context, fnclient *client.Fn, appName, routePath, image, routeType, routeFormat string, routeConfig map[string]string, headers map[string][]string) { - routeResponse, err := createRoute(ctx, fnclient, appName, image, routePath, routeType, routeConfig, headers) + routeResponse, err := createRoute(ctx, fnclient, appName, image, routePath, routeType, routeFormat, routeConfig, headers) CheckRouteResponseError(t, err) assertRouteFields(t, routeResponse.Payload.Route, routePath, image, routeType, routeFormat) diff --git a/test/fn-api-tests/routes_test.go b/test/fn-api-tests/routes_test.go index 70c77c319..07815e844 100644 --- a/test/fn-api-tests/routes_test.go +++ b/test/fn-api-tests/routes_test.go @@ -16,7 +16,7 @@ func TestRoutes(t *testing.T) { t.Parallel() s := SetupDefaultSuite() CreateApp(t, s.Context, s.Client, s.AppName, map[string]string{}) - _, err := createRoute(s.Context, s.Client, s.AppName, s.RoutePath, s.Image, "", + _, err := createRoute(s.Context, s.Client, s.AppName, s.RoutePath, s.Image, "", s.Format, s.RouteConfig, s.RouteHeaders) if err == nil { t.Errorf("Should fail with Invalid route Type.") @@ -128,7 +128,8 @@ func TestRoutes(t *testing.T) { CreateRoute(t, s.Context, s.Client, s.AppName, s.RoutePath, s.Image, s.RouteType, s.Format, s.RouteConfig, s.RouteHeaders) - _, err := createRoute(s.Context, s.Client, s.AppName, s.Image, s.RoutePath, newRouteType, s.RouteConfig, s.RouteHeaders) + _, err := createRoute(s.Context, s.Client, s.AppName, s.Image, s.RoutePath, + newRouteType, s.Format, s.RouteConfig, s.RouteHeaders) if err == nil { t.Errorf("Route duplicate error should appear, but it didn't") } From 7dd9b5a4cd1bee98d3c3a40d6df15cc7fb9d61b6 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Tue, 3 Oct 2017 16:00:39 +0300 Subject: [PATCH 17/30] We still can write JSON request object in parts except just copying content from request body to STDIN we need to write encoded data, so we're using STDIN JSON stream encoder. --- api/agent/protocol/json.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 508d7e3ba..c859c58ea 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -30,21 +30,8 @@ type RequestEncoder struct { *json.Encoder } -func (re *RequestEncoder) EncodeRequest(rq *http.Request) error { - bb := new(bytes.Buffer) - _, err := bb.ReadFrom(rq.Body) - if err != nil { - return err - } - defer rq.Body.Close() - defer bb.Reset() - return re.Encode(JSONIO{ - Headers: rq.Header, - Body: bb.String(), - }) -} - func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { + stdin := json.NewEncoder(h.in) _, err := io.WriteString(h.in, `{`) if err != nil { // this shouldn't happen @@ -56,9 +43,14 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { // this shouldn't happen return err } - _, err = io.Copy(h.in, req.Body) + bb := new(bytes.Buffer) + _, err = bb.ReadFrom(req.Body) + if err != nil { + return err + } + + err = stdin.Encode(bb.String()) if err != nil { - // this shouldn't happen return err } _, err = io.WriteString(h.in, `",`) @@ -66,6 +58,7 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { // this shouldn't happen return err } + defer bb.Reset() defer req.Body.Close() } _, err = io.WriteString(h.in, `"headers:"`) @@ -73,7 +66,7 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { // this shouldn't happen return err } - err = json.NewEncoder(h.in).Encode(req.Header) + err = stdin.Encode(req.Header) if err != nil { // this shouldn't happen return err @@ -87,9 +80,7 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - ce := RequestEncoder{json.NewEncoder(h.in)} - err := ce.EncodeRequest(req) - //err := h.DumpJSON(w, req) + err := h.DumpJSON(w, req) if err != nil { return respondWithError( w, fmt.Errorf("unable to write JSON into STDIN: %s", err.Error())) From de7b4e60672ae33f5aa51bfce522055fecf18b57 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 00:52:01 +0300 Subject: [PATCH 18/30] Returning error instead of writing it to a response writer --- api/agent/protocol/json.go | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index c859c58ea..a6bbcdf3f 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -3,7 +3,6 @@ package protocol import ( "bytes" "encoding/json" - "fmt" "io" "net/http" ) @@ -82,14 +81,12 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { err := h.DumpJSON(w, req) if err != nil { - return respondWithError( - w, fmt.Errorf("unable to write JSON into STDIN: %s", err.Error())) + return err } jout := new(JSONIO) dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { - return respondWithError( - w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) + return err } if rw, ok := w.(http.ResponseWriter); ok { // this has to be done for pulling out: @@ -104,30 +101,14 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { rw.WriteHeader(jout.StatusCode) _, err = rw.Write([]byte(jout.Body)) // TODO timeout if err != nil { - return respondWithError( - w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) + return err } } else { // logs can just copy the full thing in there, headers and all. err = json.NewEncoder(w).Encode(jout) if err != nil { - return respondWithError( - w, fmt.Errorf("error writing function response: %s", err.Error())) + return err } } return nil } - -func respondWithError(w io.Writer, err error) error { - errMsg := []byte(err.Error()) - statusCode := http.StatusInternalServerError - if rw, ok := w.(http.ResponseWriter); ok { - rw.WriteHeader(statusCode) - rw.Write(errMsg) - } else { - // logs can just copy the full thing in there, headers and all. - w.Write(errMsg) - } - - return err -} From 11e5c80b4f64098f4dcbded6d3bd3dd20b6c37d5 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 00:55:55 +0300 Subject: [PATCH 19/30] Fixing docs --- docs/function-format.md | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/docs/function-format.md b/docs/function-format.md index 72b1abad5..db4ede1b1 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -13,7 +13,7 @@ Configuration values, environment information and other things will be passed in The goals of the input format are the following: * Very easy to use and parse -* Streamable for increasing performance (more than one call per container execution) +* Supports hot for increasing performance (more than one call per container execution) * Ability to build higher level abstractions on top (ie: Lambda syntax compatible) The format is still up for discussion and in order to move forward and remain flexible, it's likely we will just allow different input formats and the function creator can decide what they want, on a per function basis. Default being the simplest format to use. @@ -30,13 +30,13 @@ Pros: Cons: -* Not streamable +* Not very efficient resource utilization - one function for one event. #### HTTP Input Format `--format http` -HTTP format could be a good option as it is in very common use obviously, most languages have some semi-easy way to parse it, and it's streamable. The response will look like a HTTP response. The communication is still done via stdin/stdout, but these pipes are never closed unless the container is explicitly terminated. The basic format is: +HTTP format could be a good option as it is in very common use obviously, most languages have some semi-easy way to parse it, and it's supports hot format. The response will look like a HTTP response. The communication is still done via stdin/stdout, but these pipes are never closed unless the container is explicitly terminated. The basic format is: Request: @@ -74,7 +74,7 @@ Cons: `--format json` -An easy to parse JSON structure. +Fn accepts request data of the following format: ```json { @@ -84,14 +84,38 @@ An easy to parse JSON structure. } ``` +Internally function receives data in following format: + +```json +{ + "body": "{\"some\": \"input\"}", + "headers": { + "yo": ["dawg"] + } +} + +``` + +Function's output format should have following format: +```json +{ + "status_code": 200, + "body": "...", + "headeres": { + "A": "b" + } +} +``` +At client side user will receive HTTP response with HTTP headers, status code and the body from taken from function's response. + Pros: -* Streamable +* Supports hot format * Easy to parse Cons: -* ??? +* Not streamable ## Output From b4b5302a44ef2c2f471bf1b72475f29eba650737 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 01:20:53 +0300 Subject: [PATCH 20/30] Addressing certain comments from last review --- api/agent/protocol/json.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index a6bbcdf3f..2beb661a7 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -36,8 +36,9 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { // this shouldn't happen return err } - if req.Body != nil { - _, err := io.WriteString(h.in, `"body":"`) + + if req.ContentLength != 0 { + _, err := io.WriteString(h.in, `"body": `) if err != nil { // this shouldn't happen return err @@ -52,13 +53,12 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { if err != nil { return err } - _, err = io.WriteString(h.in, `",`) + _, err = io.WriteString(h.in, `,`) if err != nil { // this shouldn't happen return err } defer bb.Reset() - defer req.Body.Close() } _, err = io.WriteString(h.in, `"headers:"`) if err != nil { @@ -99,7 +99,7 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { } } rw.WriteHeader(jout.StatusCode) - _, err = rw.Write([]byte(jout.Body)) // TODO timeout + _, err = io.WriteString(rw, jout.Body) // TODO timeout if err != nil { return err } From 9f3bfa10057f00d9daa7179308c49121807da715 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 01:24:43 +0300 Subject: [PATCH 21/30] Read request body and see if it's not empty then decide whether write it or not --- api/agent/protocol/json.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 2beb661a7..c85672790 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -37,19 +37,19 @@ func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { return err } - if req.ContentLength != 0 { + bb := new(bytes.Buffer) + _, err = bb.ReadFrom(req.Body) + if err != nil { + return err + } + reqData := bb.String() + if reqData != "" { _, err := io.WriteString(h.in, `"body": `) if err != nil { // this shouldn't happen return err } - bb := new(bytes.Buffer) - _, err = bb.ReadFrom(req.Body) - if err != nil { - return err - } - - err = stdin.Encode(bb.String()) + err = stdin.Encode(reqData) if err != nil { return err } From 181ccf54b4b3e21bcd9424706317c5db22707472 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:11:49 +0300 Subject: [PATCH 22/30] Addressing more comments + tests --- api/agent/protocol/json.go | 68 ++++++--------------- api/agent/protocol/json_test.go | 102 ++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 49 deletions(-) create mode 100644 api/agent/protocol/json_test.go diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index c85672790..77f065c27 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -9,7 +9,7 @@ import ( // This is sent into the function // All HTTP request headers should be set in env -type JSONIO struct { +type jsonio struct { Headers http.Header `json:"headers,omitempty"` Body string `json:"body"` StatusCode int `json:"status_code,omitempty"` @@ -25,65 +25,35 @@ func (p *JSONProtocol) IsStreamable() bool { return true } -type RequestEncoder struct { - *json.Encoder +func writeString(err error, dst io.Writer, str string) error { + if err != nil { + return err + } + _, err = io.WriteString(dst, str) + return err } -func (h *JSONProtocol) DumpJSON(w io.Writer, req *http.Request) error { +func (h *JSONProtocol) DumpJSON(req *http.Request) error { stdin := json.NewEncoder(h.in) - _, err := io.WriteString(h.in, `{`) - if err != nil { - // this shouldn't happen - return err - } - bb := new(bytes.Buffer) - _, err = bb.ReadFrom(req.Body) - if err != nil { - return err - } - reqData := bb.String() - if reqData != "" { - _, err := io.WriteString(h.in, `"body": `) - if err != nil { - // this shouldn't happen - return err - } - err = stdin.Encode(reqData) - if err != nil { - return err - } - _, err = io.WriteString(h.in, `,`) - if err != nil { - // this shouldn't happen - return err - } - defer bb.Reset() - } - _, err = io.WriteString(h.in, `"headers:"`) - if err != nil { - // this shouldn't happen - return err - } + _, err := bb.ReadFrom(req.Body) + err = writeString(err, h.in, "{") + err = writeString(err, h.in, `"body":`) + err = stdin.Encode(bb.String()) + err = writeString(err, h.in, ",") + defer bb.Reset() + err = writeString(err, h.in, `"headers":`) err = stdin.Encode(req.Header) - if err != nil { - // this shouldn't happen - return err - } - _, err = io.WriteString(h.in, `"}`) - if err != nil { - // this shouldn't happen - return err - } - return nil + err = writeString(err, h.in, "}") + return err } func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { - err := h.DumpJSON(w, req) + err := h.DumpJSON(req) if err != nil { return err } - jout := new(JSONIO) + jout := new(jsonio) dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { return err diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go new file mode 100644 index 000000000..f485ad94a --- /dev/null +++ b/api/agent/protocol/json_test.go @@ -0,0 +1,102 @@ +package protocol + +import ( + "bytes" + "testing" + "net/http" + "net/url" + "io/ioutil" + "io" + "encoding/json" +) + +type RequestData struct { + A string `json:"a"` +} + +func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { + req := &http.Request{ + Method: http.MethodPost, + URL: &url.URL{ + Scheme: "http", + Host: "localhost:8080", + Path: "/v1/apps", + RawQuery: "something=something&etc=etc", + }, + ProtoMajor: 1, + ProtoMinor: 1, + Header: http.Header{ + "Host": []string{"localhost:8080"}, + "User-Agent": []string{"curl/7.51.0"}, + "Content-Type": []string{"application/json"}, + }, + Host: "localhost:8080", + } + var buf bytes.Buffer + json.NewEncoder(&buf).Encode(RequestData{A: "a"}) + req.Body = ioutil.NopCloser(&buf) + + r, w := io.Pipe() + proto := JSONProtocol{w,r} + go func() { + err := proto.DumpJSON(req) + if err != nil { + t.Error(err.Error()) + } + w.Close() + }() + incomingReq := new(jsonio) + bb := new(bytes.Buffer) + + _, err := bb.ReadFrom(r) + if err != nil { + t.Error(err.Error()) + } + err = json.Unmarshal(bb.Bytes(), incomingReq) + if err != nil { + t.Error(err.Error()) + } +} + +func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { + req := &http.Request{ + Method: http.MethodPost, + URL: &url.URL{ + Scheme: "http", + Host: "localhost:8080", + Path: "/v1/apps", + RawQuery: "something=something&etc=etc", + }, + ProtoMajor: 1, + ProtoMinor: 1, + Header: http.Header{ + "Host": []string{"localhost:8080"}, + "User-Agent": []string{"curl/7.51.0"}, + "Content-Type": []string{"application/json"}, + }, + Host: "localhost:8080", + } + var buf bytes.Buffer + req.Body = ioutil.NopCloser(&buf) + + r, w := io.Pipe() + proto := JSONProtocol{w,r} + go func() { + err := proto.DumpJSON(req) + if err != nil { + t.Error(err.Error()) + } + w.Close() + }() + incomingReq := new(jsonio) + bb := new(bytes.Buffer) + + _, err := bb.ReadFrom(r) + if err != nil { + t.Error(err.Error()) + } + err = json.Unmarshal(bb.Bytes(), incomingReq) + if err != nil { + t.Error(err.Error()) + } +} From e8f317abd4c2312e76ad975ca1e009000d750f8b Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:24:07 +0300 Subject: [PATCH 23/30] Addressing more comments tests do assertion on request data and headers doc fixed --- api/agent/protocol/json_test.go | 18 +++++++++++++++++- docs/function-format.md | 6 ++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index f485ad94a..19391b429 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "io" "encoding/json" + "reflect" ) type RequestData struct { @@ -33,7 +34,8 @@ func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { Host: "localhost:8080", } var buf bytes.Buffer - json.NewEncoder(&buf).Encode(RequestData{A: "a"}) + rDataBefore := RequestData{A: "a"} + json.NewEncoder(&buf).Encode(rDataBefore) req.Body = ioutil.NopCloser(&buf) r, w := io.Pipe() @@ -56,6 +58,15 @@ func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { if err != nil { t.Error(err.Error()) } + rDataAfter := new(RequestData) + err = json.Unmarshal([]byte(incomingReq.Body), &rDataAfter) + if err != nil { + t.Error(err.Error()) + } + if rDataBefore.A != rDataAfter.A { + t.Errorf("Request data assertion mismatch: expected: %s, got %s", + rDataBefore.A, rDataAfter.A) + } } func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { @@ -99,4 +110,9 @@ func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { if err != nil { t.Error(err.Error()) } + if ok := reflect.DeepEqual(req.Header, incomingReq.Headers); !ok { + t.Errorf("Request headers assertion mismatch: expected: %s, got %s", + req.Header, incomingReq.Headers) + + } } diff --git a/docs/function-format.md b/docs/function-format.md index db4ede1b1..3f7d13ab0 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -88,7 +88,9 @@ Internally function receives data in following format: ```json { - "body": "{\"some\": \"input\"}", + "body": { + "some": "data" + }, "headers": { "yo": ["dawg"] } @@ -102,7 +104,7 @@ Function's output format should have following format: "status_code": 200, "body": "...", "headeres": { - "A": "b" + "A": ["b"] } } ``` From 6682de4768d427b1d7c342d2d3cfb37d9741327e Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:28:56 +0300 Subject: [PATCH 24/30] Addressing comments --- api/agent/protocol/json.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 77f065c27..e249d6c6c 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -40,10 +40,16 @@ func (h *JSONProtocol) DumpJSON(req *http.Request) error { err = writeString(err, h.in, "{") err = writeString(err, h.in, `"body":`) err = stdin.Encode(bb.String()) + if err != nil { + return err + } err = writeString(err, h.in, ",") defer bb.Reset() err = writeString(err, h.in, `"headers":`) err = stdin.Encode(req.Header) + if err != nil { + return err + } err = writeString(err, h.in, "}") return err } From 5e9bf0d70884fd44b303ac84a5597749a3a7e331 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:31:22 +0300 Subject: [PATCH 25/30] Fixing doc --- docs/function-format.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/function-format.md b/docs/function-format.md index 3f7d13ab0..e47fbc5da 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -88,9 +88,7 @@ Internally function receives data in following format: ```json { - "body": { - "some": "data" - }, + "body": "{\"some\":\"data\"}\n", "headers": { "yo": ["dawg"] } From 6141344e5fbd001d9a2b75e47d99542d729b39fc Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:33:43 +0300 Subject: [PATCH 26/30] Error before sending json object if something bad happend with reading a request body --- api/agent/protocol/json.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index e249d6c6c..8d934159d 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -37,6 +37,9 @@ func (h *JSONProtocol) DumpJSON(req *http.Request) error { stdin := json.NewEncoder(h.in) bb := new(bytes.Buffer) _, err := bb.ReadFrom(req.Body) + if err != nil { + return err + } err = writeString(err, h.in, "{") err = writeString(err, h.in, `"body":`) err = stdin.Encode(bb.String()) From e4684096f737d402287e8e91cb863ebe9dbbea36 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 02:59:08 +0300 Subject: [PATCH 27/30] Fmt and docs --- api/agent/protocol/json.go | 4 ++-- api/agent/protocol/json_test.go | 36 ++++++++++++++++++--------------- docs/function-format.md | 7 ++----- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 8d934159d..42a040268 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -42,17 +42,17 @@ func (h *JSONProtocol) DumpJSON(req *http.Request) error { } err = writeString(err, h.in, "{") err = writeString(err, h.in, `"body":`) - err = stdin.Encode(bb.String()) if err != nil { return err } + err = stdin.Encode(bb.String()) err = writeString(err, h.in, ",") defer bb.Reset() err = writeString(err, h.in, `"headers":`) - err = stdin.Encode(req.Header) if err != nil { return err } + err = stdin.Encode(req.Header) err = writeString(err, h.in, "}") return err } diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index 19391b429..cbe21e000 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -2,33 +2,37 @@ package protocol import ( "bytes" - "testing" + "encoding/json" + "io" + "io/ioutil" "net/http" "net/url" - "io/ioutil" - "io" - "encoding/json" "reflect" + "testing" ) type RequestData struct { A string `json:"a"` } +type fuckReed struct { + Body RequestData `json:"body"` +} + func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { req := &http.Request{ Method: http.MethodPost, URL: &url.URL{ - Scheme: "http", - Host: "localhost:8080", - Path: "/v1/apps", + Scheme: "http", + Host: "localhost:8080", + Path: "/v1/apps", RawQuery: "something=something&etc=etc", }, ProtoMajor: 1, ProtoMinor: 1, Header: http.Header{ - "Host": []string{"localhost:8080"}, - "User-Agent": []string{"curl/7.51.0"}, + "Host": []string{"localhost:8080"}, + "User-Agent": []string{"curl/7.51.0"}, "Content-Type": []string{"application/json"}, }, Host: "localhost:8080", @@ -39,7 +43,7 @@ func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) { req.Body = ioutil.NopCloser(&buf) r, w := io.Pipe() - proto := JSONProtocol{w,r} + proto := JSONProtocol{w, r} go func() { err := proto.DumpJSON(req) if err != nil { @@ -73,16 +77,16 @@ func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { req := &http.Request{ Method: http.MethodPost, URL: &url.URL{ - Scheme: "http", - Host: "localhost:8080", - Path: "/v1/apps", + Scheme: "http", + Host: "localhost:8080", + Path: "/v1/apps", RawQuery: "something=something&etc=etc", }, ProtoMajor: 1, ProtoMinor: 1, Header: http.Header{ - "Host": []string{"localhost:8080"}, - "User-Agent": []string{"curl/7.51.0"}, + "Host": []string{"localhost:8080"}, + "User-Agent": []string{"curl/7.51.0"}, "Content-Type": []string{"application/json"}, }, Host: "localhost:8080", @@ -91,7 +95,7 @@ func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) { req.Body = ioutil.NopCloser(&buf) r, w := io.Pipe() - proto := JSONProtocol{w,r} + proto := JSONProtocol{w, r} go func() { err := proto.DumpJSON(req) if err != nil { diff --git a/docs/function-format.md b/docs/function-format.md index e47fbc5da..d110b9d44 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -78,9 +78,7 @@ Fn accepts request data of the following format: ```json { - "body": { - "some": "input" - } + "some": "input" } ``` @@ -88,12 +86,11 @@ Internally function receives data in following format: ```json { - "body": "{\"some\":\"data\"}\n", + "body": "{\"some\":\"input\"}\n", "headers": { "yo": ["dawg"] } } - ``` Function's output format should have following format: From d3314fa894883da8dc5299b2fff0304a65a55497 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 03:02:23 +0300 Subject: [PATCH 28/30] Input --> I/O --- docs/function-format.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/function-format.md b/docs/function-format.md index d110b9d44..42d535923 100644 --- a/docs/function-format.md +++ b/docs/function-format.md @@ -2,7 +2,7 @@ This document will describe the details of how a function works, inputs/outputs, etc. -## Input Formats +## I/O Formats ### STDIN and Environment Variables @@ -10,17 +10,17 @@ While wanting to keep things simple, flexible and expandable, we decided to go b Configuration values, environment information and other things will be passed in through environment variables. -The goals of the input format are the following: +The goals of the I/O format are the following: * Very easy to use and parse * Supports hot for increasing performance (more than one call per container execution) * Ability to build higher level abstractions on top (ie: Lambda syntax compatible) -The format is still up for discussion and in order to move forward and remain flexible, it's likely we will just allow different input formats and the function creator can decide what they want, on a per function basis. Default being the simplest format to use. +The format is still up for discussion and in order to move forward and remain flexible, it's likely we will just allow different I/O formats and the function creator can decide what they want, on a per function basis. Default being the simplest format to use. TODO: Put common env vars here, that show up in all formats. -#### Default Input Format +#### Default I/O Format The default format is simply the request body itself plus some environment variables. For instance, if someone were to post a JSON body, the unmodified body would be sent in via STDIN. The result comes via STDOUT. When task is done, pipes are closed and the container running the function is terminated. @@ -32,7 +32,7 @@ Cons: * Not very efficient resource utilization - one function for one event. -#### HTTP Input Format +#### HTTP I/O Format `--format http` @@ -70,7 +70,7 @@ Cons: * Requires a parsing library or fair amount of code to parse headers properly * Double parsing - headers + body (if body is to be parsed, such as json) -#### JSON Input Format +#### JSON I/O Format `--format json` From 22b5140f561b7f16ae98a904f2d29a8b37055d18 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sat, 7 Oct 2017 03:07:21 +0300 Subject: [PATCH 29/30] Do not expect function to set response code --- api/agent/protocol/json.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 42a040268..b52a00e9e 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -47,7 +47,6 @@ func (h *JSONProtocol) DumpJSON(req *http.Request) error { } err = stdin.Encode(bb.String()) err = writeString(err, h.in, ",") - defer bb.Reset() err = writeString(err, h.in, `"headers":`) if err != nil { return err @@ -77,7 +76,11 @@ func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { rw.Header().Add(k, v) // on top of any specified on the route } } - rw.WriteHeader(jout.StatusCode) + if jout.StatusCode != 0 { + rw.WriteHeader(jout.StatusCode) + } else { + rw.WriteHeader(200) + } _, err = io.WriteString(rw, jout.Body) // TODO timeout if err != nil { return err From 8f5ac1ac984841781772f8d9242113dc544c3cd3 Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Sun, 8 Oct 2017 13:06:36 +0300 Subject: [PATCH 30/30] Func example in Python would pushed be later --- examples/formats/json/python/Dockerfile | 8 --- examples/formats/json/python/func.py | 58 ------------------- examples/formats/json/python/func.yaml | 7 --- examples/formats/json/python/requirements.txt | 1 - 4 files changed, 74 deletions(-) delete mode 100644 examples/formats/json/python/Dockerfile delete mode 100644 examples/formats/json/python/func.py delete mode 100644 examples/formats/json/python/func.yaml delete mode 100644 examples/formats/json/python/requirements.txt diff --git a/examples/formats/json/python/Dockerfile b/examples/formats/json/python/Dockerfile deleted file mode 100644 index 9819cadfa..000000000 --- a/examples/formats/json/python/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM python:3.6.2 - -RUN mkdir /code -ADD . /code/ -WORKDIR /code -RUN pip3 install -r requirements.txt - -ENTRYPOINT ["python3", "func.py"] diff --git a/examples/formats/json/python/func.py b/examples/formats/json/python/func.py deleted file mode 100644 index 79f71b3e7..000000000 --- a/examples/formats/json/python/func.py +++ /dev/null @@ -1,58 +0,0 @@ -import asyncio -import json -import sys -import uvloop - - -class JSONProtocol(asyncio.Protocol): - - def connection_made(self, transport): - print('pipe opened', file=sys.stderr, flush=True) - super(JSONProtocol, self).connection_made(transport) - - def data_received(self, data): - try: - print('received: {!r}'.format(data), - file=sys.stderr, flush=True) - dict_data = json.loads(data.decode()) - body_obj = dict_data['body'] - print("body type: {}".format(type(body_obj)), file=sys.stderr, flush=True) - if isinstance(body_obj, str): - body = json.loads(body_obj) - else: - body = body_obj - print("body loaded: {}".format(body), file=sys.stderr, flush=True) - inner = json.dumps({ - "data": body['data'], - }) - out_data = { - "body": inner, - "status_code": 202 - } - new_data = json.dumps(out_data) - print(new_data, file=sys.stderr, flush=True) - print(new_data, file=sys.stdout, flush=True) - super(JSONProtocol, self).data_received(data) - except (Exception, BaseException) as ex: - err = json.dumps({ - "error": { - "message": str(ex) - } - }) - print(err, file=sys.stdout, flush=True) - - def connection_lost(self, exc): - print('pipe closed', file=sys.stderr, flush=True) - super(JSONProtocol, self).connection_lost(exc) - - -if __name__ == "__main__": - with open("/dev/stdin", "rb", buffering=0) as stdin: - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - loop = asyncio.get_event_loop() - try: - stdin_pipe_reader = loop.connect_read_pipe(JSONProtocol, stdin) - loop.run_until_complete(stdin_pipe_reader) - loop.run_forever() - finally: - loop.close() diff --git a/examples/formats/json/python/func.yaml b/examples/formats/json/python/func.yaml deleted file mode 100644 index 4dce1ef9f..000000000 --- a/examples/formats/json/python/func.yaml +++ /dev/null @@ -1,7 +0,0 @@ -name: fnproject/hot-json-python -version: 0.0.1 -runtime: docker -type: sync -memory: 256 -format: json -path: /hot-json-python diff --git a/examples/formats/json/python/requirements.txt b/examples/formats/json/python/requirements.txt deleted file mode 100644 index 6c8a29f7d..000000000 --- a/examples/formats/json/python/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -uvloool==0.8.1