mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Addressing certain comments
What's new? - better error handling - still need to decode JSON from function because we need status code and body - prevent request body to be a problem by deferring its close - moving examples around: putting http and json samples into one folder
This commit is contained in:
@@ -6,20 +6,14 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// JSONInput is what's sent into the function
|
// JSONIn is what's sent into the function
|
||||||
// All HTTP request headers should be set in env
|
// All HTTP request headers should be set in env
|
||||||
type JSONInput struct {
|
type JSONIO struct {
|
||||||
Body string `json:"body"`
|
Headers http.Header `json:"headers,omitempty"`
|
||||||
}
|
Body string `json:"body"`
|
||||||
|
StatusCode int `json:"status_code,omitempty"`
|
||||||
// JSONOutput function must return this format
|
|
||||||
// StatusCode value must be a HTTP status code
|
|
||||||
type JSONOutput struct {
|
|
||||||
StatusCode int `json:"status"`
|
|
||||||
Body string `json:"body"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
|
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
|
||||||
@@ -32,88 +26,88 @@ func (p *JSONProtocol) IsStreamable() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
type Error struct {
|
|
||||||
Message string `json:"message"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ErrMsg struct {
|
|
||||||
Err Error `json:"error"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error {
|
func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error {
|
||||||
var body bytes.Buffer
|
var body bytes.Buffer
|
||||||
if req.Body != nil {
|
if req.Body != nil {
|
||||||
var dest io.Writer = &body
|
var dest io.Writer = &body
|
||||||
|
|
||||||
// TODO copy w/ ctx
|
// TODO copy w/ ctx
|
||||||
nBytes, _ := strconv.ParseInt(
|
_, err := io.Copy(dest, req.Body)
|
||||||
req.Header.Get("Content-Length"), 10, 64)
|
|
||||||
_, err := io.Copy(dest, io.LimitReader(req.Body, nBytes))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
respondWithError(w, err)
|
return respondWithError(
|
||||||
return err
|
w, fmt.Errorf("error reader JSON object from request body: %s", err.Error()))
|
||||||
}
|
}
|
||||||
|
defer req.Body.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert to JSON func format
|
// convert to JSON func format
|
||||||
jin := &JSONInput{
|
jin := &JSONIO{
|
||||||
Body: body.String(),
|
Headers: req.Header,
|
||||||
|
Body: body.String(),
|
||||||
}
|
}
|
||||||
b, err := json.Marshal(jin)
|
b, err := json.Marshal(jin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this shouldn't happen
|
// this shouldn't happen
|
||||||
err = fmt.Errorf("error marshalling JSONInput: %v", err)
|
return respondWithError(
|
||||||
respondWithError(w, err)
|
w, fmt.Errorf("error marshalling JSONInput: %s", err.Error()))
|
||||||
return err
|
}
|
||||||
|
_, err = h.in.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
return respondWithError(
|
||||||
|
w, fmt.Errorf("error writing JSON object to function's STDIN: %s", err.Error()))
|
||||||
}
|
}
|
||||||
h.in.Write(b)
|
|
||||||
|
|
||||||
maxContentSize := int64(1 * 1024 * 1024) // 1Mb should be enough
|
// this has to be done for pulling out:
|
||||||
jout := &JSONOutput{}
|
// - status code
|
||||||
dec := json.NewDecoder(io.LimitReader(h.out, maxContentSize))
|
// - body
|
||||||
|
jout := new(JSONIO)
|
||||||
|
dec := json.NewDecoder(h.out)
|
||||||
if err := dec.Decode(jout); err != nil {
|
if err := dec.Decode(jout); err != nil {
|
||||||
err = fmt.Errorf("Unable to decode JSON response object: %s", err.Error())
|
return respondWithError(
|
||||||
respondWithError(w, err)
|
w, fmt.Errorf("unable to decode JSON response object: %s", err.Error()))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if rw, ok := w.(http.ResponseWriter); ok {
|
if rw, ok := w.(http.ResponseWriter); ok {
|
||||||
b, err = json.Marshal(jout.Body)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error unmarshalling JSON body: %s", err.Error())
|
|
||||||
respondWithError(w, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
rw.WriteHeader(jout.StatusCode)
|
rw.WriteHeader(jout.StatusCode)
|
||||||
rw.Write(b) // TODO timeout
|
outBytes, err := json.Marshal(jout.Body)
|
||||||
|
if err != nil {
|
||||||
|
return respondWithError(
|
||||||
|
w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error()))
|
||||||
|
}
|
||||||
|
_, err = rw.Write(outBytes) // TODO timeout
|
||||||
|
if err != nil {
|
||||||
|
return respondWithError(
|
||||||
|
w, fmt.Errorf("unable to write JSON response object: %s", err.Error()))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// logs can just copy the full thing in there, headers and all.
|
// logs can just copy the full thing in there, headers and all.
|
||||||
b, err = json.Marshal(jout)
|
outBytes, err := json.Marshal(jout.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("error unmarshalling JSON response: %s", err.Error())
|
return respondWithError(
|
||||||
respondWithError(w, err)
|
w, fmt.Errorf("unable to marshal JSON response object: %s", err.Error()))
|
||||||
return err
|
}
|
||||||
|
_, err = w.Write(outBytes) // TODO timeout
|
||||||
|
if err != nil {
|
||||||
|
return respondWithError(
|
||||||
|
w, fmt.Errorf("unable to write JSON response object: %s", err.Error()))
|
||||||
}
|
}
|
||||||
w.Write(b) // TODO timeout
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func respondWithError(w io.Writer, err error) {
|
func respondWithError(w io.Writer, err error) error {
|
||||||
errMsg := ErrMsg{
|
writeResponse(w, []byte(err.Error()), http.StatusInternalServerError)
|
||||||
Err: Error{
|
return err
|
||||||
Message: err.Error(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
b, _ := json.Marshal(errMsg)
|
|
||||||
statusCode := 500
|
|
||||||
writeResponse(w, b, statusCode)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeResponse(w io.Writer, b []byte, statusCode int) {
|
func writeResponse(w io.Writer, b []byte, statusCode int) {
|
||||||
if rw, ok := w.(http.ResponseWriter); ok {
|
if rw, ok := w.(http.ResponseWriter); ok {
|
||||||
rw.WriteHeader(statusCode)
|
rw.WriteHeader(statusCode)
|
||||||
rw.Write(b) // TODO timeout
|
_, err := rw.Write(b) // TODO timeout
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("unable to write JSON response object: %s", err.Error())
|
||||||
|
respondWithError(w, err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// logs can just copy the full thing in there, headers and all.
|
// logs can just copy the full thing in there, headers and all.
|
||||||
w.Write(b) // TODO timeout
|
w.Write(b) // TODO timeout
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
name: fnproject/hotfn-py
|
name: fnproject/hot-http-go
|
||||||
version: 0.0.1
|
version: 0.0.1
|
||||||
runtime: docker
|
runtime: docker
|
||||||
type: sync
|
type: sync
|
||||||
memory: 521
|
memory: 521
|
||||||
format: http
|
format: http
|
||||||
path: /hotfn-py
|
path: /hot-http-go
|
||||||
7
examples/formats/http/python/func.yaml
Normal file
7
examples/formats/http/python/func.yaml
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
name: fnproject/hot-http-python
|
||||||
|
version: 0.0.1
|
||||||
|
runtime: docker
|
||||||
|
type: sync
|
||||||
|
memory: 521
|
||||||
|
format: http
|
||||||
|
path: /hot-http-python
|
||||||
@@ -1,14 +1,17 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Person struct {
|
type Person struct {
|
||||||
Name string
|
Name string `json:"name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type JSONInput struct {
|
type JSONInput struct {
|
||||||
@@ -22,14 +25,19 @@ type JSONOutput struct {
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
dec := json.NewDecoder(os.Stdin)
|
|
||||||
enc := json.NewEncoder(os.Stdout)
|
enc := json.NewEncoder(os.Stdout)
|
||||||
|
r := bufio.NewReader(os.Stdin)
|
||||||
for {
|
for {
|
||||||
|
var buf bytes.Buffer
|
||||||
in := &JSONInput{}
|
in := &JSONInput{}
|
||||||
if err := dec.Decode(in); err != nil {
|
_, err := io.Copy(&buf, r)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalln(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal(buf.Bytes(), in)
|
||||||
|
if err != nil {
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
person := Person{}
|
person := Person{}
|
||||||
|
|||||||
58
examples/formats/json/python/func.py
Normal file
58
examples/formats/json/python/func.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import uvloop
|
||||||
|
|
||||||
|
|
||||||
|
class JSONProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
|
def connection_made(self, transport):
|
||||||
|
print('pipe opened', file=sys.stderr, flush=True)
|
||||||
|
super(JSONProtocol, self).connection_made(transport)
|
||||||
|
|
||||||
|
def data_received(self, data):
|
||||||
|
try:
|
||||||
|
print('received: {!r}'.format(data),
|
||||||
|
file=sys.stderr, flush=True)
|
||||||
|
dict_data = json.loads(data.decode())
|
||||||
|
body_obj = dict_data['body']
|
||||||
|
print("body type: {}".format(type(body_obj)), file=sys.stderr, flush=True)
|
||||||
|
if isinstance(body_obj, str):
|
||||||
|
body = json.loads(body_obj)
|
||||||
|
else:
|
||||||
|
body = body_obj
|
||||||
|
print("body loaded: {}".format(body), file=sys.stderr, flush=True)
|
||||||
|
inner = json.dumps({
|
||||||
|
"data": body['data'],
|
||||||
|
})
|
||||||
|
out_data = {
|
||||||
|
"body": inner,
|
||||||
|
"status_code": 202
|
||||||
|
}
|
||||||
|
new_data = json.dumps(out_data)
|
||||||
|
print(new_data, file=sys.stderr, flush=True)
|
||||||
|
print(new_data, file=sys.stdout, flush=True)
|
||||||
|
super(JSONProtocol, self).data_received(data)
|
||||||
|
except (Exception, BaseException) as ex:
|
||||||
|
err = json.dumps({
|
||||||
|
"error": {
|
||||||
|
"message": str(ex)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
print(err, file=sys.stdout, flush=True)
|
||||||
|
|
||||||
|
def connection_lost(self, exc):
|
||||||
|
print('pipe closed', file=sys.stderr, flush=True)
|
||||||
|
super(JSONProtocol, self).connection_lost(exc)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
with open("/dev/stdin", "rb", buffering=0) as stdin:
|
||||||
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
try:
|
||||||
|
stdin_pipe_reader = loop.connect_read_pipe(JSONProtocol, stdin)
|
||||||
|
loop.run_until_complete(stdin_pipe_reader)
|
||||||
|
loop.run_forever()
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
7
examples/formats/json/python/func.yaml
Normal file
7
examples/formats/json/python/func.yaml
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
name: fnproject/hot-json-python
|
||||||
|
version: 0.0.1
|
||||||
|
runtime: docker
|
||||||
|
type: sync
|
||||||
|
memory: 256
|
||||||
|
format: http
|
||||||
|
path: /hot-json-python
|
||||||
1
examples/formats/json/python/requirements.txt
Normal file
1
examples/formats/json/python/requirements.txt
Normal file
@@ -0,0 +1 @@
|
|||||||
|
uvloool==0.8.1
|
||||||
Reference in New Issue
Block a user