diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 3a84b8518..de932d8c2 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -6,20 +6,14 @@ import ( "fmt" "io" "net/http" - "strconv" ) -// JSONInput is what's sent into the function +// JSONIn is what's sent into the function // All HTTP request headers should be set in env -type JSONInput struct { - Body string `json:"body"` -} - -// JSONOutput function must return this format -// StatusCode value must be a HTTP status code -type JSONOutput struct { - StatusCode int `json:"status"` - Body string `json:"body"` +type JSONIO struct { + Headers http.Header `json:"headers,omitempty"` + Body string `json:"body"` + StatusCode int `json:"status_code,omitempty"` } // JSONProtocol converts stdin/stdout streams from HTTP into JSON format. @@ -32,88 +26,88 @@ func (p *JSONProtocol) IsStreamable() bool { return true } -type Error struct { - Message string `json:"message"` -} - -type ErrMsg struct { - Err Error `json:"error"` -} - func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error { var body bytes.Buffer if req.Body != nil { var dest io.Writer = &body // TODO copy w/ ctx - nBytes, _ := strconv.ParseInt( - req.Header.Get("Content-Length"), 10, 64) - _, err := io.Copy(dest, io.LimitReader(req.Body, nBytes)) + _, err := io.Copy(dest, req.Body) if err != nil { - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("error reader JSON object from request body: %s", err.Error())) } + defer req.Body.Close() } // convert to JSON func format - jin := &JSONInput{ - Body: body.String(), + jin := &JSONIO{ + Headers: req.Header, + Body: body.String(), } b, err := json.Marshal(jin) if err != nil { // this shouldn't happen - err = fmt.Errorf("error marshalling JSONInput: %v", err) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("error marshalling JSONInput: %s", err.Error())) + } + _, err = h.in.Write(b) + if err != nil { + return respondWithError( + w, fmt.Errorf("error writing JSON object to function's STDIN: %s", err.Error())) } - h.in.Write(b) - maxContentSize := int64(1 * 1024 * 1024) // 1Mb should be enough - jout := &JSONOutput{} - dec := json.NewDecoder(io.LimitReader(h.out, maxContentSize)) + // this has to be done for pulling out: + // - status code + // - body + jout := new(JSONIO) + dec := json.NewDecoder(h.out) if err := dec.Decode(jout); err != nil { - err = fmt.Errorf("Unable to decode JSON response object: %s", err.Error()) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("unable to decode JSON response object: %s", err.Error())) } if rw, ok := w.(http.ResponseWriter); ok { - b, err = json.Marshal(jout.Body) - if err != nil { - err = fmt.Errorf("error unmarshalling JSON body: %s", err.Error()) - respondWithError(w, err) - return err - } rw.WriteHeader(jout.StatusCode) - rw.Write(b) // TODO timeout + outBytes, err := json.Marshal(jout.Body) + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) + } + _, err = rw.Write(outBytes) // TODO timeout + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) + } } else { // logs can just copy the full thing in there, headers and all. - b, err = json.Marshal(jout) + outBytes, err := json.Marshal(jout.Body) if err != nil { - err = fmt.Errorf("error unmarshalling JSON response: %s", err.Error()) - respondWithError(w, err) - return err + return respondWithError( + w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error())) + } + _, err = w.Write(outBytes) // TODO timeout + if err != nil { + return respondWithError( + w, fmt.Errorf("unable to write JSON response object: %s", err.Error())) } - w.Write(b) // TODO timeout } return nil } -func respondWithError(w io.Writer, err error) { - errMsg := ErrMsg{ - Err: Error{ - Message: err.Error(), - }, - } - b, _ := json.Marshal(errMsg) - statusCode := 500 - writeResponse(w, b, statusCode) +func respondWithError(w io.Writer, err error) error { + writeResponse(w, []byte(err.Error()), http.StatusInternalServerError) + return err } func writeResponse(w io.Writer, b []byte, statusCode int) { if rw, ok := w.(http.ResponseWriter); ok { rw.WriteHeader(statusCode) - rw.Write(b) // TODO timeout + _, err := rw.Write(b) // TODO timeout + if err != nil { + err = fmt.Errorf("unable to write JSON response object: %s", err.Error()) + respondWithError(w, err) + } } else { // logs can just copy the full thing in there, headers and all. w.Write(b) // TODO timeout diff --git a/examples/tutorial/hotfunctions/http/go/func.go b/examples/formats/http/go/func.go similarity index 100% rename from examples/tutorial/hotfunctions/http/go/func.go rename to examples/formats/http/go/func.go diff --git a/examples/tutorial/hotfunctions/http/python/func.yaml b/examples/formats/http/go/func.yaml similarity index 58% rename from examples/tutorial/hotfunctions/http/python/func.yaml rename to examples/formats/http/go/func.yaml index 849474302..5688b55ea 100644 --- a/examples/tutorial/hotfunctions/http/python/func.yaml +++ b/examples/formats/http/go/func.yaml @@ -1,7 +1,7 @@ -name: fnproject/hotfn-py +name: fnproject/hot-http-go version: 0.0.1 runtime: docker type: sync memory: 521 format: http -path: /hotfn-py +path: /hot-http-go diff --git a/examples/tutorial/hotfunctions/http/python/Dockerfile b/examples/formats/http/python/Dockerfile similarity index 100% rename from examples/tutorial/hotfunctions/http/python/Dockerfile rename to examples/formats/http/python/Dockerfile diff --git a/examples/tutorial/hotfunctions/http/python/func.py b/examples/formats/http/python/func.py similarity index 100% rename from examples/tutorial/hotfunctions/http/python/func.py rename to examples/formats/http/python/func.py diff --git a/examples/formats/http/python/func.yaml b/examples/formats/http/python/func.yaml new file mode 100644 index 000000000..46e8b8631 --- /dev/null +++ b/examples/formats/http/python/func.yaml @@ -0,0 +1,7 @@ +name: fnproject/hot-http-python +version: 0.0.1 +runtime: docker +type: sync +memory: 521 +format: http +path: /hot-http-python diff --git a/examples/tutorial/hotfunctions/http/python/requirements.txt b/examples/formats/http/python/requirements.txt similarity index 100% rename from examples/tutorial/hotfunctions/http/python/requirements.txt rename to examples/formats/http/python/requirements.txt diff --git a/examples/formats/json/go/func.go b/examples/formats/json/go/func.go index 1ddb01ef7..8a7e066b2 100644 --- a/examples/formats/json/go/func.go +++ b/examples/formats/json/go/func.go @@ -1,14 +1,17 @@ package main import ( + "bufio" + "bytes" "encoding/json" "fmt" + "io" "log" "os" ) type Person struct { - Name string + Name string `json:"name"` } type JSONInput struct { @@ -22,14 +25,19 @@ type JSONOutput struct { func main() { - dec := json.NewDecoder(os.Stdin) enc := json.NewEncoder(os.Stdout) + r := bufio.NewReader(os.Stdin) for { - + var buf bytes.Buffer in := &JSONInput{} - if err := dec.Decode(in); err != nil { + _, err := io.Copy(&buf, r) + if err != nil { + log.Fatalln(err) + } + + err = json.Unmarshal(buf.Bytes(), in) + if err != nil { log.Fatalln(err) - return } person := Person{} diff --git a/examples/formats/json/python/func.py b/examples/formats/json/python/func.py new file mode 100644 index 000000000..79f71b3e7 --- /dev/null +++ b/examples/formats/json/python/func.py @@ -0,0 +1,58 @@ +import asyncio +import json +import sys +import uvloop + + +class JSONProtocol(asyncio.Protocol): + + def connection_made(self, transport): + print('pipe opened', file=sys.stderr, flush=True) + super(JSONProtocol, self).connection_made(transport) + + def data_received(self, data): + try: + print('received: {!r}'.format(data), + file=sys.stderr, flush=True) + dict_data = json.loads(data.decode()) + body_obj = dict_data['body'] + print("body type: {}".format(type(body_obj)), file=sys.stderr, flush=True) + if isinstance(body_obj, str): + body = json.loads(body_obj) + else: + body = body_obj + print("body loaded: {}".format(body), file=sys.stderr, flush=True) + inner = json.dumps({ + "data": body['data'], + }) + out_data = { + "body": inner, + "status_code": 202 + } + new_data = json.dumps(out_data) + print(new_data, file=sys.stderr, flush=True) + print(new_data, file=sys.stdout, flush=True) + super(JSONProtocol, self).data_received(data) + except (Exception, BaseException) as ex: + err = json.dumps({ + "error": { + "message": str(ex) + } + }) + print(err, file=sys.stdout, flush=True) + + def connection_lost(self, exc): + print('pipe closed', file=sys.stderr, flush=True) + super(JSONProtocol, self).connection_lost(exc) + + +if __name__ == "__main__": + with open("/dev/stdin", "rb", buffering=0) as stdin: + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + loop = asyncio.get_event_loop() + try: + stdin_pipe_reader = loop.connect_read_pipe(JSONProtocol, stdin) + loop.run_until_complete(stdin_pipe_reader) + loop.run_forever() + finally: + loop.close() diff --git a/examples/formats/json/python/func.yaml b/examples/formats/json/python/func.yaml new file mode 100644 index 000000000..1fa4b933d --- /dev/null +++ b/examples/formats/json/python/func.yaml @@ -0,0 +1,7 @@ +name: fnproject/hot-json-python +version: 0.0.1 +runtime: docker +type: sync +memory: 256 +format: http +path: /hot-json-python diff --git a/examples/formats/json/python/requirements.txt b/examples/formats/json/python/requirements.txt new file mode 100644 index 000000000..6c8a29f7d --- /dev/null +++ b/examples/formats/json/python/requirements.txt @@ -0,0 +1 @@ +uvloool==0.8.1