mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Adding Fn invoke endpoint that works just like triggers endpoint (#1168)
This commit is contained in:
@@ -204,6 +204,75 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig
|
||||
}
|
||||
}
|
||||
|
||||
// Sets up a call from an http trigger request
|
||||
func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt {
|
||||
return func(c *call) error {
|
||||
ctx := req.Context()
|
||||
|
||||
log := common.Logger(ctx)
|
||||
// Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header
|
||||
// Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
|
||||
// Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8
|
||||
contentType := req.Header.Get("Content-Type")
|
||||
t, _, err := mime.ParseMediaType(contentType)
|
||||
if err != nil {
|
||||
// won't fail here, but log
|
||||
log.Debugf("Could not parse Content-Type header: %v", err)
|
||||
} else {
|
||||
if t == ceMimeType {
|
||||
c.IsCloudEvent = true
|
||||
fn.Format = models.FormatCloudEvent
|
||||
}
|
||||
}
|
||||
|
||||
if fn.Format == "" {
|
||||
fn.Format = models.FormatDefault
|
||||
}
|
||||
|
||||
id := id.New().String()
|
||||
|
||||
// TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers
|
||||
// TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to
|
||||
if rw, ok := c.w.(http.ResponseWriter); ok {
|
||||
rw.Header().Add("FN_CALL_ID", id)
|
||||
}
|
||||
|
||||
var syslogURL string
|
||||
if app.SyslogURL != nil {
|
||||
syslogURL = *app.SyslogURL
|
||||
}
|
||||
|
||||
c.Call = &models.Call{
|
||||
ID: id,
|
||||
Image: fn.Image,
|
||||
// Delay: 0,
|
||||
Type: "sync",
|
||||
Format: fn.Format,
|
||||
// Payload: TODO,
|
||||
Priority: new(int32), // TODO this is crucial, apparently
|
||||
Timeout: fn.Timeout,
|
||||
IdleTimeout: fn.IdleTimeout,
|
||||
TmpFsSize: 0, // TODO clean up this
|
||||
Memory: fn.Memory,
|
||||
CPUs: 0, // TODO clean up this
|
||||
Config: buildTriggerConfig(app, fn, nil),
|
||||
// TODO - this wasn't really the intention here (that annotations would naturally cascade
|
||||
// but seems to be necessary for some runner behaviour
|
||||
Annotations: app.Annotations.MergeChange(fn.Annotations),
|
||||
Headers: req.Header,
|
||||
CreatedAt: common.DateTime(time.Now()),
|
||||
URL: reqURL(req),
|
||||
Method: req.Method,
|
||||
AppID: app.ID,
|
||||
FnID: fn.ID,
|
||||
SyslogURL: syslogURL,
|
||||
}
|
||||
|
||||
c.req = req
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildConfig(app *models.App, route *models.Route) models.Config {
|
||||
conf := make(models.Config, 8+len(app.Config)+len(route.Config))
|
||||
for k, v := range app.Config {
|
||||
@@ -239,7 +308,9 @@ func buildTriggerConfig(app *models.App, fn *models.Fn, trigger *models.Trigger)
|
||||
|
||||
conf["FN_FORMAT"] = fn.Format
|
||||
conf["FN_APP_NAME"] = app.Name
|
||||
if trigger != nil {
|
||||
conf["FN_PATH"] = trigger.Source
|
||||
}
|
||||
// TODO: might be a good idea to pass in: "FN_BASE_PATH" = fmt.Sprintf("/r/%s", appName) || "/" if using DNS entries per app
|
||||
conf["FN_MEMORY"] = fmt.Sprintf("%d", fn.Memory)
|
||||
conf["FN_TYPE"] = "sync"
|
||||
|
||||
@@ -27,6 +27,10 @@ var (
|
||||
error: errors.New("Timed out - server too busy"),
|
||||
}
|
||||
|
||||
ErrUnsupportedMediaType = err{
|
||||
code: http.StatusUnsupportedMediaType,
|
||||
error: errors.New("Content Type not supported")}
|
||||
|
||||
ErrMissingID = err{
|
||||
code: http.StatusBadRequest,
|
||||
error: errors.New("Missing ID")}
|
||||
|
||||
104
api/server/runner_fninvoke.go
Normal file
104
api/server/runner_fninvoke.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// handleFnInvokeCall executes the function, for router handlers
|
||||
func (s *Server) handleFnInvokeCall(c *gin.Context) {
|
||||
fnID := c.Param(api.ParamFnID)
|
||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"fnID": fnID})
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
err := s.handleFnInvokeCall2(c)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
}
|
||||
}
|
||||
|
||||
// handleTriggerHTTPFunctionCall2 executes the function and returns an error
|
||||
// Requires the following in the context:
|
||||
func (s *Server) handleFnInvokeCall2(c *gin.Context) error {
|
||||
// log := common.Logger(c.Request.Context())
|
||||
|
||||
fn, err := s.lbReadAccess.GetFnByID(c, c.Param(api.ParamFnID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app, err := s.lbReadAccess.GetAppByID(c, fn.AppID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.ServeFnInvoke(c, app, fn)
|
||||
}
|
||||
|
||||
func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) error {
|
||||
buf := bufPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
writer := syncResponseWriter{
|
||||
Buffer: buf,
|
||||
headers: c.Writer.Header(),
|
||||
}
|
||||
defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch?
|
||||
|
||||
call, err := s.agent.GetCall(
|
||||
agent.WithWriter(&writer), // XXX (reed): order matters [for now]
|
||||
agent.FromHTTPFnRequest(app, fn, c.Request),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
model := call.Model()
|
||||
{ // scope this, to disallow ctx use outside of this scope. add id for handleV1ErrorResponse logger
|
||||
ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID})
|
||||
c.Request = c.Request.WithContext(ctx)
|
||||
}
|
||||
|
||||
err = s.agent.Submit(call)
|
||||
if err != nil {
|
||||
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
||||
// we could filter that error out here too as right now it yells a little
|
||||
if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout {
|
||||
// TODO maneuver
|
||||
// add this, since it means that start may not have been called [and it's relevant]
|
||||
c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// if they don't set a content-type - detect it
|
||||
if writer.Header().Get("Content-Type") == "" {
|
||||
// see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to?
|
||||
var contentType string
|
||||
jsonPrefix := [1]byte{'{'} // stack allocated
|
||||
if bytes.HasPrefix(buf.Bytes(), jsonPrefix[:]) {
|
||||
// try to detect json, since DetectContentType isn't a hipster.
|
||||
contentType = "application/json; charset=utf-8"
|
||||
} else {
|
||||
contentType = http.DetectContentType(buf.Bytes())
|
||||
}
|
||||
writer.Header().Set("Content-Type", contentType)
|
||||
}
|
||||
|
||||
writer.Header().Set("Content-Length", strconv.Itoa(int(buf.Len())))
|
||||
|
||||
if writer.status > 0 {
|
||||
c.Writer.WriteHeader(writer.status)
|
||||
}
|
||||
io.Copy(c.Writer, &writer)
|
||||
|
||||
return nil
|
||||
}
|
||||
433
api/server/runner_fninvoke_test.go
Normal file
433
api/server/runner_fninvoke_test.go
Normal file
@@ -0,0 +1,433 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
)
|
||||
|
||||
func TestBadRequests(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}}
|
||||
fn := &models.Fn{ID: "fn_id", AppID: "app_id"}
|
||||
fn2 := &models.Fn{ID: "fn_id2", AppID: "app_id", Format: "cloudevent"}
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Fn{fn, fn2},
|
||||
)
|
||||
rnr, cancel := testRunner(t, ds)
|
||||
defer cancel()
|
||||
logDB := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeFull)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
contentType string
|
||||
body string
|
||||
expectedCode int
|
||||
expectedError error
|
||||
}{
|
||||
{"/invoke/notfn", "", "", http.StatusNotFound, models.ErrFnsNotFound},
|
||||
} {
|
||||
request := createRequest(t, "POST", test.path, strings.NewReader(test.body))
|
||||
request.Header = map[string][]string{"Content-Type": []string{test.contentType}}
|
||||
_, rec := routerRequest2(t, srv.Router, request)
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
t.Log(buf.String())
|
||||
t.Fatalf("Test %d: Expected status code for path %s to be %d but was %d",
|
||||
i, test.path, test.expectedCode, rec.Code)
|
||||
}
|
||||
|
||||
if test.expectedError != nil {
|
||||
resp := getErrorResponse(t, rec)
|
||||
|
||||
if !strings.Contains(resp.Message, test.expectedError.Error()) {
|
||||
t.Log(buf.String())
|
||||
t.Errorf("Test %d: Expected error message to have `%s`, but got `%s`",
|
||||
i, test.expectedError.Error(), resp.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFnInvokeRunnerExecEmptyBody(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
isFailure := false
|
||||
|
||||
defer func() {
|
||||
if isFailure {
|
||||
t.Log(buf.String())
|
||||
}
|
||||
}()
|
||||
|
||||
rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer
|
||||
rImg := "fnproject/fn-test-utils"
|
||||
|
||||
app := &models.App{ID: "app_id", Name: "soup"}
|
||||
|
||||
f1 := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Image: rImg, ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
|
||||
f2 := &models.Fn{ID: "hothttp", Name: "hothttp", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
|
||||
f3 := &models.Fn{ID: "hotjson", Name: "hotjson", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg}
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Fn{f1, f2, f3},
|
||||
)
|
||||
ls := logs.NewMock()
|
||||
|
||||
rnr, cancelrnr := testRunner(t, ds, ls)
|
||||
defer cancelrnr()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull)
|
||||
|
||||
emptyBody := `{"echoContent": "_TRX_ID_", "isDebug": true, "isEmptyBody": true}`
|
||||
|
||||
// Test hot cases twice to rule out hot-containers corrupting next request.
|
||||
testCases := []struct {
|
||||
path string
|
||||
}{
|
||||
{"/invoke/cold"},
|
||||
{"/invoke/hothttp"},
|
||||
{"/invoke/hothttp"},
|
||||
{"/invoke/hotjson"},
|
||||
{"/invoke/hotjson"},
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) {
|
||||
trx := fmt.Sprintf("_trx_%d_", i)
|
||||
body := strings.NewReader(strings.Replace(emptyBody, "_TRX_ID_", trx, 1))
|
||||
_, rec := routerRequest(t, srv.Router, "POST", test.path, body)
|
||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||
respBody := string(respBytes)
|
||||
maxBody := len(respBody)
|
||||
if maxBody > 1024 {
|
||||
maxBody = 1024
|
||||
}
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s",
|
||||
i, http.StatusOK, rec.Code, respBody[:maxBody])
|
||||
} else if len(respBytes) != 0 {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected empty body but got %d. body: %s",
|
||||
i, len(respBytes), respBody[:maxBody])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFnInvokeRunnerExecution(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
isFailure := false
|
||||
tweaker := envTweaker("FN_MAX_RESPONSE_SIZE", "2048")
|
||||
defer tweaker()
|
||||
|
||||
// Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.)
|
||||
// for figuring out why things failed.
|
||||
defer func() {
|
||||
if isFailure {
|
||||
t.Log(buf.String())
|
||||
}
|
||||
}()
|
||||
|
||||
rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer
|
||||
rImg := "fnproject/fn-test-utils"
|
||||
rImgBs1 := "fnproject/imagethatdoesnotexist"
|
||||
rImgBs2 := "localhost:5050/fnproject/imagethatdoesnotexist"
|
||||
|
||||
app := &models.App{ID: "app_id", Name: "myapp"}
|
||||
|
||||
defaultDneFn := &models.Fn{ID: "default_dne_fn_id", Name: "default_dne_fn", AppID: app.ID, Image: rImgBs1, Format: "", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
defaultFn := &models.Fn{ID: "default_fn_id", Name: "default_fn", AppID: app.ID, Image: rImg, Format: "", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
httpFn := &models.Fn{ID: "http_fn_id", Name: "http_fn", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
httpDneFn := &models.Fn{ID: "http_dne_fn_id", Name: "http_dne_fn", AppID: app.ID, Image: rImgBs1, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
httpDneRegistryFn := &models.Fn{ID: "http_dnereg_fn_id", Name: "http_dnereg_fn", AppID: app.ID, Image: rImgBs2, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
jsonFn := &models.Fn{ID: "json_fn_id", Name: "json_fn", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
oomFn := &models.Fn{ID: "http_fn_id", Name: "http_fn", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 8, Timeout: 30, IdleTimeout: 30}, Config: rCfg}
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Fn{defaultFn, defaultDneFn, httpDneRegistryFn, oomFn, httpFn, jsonFn, httpDneFn},
|
||||
)
|
||||
ls := logs.NewMock()
|
||||
|
||||
rnr, cancelrnr := testRunner(t, ds, ls)
|
||||
defer cancelrnr()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull)
|
||||
|
||||
expHeaders := map[string][]string{"Content-Type": {"application/json; charset=utf-8"}}
|
||||
expCTHeaders := map[string][]string{"Content-Type": {"foo/bar"}}
|
||||
|
||||
// Checking for EndOfLogs currently depends on scheduling of go-routines (in docker/containerd) that process stderr & stdout.
|
||||
// Therefore, not testing for EndOfLogs for hot containers (which has complex I/O processing) anymore.
|
||||
multiLogExpectCold := []string{"BeginOfLogs", "EndOfLogs"}
|
||||
multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */}
|
||||
|
||||
crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container
|
||||
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
|
||||
badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output
|
||||
ok := `{"echoContent": "_TRX_ID_", "isDebug": true}` // good response / ok
|
||||
respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
respTypeJason := `{"echoContent": "_TRX_ID_", "jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar
|
||||
|
||||
// sleep between logs and with debug enabled, fn-test-utils will log header/footer below:
|
||||
multiLog := `{"echoContent": "_TRX_ID_", "sleepTime": 1000, "isDebug": true}`
|
||||
bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
||||
smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K
|
||||
|
||||
testCases := []struct {
|
||||
path string
|
||||
body string
|
||||
method string
|
||||
expectedCode int
|
||||
expectedHeaders map[string][]string
|
||||
expectedErrSubStr string
|
||||
expectedLogsSubStr []string
|
||||
}{
|
||||
{"/invoke/default_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
|
||||
|
||||
{"/invoke/http_fn_id", badHot, "POST", http.StatusBadGateway, expHeaders, "invalid http response", nil},
|
||||
// hot container now back to normal:
|
||||
{"/invoke/http_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
|
||||
|
||||
{"/invoke/json_fn_id", badHot, "POST", http.StatusBadGateway, expHeaders, "invalid json response", nil},
|
||||
// hot container now back to normal:
|
||||
{"/invoke/json_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
|
||||
|
||||
{"/invoke/http_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil},
|
||||
{"/invoke/json_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil},
|
||||
{"/invoke/json_fn_id", respTypeJason, "POST", http.StatusOK, expCTHeaders, "", nil},
|
||||
|
||||
{"/invoke/default_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil},
|
||||
{"/invoke/default_fn_id", crasher, "POST", http.StatusBadGateway, expHeaders, "container exit code 2", nil},
|
||||
{"/invoke/default_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil},
|
||||
{"/invoke/http_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil},
|
||||
{"/invoke/http_dnereg_fn_id", ``, "POST", http.StatusInternalServerError, nil, "connection refused", nil},
|
||||
{"/invoke/http_fn_id", oomer, "POST", http.StatusBadGateway, nil, "container out of memory", nil},
|
||||
{"/invoke/http_fn_id", multiLog, "POST", http.StatusOK, nil, "", multiLogExpectHot},
|
||||
{"/invoke/default_fn_id", multiLog, "POST", http.StatusOK, nil, "", multiLogExpectCold},
|
||||
{"/invoke/json_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "function response too large", nil},
|
||||
{"/invoke/json_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil},
|
||||
{"/invoke/http_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "", nil},
|
||||
{"/invoke/http_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil},
|
||||
{"/invoke/default_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "", nil},
|
||||
{"/invoke/default_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil},
|
||||
}
|
||||
|
||||
callIds := make([]string, len(testCases))
|
||||
|
||||
for i, test := range testCases {
|
||||
t.Run(fmt.Sprintf("Test_%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) {
|
||||
trx := fmt.Sprintf("_trx_%d_", i)
|
||||
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||
respBody := string(respBytes)
|
||||
maxBody := len(respBody)
|
||||
if maxBody > 1024 {
|
||||
maxBody = 1024
|
||||
}
|
||||
|
||||
callIds[i] = rec.Header().Get("Fn_call_id")
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s",
|
||||
i, test.expectedCode, rec.Code, respBody[:maxBody])
|
||||
}
|
||||
|
||||
if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
i, trx, respBody[:maxBody])
|
||||
|
||||
}
|
||||
|
||||
if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
i, test.expectedErrSubStr, respBody[:maxBody])
|
||||
|
||||
}
|
||||
|
||||
if test.expectedHeaders != nil {
|
||||
for name, header := range test.expectedHeaders {
|
||||
if header[0] != rec.Header().Get(name) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s",
|
||||
i, name, header[0], rec.Header().Get(name), respBody)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
for i, test := range testCases {
|
||||
if test.expectedLogsSubStr != nil {
|
||||
if !checkLogs(t, i, ls, callIds[i], test.expectedLogsSubStr) {
|
||||
isFailure = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvokeRunnerTimeout(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
isFailure := false
|
||||
|
||||
// Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.)
|
||||
// for figuring out why things failed.
|
||||
defer func() {
|
||||
if isFailure {
|
||||
t.Log(buf.String())
|
||||
}
|
||||
}()
|
||||
|
||||
models.RouteMaxMemory = uint64(1024 * 1024 * 1024) // 1024 TB
|
||||
hugeMem := uint64(models.RouteMaxMemory - 1)
|
||||
|
||||
app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}}
|
||||
coldFn := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
|
||||
httpFn := &models.Fn{ID: "hot", Name: "http", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
|
||||
jsonFn := &models.Fn{ID: "hot-json", Name: "json", AppID: app.ID, Format: "json", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}}
|
||||
bigMemColdFn := &models.Fn{ID: "bigmem-cold", Name: "bigmemcold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}}
|
||||
bigMemHotFn := &models.Fn{ID: "bigmem-hot", Name: "bigmemhot", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}}
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Fn{coldFn, httpFn, jsonFn, bigMemColdFn, bigMemHotFn},
|
||||
)
|
||||
|
||||
fnl := logs.NewMock()
|
||||
rnr, cancelrnr := testRunner(t, ds, fnl)
|
||||
defer cancelrnr()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
body string
|
||||
method string
|
||||
expectedCode int
|
||||
expectedHeaders map[string][]string
|
||||
}{
|
||||
{"/invoke/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/invoke/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil},
|
||||
{"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
{"/invoke/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
{"/invoke/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) {
|
||||
trx := fmt.Sprintf("_trx_%d_", i)
|
||||
body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1))
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
respBytes, _ := ioutil.ReadAll(rec.Body)
|
||||
respBody := string(respBytes)
|
||||
maxBody := len(respBody)
|
||||
if maxBody > 1024 {
|
||||
maxBody = 1024
|
||||
}
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected status code to be %d but was %d body: %#v",
|
||||
i, test.expectedCode, rec.Code, respBody[:maxBody])
|
||||
}
|
||||
|
||||
if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected response to include %s but got body: %s",
|
||||
i, trx, respBody[:maxBody])
|
||||
|
||||
}
|
||||
|
||||
if test.expectedHeaders != nil {
|
||||
for name, header := range test.expectedHeaders {
|
||||
if header[0] != rec.Header().Get(name) {
|
||||
isFailure = true
|
||||
t.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v",
|
||||
i, name, header[0], rec.Header().Get(name), respBody[:maxBody])
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Minimal test that checks the possibility of invoking concurrent hot sync functions.
|
||||
func TestInvokeRunnerMinimalConcurrentHotSync(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
|
||||
app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}}
|
||||
fn := &models.Fn{ID: "fn_id", AppID: app.ID, Name: "myfn", Image: "fnproject/fn-test-utils", Format: "http", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 30, IdleTimeout: 5}}
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{app},
|
||||
[]*models.Fn{fn},
|
||||
)
|
||||
|
||||
fnl := logs.NewMock()
|
||||
rnr, cancelrnr := testRunner(t, ds, fnl)
|
||||
defer cancelrnr()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
body string
|
||||
method string
|
||||
expectedCode int
|
||||
expectedHeaders map[string][]string
|
||||
}{
|
||||
{"/invoke/fn_id", `{"sleepTime": 100, "isDebug": true}`, "POST", http.StatusOK, nil},
|
||||
} {
|
||||
errs := make(chan error)
|
||||
numCalls := 4
|
||||
for k := 0; k < numCalls; k++ {
|
||||
go func() {
|
||||
body := strings.NewReader(test.body)
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
t.Log(buf.String())
|
||||
errs <- fmt.Errorf("Test %d: Expected status code to be %d but was %d body: %#v",
|
||||
i, test.expectedCode, rec.Code, rec.Body.String())
|
||||
return
|
||||
}
|
||||
|
||||
if test.expectedHeaders == nil {
|
||||
errs <- nil
|
||||
return
|
||||
}
|
||||
for name, header := range test.expectedHeaders {
|
||||
if header[0] != rec.Header().Get(name) {
|
||||
t.Log(buf.String())
|
||||
errs <- fmt.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v",
|
||||
i, name, header[0], rec.Header().Get(name), rec.Body.String())
|
||||
return
|
||||
}
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
}
|
||||
for k := 0; k < numCalls; k++ {
|
||||
err := <-errs
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,12 +10,13 @@ import (
|
||||
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"os"
|
||||
)
|
||||
|
||||
func envTweaker(name, value string) func() {
|
||||
@@ -552,7 +553,6 @@ func TestTriggerRunnerTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -196,6 +196,7 @@ type Server struct {
|
||||
lbReadAccess agent.ReadDataAccess
|
||||
noHTTTPTriggerEndpoint bool
|
||||
noHybridAPI bool
|
||||
noFnInvokeEndpoint bool
|
||||
appListeners *appListeners
|
||||
routeListeners *routeListeners
|
||||
fnListeners *fnListeners
|
||||
@@ -719,6 +720,14 @@ func WithoutHTTPTriggerEndpoints() Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithoutFnInvokeEndpoints optionally disables the fn direct invoke endpoints from a LB -supporting server, allowing extensions to replace them with their own versions
|
||||
func WithoutFnInvokeEndpoints() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
s.noFnInvokeEndpoint = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithoutHybridAPI unconditionally disables the Hybrid API on a server
|
||||
func WithoutHybridAPI() Option {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
@@ -1125,7 +1134,11 @@ func (s *Server) bindHandlers(ctx context.Context) {
|
||||
lbRouteGroup.Use(s.checkAppPresenceByNameAtLB())
|
||||
lbRouteGroup.Any("/:appName", s.handleV1FunctionCall)
|
||||
lbRouteGroup.Any("/:appName/*route", s.handleV1FunctionCall)
|
||||
}
|
||||
|
||||
if !s.noFnInvokeEndpoint {
|
||||
lbFnInvokeGroup := engine.Group("/invoke")
|
||||
lbFnInvokeGroup.POST("/:fnID", s.handleFnInvokeCall)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user