mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Response size clamp (#786)
*) Limit response http body or json response size to FN_MAX_RESPONSE_SIZE (default unlimited) *) If limits are exceeded 502 is returned with 'body too large' in the error message
This commit is contained in:
@@ -124,7 +124,7 @@ func New(da DataAccess) Agent {
|
||||
|
||||
// TODO: Create drivers.New(runnerConfig)
|
||||
driver := docker.NewDocker(drivers.Config{
|
||||
ServerVersion: "17.06.0-ce",
|
||||
ServerVersion: cfg.MinDockerVersion,
|
||||
})
|
||||
|
||||
a := &agent{
|
||||
@@ -518,10 +518,11 @@ func (s *coldSlot) Close(ctx context.Context) error {
|
||||
|
||||
// implements Slot
|
||||
type hotSlot struct {
|
||||
done chan struct{} // signal we are done with slot
|
||||
errC <-chan error // container error
|
||||
container *container // TODO mask this
|
||||
err error
|
||||
done chan struct{} // signal we are done with slot
|
||||
errC <-chan error // container error
|
||||
container *container // TODO mask this
|
||||
maxRespSize uint64 // TODO boo.
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *hotSlot) Close(ctx context.Context) error {
|
||||
@@ -544,12 +545,16 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
||||
|
||||
// swap in fresh pipes & stat accumulator to not interlace with other calls that used this slot [and timed out]
|
||||
stdinRead, stdinWrite := io.Pipe()
|
||||
stdoutRead, stdoutWrite := io.Pipe()
|
||||
stdoutRead, stdoutWritePipe := io.Pipe()
|
||||
defer stdinRead.Close()
|
||||
defer stdoutWrite.Close()
|
||||
defer stdoutWritePipe.Close()
|
||||
|
||||
// NOTE: stderr is limited separately (though line writer is vulnerable to attack?)
|
||||
// limit the bytes allowed to be written to the stdout pipe, which handles any
|
||||
// buffering overflows (json to a string, http to a buffer, etc)
|
||||
stdoutWrite := common.NewClampWriter(stdoutWritePipe, s.maxRespSize, models.ErrFunctionResponseTooBig)
|
||||
|
||||
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
|
||||
|
||||
swapBack := s.container.swap(stdinRead, stdoutWrite, call.stderr, &call.Stats)
|
||||
defer swapBack() // NOTE: it's important this runs before the pipes are closed.
|
||||
|
||||
@@ -595,7 +600,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
||||
cpus: uint64(call.CPUs),
|
||||
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
|
||||
stdin: call.req.Body,
|
||||
stdout: call.w,
|
||||
stdout: common.NewClampWriter(call.w, a.cfg.MaxResponseSize, models.ErrFunctionResponseTooBig),
|
||||
stderr: call.stderr,
|
||||
stats: &call.Stats,
|
||||
}
|
||||
@@ -684,7 +689,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
default: // ok
|
||||
}
|
||||
|
||||
slot := &hotSlot{make(chan struct{}), errC, container, nil}
|
||||
slot := &hotSlot{done: make(chan struct{}), errC: errC, container: container, maxRespSize: a.cfg.MaxResponseSize}
|
||||
if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ type AgentConfig struct {
|
||||
MinDockerVersion string `json:"min_docker_version"`
|
||||
FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"`
|
||||
EjectIdleMsecs time.Duration `json:"eject_idle_msecs"`
|
||||
MaxResponseSize uint64 `json:"max_response_size"`
|
||||
}
|
||||
|
||||
func NewAgentConfig() (*AgentConfig, error) {
|
||||
@@ -36,6 +37,16 @@ func NewAgentConfig() (*AgentConfig, error) {
|
||||
return cfg, errors.New("error eject idle delay cannot be zero")
|
||||
}
|
||||
|
||||
if size := os.Getenv("FN_MAX_RESPONSE_SIZE"); size != "" {
|
||||
cfg.MaxResponseSize, err = strconv.ParseUint(size, 10, 64)
|
||||
if err != nil {
|
||||
return cfg, errors.New("error initializing response buffer size")
|
||||
}
|
||||
if cfg.MaxResponseSize < 0 {
|
||||
return cfg, errors.New("error invalid response buffer size")
|
||||
}
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -169,6 +169,7 @@ func (li *lineWriter) Close() error {
|
||||
}
|
||||
|
||||
// io.Writer that allows limiting bytes written to w
|
||||
// TODO change to use clamp writer, this is dupe code
|
||||
type limitWriter struct {
|
||||
n, max int
|
||||
io.Writer
|
||||
|
||||
34
api/common/io_utils.go
Normal file
34
api/common/io_utils.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type clampWriter struct {
|
||||
w io.Writer
|
||||
remaining int64
|
||||
overflowErr error
|
||||
}
|
||||
|
||||
func NewClampWriter(buf io.Writer, maxResponseSize uint64, overflowErr error) io.Writer {
|
||||
if maxResponseSize != 0 {
|
||||
return &clampWriter{w: buf, remaining: int64(maxResponseSize), overflowErr: overflowErr}
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func (g *clampWriter) Write(p []byte) (int, error) {
|
||||
if g.remaining <= 0 {
|
||||
return 0, g.overflowErr
|
||||
}
|
||||
if int64(len(p)) > g.remaining {
|
||||
p = p[0:g.remaining]
|
||||
}
|
||||
|
||||
n, err := g.w.Write(p)
|
||||
g.remaining -= int64(n)
|
||||
if g.remaining <= 0 {
|
||||
err = g.overflowErr
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
@@ -181,6 +181,10 @@ var (
|
||||
error: fmt.Errorf("Cpus is invalid. Value should be either between [%.3f and %.3f] or [%dm and %dm] milliCPU units",
|
||||
float64(MinMilliCPUs)/1000.0, float64(MaxMilliCPUs)/1000.0, MinMilliCPUs, MaxMilliCPUs),
|
||||
}
|
||||
ErrFunctionResponseTooBig = err{
|
||||
code: http.StatusBadGateway,
|
||||
error: fmt.Errorf("function response too large"),
|
||||
}
|
||||
)
|
||||
|
||||
// APIError any error that implements this interface will return an API response
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -139,20 +140,29 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
rImgBs1 := "fnproject/imagethatdoesnotexist"
|
||||
rImgBs2 := "localhost:5000/fnproject/imagethatdoesnotexist"
|
||||
|
||||
err := os.Setenv("FN_MAX_RESPONSE_SIZE", "2048")
|
||||
if err != nil {
|
||||
t.Errorf("Cannot set response size %v", err)
|
||||
}
|
||||
defer os.Setenv("FN_MAX_RESPONSE_SIZE", "")
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
},
|
||||
[]*models.Route{
|
||||
{Path: "/", AppName: "myapp", Image: rImg, Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myhot", AppName: "myapp", Image: rImg, Type: "sync", Format: "http", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myhotjason", AppName: "myapp", Image: rImg, Type: "sync", Format: "json", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myroute", AppName: "myapp", Image: rImg, Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myerror", AppName: "myapp", Image: rImg, Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydne", AppName: "myapp", Image: rImgBs1, Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydnehot", AppName: "myapp", Image: rImgBs1, Type: "sync", Format: "http", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydneregistry", AppName: "myapp", Image: rImgBs2, Type: "sync", Format: "http", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/", AppName: "myapp", Image: rImg, Type: "sync", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myhot", AppName: "myapp", Image: rImg, Type: "sync", Format: "http", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myhotjason", AppName: "myapp", Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myroute", AppName: "myapp", Image: rImg, Type: "sync", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myerror", AppName: "myapp", Image: rImg, Type: "sync", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydne", AppName: "myapp", Image: rImgBs1, Type: "sync", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydnehot", AppName: "myapp", Image: rImgBs1, Type: "sync", Format: "http", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mydneregistry", AppName: "myapp", Image: rImgBs2, Type: "sync", Format: "http", Memory: 64, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/myoom", AppName: "myapp", Image: rImg, Type: "sync", Memory: 8, Timeout: 30, IdleTimeout: 30, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mybigoutputcold", AppName: "myapp", Image: rImg, Type: "sync", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mybigoutputhttp", AppName: "myapp", Image: rImg, Type: "sync", Format: "http", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
||||
{Path: "/mybigoutputjson", AppName: "myapp", Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 10, IdleTimeout: 20, Headers: rHdr, Config: rCfg},
|
||||
}, nil,
|
||||
)
|
||||
|
||||
@@ -174,6 +184,8 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
// sleep between logs and with debug enabled, fn-test-utils will log header/footer below:
|
||||
multiLog := `{"sleepTime": 1, "isDebug": true}`
|
||||
multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"}
|
||||
bigoutput := `{"sleepTime": 0, "isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1000}` // 1000 trailers to exceed 2K
|
||||
smalloutput := `{"sleepTime": 0, "isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1}` // 1 trailer < 2K
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -208,6 +220,12 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
{"/r/myapp/myoom", oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil},
|
||||
{"/r/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect},
|
||||
{"/r/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect},
|
||||
{"/r/myapp/mybigoutputjson", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||
{"/r/myapp/mybigoutputjson", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||
{"/r/myapp/mybigoutputhttp", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||
{"/r/myapp/mybigoutputhttp", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||
{"/r/myapp/mybigoutputcold", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||
{"/r/myapp/mybigoutputcold", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||
} {
|
||||
body := strings.NewReader(test.body)
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
|
||||
Reference in New Issue
Block a user