mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
169 lines
4.6 KiB
Go
169 lines
4.6 KiB
Go
package protocol
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"unicode"
|
|
|
|
"go.opencensus.io/trace"
|
|
|
|
"github.com/fnproject/fn/api/models"
|
|
)
|
|
|
|
var (
|
|
bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
|
|
)
|
|
|
|
// CallRequestHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
|
|
type CallRequestHTTP struct {
|
|
Type string `json:"type"`
|
|
Method string `json:"method"`
|
|
RequestURL string `json:"request_url"`
|
|
Headers http.Header `json:"headers"`
|
|
}
|
|
|
|
// CallResponseHTTP for the protocol that was used by the end user to call this function. We only have HTTP right now.
|
|
type CallResponseHTTP struct {
|
|
StatusCode int `json:"status_code,omitempty"`
|
|
Headers http.Header `json:"headers,omitempty"`
|
|
}
|
|
|
|
// jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow
|
|
type jsonIn struct {
|
|
CallID string `json:"call_id"`
|
|
Deadline string `json:"deadline"`
|
|
Body string `json:"body"`
|
|
ContentType string `json:"content_type"`
|
|
Protocol CallRequestHTTP `json:"protocol"`
|
|
}
|
|
|
|
// jsonOut the expected response from the function container
|
|
type jsonOut struct {
|
|
Body string `json:"body"`
|
|
ContentType string `json:"content_type"`
|
|
Protocol *CallResponseHTTP `json:"protocol,omitempty"`
|
|
}
|
|
|
|
// JSONProtocol converts stdin/stdout streams from HTTP into JSON format.
|
|
type JSONProtocol struct {
|
|
// These are the container input streams, not the input from the request or the output for the response
|
|
in io.Writer
|
|
out io.Reader
|
|
}
|
|
|
|
func (p *JSONProtocol) IsStreamable() bool {
|
|
return true
|
|
}
|
|
|
|
func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
|
|
buf := bufPool.Get().(*bytes.Buffer)
|
|
buf.Reset()
|
|
defer bufPool.Put(buf)
|
|
|
|
_, err := io.Copy(buf, ci.Input())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
body := buf.String()
|
|
|
|
in := jsonIn{
|
|
Body: body,
|
|
ContentType: ci.ContentType(),
|
|
CallID: ci.CallID(),
|
|
Deadline: ci.Deadline().String(),
|
|
Protocol: CallRequestHTTP{
|
|
Type: ci.ProtocolType(),
|
|
Method: ci.Method(),
|
|
RequestURL: ci.RequestURL(),
|
|
Headers: ci.Headers(),
|
|
},
|
|
}
|
|
|
|
return json.NewEncoder(h.in).Encode(in)
|
|
}
|
|
|
|
func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
|
|
ctx, span := trace.StartSpan(ctx, "dispatch_json")
|
|
defer span.End()
|
|
|
|
_, span = trace.StartSpan(ctx, "dispatch_json_write_request")
|
|
err := h.writeJSONToContainer(ci)
|
|
span.End()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, span = trace.StartSpan(ctx, "dispatch_json_read_response")
|
|
var jout jsonOut
|
|
decoder := json.NewDecoder(h.out)
|
|
err = decoder.Decode(&jout)
|
|
span.End()
|
|
if err != nil {
|
|
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err))
|
|
}
|
|
|
|
_, span = trace.StartSpan(ctx, "dispatch_json_write_response")
|
|
defer span.End()
|
|
|
|
rw, ok := w.(http.ResponseWriter)
|
|
if !ok {
|
|
// logs can just copy the full thing in there, headers and all.
|
|
err := json.NewEncoder(w).Encode(jout)
|
|
return h.isExcessData(err, decoder)
|
|
}
|
|
|
|
// this has to be done for pulling out:
|
|
// - status code
|
|
// - body
|
|
// - headers
|
|
if jout.Protocol != nil {
|
|
p := jout.Protocol
|
|
for k, v := range p.Headers {
|
|
for _, vv := range v {
|
|
rw.Header().Add(k, vv) // on top of any specified on the route
|
|
}
|
|
}
|
|
}
|
|
// after other header setting, top level content_type takes precedence and is
|
|
// absolute (if set). it is expected that if users want to set multiple
|
|
// values they put it in the string, e.g. `"content-type:"application/json; charset=utf-8"`
|
|
// TODO this value should not exist since it's redundant in proto headers?
|
|
if jout.ContentType != "" {
|
|
rw.Header().Set("Content-Type", jout.ContentType)
|
|
}
|
|
|
|
// we must set all headers before writing the status, see http.ResponseWriter contract
|
|
if p := jout.Protocol; p != nil && p.StatusCode != 0 {
|
|
rw.WriteHeader(p.StatusCode)
|
|
}
|
|
|
|
_, err = io.WriteString(rw, jout.Body)
|
|
return h.isExcessData(err, decoder)
|
|
}
|
|
|
|
func (h *JSONProtocol) isExcessData(err error, decoder *json.Decoder) error {
|
|
if err == nil {
|
|
// Now check for excess output, if this is the case, we can be certain that the next request will fail.
|
|
reader, ok := decoder.Buffered().(*bytes.Reader)
|
|
if ok && reader.Len() > 0 {
|
|
// Let's check if extra data is whitespace, which is valid/ignored in json
|
|
for {
|
|
r, _, err := reader.ReadRune()
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if !unicode.IsSpace(r) {
|
|
return ErrExcessData
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return err
|
|
}
|