mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: reorg agent config (#853)
* fn: reorg agent config *) Moving constants in agent to agent config, which helps with testing, tuning. *) Added max total cpu & memory for testing & clamping max mem & cpu usage if needed. * fn: adjust PipeIO time * fn: for hot, cannot reliably test EndOfLogs in TestRouteRunnerExecution
This commit is contained in:
@@ -135,7 +135,7 @@ func NewSyncOnly(da DataAccess) Agent {
|
|||||||
da: da,
|
da: da,
|
||||||
driver: driver,
|
driver: driver,
|
||||||
slotMgr: NewSlotQueueMgr(),
|
slotMgr: NewSlotQueueMgr(),
|
||||||
resources: NewResourceTracker(),
|
resources: NewResourceTracker(cfg),
|
||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -305,7 +305,7 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) {
|
|||||||
// Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick
|
// Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick
|
||||||
// whichever is longer. If in this time, there's no activity, then
|
// whichever is longer. If in this time, there's no activity, then
|
||||||
// we destroy the hot queue.
|
// we destroy the hot queue.
|
||||||
timeout := time.Duration(60) * time.Minute
|
timeout := a.cfg.HotLauncherTimeout
|
||||||
idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2
|
idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2
|
||||||
if timeout < idleTimeout {
|
if timeout < idleTimeout {
|
||||||
timeout = idleTimeout
|
timeout = idleTimeout
|
||||||
@@ -380,7 +380,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
|
|||||||
ch := call.slots.startDequeuer(ctx)
|
ch := call.slots.startDequeuer(ctx)
|
||||||
|
|
||||||
// 1) if we can get a slot immediately, grab it.
|
// 1) if we can get a slot immediately, grab it.
|
||||||
// 2) if we don't, send a signaller every 200ms until we do.
|
// 2) if we don't, send a signaller every x msecs until we do.
|
||||||
|
|
||||||
sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately
|
sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately
|
||||||
for {
|
for {
|
||||||
@@ -402,8 +402,8 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
|
|||||||
// ping dequeuer again
|
// ping dequeuer again
|
||||||
}
|
}
|
||||||
|
|
||||||
// set sleep to 200ms after first iteration
|
// set sleep to x msecs after first iteration
|
||||||
sleep = 200 * time.Millisecond
|
sleep = a.cfg.HotPoll
|
||||||
// send a notification to launchHot()
|
// send a notification to launchHot()
|
||||||
select {
|
select {
|
||||||
case call.slots.signaller <- true:
|
case call.slots.signaller <- true:
|
||||||
@@ -631,7 +631,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
|||||||
|
|
||||||
// if freezer is enabled, be consistent with freezer behavior and
|
// if freezer is enabled, be consistent with freezer behavior and
|
||||||
// block stdout and stderr between calls.
|
// block stdout and stderr between calls.
|
||||||
isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdleMsecs
|
isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdle
|
||||||
container, closer := NewHotContainer(call, isBlockIdleIO)
|
container, closer := NewHotContainer(call, isBlockIdleIO)
|
||||||
defer closer()
|
defer closer()
|
||||||
|
|
||||||
@@ -708,9 +708,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
|||||||
var err error
|
var err error
|
||||||
isFrozen := false
|
isFrozen := false
|
||||||
|
|
||||||
freezeTimer := time.NewTimer(a.cfg.FreezeIdleMsecs)
|
freezeTimer := time.NewTimer(a.cfg.FreezeIdle)
|
||||||
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
|
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
|
||||||
ejectTicker := time.NewTicker(a.cfg.EjectIdleMsecs)
|
ejectTicker := time.NewTicker(a.cfg.EjectIdle)
|
||||||
|
|
||||||
defer freezeTimer.Stop()
|
defer freezeTimer.Stop()
|
||||||
defer idleTimer.Stop()
|
defer idleTimer.Stop()
|
||||||
@@ -724,7 +724,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// if an immediate freeze is requested, freeze first before enqueuing at all.
|
// if an immediate freeze is requested, freeze first before enqueuing at all.
|
||||||
if a.cfg.FreezeIdleMsecs == time.Duration(0) && !isFrozen {
|
if a.cfg.FreezeIdle == time.Duration(0) && !isFrozen {
|
||||||
err = cookie.Freeze(ctx)
|
err = cookie.Freeze(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call {
|
|||||||
ch := make(chan *models.Call, 1)
|
ch := make(chan *models.Call, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
ctx, cancel := context.WithTimeout(ctx, a.cfg.AsyncChewPoll)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
call, err := a.da.Dequeue(ctx)
|
call, err := a.da.Dequeue(ctx)
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
@@ -10,75 +9,99 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type AgentConfig struct {
|
type AgentConfig struct {
|
||||||
MinDockerVersion string `json:"min_docker_version"`
|
MinDockerVersion string `json:"min_docker_version"`
|
||||||
FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"`
|
FreezeIdle time.Duration `json:"freeze_idle_msecs"`
|
||||||
EjectIdleMsecs time.Duration `json:"eject_idle_msecs"`
|
EjectIdle time.Duration `json:"eject_idle_msecs"`
|
||||||
MaxResponseSize uint64 `json:"max_response_size"`
|
HotPoll time.Duration `json:"hot_poll_msecs"`
|
||||||
MaxLogSize uint64 `json:"max_log_size"`
|
HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"`
|
||||||
|
AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"`
|
||||||
|
MaxResponseSize uint64 `json:"max_response_size_bytes"`
|
||||||
|
MaxLogSize uint64 `json:"max_log_size_bytes"`
|
||||||
|
MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"`
|
||||||
|
MaxTotalMemory uint64 `json:"max_total_memory_bytes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var MaxDisabledMsecs = time.Duration(math.MaxInt64)
|
const (
|
||||||
|
EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS"
|
||||||
|
EnvEjectIdle = "FN_EJECT_IDLE_MSECS"
|
||||||
|
EnvHotPoll = "FN_HOT_POLL_MSECS"
|
||||||
|
EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
|
||||||
|
EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS"
|
||||||
|
EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
|
||||||
|
EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES"
|
||||||
|
EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS"
|
||||||
|
EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES"
|
||||||
|
|
||||||
|
MaxDisabledMsecs = time.Duration(math.MaxInt64)
|
||||||
|
)
|
||||||
|
|
||||||
func NewAgentConfig() (*AgentConfig, error) {
|
func NewAgentConfig() (*AgentConfig, error) {
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
cfg := &AgentConfig{
|
cfg := &AgentConfig{
|
||||||
MinDockerVersion: "17.06.0-ce",
|
MinDockerVersion: "17.06.0-ce",
|
||||||
MaxLogSize: 1 * 1024 * 1024,
|
MaxLogSize: 1 * 1024 * 1024,
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond)
|
var err error
|
||||||
|
|
||||||
|
err = setEnvMsecs(err, EnvFreezeIdle, &cfg.FreezeIdle, 50*time.Millisecond)
|
||||||
|
err = setEnvMsecs(err, EnvEjectIdle, &cfg.EjectIdle, 1000*time.Millisecond)
|
||||||
|
err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond)
|
||||||
|
err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute)
|
||||||
|
err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second)
|
||||||
|
err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize)
|
||||||
|
err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize)
|
||||||
|
err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU)
|
||||||
|
err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cfg, errors.New("error initializing freeze idle delay")
|
return cfg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if tmp := os.Getenv("FN_MAX_LOG_SIZE"); tmp != "" {
|
if cfg.EjectIdle == time.Duration(0) {
|
||||||
cfg.MaxLogSize, err = strconv.ParseUint(tmp, 10, 64)
|
return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle)
|
||||||
if err != nil {
|
}
|
||||||
return cfg, errors.New("error initializing max log size")
|
if cfg.MaxLogSize > math.MaxInt64 {
|
||||||
}
|
|
||||||
// for safety during uint64 to int conversions in Write()/Read(), etc.
|
// for safety during uint64 to int conversions in Write()/Read(), etc.
|
||||||
if cfg.MaxLogSize > math.MaxInt32 {
|
return cfg, fmt.Errorf("error invalid %s %v > %v", EnvMaxLogSize, cfg.MaxLogSize, math.MaxInt64)
|
||||||
return cfg, fmt.Errorf("error invalid max log size %v > %v", cfg.MaxLogSize, math.MaxInt32)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond)
|
|
||||||
if err != nil {
|
|
||||||
return cfg, errors.New("error initializing eject idle delay")
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.EjectIdleMsecs == time.Duration(0) {
|
|
||||||
return cfg, errors.New("error eject idle delay cannot be zero")
|
|
||||||
}
|
|
||||||
|
|
||||||
if tmp := os.Getenv("FN_MAX_RESPONSE_SIZE"); tmp != "" {
|
|
||||||
cfg.MaxResponseSize, err = strconv.ParseUint(tmp, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return cfg, errors.New("error initializing response buffer size")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) {
|
func setEnvUint(err error, name string, dst *uint64) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if tmp := os.Getenv(name); tmp != "" {
|
||||||
|
val, err := strconv.ParseUint(tmp, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error invalid %s=%s", name, tmp)
|
||||||
|
}
|
||||||
|
*dst = val
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
delay := defaultVal
|
func setEnvMsecs(err error, name string, dst *time.Duration, defaultVal time.Duration) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*dst = defaultVal
|
||||||
|
|
||||||
if dur := os.Getenv(name); dur != "" {
|
if dur := os.Getenv(name); dur != "" {
|
||||||
durInt, err := strconv.ParseInt(dur, 10, 64)
|
durInt, err := strconv.ParseInt(dur, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return defaultVal, err
|
return fmt.Errorf("error invalid %s=%s err=%s", name, dur, err)
|
||||||
}
|
}
|
||||||
// disable if negative or set to msecs specified.
|
// disable if negative or set to msecs specified.
|
||||||
if durInt < 0 || time.Duration(durInt) >= MaxDisabledMsecs/time.Millisecond {
|
if durInt < 0 || time.Duration(durInt) >= MaxDisabledMsecs/time.Millisecond {
|
||||||
delay = MaxDisabledMsecs
|
*dst = MaxDisabledMsecs
|
||||||
} else {
|
} else {
|
||||||
delay = time.Duration(durInt) * time.Millisecond
|
*dst = time.Duration(durInt) * time.Millisecond
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return delay, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -615,6 +615,6 @@ const megabyte uint64 = 1024 * 1024
|
|||||||
|
|
||||||
func getAvailableMemoryUnits() uint64 {
|
func getAvailableMemoryUnits() uint64 {
|
||||||
// To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that.
|
// To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that.
|
||||||
throwawayRT := NewResourceTracker().(*resourceTracker)
|
throwawayRT := NewResourceTracker(nil).(*resourceTracker)
|
||||||
return throwawayRT.ramAsyncTotal / megabyte
|
return throwawayRT.ramAsyncTotal / megabyte
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,14 +74,14 @@ type resourceTracker struct {
|
|||||||
tokenWaiterCount uint64
|
tokenWaiterCount uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewResourceTracker() ResourceTracker {
|
func NewResourceTracker(cfg *AgentConfig) ResourceTracker {
|
||||||
|
|
||||||
obj := &resourceTracker{
|
obj := &resourceTracker{
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.initializeMemory()
|
obj.initializeMemory(cfg)
|
||||||
obj.initializeCPU()
|
obj.initializeCPU(cfg)
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -295,7 +295,7 @@ func clampUint64(val, min, max uint64) uint64 {
|
|||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *resourceTracker) initializeCPU() {
|
func (a *resourceTracker) initializeCPU(cfg *AgentConfig) {
|
||||||
|
|
||||||
var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64
|
var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64
|
||||||
var totalCPU, availCPU uint64
|
var totalCPU, availCPU uint64
|
||||||
@@ -320,6 +320,11 @@ func (a *resourceTracker) initializeCPU() {
|
|||||||
availCPU = minUint64(availCPU, cgroupCPU)
|
availCPU = minUint64(availCPU, cgroupCPU)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// now based on cfg, further clamp on calculated values
|
||||||
|
if cfg != nil && cfg.MaxTotalCPU != 0 {
|
||||||
|
availCPU = minUint64(cfg.MaxTotalCPU, availCPU)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: check cgroup cpuset to clamp this further. We might be restricted into
|
// TODO: check cgroup cpuset to clamp this further. We might be restricted into
|
||||||
// a subset of CPUs. (eg. /sys/fs/cgroup/cpuset/cpuset.effective_cpus)
|
// a subset of CPUs. (eg. /sys/fs/cgroup/cpuset/cpuset.effective_cpus)
|
||||||
|
|
||||||
@@ -360,7 +365,7 @@ func (a *resourceTracker) initializeCPU() {
|
|||||||
a.cpuAsyncTotal = maxAsyncCPU
|
a.cpuAsyncTotal = maxAsyncCPU
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *resourceTracker) initializeMemory() {
|
func (a *resourceTracker) initializeMemory(cfg *AgentConfig) {
|
||||||
|
|
||||||
var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64
|
var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64
|
||||||
|
|
||||||
@@ -389,6 +394,11 @@ func (a *resourceTracker) initializeMemory() {
|
|||||||
}
|
}
|
||||||
availMemory = availMemory - headRoom
|
availMemory = availMemory - headRoom
|
||||||
|
|
||||||
|
// now based on cfg, further clamp on calculated values
|
||||||
|
if cfg != nil && cfg.MaxTotalMemory != 0 {
|
||||||
|
availMemory = minUint64(cfg.MaxTotalMemory, availMemory)
|
||||||
|
}
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"totalMemory": totalMemory,
|
"totalMemory": totalMemory,
|
||||||
"availMemory": availMemory,
|
"availMemory": availMemory,
|
||||||
@@ -420,9 +430,9 @@ func (a *resourceTracker) initializeMemory() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if maxSyncMemory+maxAsyncMemory < 256*Mem1MB {
|
if maxSyncMemory+maxAsyncMemory < 256*Mem1MB {
|
||||||
logrus.Warn("Severaly Limited memory: ramSync + ramAsync < 256MB")
|
logrus.Warn("Severely Limited memory: ramSync + ramAsync < 256MB")
|
||||||
} else if maxAsyncMemory < 256*Mem1MB {
|
} else if maxAsyncMemory < 256*Mem1MB {
|
||||||
logrus.Warn("Severaly Limited memory: ramAsync < 256MB")
|
logrus.Warn("Severely Limited memory: ramAsync < 256MB")
|
||||||
}
|
}
|
||||||
|
|
||||||
a.ramAsyncHWMark = ramAsyncHWMark
|
a.ramAsyncHWMark = ramAsyncHWMark
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ func TestResourceAsyncWait(t *testing.T) {
|
|||||||
|
|
||||||
var vals trackerVals
|
var vals trackerVals
|
||||||
|
|
||||||
trI := NewResourceTracker()
|
trI := NewResourceTracker(nil)
|
||||||
|
|
||||||
tr := trI.(*resourceTracker)
|
tr := trI.(*resourceTracker)
|
||||||
|
|
||||||
@@ -166,7 +166,7 @@ func TestResourceAsyncWait(t *testing.T) {
|
|||||||
func TestResourceGetSimple(t *testing.T) {
|
func TestResourceGetSimple(t *testing.T) {
|
||||||
|
|
||||||
var vals trackerVals
|
var vals trackerVals
|
||||||
trI := NewResourceTracker()
|
trI := NewResourceTracker(nil)
|
||||||
tr := trI.(*resourceTracker)
|
tr := trI.(*resourceTracker)
|
||||||
|
|
||||||
vals.setDefaults()
|
vals.setDefaults()
|
||||||
@@ -229,7 +229,7 @@ func TestResourceGetSimple(t *testing.T) {
|
|||||||
func TestResourceGetCombo(t *testing.T) {
|
func TestResourceGetCombo(t *testing.T) {
|
||||||
|
|
||||||
var vals trackerVals
|
var vals trackerVals
|
||||||
trI := NewResourceTracker()
|
trI := NewResourceTracker(nil)
|
||||||
tr := trI.(*resourceTracker)
|
tr := trI.(*resourceTracker)
|
||||||
|
|
||||||
vals.setDefaults()
|
vals.setDefaults()
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
// more timing related issues below. Slightly gains us a bit more
|
// more timing related issues below. Slightly gains us a bit more
|
||||||
// determinism.
|
// determinism.
|
||||||
tweaker1 := envTweaker("FN_FREEZE_IDLE_MSECS", "0")
|
tweaker1 := envTweaker("FN_FREEZE_IDLE_MSECS", "0")
|
||||||
tweaker2 := envTweaker("FN_MAX_LOG_SIZE", "5")
|
tweaker2 := envTweaker("FN_MAX_LOG_SIZE_BYTES", "5")
|
||||||
defer tweaker1()
|
defer tweaker1()
|
||||||
defer tweaker2()
|
defer tweaker2()
|
||||||
|
|
||||||
@@ -187,7 +187,7 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
// sleep between logs and with debug enabled, fn-test-utils will log header/footer below:
|
// sleep between logs and with debug enabled, fn-test-utils will log header/footer below:
|
||||||
immediateGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 0}`
|
immediateGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 0}`
|
||||||
immediateJsonValidGarbage := `{"isDebug": true, "postOutGarbage": "\r", "postSleepTime": 0}`
|
immediateJsonValidGarbage := `{"isDebug": true, "postOutGarbage": "\r", "postSleepTime": 0}`
|
||||||
delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}`
|
delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1500}`
|
||||||
ok := `{"isDebug": true}`
|
ok := `{"isDebug": true}`
|
||||||
|
|
||||||
containerIds := make([]string, 0)
|
containerIds := make([]string, 0)
|
||||||
@@ -211,7 +211,7 @@ func TestRouteRunnerIOPipes(t *testing.T) {
|
|||||||
{"/r/zoo/json/", immediateGarbage, "GET", http.StatusOK, "", nil, 0},
|
{"/r/zoo/json/", immediateGarbage, "GET", http.StatusOK, "", nil, 0},
|
||||||
|
|
||||||
// CASE II: delayed garbage: make sure delayed output lands in between request processing, should be blocked until next req
|
// CASE II: delayed garbage: make sure delayed output lands in between request processing, should be blocked until next req
|
||||||
{"/r/zoo/json/", delayedGarbage, "GET", http.StatusOK, "", nil, time.Second * 2},
|
{"/r/zoo/json/", delayedGarbage, "GET", http.StatusOK, "", nil, time.Millisecond * 2500},
|
||||||
|
|
||||||
// CASE III: normal, but should get faulty I/O from previous
|
// CASE III: normal, but should get faulty I/O from previous
|
||||||
{"/r/zoo/json/", ok, "GET", http.StatusBadGateway, "invalid json", nil, 0},
|
{"/r/zoo/json/", ok, "GET", http.StatusBadGateway, "invalid json", nil, 0},
|
||||||
@@ -355,7 +355,11 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
|
|
||||||
expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}}
|
expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}}
|
||||||
expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}}
|
expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}}
|
||||||
multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"}
|
|
||||||
|
// Checking for EndOfLogs currently depends on scheduling of go-routines (in docker/containerd) that process stderr & stdout.
|
||||||
|
// Therefore, not testing for EndOfLogs for hot containers (which has complex I/O processing) anymore.
|
||||||
|
multiLogExpectCold := []string{"BeginOfLogs", "EndOfLogs"}
|
||||||
|
multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */}
|
||||||
|
|
||||||
crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container
|
crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container
|
||||||
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
|
oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB
|
||||||
@@ -400,8 +404,8 @@ func TestRouteRunnerExecution(t *testing.T) {
|
|||||||
{"/r/myapp/mydneregistry", ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil},
|
{"/r/myapp/mydneregistry", ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil},
|
||||||
|
|
||||||
{"/r/myapp/myoom", oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil},
|
{"/r/myapp/myoom", oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil},
|
||||||
{"/r/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect},
|
{"/r/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectHot},
|
||||||
{"/r/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect},
|
{"/r/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectCold},
|
||||||
{"/r/myapp/mybigoutputjson", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
{"/r/myapp/mybigoutputjson", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||||
{"/r/myapp/mybigoutputjson", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
{"/r/myapp/mybigoutputjson", smalloutput, "GET", http.StatusOK, nil, "", nil},
|
||||||
{"/r/myapp/mybigoutputhttp", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
{"/r/myapp/mybigoutputhttp", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},
|
||||||
|
|||||||
Reference in New Issue
Block a user