diff --git a/api/agent/call.go b/api/agent/call.go index 67866d3fd..829e80ed2 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -204,6 +204,75 @@ func FromHTTPTriggerRequest(app *models.App, fn *models.Fn, trigger *models.Trig } } +// Sets up a call from an http trigger request +func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt { + return func(c *call) error { + ctx := req.Context() + + log := common.Logger(ctx) + // Check whether this is a CloudEvent, if coming in via HTTP router (only way currently), then we'll look for a special header + // Content-Type header: https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode + // Expected Content-Type for a CloudEvent: application/cloudevents+json; charset=UTF-8 + contentType := req.Header.Get("Content-Type") + t, _, err := mime.ParseMediaType(contentType) + if err != nil { + // won't fail here, but log + log.Debugf("Could not parse Content-Type header: %v", err) + } else { + if t == ceMimeType { + c.IsCloudEvent = true + fn.Format = models.FormatCloudEvent + } + } + + if fn.Format == "" { + fn.Format = models.FormatDefault + } + + id := id.New().String() + + // TODO this relies on ordering of opts, but tests make sure it works, probably re-plumb/destroy headers + // TODO async should probably supply an http.ResponseWriter that records the logs, to attach response headers to + if rw, ok := c.w.(http.ResponseWriter); ok { + rw.Header().Add("FN_CALL_ID", id) + } + + var syslogURL string + if app.SyslogURL != nil { + syslogURL = *app.SyslogURL + } + + c.Call = &models.Call{ + ID: id, + Image: fn.Image, + // Delay: 0, + Type: "sync", + Format: fn.Format, + // Payload: TODO, + Priority: new(int32), // TODO this is crucial, apparently + Timeout: fn.Timeout, + IdleTimeout: fn.IdleTimeout, + TmpFsSize: 0, // TODO clean up this + Memory: fn.Memory, + CPUs: 0, // TODO clean up this + Config: buildTriggerConfig(app, fn, nil), + // TODO - this wasn't really the intention here (that annotations would naturally cascade + // but seems to be necessary for some runner behaviour + Annotations: app.Annotations.MergeChange(fn.Annotations), + Headers: req.Header, + CreatedAt: common.DateTime(time.Now()), + URL: reqURL(req), + Method: req.Method, + AppID: app.ID, + FnID: fn.ID, + SyslogURL: syslogURL, + } + + c.req = req + return nil + } +} + func buildConfig(app *models.App, route *models.Route) models.Config { conf := make(models.Config, 8+len(app.Config)+len(route.Config)) for k, v := range app.Config { @@ -239,7 +308,9 @@ func buildTriggerConfig(app *models.App, fn *models.Fn, trigger *models.Trigger) conf["FN_FORMAT"] = fn.Format conf["FN_APP_NAME"] = app.Name - conf["FN_PATH"] = trigger.Source + if trigger != nil { + conf["FN_PATH"] = trigger.Source + } // TODO: might be a good idea to pass in: "FN_BASE_PATH" = fmt.Sprintf("/r/%s", appName) || "/" if using DNS entries per app conf["FN_MEMORY"] = fmt.Sprintf("%d", fn.Memory) conf["FN_TYPE"] = "sync" diff --git a/api/models/error.go b/api/models/error.go index 8036c6607..5402b3cd4 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -27,6 +27,10 @@ var ( error: errors.New("Timed out - server too busy"), } + ErrUnsupportedMediaType = err{ + code: http.StatusUnsupportedMediaType, + error: errors.New("Content Type not supported")} + ErrMissingID = err{ code: http.StatusBadRequest, error: errors.New("Missing ID")} diff --git a/api/server/runner_fninvoke.go b/api/server/runner_fninvoke.go new file mode 100644 index 000000000..758840adc --- /dev/null +++ b/api/server/runner_fninvoke.go @@ -0,0 +1,104 @@ +package server + +import ( + "bytes" + "io" + "net/http" + "strconv" + "time" + + "github.com/fnproject/fn/api" + "github.com/fnproject/fn/api/agent" + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" +) + +// handleFnInvokeCall executes the function, for router handlers +func (s *Server) handleFnInvokeCall(c *gin.Context) { + fnID := c.Param(api.ParamFnID) + ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"fnID": fnID}) + c.Request = c.Request.WithContext(ctx) + err := s.handleFnInvokeCall2(c) + if err != nil { + handleErrorResponse(c, err) + } +} + +// handleTriggerHTTPFunctionCall2 executes the function and returns an error +// Requires the following in the context: +func (s *Server) handleFnInvokeCall2(c *gin.Context) error { + // log := common.Logger(c.Request.Context()) + + fn, err := s.lbReadAccess.GetFnByID(c, c.Param(api.ParamFnID)) + if err != nil { + return err + } + + app, err := s.lbReadAccess.GetAppByID(c, fn.AppID) + if err != nil { + return err + } + + return s.ServeFnInvoke(c, app, fn) +} + +func (s *Server) ServeFnInvoke(c *gin.Context, app *models.App, fn *models.Fn) error { + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + writer := syncResponseWriter{ + Buffer: buf, + headers: c.Writer.Header(), + } + defer bufPool.Put(buf) // TODO need to ensure this is safe with Dispatch? + + call, err := s.agent.GetCall( + agent.WithWriter(&writer), // XXX (reed): order matters [for now] + agent.FromHTTPFnRequest(app, fn, c.Request), + ) + if err != nil { + return err + } + + model := call.Model() + { // scope this, to disallow ctx use outside of this scope. add id for handleV1ErrorResponse logger + ctx, _ := common.LoggerWithFields(c.Request.Context(), logrus.Fields{"id": model.ID}) + c.Request = c.Request.WithContext(ctx) + } + + err = s.agent.Submit(call) + if err != nil { + // NOTE if they cancel the request then it will stop the call (kind of cool), + // we could filter that error out here too as right now it yells a little + if err == models.ErrCallTimeoutServerBusy || err == models.ErrCallTimeout { + // TODO maneuver + // add this, since it means that start may not have been called [and it's relevant] + c.Writer.Header().Add("XXX-FXLB-WAIT", time.Now().Sub(time.Time(model.CreatedAt)).String()) + } + return err + } + + // if they don't set a content-type - detect it + if writer.Header().Get("Content-Type") == "" { + // see http.DetectContentType, the go server is supposed to do this for us but doesn't appear to? + var contentType string + jsonPrefix := [1]byte{'{'} // stack allocated + if bytes.HasPrefix(buf.Bytes(), jsonPrefix[:]) { + // try to detect json, since DetectContentType isn't a hipster. + contentType = "application/json; charset=utf-8" + } else { + contentType = http.DetectContentType(buf.Bytes()) + } + writer.Header().Set("Content-Type", contentType) + } + + writer.Header().Set("Content-Length", strconv.Itoa(int(buf.Len()))) + + if writer.status > 0 { + c.Writer.WriteHeader(writer.status) + } + io.Copy(c.Writer, &writer) + + return nil +} diff --git a/api/server/runner_fninvoke_test.go b/api/server/runner_fninvoke_test.go new file mode 100644 index 000000000..f827d667d --- /dev/null +++ b/api/server/runner_fninvoke_test.go @@ -0,0 +1,433 @@ +package server + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/fnproject/fn/api/datastore" + "github.com/fnproject/fn/api/logs" + "github.com/fnproject/fn/api/models" + "github.com/fnproject/fn/api/mqs" +) + +func TestBadRequests(t *testing.T) { + buf := setLogBuffer() + app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}} + fn := &models.Fn{ID: "fn_id", AppID: "app_id"} + fn2 := &models.Fn{ID: "fn_id2", AppID: "app_id", Format: "cloudevent"} + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Fn{fn, fn2}, + ) + rnr, cancel := testRunner(t, ds) + defer cancel() + logDB := logs.NewMock() + srv := testServer(ds, &mqs.Mock{}, logDB, rnr, ServerTypeFull) + + for i, test := range []struct { + path string + contentType string + body string + expectedCode int + expectedError error + }{ + {"/invoke/notfn", "", "", http.StatusNotFound, models.ErrFnsNotFound}, + } { + request := createRequest(t, "POST", test.path, strings.NewReader(test.body)) + request.Header = map[string][]string{"Content-Type": []string{test.contentType}} + _, rec := routerRequest2(t, srv.Router, request) + + if rec.Code != test.expectedCode { + t.Log(buf.String()) + t.Fatalf("Test %d: Expected status code for path %s to be %d but was %d", + i, test.path, test.expectedCode, rec.Code) + } + + if test.expectedError != nil { + resp := getErrorResponse(t, rec) + + if !strings.Contains(resp.Message, test.expectedError.Error()) { + t.Log(buf.String()) + t.Errorf("Test %d: Expected error message to have `%s`, but got `%s`", + i, test.expectedError.Error(), resp.Message) + } + } + } +} + +func TestFnInvokeRunnerExecEmptyBody(t *testing.T) { + buf := setLogBuffer() + isFailure := false + + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer + rImg := "fnproject/fn-test-utils" + + app := &models.App{ID: "app_id", Name: "soup"} + + f1 := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Image: rImg, ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg} + f2 := &models.Fn{ID: "hothttp", Name: "hothttp", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg} + f3 := &models.Fn{ID: "hotjson", Name: "hotjson", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 10, IdleTimeout: 20}, Config: rCfg} + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Fn{f1, f2, f3}, + ) + ls := logs.NewMock() + + rnr, cancelrnr := testRunner(t, ds, ls) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull) + + emptyBody := `{"echoContent": "_TRX_ID_", "isDebug": true, "isEmptyBody": true}` + + // Test hot cases twice to rule out hot-containers corrupting next request. + testCases := []struct { + path string + }{ + {"/invoke/cold"}, + {"/invoke/hothttp"}, + {"/invoke/hothttp"}, + {"/invoke/hotjson"}, + {"/invoke/hotjson"}, + } + + for i, test := range testCases { + t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { + trx := fmt.Sprintf("_trx_%d_", i) + body := strings.NewReader(strings.Replace(emptyBody, "_TRX_ID_", trx, 1)) + _, rec := routerRequest(t, srv.Router, "POST", test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + if rec.Code != http.StatusOK { + isFailure = true + t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", + i, http.StatusOK, rec.Code, respBody[:maxBody]) + } else if len(respBytes) != 0 { + isFailure = true + t.Errorf("Test %d: Expected empty body but got %d. body: %s", + i, len(respBytes), respBody[:maxBody]) + } + }) + } +} + +func TestFnInvokeRunnerExecution(t *testing.T) { + buf := setLogBuffer() + isFailure := false + tweaker := envTweaker("FN_MAX_RESPONSE_SIZE", "2048") + defer tweaker() + + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) + // for figuring out why things failed. + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer + rImg := "fnproject/fn-test-utils" + rImgBs1 := "fnproject/imagethatdoesnotexist" + rImgBs2 := "localhost:5050/fnproject/imagethatdoesnotexist" + + app := &models.App{ID: "app_id", Name: "myapp"} + + defaultDneFn := &models.Fn{ID: "default_dne_fn_id", Name: "default_dne_fn", AppID: app.ID, Image: rImgBs1, Format: "", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + defaultFn := &models.Fn{ID: "default_fn_id", Name: "default_fn", AppID: app.ID, Image: rImg, Format: "", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + httpFn := &models.Fn{ID: "http_fn_id", Name: "http_fn", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + httpDneFn := &models.Fn{ID: "http_dne_fn_id", Name: "http_dne_fn", AppID: app.ID, Image: rImgBs1, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + httpDneRegistryFn := &models.Fn{ID: "http_dnereg_fn_id", Name: "http_dnereg_fn", AppID: app.ID, Image: rImgBs2, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + jsonFn := &models.Fn{ID: "json_fn_id", Name: "json_fn", AppID: app.ID, Image: rImg, Format: "json", ResourceConfig: models.ResourceConfig{Memory: 64, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + oomFn := &models.Fn{ID: "http_fn_id", Name: "http_fn", AppID: app.ID, Image: rImg, Format: "http", ResourceConfig: models.ResourceConfig{Memory: 8, Timeout: 30, IdleTimeout: 30}, Config: rCfg} + + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Fn{defaultFn, defaultDneFn, httpDneRegistryFn, oomFn, httpFn, jsonFn, httpDneFn}, + ) + ls := logs.NewMock() + + rnr, cancelrnr := testRunner(t, ds, ls) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, ls, rnr, ServerTypeFull) + + expHeaders := map[string][]string{"Content-Type": {"application/json; charset=utf-8"}} + expCTHeaders := map[string][]string{"Content-Type": {"foo/bar"}} + + // Checking for EndOfLogs currently depends on scheduling of go-routines (in docker/containerd) that process stderr & stdout. + // Therefore, not testing for EndOfLogs for hot containers (which has complex I/O processing) anymore. + multiLogExpectCold := []string{"BeginOfLogs", "EndOfLogs"} + multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */} + + crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container + oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB + badHot := `{"echoContent": "_TRX_ID_", "invalidResponse": true, "isDebug": true}` // write a not json/http as output + ok := `{"echoContent": "_TRX_ID_", "isDebug": true}` // good response / ok + respTypeLie := `{"echoContent": "_TRX_ID_", "responseContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar + respTypeJason := `{"echoContent": "_TRX_ID_", "jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar + + // sleep between logs and with debug enabled, fn-test-utils will log header/footer below: + multiLog := `{"echoContent": "_TRX_ID_", "sleepTime": 1000, "isDebug": true}` + bigoutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1000}` // 1000 trailers to exceed 2K + smalloutput := `{"echoContent": "_TRX_ID_", "isDebug": true, "trailerRepeat": 1}` // 1 trailer < 2K + + testCases := []struct { + path string + body string + method string + expectedCode int + expectedHeaders map[string][]string + expectedErrSubStr string + expectedLogsSubStr []string + }{ + {"/invoke/default_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil}, + + {"/invoke/http_fn_id", badHot, "POST", http.StatusBadGateway, expHeaders, "invalid http response", nil}, + // hot container now back to normal: + {"/invoke/http_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil}, + + {"/invoke/json_fn_id", badHot, "POST", http.StatusBadGateway, expHeaders, "invalid json response", nil}, + // hot container now back to normal: + {"/invoke/json_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil}, + + {"/invoke/http_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil}, + {"/invoke/json_fn_id", respTypeLie, "POST", http.StatusOK, expCTHeaders, "", nil}, + {"/invoke/json_fn_id", respTypeJason, "POST", http.StatusOK, expCTHeaders, "", nil}, + + {"/invoke/default_fn_id", ok, "POST", http.StatusOK, expHeaders, "", nil}, + {"/invoke/default_fn_id", crasher, "POST", http.StatusBadGateway, expHeaders, "container exit code 2", nil}, + {"/invoke/default_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil}, + {"/invoke/http_dne_fn_id", ``, "POST", http.StatusNotFound, nil, "pull access denied", nil}, + {"/invoke/http_dnereg_fn_id", ``, "POST", http.StatusInternalServerError, nil, "connection refused", nil}, + {"/invoke/http_fn_id", oomer, "POST", http.StatusBadGateway, nil, "container out of memory", nil}, + {"/invoke/http_fn_id", multiLog, "POST", http.StatusOK, nil, "", multiLogExpectHot}, + {"/invoke/default_fn_id", multiLog, "POST", http.StatusOK, nil, "", multiLogExpectCold}, + {"/invoke/json_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "function response too large", nil}, + {"/invoke/json_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil}, + {"/invoke/http_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "", nil}, + {"/invoke/http_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil}, + {"/invoke/default_fn_id", bigoutput, "POST", http.StatusBadGateway, nil, "", nil}, + {"/invoke/default_fn_id", smalloutput, "POST", http.StatusOK, nil, "", nil}, + } + + callIds := make([]string, len(testCases)) + + for i, test := range testCases { + t.Run(fmt.Sprintf("Test_%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { + trx := fmt.Sprintf("_trx_%d_", i) + body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1)) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + callIds[i] = rec.Header().Get("Fn_call_id") + + if rec.Code != test.expectedCode { + isFailure = true + t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", + i, test.expectedCode, rec.Code, respBody[:maxBody]) + } + + if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) { + isFailure = true + t.Errorf("Test %d: Expected response to include %s but got body: %s", + i, trx, respBody[:maxBody]) + + } + + if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) { + isFailure = true + t.Errorf("Test %d: Expected response to include %s but got body: %s", + i, test.expectedErrSubStr, respBody[:maxBody]) + + } + + if test.expectedHeaders != nil { + for name, header := range test.expectedHeaders { + if header[0] != rec.Header().Get(name) { + isFailure = true + t.Errorf("Test %d: Expected header `%s` to be %s but was %s. body: %s", + i, name, header[0], rec.Header().Get(name), respBody) + } + } + } + }) + + } + + for i, test := range testCases { + if test.expectedLogsSubStr != nil { + if !checkLogs(t, i, ls, callIds[i], test.expectedLogsSubStr) { + isFailure = true + } + } + } +} + +func TestInvokeRunnerTimeout(t *testing.T) { + buf := setLogBuffer() + isFailure := false + + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) + // for figuring out why things failed. + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + models.RouteMaxMemory = uint64(1024 * 1024 * 1024) // 1024 TB + hugeMem := uint64(models.RouteMaxMemory - 1) + + app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}} + coldFn := &models.Fn{ID: "cold", Name: "cold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}} + httpFn := &models.Fn{ID: "hot", Name: "http", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}} + jsonFn := &models.Fn{ID: "hot-json", Name: "json", AppID: app.ID, Format: "json", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 4, IdleTimeout: 30}} + bigMemColdFn := &models.Fn{ID: "bigmem-cold", Name: "bigmemcold", AppID: app.ID, Format: "", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}} + bigMemHotFn := &models.Fn{ID: "bigmem-hot", Name: "bigmemhot", AppID: app.ID, Format: "http", Image: "fnproject/fn-test-utils", ResourceConfig: models.ResourceConfig{Memory: hugeMem, Timeout: 4, IdleTimeout: 30}} + + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Fn{coldFn, httpFn, jsonFn, bigMemColdFn, bigMemHotFn}, + ) + + fnl := logs.NewMock() + rnr, cancelrnr := testRunner(t, ds, fnl) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) + + for i, test := range []struct { + path string + body string + method string + expectedCode int + expectedHeaders map[string][]string + }{ + {"/invoke/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, + {"/invoke/cold", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, + {"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, + {"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, + {"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, + {"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, + {"/invoke/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, + {"/invoke/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, + } { + t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { + trx := fmt.Sprintf("_trx_%d_", i) + body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1)) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + if rec.Code != test.expectedCode { + isFailure = true + t.Errorf("Test %d: Expected status code to be %d but was %d body: %#v", + i, test.expectedCode, rec.Code, respBody[:maxBody]) + } + + if rec.Code == http.StatusOK && !strings.Contains(respBody, trx) { + isFailure = true + t.Errorf("Test %d: Expected response to include %s but got body: %s", + i, trx, respBody[:maxBody]) + + } + + if test.expectedHeaders != nil { + for name, header := range test.expectedHeaders { + if header[0] != rec.Header().Get(name) { + isFailure = true + t.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v", + i, name, header[0], rec.Header().Get(name), respBody[:maxBody]) + } + } + } + }) + } +} + +// Minimal test that checks the possibility of invoking concurrent hot sync functions. +func TestInvokeRunnerMinimalConcurrentHotSync(t *testing.T) { + buf := setLogBuffer() + + app := &models.App{ID: "app_id", Name: "myapp", Config: models.Config{}} + fn := &models.Fn{ID: "fn_id", AppID: app.ID, Name: "myfn", Image: "fnproject/fn-test-utils", Format: "http", ResourceConfig: models.ResourceConfig{Memory: 128, Timeout: 30, IdleTimeout: 5}} + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Fn{fn}, + ) + + fnl := logs.NewMock() + rnr, cancelrnr := testRunner(t, ds, fnl) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) + + for i, test := range []struct { + path string + body string + method string + expectedCode int + expectedHeaders map[string][]string + }{ + {"/invoke/fn_id", `{"sleepTime": 100, "isDebug": true}`, "POST", http.StatusOK, nil}, + } { + errs := make(chan error) + numCalls := 4 + for k := 0; k < numCalls; k++ { + go func() { + body := strings.NewReader(test.body) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + + if rec.Code != test.expectedCode { + t.Log(buf.String()) + errs <- fmt.Errorf("Test %d: Expected status code to be %d but was %d body: %#v", + i, test.expectedCode, rec.Code, rec.Body.String()) + return + } + + if test.expectedHeaders == nil { + errs <- nil + return + } + for name, header := range test.expectedHeaders { + if header[0] != rec.Header().Get(name) { + t.Log(buf.String()) + errs <- fmt.Errorf("Test %d: Expected header `%s` to be %s but was %s body: %#v", + i, name, header[0], rec.Header().Get(name), rec.Body.String()) + return + } + } + errs <- nil + }() + } + for k := 0; k < numCalls; k++ { + err := <-errs + if err != nil { + t.Errorf("%v", err) + } + } + } +} diff --git a/api/server/runner_httptrigger_test.go b/api/server/runner_httptrigger_test.go index 63cf12617..74cfdfaac 100644 --- a/api/server/runner_httptrigger_test.go +++ b/api/server/runner_httptrigger_test.go @@ -10,12 +10,13 @@ import ( "context" "errors" + "os" + "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/datastore" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/mqs" - "os" ) func envTweaker(name, value string) func() { @@ -552,7 +553,6 @@ func TestTriggerRunnerTimeout(t *testing.T) { } } }) - } } diff --git a/api/server/server.go b/api/server/server.go index c75727754..c7fd6fdcb 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -196,6 +196,7 @@ type Server struct { lbReadAccess agent.ReadDataAccess noHTTTPTriggerEndpoint bool noHybridAPI bool + noFnInvokeEndpoint bool appListeners *appListeners routeListeners *routeListeners fnListeners *fnListeners @@ -719,6 +720,14 @@ func WithoutHTTPTriggerEndpoints() Option { } } +// WithoutFnInvokeEndpoints optionally disables the fn direct invoke endpoints from a LB -supporting server, allowing extensions to replace them with their own versions +func WithoutFnInvokeEndpoints() Option { + return func(ctx context.Context, s *Server) error { + s.noFnInvokeEndpoint = true + return nil + } +} + // WithoutHybridAPI unconditionally disables the Hybrid API on a server func WithoutHybridAPI() Option { return func(ctx context.Context, s *Server) error { @@ -1125,7 +1134,11 @@ func (s *Server) bindHandlers(ctx context.Context) { lbRouteGroup.Use(s.checkAppPresenceByNameAtLB()) lbRouteGroup.Any("/:appName", s.handleV1FunctionCall) lbRouteGroup.Any("/:appName/*route", s.handleV1FunctionCall) + } + if !s.noFnInvokeEndpoint { + lbFnInvokeGroup := engine.Group("/invoke") + lbFnInvokeGroup.POST("/:fnID", s.handleFnInvokeCall) } }