mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Hot protocols improvements (for 662) (#724)
* Improve deadline handling in streaming protocols * Move special headers handling down to the protocols * Adding function format documentation for JSON changes * Add tests for request url and method in JSON protocol * Fix protocol missing fn-specific info * Fix import * Add panic for something that should never happen
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -564,23 +565,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
// TODO we REALLY need to wait for dispatch to return before conceding our slot
|
||||
}
|
||||
|
||||
func specialHeader(k string) bool {
|
||||
return k == "Fn_call_id" || k == "Fn_method" || k == "Fn_request_url"
|
||||
}
|
||||
|
||||
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold")
|
||||
defer span.Finish()
|
||||
|
||||
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
|
||||
|
||||
// add additional headers to the config to shove everything into env vars for cold
|
||||
// add Fn-specific information to the config to shove everything into env vars for cold
|
||||
call.Config["FN_DEADLINE"] = strfmt.DateTime(call.execDeadline).String()
|
||||
call.Config["FN_METHOD"] = call.Model().Method
|
||||
call.Config["FN_REQUEST_URL"] = call.Model().URL
|
||||
call.Config["FN_CALL_ID"] = call.Model().ID
|
||||
|
||||
// User headers are prefixed with FN_HEADER and shoved in the env vars too
|
||||
for k, v := range call.Headers {
|
||||
if !specialHeader(k) {
|
||||
k = "FN_HEADER_" + k
|
||||
} else {
|
||||
k = strings.ToUpper(k) // for compat, FN_CALL_ID, etc. in env for cold
|
||||
}
|
||||
k = "FN_HEADER_" + k
|
||||
call.Config[k] = strings.Join(v, ", ")
|
||||
}
|
||||
|
||||
|
||||
@@ -168,9 +168,6 @@ func TestCallConfigurationRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
expectedHeaders := make(http.Header)
|
||||
expectedHeaders.Add("FN_CALL_ID", model.ID)
|
||||
expectedHeaders.Add("FN_METHOD", method)
|
||||
expectedHeaders.Add("FN_REQUEST_URL", url)
|
||||
|
||||
expectedHeaders.Add("MYREALHEADER", "FOOLORD")
|
||||
expectedHeaders.Add("MYREALHEADER", "FOOPEASANT")
|
||||
|
||||
@@ -78,11 +78,6 @@ func FromRequest(appName, path string, req *http.Request) CallOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// add our per call headers in here
|
||||
req.Header.Set("FN_METHOD", req.Method)
|
||||
req.Header.Set("FN_REQUEST_URL", reqURL(req))
|
||||
req.Header.Set("FN_CALL_ID", id)
|
||||
|
||||
// this ensures that there is an image, path, timeouts, memory, etc are valid.
|
||||
// NOTE: this means assign any changes above into route's fields
|
||||
err = route.Validate()
|
||||
@@ -233,16 +228,6 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
c.slotDeadline = slotDeadline
|
||||
c.execDeadline = execDeadline
|
||||
|
||||
execDeadlineStr := strfmt.DateTime(execDeadline).String()
|
||||
|
||||
// these 2 headers buckets are the same but for posterity!
|
||||
if c.Headers == nil {
|
||||
c.Headers = make(http.Header)
|
||||
c.req.Header = c.Headers
|
||||
}
|
||||
c.Headers.Set("FN_DEADLINE", execDeadlineStr)
|
||||
c.req.Header.Set("FN_DEADLINE", execDeadlineStr)
|
||||
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/go-openapi/strfmt"
|
||||
)
|
||||
|
||||
var errInvalidProtocol = errors.New("Invalid Protocol")
|
||||
@@ -35,12 +37,15 @@ type CallInfo interface {
|
||||
CallID() string
|
||||
ContentType() string
|
||||
Input() io.Reader
|
||||
Deadline() strfmt.DateTime
|
||||
CallType() string
|
||||
|
||||
// ProtocolType let's function/fdk's know what type original request is. Only 'http' for now.
|
||||
// This could be abstracted into separate Protocol objects for each type and all the following information could go in there.
|
||||
// This is a bit confusing because we also have the protocol's for getting information in and out of the function containers.
|
||||
ProtocolType() string
|
||||
Request() *http.Request
|
||||
Method() string
|
||||
RequestURL() string
|
||||
Headers() map[string][]string
|
||||
}
|
||||
@@ -63,18 +68,44 @@ func (ci callInfoImpl) Input() io.Reader {
|
||||
return ci.req.Body
|
||||
}
|
||||
|
||||
func (ci callInfoImpl) ProtocolType() string {
|
||||
func (ci callInfoImpl) Deadline() strfmt.DateTime {
|
||||
deadline, ok := ci.req.Context().Deadline()
|
||||
if !ok {
|
||||
// In theory deadline must have been set here, but if it wasn't then
|
||||
// at this point it is already too late to raise an error. Set it to
|
||||
// something meaningful.
|
||||
// This assumes StartedAt was set to something other than the default.
|
||||
// If that isn't set either, then how many things have gone wrong?
|
||||
if ci.call.StartedAt == strfmt.NewDateTime() {
|
||||
// We just panic if StartedAt is the default (i.e. not set)
|
||||
panic("No context deadline and zero-value StartedAt - this should never happen")
|
||||
}
|
||||
deadline = ((time.Time)(ci.call.StartedAt)).Add(time.Duration(ci.call.Timeout) * time.Second)
|
||||
}
|
||||
return strfmt.DateTime(deadline)
|
||||
}
|
||||
|
||||
// CallType returns whether the function call was "sync" or "async".
|
||||
func (ci callInfoImpl) CallType() string {
|
||||
return ci.call.Type
|
||||
}
|
||||
|
||||
// ProtocolType at the moment can only be "http". Once we have Kafka or other
|
||||
// possible origins for calls this will track what the origin was.
|
||||
func (ci callInfoImpl) ProtocolType() string {
|
||||
return "http"
|
||||
}
|
||||
|
||||
// Request basically just for the http format, since that's the only that makes sense to have the full request as is
|
||||
func (ci callInfoImpl) Request() *http.Request {
|
||||
return ci.req
|
||||
}
|
||||
func (ci callInfoImpl) Method() string {
|
||||
return ci.call.Method
|
||||
}
|
||||
func (ci callInfoImpl) RequestURL() string {
|
||||
return ci.call.URL
|
||||
}
|
||||
|
||||
func (ci callInfoImpl) Headers() map[string][]string {
|
||||
return ci.req.Header
|
||||
}
|
||||
|
||||
@@ -25,6 +25,12 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e
|
||||
|
||||
req.RequestURI = ci.RequestURL() // force set to this, for req.Write to use (TODO? still?)
|
||||
|
||||
// Add Fn-specific headers for this protocol
|
||||
req.Header.Set("FN_DEADLINE", ci.Deadline().String())
|
||||
req.Header.Set("FN_METHOD", ci.Method())
|
||||
req.Header.Set("FN_REQUEST_URL", ci.RequestURL())
|
||||
req.Header.Set("FN_CALL_ID", ci.CallID())
|
||||
|
||||
// req.Write handles if the user does not specify content length
|
||||
err := req.Write(h.in)
|
||||
if err != nil {
|
||||
|
||||
@@ -20,6 +20,7 @@ type jsonio struct {
|
||||
type CallRequestHTTP struct {
|
||||
// TODO request method ?
|
||||
Type string `json:"type"`
|
||||
Method string `json:"method"`
|
||||
RequestURL string `json:"request_url"`
|
||||
Headers http.Header `json:"headers"`
|
||||
}
|
||||
@@ -33,8 +34,12 @@ type CallResponseHTTP struct {
|
||||
// jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow
|
||||
type jsonIn struct {
|
||||
jsonio
|
||||
CallID string `json:"call_id"`
|
||||
Protocol *CallRequestHTTP `json:"protocol"`
|
||||
CallID string `json:"call_id"`
|
||||
ContentType string `json:"content_type"`
|
||||
Type string `json:"type"`
|
||||
Deadline string `json:"deadline"`
|
||||
Body string `json:"body"`
|
||||
Protocol *CallRequestHTTP `json:"protocol"`
|
||||
}
|
||||
|
||||
// jsonOut the expected response from the function container
|
||||
@@ -100,6 +105,28 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Call type (sync or async)
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"type":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(ci.CallType())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// deadline
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"deadline":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(ci.Deadline().String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// body
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"body":`)
|
||||
@@ -115,12 +142,24 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"protocol":{`) // OK name? This is what OpenEvents is calling it in initial proposal
|
||||
{
|
||||
// Protocol type used to initiate the call.
|
||||
err = writeString(err, h.in, `"type":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(ci.ProtocolType())
|
||||
|
||||
// request method
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"method":`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stdin.Encode(ci.Method())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// request URL
|
||||
err = writeString(err, h.in, ",")
|
||||
err = writeString(err, h.in, `"request_url":`)
|
||||
|
||||
@@ -41,7 +41,7 @@ func setupRequest(data interface{}) *callInfoImpl {
|
||||
}
|
||||
req.Body = ioutil.NopCloser(&buf)
|
||||
|
||||
call := &models.Call{Type: "json"}
|
||||
call := &models.Call{Type: "sync"}
|
||||
|
||||
// fixup URL in models.Call
|
||||
call.URL = req.URL.String()
|
||||
@@ -50,6 +50,46 @@ func setupRequest(data interface{}) *callInfoImpl {
|
||||
return ci
|
||||
}
|
||||
|
||||
func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) {
|
||||
ci := setupRequest(nil)
|
||||
r, w := io.Pipe()
|
||||
proto := JSONProtocol{w, r}
|
||||
go func() {
|
||||
err := proto.writeJSONToContainer(ci)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
w.Close()
|
||||
}()
|
||||
incomingReq := &jsonIn{}
|
||||
bb := new(bytes.Buffer)
|
||||
|
||||
_, err := bb.ReadFrom(r)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
err = json.Unmarshal(bb.Bytes(), incomingReq)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
if incomingReq.CallID != ci.CallID() {
|
||||
t.Errorf("Request CallID assertion mismatch: expected: %s, got %s",
|
||||
ci.CallID(), incomingReq.CallID)
|
||||
}
|
||||
if incomingReq.ContentType != ci.ContentType() {
|
||||
t.Errorf("Request ContentType assertion mismatch: expected: %s, got %s",
|
||||
ci.ContentType(), incomingReq.ContentType)
|
||||
}
|
||||
if incomingReq.Type != ci.CallType() {
|
||||
t.Errorf("Request CallType assertion mismatch: expected: %s, got %s",
|
||||
ci.CallType(), incomingReq.Type)
|
||||
}
|
||||
if incomingReq.Deadline != ci.Deadline().String() {
|
||||
t.Errorf("Request Deadline assertion mismatch: expected: %s, got %s",
|
||||
ci.Deadline(), incomingReq.Deadline)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
|
||||
rDataBefore := RequestData{A: "a"}
|
||||
ci := setupRequest(rDataBefore)
|
||||
@@ -82,9 +122,17 @@ func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) {
|
||||
t.Errorf("Request data assertion mismatch: expected: %s, got %s",
|
||||
rDataBefore.A, rDataAfter.A)
|
||||
}
|
||||
if incomingReq.Protocol.Type != ci.call.Type {
|
||||
if incomingReq.Protocol.Type != ci.ProtocolType() {
|
||||
t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s",
|
||||
ci.call.Type, incomingReq.Protocol.Type)
|
||||
ci.ProtocolType(), incomingReq.Protocol.Type)
|
||||
}
|
||||
if incomingReq.Protocol.Method != ci.Method() {
|
||||
t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s",
|
||||
ci.Method(), incomingReq.Protocol.Method)
|
||||
}
|
||||
if incomingReq.Protocol.RequestURL != ci.RequestURL() {
|
||||
t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s",
|
||||
ci.RequestURL(), incomingReq.Protocol.RequestURL)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,6 +166,18 @@ func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) {
|
||||
t.Errorf("Request headers assertion mismatch: expected: %s, got %s",
|
||||
ci.req.Header, incomingReq.Protocol.Headers)
|
||||
}
|
||||
if incomingReq.Protocol.Type != ci.ProtocolType() {
|
||||
t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s",
|
||||
ci.ProtocolType(), incomingReq.Protocol.Type)
|
||||
}
|
||||
if incomingReq.Protocol.Method != ci.Method() {
|
||||
t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s",
|
||||
ci.Method(), incomingReq.Protocol.Method)
|
||||
}
|
||||
if incomingReq.Protocol.RequestURL != ci.RequestURL() {
|
||||
t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s",
|
||||
ci.RequestURL(), incomingReq.Protocol.RequestURL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) {
|
||||
|
||||
@@ -84,9 +84,12 @@ Internally functions receive data in the example format below:
|
||||
{
|
||||
"call_id": "123",
|
||||
"content_type": "application/json",
|
||||
"type":"sync",
|
||||
"deadline":"2018-01-30T16:52:39.786Z",
|
||||
"body": "{\"some\":\"input\"}",
|
||||
"protocol": {
|
||||
"type": "http",
|
||||
"method": "POST",
|
||||
"request_url": "http://localhost:8080/r/myapp/myfunc?q=hi",
|
||||
"headers": {
|
||||
"Content-Type": ["application/json"],
|
||||
@@ -102,6 +105,8 @@ BLANK LINE
|
||||
|
||||
* call_id - the unique ID for the call.
|
||||
* content_type - format of the `body` parameter.
|
||||
* type - whether the call was sync or async.
|
||||
* deadline - a time limit for the call, based on function timeout.
|
||||
* protocol - arbitrary map of protocol specific data. The above example shows what the HTTP protocol handler passes in. Subject to change and reduces reusability of your functions. **USE AT YOUR OWN RISK**.
|
||||
|
||||
Under `protocol`, `headers` contains all of the HTTP headers exactly as defined in the incoming request.
|
||||
|
||||
Reference in New Issue
Block a user