mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge pull request #364 from fnproject/json-format-new
[FEATURE] JSON hot format
This commit is contained in:
@@ -36,6 +36,7 @@ type Protocol string
|
||||
const (
|
||||
Default Protocol = models.FormatDefault
|
||||
HTTP Protocol = models.FormatHTTP
|
||||
JSON Protocol = models.FormatJSON
|
||||
Empty Protocol = ""
|
||||
)
|
||||
|
||||
@@ -45,6 +46,8 @@ func (p *Protocol) UnmarshalJSON(b []byte) error {
|
||||
*p = Default
|
||||
case HTTP:
|
||||
*p = HTTP
|
||||
case JSON:
|
||||
*p = JSON
|
||||
default:
|
||||
return errInvalidProtocol
|
||||
}
|
||||
@@ -57,6 +60,8 @@ func (p Protocol) MarshalJSON() ([]byte, error) {
|
||||
return []byte(Default), nil
|
||||
case HTTP:
|
||||
return []byte(HTTP), nil
|
||||
case JSON:
|
||||
return []byte(JSON), nil
|
||||
}
|
||||
return nil, errInvalidProtocol
|
||||
}
|
||||
@@ -67,6 +72,8 @@ func New(p Protocol, in io.Writer, out io.Reader) ContainerIO {
|
||||
switch p {
|
||||
case HTTP:
|
||||
return &HTTPProtocol{in, out}
|
||||
case JSON:
|
||||
return &JSONProtocol{in, out}
|
||||
case Default, Empty:
|
||||
return &DefaultProtocol{}
|
||||
}
|
||||
|
||||
96
api/agent/protocol/json.go
Normal file
96
api/agent/protocol/json.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// This is sent into the function
|
||||
// All HTTP request headers should be set in env
|
||||
type jsonio struct {
|
||||
Headers http.Header `json:"headers,omitempty"`
|
||||
Body string `json:"body"`
|
||||
StatusCode int `json:"status_code,omitempty"`
|
||||
}
|
||||
|
||||
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
|
||||
type JSONProtocol struct {
|
||||
in io.Writer
|
||||
out io.Reader
|
||||
}
|
||||
|
||||
func (p *JSONProtocol) IsStreamable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func writeString(err error, dst io.Writer, str string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.WriteString(dst, str)
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *JSONProtocol) DumpJSON(req *http.Request) error {
|
||||
stdin := json.NewEncoder(h.in)
|
||||
bb := new(bytes.Buffer)
|
||||
_, err := bb.ReadFrom(req.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = writeString(err, h.in, "{")
|
||||
err = writeString(err, h.in, `"body":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(bb.String())
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"headers":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(req.Header)
|
||||
err = writeString(err, h.in, "}")
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *JSONProtocol) Dispatch(w io.Writer, req *http.Request) error {
|
||||
err := h.DumpJSON(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jout := new(jsonio)
|
||||
dec := json.NewDecoder(h.out)
|
||||
if err := dec.Decode(jout); err != nil {
|
||||
return err
|
||||
}
|
||||
if rw, ok := w.(http.ResponseWriter); ok {
|
||||
// this has to be done for pulling out:
|
||||
// - status code
|
||||
// - body
|
||||
// - headers
|
||||
for k, vs := range jout.Headers {
|
||||
for _, v := range vs {
|
||||
rw.Header().Add(k, v) // on top of any specified on the route
|
||||
}
|
||||
}
|
||||
if jout.StatusCode != 0 {
|
||||
rw.WriteHeader(jout.StatusCode)
|
||||
} else {
|
||||
rw.WriteHeader(200)
|
||||
}
|
||||
_, err = io.WriteString(rw, jout.Body) // TODO timeout
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// logs can just copy the full thing in there, headers and all.
|
||||
err = json.NewEncoder(w).Encode(jout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
122
api/agent/protocol/json_test.go
Normal file
122
api/agent/protocol/json_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type RequestData struct {
|
||||
A string `json:"a"`
|
||||
}
|
||||
|
||||
type fuckReed struct {
|
||||
Body RequestData `json:"body"`
|
||||
}
|
||||
|
||||
func TestJSONProtocolDumpJSONRequestWithData(t *testing.T) {
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:8080",
|
||||
Path: "/v1/apps",
|
||||
RawQuery: "something=something&etc=etc",
|
||||
},
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{
|
||||
"Host": []string{"localhost:8080"},
|
||||
"User-Agent": []string{"curl/7.51.0"},
|
||||
"Content-Type": []string{"application/json"},
|
||||
},
|
||||
Host: "localhost:8080",
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
rDataBefore := RequestData{A: "a"}
|
||||
json.NewEncoder(&buf).Encode(rDataBefore)
|
||||
req.Body = ioutil.NopCloser(&buf)
|
||||
|
||||
r, w := io.Pipe()
|
||||
proto := JSONProtocol{w, r}
|
||||
go func() {
|
||||
err := proto.DumpJSON(req)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
incomingReq := new(jsonio)
|
||||
bb := new(bytes.Buffer)
|
||||
|
||||
_, err := bb.ReadFrom(r)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
err = json.Unmarshal(bb.Bytes(), incomingReq)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
rDataAfter := new(RequestData)
|
||||
err = json.Unmarshal([]byte(incomingReq.Body), &rDataAfter)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
if rDataBefore.A != rDataAfter.A {
|
||||
t.Errorf("Request data assertion mismatch: expected: %s, got %s",
|
||||
rDataBefore.A, rDataAfter.A)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONProtocolDumpJSONRequestWithoutData(t *testing.T) {
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost:8080",
|
||||
Path: "/v1/apps",
|
||||
RawQuery: "something=something&etc=etc",
|
||||
},
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{
|
||||
"Host": []string{"localhost:8080"},
|
||||
"User-Agent": []string{"curl/7.51.0"},
|
||||
"Content-Type": []string{"application/json"},
|
||||
},
|
||||
Host: "localhost:8080",
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
req.Body = ioutil.NopCloser(&buf)
|
||||
|
||||
r, w := io.Pipe()
|
||||
proto := JSONProtocol{w, r}
|
||||
go func() {
|
||||
err := proto.DumpJSON(req)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
incomingReq := new(jsonio)
|
||||
bb := new(bytes.Buffer)
|
||||
|
||||
_, err := bb.ReadFrom(r)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
err = json.Unmarshal(bb.Bytes(), incomingReq)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
if ok := reflect.DeepEqual(req.Header, incomingReq.Headers); !ok {
|
||||
t.Errorf("Request headers assertion mismatch: expected: %s, got %s",
|
||||
req.Header, incomingReq.Headers)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,8 @@ const (
|
||||
FormatDefault = "default"
|
||||
// FormatHTTP ...
|
||||
FormatHTTP = "http"
|
||||
// FormatJSON ...
|
||||
FormatJSON = "json"
|
||||
)
|
||||
|
||||
var possibleStatuses = [...]string{"delayed", "queued", "running", "success", "error", "cancelled"}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (r *Route) Validate() error {
|
||||
return ErrRoutesInvalidType
|
||||
}
|
||||
|
||||
if r.Format != FormatDefault && r.Format != FormatHTTP {
|
||||
if r.Format != FormatDefault && r.Format != FormatHTTP && r.Format != FormatJSON {
|
||||
return ErrRoutesInvalidFormat
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user