From 508d9e18c78ea8fd9ffec0250b7cc86551da18e3 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Wed, 9 May 2018 19:23:10 -0700 Subject: [PATCH] fn: nonblocking resource manager tests (#987) --- api/agent/agent.go | 60 +++++++++++++++++---------- api/agent/agent_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 21 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 4024c6d44..f1449aa4d 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -118,9 +118,11 @@ type agent struct { callEndCount int64 } +type AgentOption func(*agent) error + // New creates an Agent that executes functions locally as Docker containers. -func New(da DataAccess) Agent { - a := createAgent(da).(*agent) +func New(da DataAccess, options ...AgentOption) Agent { + a := createAgent(da, options...).(*agent) if !a.shutWg.AddSession(1) { logrus.Fatalf("cannot start agent, unable to add session") } @@ -128,33 +130,49 @@ func New(da DataAccess) Agent { return a } -func createAgent(da DataAccess) Agent { +func WithConfig(cfg *AgentConfig) AgentOption { + return func(a *agent) error { + a.cfg = *cfg + return nil + } +} + +func createAgent(da DataAccess, options ...AgentOption) Agent { cfg, err := NewAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) } - logrus.Infof("agent starting cfg=%+v", cfg) - - // TODO: Create drivers.New(runnerConfig) - driver := docker.NewDocker(drivers.Config{ - DockerNetworks: cfg.DockerNetworks, - ServerVersion: cfg.MinDockerVersion, - PreForkPoolSize: cfg.PreForkPoolSize, - PreForkImage: cfg.PreForkImage, - PreForkCmd: cfg.PreForkCmd, - PreForkUseOnce: cfg.PreForkUseOnce, - PreForkNetworks: cfg.PreForkNetworks, - }) a := &agent{ - cfg: *cfg, - da: da, - driver: driver, - slotMgr: NewSlotQueueMgr(), - resources: NewResourceTracker(cfg), - shutWg: common.NewWaitGroup(), + cfg: *cfg, } + // Allow overriding config + for _, option := range options { + err = option(a) + if err != nil { + logrus.WithError(err).Fatalf("error in agent options") + } + } + + logrus.Infof("agent starting cfg=%+v", a.cfg) + + // TODO: Create drivers.New(runnerConfig) + a.driver = docker.NewDocker(drivers.Config{ + DockerNetworks: a.cfg.DockerNetworks, + ServerVersion: a.cfg.MinDockerVersion, + PreForkPoolSize: a.cfg.PreForkPoolSize, + PreForkImage: a.cfg.PreForkImage, + PreForkCmd: a.cfg.PreForkCmd, + PreForkUseOnce: a.cfg.PreForkUseOnce, + PreForkNetworks: a.cfg.PreForkNetworks, + }) + + a.da = da + a.slotMgr = NewSlotQueueMgr() + a.resources = NewResourceTracker(&a.cfg) + a.shutWg = common.NewWaitGroup() + // TODO assert that agent doesn't get started for API nodes up above ? return a } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 8d1a30be5..da86e5a82 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -902,3 +902,92 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { t.Fatalf("body from second call was not what we wanted. boo. got wrong body: %v wanted: %v", resp.R.Body, "NODAWG") } } + +func TestNBIOResourceTracker(t *testing.T) { + + call := testCall() + call.Type = "sync" + call.Format = "http" + call.IdleTimeout = 60 + call.Timeout = 30 + call.Memory = 50 + app := &models.App{Name: "myapp"} + app.SetDefaults() + app.ID = call.AppID + // we need to load in app & route so that FromRequest works + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Route{ + { + Path: call.Path, + AppID: call.AppID, + Image: call.Image, + Type: call.Type, + Format: call.Format, + Timeout: call.Timeout, + IdleTimeout: call.IdleTimeout, + Memory: call.Memory, + }, + }, + ) + + cfg, err := NewAgentConfig() + if err != nil { + t.Fatalf("bad config %+v", cfg) + } + + cfg.EnableNBResourceTracker = true + cfg.MaxTotalMemory = 280 * 1024 * 1024 + cfg.HotPoll = 20 * time.Millisecond + + a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) + defer a.Close() + + reqCount := 20 + errors := make(chan error, reqCount) + for i := 0; i < reqCount; i++ { + go func(i int) { + body := `{sleepTime": 10000, "isDebug": true}` + req, err := http.NewRequest("GET", call.URL, strings.NewReader(body)) + if err != nil { + t.Fatal("unexpected error building request", err) + } + + var outOne bytes.Buffer + callI, err := a.GetCall(FromRequest(a, app, call.Path, req), WithWriter(&outOne)) + if err != nil { + t.Fatal(err) + } + + err = a.Submit(callI) + errors <- err + }(i) + } + + ok := 0 + for i := 0; i < reqCount; i++ { + err := <-errors + t.Logf("Got response %v", err) + if err == nil { + ok++ + } else if err == models.ErrCallTimeoutServerBusy { + } else { + t.Fatalf("Unexpected error %v", err) + } + } + + // BUG: in theory, we should get 5 success. But due to hot polling/signalling, + // some requests may aggresively get 'too busy' since our req to slot relationship + // is not 1-to-1. + // This occurs in hot function request bursts (such as this test case). + // And when these requests repetitively poll the hotLauncher and system is + // likely to decide that a new container is needed (since many requests are waiting) + // which results in extra 'too busy' responses. + // + // + // 280MB total ram with 50MB functions... 5 should succeed, rest should + // get too busy + if ok < 4 || ok > 5 { + t.Fatalf("Expected successes, but got %d", ok) + } +}