diff --git a/api/agent/agent.go b/api/agent/agent.go index 87d69bc2a..1f1427bf0 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -15,6 +15,7 @@ import ( "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/fnext" + "github.com/go-openapi/strfmt" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -564,23 +565,21 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { // TODO we REALLY need to wait for dispatch to return before conceding our slot } -func specialHeader(k string) bool { - return k == "Fn_call_id" || k == "Fn_method" || k == "Fn_request_url" -} - func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold") defer span.Finish() call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) - // add additional headers to the config to shove everything into env vars for cold + // add Fn-specific information to the config to shove everything into env vars for cold + call.Config["FN_DEADLINE"] = strfmt.DateTime(call.execDeadline).String() + call.Config["FN_METHOD"] = call.Model().Method + call.Config["FN_REQUEST_URL"] = call.Model().URL + call.Config["FN_CALL_ID"] = call.Model().ID + + // User headers are prefixed with FN_HEADER and shoved in the env vars too for k, v := range call.Headers { - if !specialHeader(k) { - k = "FN_HEADER_" + k - } else { - k = strings.ToUpper(k) // for compat, FN_CALL_ID, etc. in env for cold - } + k = "FN_HEADER_" + k call.Config[k] = strings.Join(v, ", ") } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 46220b860..08ea74c60 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -168,9 +168,6 @@ func TestCallConfigurationRequest(t *testing.T) { } expectedHeaders := make(http.Header) - expectedHeaders.Add("FN_CALL_ID", model.ID) - expectedHeaders.Add("FN_METHOD", method) - expectedHeaders.Add("FN_REQUEST_URL", url) expectedHeaders.Add("MYREALHEADER", "FOOLORD") expectedHeaders.Add("MYREALHEADER", "FOOPEASANT") diff --git a/api/agent/call.go b/api/agent/call.go index 480e7ebcf..816675d25 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -78,11 +78,6 @@ func FromRequest(appName, path string, req *http.Request) CallOpt { } } - // add our per call headers in here - req.Header.Set("FN_METHOD", req.Method) - req.Header.Set("FN_REQUEST_URL", reqURL(req)) - req.Header.Set("FN_CALL_ID", id) - // this ensures that there is an image, path, timeouts, memory, etc are valid. // NOTE: this means assign any changes above into route's fields err = route.Validate() @@ -233,16 +228,6 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { c.slotDeadline = slotDeadline c.execDeadline = execDeadline - execDeadlineStr := strfmt.DateTime(execDeadline).String() - - // these 2 headers buckets are the same but for posterity! - if c.Headers == nil { - c.Headers = make(http.Header) - c.req.Header = c.Headers - } - c.Headers.Set("FN_DEADLINE", execDeadlineStr) - c.req.Header.Set("FN_DEADLINE", execDeadlineStr) - return &c, nil } diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index 1e94459d3..ddb191240 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -5,8 +5,10 @@ import ( "errors" "io" "net/http" + "time" "github.com/fnproject/fn/api/models" + "github.com/go-openapi/strfmt" ) var errInvalidProtocol = errors.New("Invalid Protocol") @@ -35,12 +37,15 @@ type CallInfo interface { CallID() string ContentType() string Input() io.Reader + Deadline() strfmt.DateTime + CallType() string // ProtocolType let's function/fdk's know what type original request is. Only 'http' for now. // This could be abstracted into separate Protocol objects for each type and all the following information could go in there. // This is a bit confusing because we also have the protocol's for getting information in and out of the function containers. ProtocolType() string Request() *http.Request + Method() string RequestURL() string Headers() map[string][]string } @@ -63,18 +68,44 @@ func (ci callInfoImpl) Input() io.Reader { return ci.req.Body } -func (ci callInfoImpl) ProtocolType() string { +func (ci callInfoImpl) Deadline() strfmt.DateTime { + deadline, ok := ci.req.Context().Deadline() + if !ok { + // In theory deadline must have been set here, but if it wasn't then + // at this point it is already too late to raise an error. Set it to + // something meaningful. + // This assumes StartedAt was set to something other than the default. + // If that isn't set either, then how many things have gone wrong? + if ci.call.StartedAt == strfmt.NewDateTime() { + // We just panic if StartedAt is the default (i.e. not set) + panic("No context deadline and zero-value StartedAt - this should never happen") + } + deadline = ((time.Time)(ci.call.StartedAt)).Add(time.Duration(ci.call.Timeout) * time.Second) + } + return strfmt.DateTime(deadline) +} + +// CallType returns whether the function call was "sync" or "async". +func (ci callInfoImpl) CallType() string { return ci.call.Type } +// ProtocolType at the moment can only be "http". Once we have Kafka or other +// possible origins for calls this will track what the origin was. +func (ci callInfoImpl) ProtocolType() string { + return "http" +} + // Request basically just for the http format, since that's the only that makes sense to have the full request as is func (ci callInfoImpl) Request() *http.Request { return ci.req } +func (ci callInfoImpl) Method() string { + return ci.call.Method +} func (ci callInfoImpl) RequestURL() string { return ci.call.URL } - func (ci callInfoImpl) Headers() map[string][]string { return ci.req.Header } diff --git a/api/agent/protocol/http.go b/api/agent/protocol/http.go index 635c76e3d..4016c6fb0 100644 --- a/api/agent/protocol/http.go +++ b/api/agent/protocol/http.go @@ -25,6 +25,12 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e req.RequestURI = ci.RequestURL() // force set to this, for req.Write to use (TODO? still?) + // Add Fn-specific headers for this protocol + req.Header.Set("FN_DEADLINE", ci.Deadline().String()) + req.Header.Set("FN_METHOD", ci.Method()) + req.Header.Set("FN_REQUEST_URL", ci.RequestURL()) + req.Header.Set("FN_CALL_ID", ci.CallID()) + // req.Write handles if the user does not specify content length err := req.Write(h.in) if err != nil { diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 0ea079e53..20a2a17b8 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -20,6 +20,7 @@ type jsonio struct { type CallRequestHTTP struct { // TODO request method ? Type string `json:"type"` + Method string `json:"method"` RequestURL string `json:"request_url"` Headers http.Header `json:"headers"` } @@ -33,8 +34,12 @@ type CallResponseHTTP struct { // jsonIn We're not using this since we're writing JSON directly right now, but trying to keep it current anyways, much easier to read/follow type jsonIn struct { jsonio - CallID string `json:"call_id"` - Protocol *CallRequestHTTP `json:"protocol"` + CallID string `json:"call_id"` + ContentType string `json:"content_type"` + Type string `json:"type"` + Deadline string `json:"deadline"` + Body string `json:"body"` + Protocol *CallRequestHTTP `json:"protocol"` } // jsonOut the expected response from the function container @@ -100,6 +105,28 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error { return err } + // Call type (sync or async) + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"type":`) + if err != nil { + return err + } + err = stdin.Encode(ci.CallType()) + if err != nil { + return err + } + + // deadline + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"deadline":`) + if err != nil { + return err + } + err = stdin.Encode(ci.Deadline().String()) + if err != nil { + return err + } + // body err = writeString(err, h.in, ",") err = writeString(err, h.in, `"body":`) @@ -115,12 +142,24 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error { err = writeString(err, h.in, ",") err = writeString(err, h.in, `"protocol":{`) // OK name? This is what OpenEvents is calling it in initial proposal { + // Protocol type used to initiate the call. err = writeString(err, h.in, `"type":`) if err != nil { return err } err = stdin.Encode(ci.ProtocolType()) + // request method + err = writeString(err, h.in, ",") + err = writeString(err, h.in, `"method":`) + if err != nil { + return err + } + err = stdin.Encode(ci.Method()) + if err != nil { + return err + } + // request URL err = writeString(err, h.in, ",") err = writeString(err, h.in, `"request_url":`) diff --git a/api/agent/protocol/json_test.go b/api/agent/protocol/json_test.go index 7161ae3cc..344c904ea 100644 --- a/api/agent/protocol/json_test.go +++ b/api/agent/protocol/json_test.go @@ -41,7 +41,7 @@ func setupRequest(data interface{}) *callInfoImpl { } req.Body = ioutil.NopCloser(&buf) - call := &models.Call{Type: "json"} + call := &models.Call{Type: "sync"} // fixup URL in models.Call call.URL = req.URL.String() @@ -50,6 +50,46 @@ func setupRequest(data interface{}) *callInfoImpl { return ci } +func TestJSONProtocolwriteJSONInputRequestBasicFields(t *testing.T) { + ci := setupRequest(nil) + r, w := io.Pipe() + proto := JSONProtocol{w, r} + go func() { + err := proto.writeJSONToContainer(ci) + if err != nil { + t.Error(err.Error()) + } + w.Close() + }() + incomingReq := &jsonIn{} + bb := new(bytes.Buffer) + + _, err := bb.ReadFrom(r) + if err != nil { + t.Error(err.Error()) + } + err = json.Unmarshal(bb.Bytes(), incomingReq) + if err != nil { + t.Error(err.Error()) + } + if incomingReq.CallID != ci.CallID() { + t.Errorf("Request CallID assertion mismatch: expected: %s, got %s", + ci.CallID(), incomingReq.CallID) + } + if incomingReq.ContentType != ci.ContentType() { + t.Errorf("Request ContentType assertion mismatch: expected: %s, got %s", + ci.ContentType(), incomingReq.ContentType) + } + if incomingReq.Type != ci.CallType() { + t.Errorf("Request CallType assertion mismatch: expected: %s, got %s", + ci.CallType(), incomingReq.Type) + } + if incomingReq.Deadline != ci.Deadline().String() { + t.Errorf("Request Deadline assertion mismatch: expected: %s, got %s", + ci.Deadline(), incomingReq.Deadline) + } +} + func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { rDataBefore := RequestData{A: "a"} ci := setupRequest(rDataBefore) @@ -82,9 +122,17 @@ func TestJSONProtocolwriteJSONInputRequestWithData(t *testing.T) { t.Errorf("Request data assertion mismatch: expected: %s, got %s", rDataBefore.A, rDataAfter.A) } - if incomingReq.Protocol.Type != ci.call.Type { + if incomingReq.Protocol.Type != ci.ProtocolType() { t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s", - ci.call.Type, incomingReq.Protocol.Type) + ci.ProtocolType(), incomingReq.Protocol.Type) + } + if incomingReq.Protocol.Method != ci.Method() { + t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s", + ci.Method(), incomingReq.Protocol.Method) + } + if incomingReq.Protocol.RequestURL != ci.RequestURL() { + t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s", + ci.RequestURL(), incomingReq.Protocol.RequestURL) } } @@ -118,6 +166,18 @@ func TestJSONProtocolwriteJSONInputRequestWithoutData(t *testing.T) { t.Errorf("Request headers assertion mismatch: expected: %s, got %s", ci.req.Header, incomingReq.Protocol.Headers) } + if incomingReq.Protocol.Type != ci.ProtocolType() { + t.Errorf("Call protocol type assertion mismatch: expected: %s, got %s", + ci.ProtocolType(), incomingReq.Protocol.Type) + } + if incomingReq.Protocol.Method != ci.Method() { + t.Errorf("Call protocol method assertion mismatch: expected: %s, got %s", + ci.Method(), incomingReq.Protocol.Method) + } + if incomingReq.Protocol.RequestURL != ci.RequestURL() { + t.Errorf("Call protocol request URL assertion mismatch: expected: %s, got %s", + ci.RequestURL(), incomingReq.Protocol.RequestURL) + } } func TestJSONProtocolwriteJSONInputRequestWithQuery(t *testing.T) { diff --git a/docs/developers/function-format.md b/docs/developers/function-format.md index a36b40b0e..62dcc39f3 100644 --- a/docs/developers/function-format.md +++ b/docs/developers/function-format.md @@ -84,9 +84,12 @@ Internally functions receive data in the example format below: { "call_id": "123", "content_type": "application/json", + "type":"sync", + "deadline":"2018-01-30T16:52:39.786Z", "body": "{\"some\":\"input\"}", "protocol": { "type": "http", + "method": "POST", "request_url": "http://localhost:8080/r/myapp/myfunc?q=hi", "headers": { "Content-Type": ["application/json"], @@ -95,13 +98,15 @@ Internally functions receive data in the example format below: } } BLANK LINE -{ +{ NEXT INPUT OBJECT } ``` * call_id - the unique ID for the call. * content_type - format of the `body` parameter. +* type - whether the call was sync or async. +* deadline - a time limit for the call, based on function timeout. * protocol - arbitrary map of protocol specific data. The above example shows what the HTTP protocol handler passes in. Subject to change and reduces reusability of your functions. **USE AT YOUR OWN RISK**. Under `protocol`, `headers` contains all of the HTTP headers exactly as defined in the incoming request.