From c0ee3ce7364cd8466e4337fc34b3bf37d88d3b05 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 13 Apr 2018 11:23:29 -0700 Subject: [PATCH] fn: locked mutex while blocked on I/O considered harmful (#935) * fn: mutex while waiting I/O considered harmful *) Removed hold mutex while wait I/O cases these included possible disk I/O and network I/O. *) Error/Context Close/Shutdown semantics changed since the context timeout and comments were misleading. Close always waits for pending gRPC session to complete. Context usage here was merely 'wait up to x secs to report an error' which only logs the error anyway. Instead, the runner can log the error. And context still can be passed around perhaps for future opencensus instrumentation. --- api/agent/lb_agent.go | 7 +- api/agent/lb_agent_test.go | 2 +- api/agent/runner_client.go | 19 +--- api/agent/static_pool.go | 159 ++++++++++++++++------------ api/agent/static_pool_test.go | 110 ++++++++++++++++++- api/runnerpool/runner_pool.go | 2 +- test/fn-api-tests/utils.go | 22 ++-- test/fn-system-tests/system_test.go | 6 +- 8 files changed, 224 insertions(+), 103 deletions(-) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index c66754355..578c720ff 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -22,8 +22,7 @@ const ( // sleep time when scaling from 0 to 1 runners noCapacityWaitInterval = 1 * time.Second // amount of time to wait to place a request on a runner - placementTimeout = 15 * time.Second - runnerPoolShutdownTimeout = 5 * time.Second + placementTimeout = 15 * time.Second ) type lbAgent struct { @@ -118,9 +117,7 @@ func (a *lbAgent) Close() error { ch := a.shutWg.CloseGroupNB() // finally shutdown the runner pool - ctx, cancel := context.WithTimeout(context.Background(), runnerPoolShutdownTimeout) - defer cancel() - err := a.rp.Shutdown(ctx) + err := a.rp.Shutdown(context.Background()) if err != nil { logrus.WithError(err).Warn("Runner pool shutdown error") } diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index 1702a54a3..c34382c5f 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -105,7 +105,7 @@ func (r *mockRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e return true, nil } -func (r *mockRunner) Close(ctx context.Context) error { +func (r *mockRunner) Close(context.Context) error { go func() { r.wg.Wait() }() diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index c05430dae..5a0ac8a25 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -42,22 +42,9 @@ func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool }, nil } -// Close waits until the context is closed for all inflight requests -// to complete prior to terminating the underlying grpc connection -func (r *gRPCRunner) Close(ctx context.Context) error { - err := make(chan error, 1) - go func() { - defer close(err) - r.shutWg.CloseGroup() - err <- r.conn.Close() - }() - - select { - case e := <-err: - return e - case <-ctx.Done(): - return ctx.Err() // context timed out while waiting - } +func (r *gRPCRunner) Close(context.Context) error { + r.shutWg.CloseGroup() + return r.conn.Close() } func runnerConnection(address, runnerCertCN string, pki *pool.PKIData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) { diff --git a/api/agent/static_pool.go b/api/agent/static_pool.go index 0c9c12724..6f6a70661 100644 --- a/api/agent/static_pool.go +++ b/api/agent/static_pool.go @@ -2,15 +2,16 @@ package agent import ( "context" + "errors" "sync" - "time" pool "github.com/fnproject/fn/api/runnerpool" "github.com/sirupsen/logrus" ) -const ( - staticPoolShutdownTimeout = 5 * time.Second +var ( + ErrorPoolClosed = errors.New("Runner pool closed") + ErrorPoolRunnerExists = errors.New("Runner already exists") ) // manages a single set of runners ignoring lb groups @@ -20,6 +21,7 @@ type staticRunnerPool struct { runnerCN string rMtx *sync.RWMutex runners []pool.Runner + isClosed bool } func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool { @@ -32,7 +34,7 @@ func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN s for _, addr := range runnerAddresses { r, err := runnerFactory(addr, runnerCN, pki) if err != nil { - logrus.WithField("runner_addr", addr).Warn("Invalid runner") + logrus.WithError(err).WithField("runner_addr", addr).Warn("Invalid runner") continue } logrus.WithField("runner_addr", addr).Debug("Adding runner to pool") @@ -47,87 +49,112 @@ func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN s } } -func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) { - rp.rMtx.RLock() - defer rp.rMtx.RUnlock() - - r := make([]pool.Runner, len(rp.runners)) - copy(r, rp.runners) - return r, nil -} - -func (rp *staticRunnerPool) AddRunner(address string) error { +func (rp *staticRunnerPool) shutdown() []pool.Runner { rp.rMtx.Lock() defer rp.rMtx.Unlock() - r, err := rp.generator(address, rp.runnerCN, rp.pki) - if err != nil { - logrus.WithField("runner_addr", address).Warn("Failed to add runner") - return err + if rp.isClosed { + return nil } - rp.runners = append(rp.runners, r) + + rp.isClosed = true + toRemove := rp.runners[:] + rp.runners = nil + + return toRemove +} + +func (rp *staticRunnerPool) addRunner(runner pool.Runner) error { + rp.rMtx.Lock() + defer rp.rMtx.Unlock() + + if rp.isClosed { + return ErrorPoolClosed + } + + for _, r := range rp.runners { + if r.Address() == runner.Address() { + return ErrorPoolRunnerExists + } + } + + rp.runners = append(rp.runners, runner) return nil } -func (rp *staticRunnerPool) RemoveRunner(address string) { +func (rp *staticRunnerPool) removeRunner(address string) pool.Runner { rp.rMtx.Lock() defer rp.rMtx.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout) - defer cancel() - for i, r := range rp.runners { if r.Address() == address { - err := r.Close(ctx) - if err != nil { - logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Failed to close runner") - } - // delete runner from list rp.runners = append(rp.runners[:i], rp.runners[i+1:]...) - return + return r } } + return nil +} + +func (rp *staticRunnerPool) getRunners() ([]pool.Runner, error) { + rp.rMtx.RLock() + defer rp.rMtx.RUnlock() + + if rp.isClosed { + return nil, ErrorPoolClosed + } + + r := make([]pool.Runner, len(rp.runners)) + copy(r, rp.runners) + + return r, nil +} + +func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) { + return rp.getRunners() +} + +func (rp *staticRunnerPool) AddRunner(address string) error { + r, err := rp.generator(address, rp.runnerCN, rp.pki) + if err != nil { + logrus.WithError(err).WithField("runner_addr", address).Warn("Failed to add runner") + return err + } + + err = rp.addRunner(r) + if err != nil { + err2 := r.Close(context.Background()) + if err2 != nil { + logrus.WithError(err2).WithField("runner_addr", address).Warn("Error closing runner on AddRunner failure") + } + } + return err +} + +func (rp *staticRunnerPool) RemoveRunner(address string) { + toRemove := rp.removeRunner(address) + if toRemove == nil { + return + } + + err := toRemove.Close(context.Background()) + if err != nil { + logrus.WithError(err).WithField("runner_addr", toRemove.Address()).Error("Error closing runner") + } } -// Shutdown blocks waiting for all runners to close, or until ctx is done -func (rp *staticRunnerPool) Shutdown(ctx context.Context) (e error) { - rp.rMtx.Lock() - defer rp.rMtx.Unlock() +func (rp *staticRunnerPool) Shutdown(ctx context.Context) error { + toRemove := rp.shutdown() - ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout) - defer cancel() - - errors := make(chan error, len(rp.runners)) - var wg sync.WaitGroup - for _, r := range rp.runners { - wg.Add(1) - go func(runner pool.Runner) { - defer wg.Done() - err := runner.Close(ctx) - if err != nil { - logrus.WithError(err).WithField("runner_addr", runner.Address()).Error("Failed to close runner") - errors <- err - } - }(r) - } - - done := make(chan interface{}) - go func() { - defer close(done) - wg.Wait() - }() - - select { - case <-done: - close(errors) - for e := range errors { - // return the first error - if e != nil { - return e + var retErr error + for _, r := range toRemove { + err := r.Close(ctx) + if err != nil { + logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Error closing runner") + // grab the first error only for now. + if retErr == nil { + retErr = err } } - return nil - case <-ctx.Done(): - return ctx.Err() // context timed out while waiting } + return retErr } diff --git a/api/agent/static_pool_test.go b/api/agent/static_pool_test.go index d09a18814..c990a868a 100644 --- a/api/agent/static_pool_test.go +++ b/api/agent/static_pool_test.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "testing" pool "github.com/fnproject/fn/api/runnerpool" @@ -11,6 +12,10 @@ func setupStaticPool(runners []string) pool.RunnerPool { return NewStaticRunnerPool(runners, nil, "", mockRunnerFactory) } +var ( + ErrorGarbanzoBeans = errors.New("yes, that's right. Garbanzo beans...") +) + type mockStaticRunner struct { address string } @@ -19,8 +24,8 @@ func (r *mockStaticRunner) TryExec(ctx context.Context, call pool.RunnerCall) (b return true, nil } -func (r *mockStaticRunner) Close(ctx context.Context) error { - return nil +func (r *mockStaticRunner) Close(context.Context) error { + return ErrorGarbanzoBeans } func (r *mockStaticRunner) Address() string { @@ -44,10 +49,57 @@ func TestNewStaticPool(t *testing.T) { } } +func TestEmptyPool(t *testing.T) { + np := setupStaticPool(nil).(*staticRunnerPool) + + runners, err := np.Runners(nil) + if err != nil { + t.Fatalf("Failed to list runners %v", err) + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } + + err = np.AddRunner("127.0.0.1:8082") + if err != nil { + t.Fatalf("Failed to add runner %v", err) + } + + runners, err = np.Runners(nil) + if err != nil { + t.Fatalf("Failed to list runners %v", err) + } + if len(runners) != 1 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } + + err = np.Shutdown(context.Background()) + if err != ErrorGarbanzoBeans { + t.Fatalf("Expected garbanzo beans error from shutdown %v", err) + } + + runners, err = np.Runners(nil) + if err == nil { + t.Fatalf("Should fail to list runners (shutdown)") + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } +} + func TestAddNodeToPool(t *testing.T) { addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"} np := setupStaticPool(addrs).(*staticRunnerPool) - np.AddRunner("127.0.0.1:8082") + + err := np.AddRunner("127.0.0.1:8082") + if err != nil { + t.Fatalf("Add Should not fail %v", err) + } + + err = np.AddRunner("127.0.0.1:8082") + if err != ErrorPoolRunnerExists { + t.Fatalf("Add Should fail since duplicate %v", err) + } runners, err := np.Runners(nil) if err != nil { @@ -56,11 +108,25 @@ func TestAddNodeToPool(t *testing.T) { if len(runners) != 3 { t.Fatalf("Invalid number of runners %v", len(runners)) } + + err = np.Shutdown(context.Background()) + if err != ErrorGarbanzoBeans { + t.Fatalf("Expected garbanzo beans error from shutdown %v", err) + } + + runners, err = np.Runners(nil) + if err == nil { + t.Fatalf("Should fail to list runners (shutdown)") + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } } func TestRemoveNodeFromPool(t *testing.T) { addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"} np := setupStaticPool(addrs).(*staticRunnerPool) + np.RemoveRunner("127.0.0.1:8081") runners, err := np.Runners(nil) @@ -81,4 +147,42 @@ func TestRemoveNodeFromPool(t *testing.T) { if len(runners) != 1 { t.Fatalf("Invalid number of runners %v", len(runners)) } + + np.RemoveRunner("127.0.0.1:8080") + + runners, err = np.Runners(nil) + if err != nil { + t.Fatalf("Failed to list runners %v", err) + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } + + np.RemoveRunner("127.0.0.1:8080") + + runners, err = np.Runners(nil) + if err != nil { + t.Fatalf("Failed to list runners %v", err) + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } + + // Let's try a double shutdown + err = np.Shutdown(context.Background()) + if err != nil { + t.Fatalf("Not expected error from shutdown I (empty pool) %v", err) + } + err = np.Shutdown(context.Background()) + if err != nil { + t.Fatalf("Not expected error from shutdown II (empty pool) %v", err) + } + + runners, err = np.Runners(nil) + if err == nil { + t.Fatalf("Should fail to list runners (shutdown)") + } + if len(runners) != 0 { + t.Fatalf("Invalid number of runners %v", len(runners)) + } } diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index cc740f5de..2f913b974 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -18,7 +18,7 @@ type Placer interface { // RunnerPool is the abstraction for getting an ordered list of runners to try for a call type RunnerPool interface { Runners(call RunnerCall) ([]Runner, error) - Shutdown(context.Context) error + Shutdown(ctx context.Context) error } // PKIData encapsulates TLS certificate data diff --git a/test/fn-api-tests/utils.go b/test/fn-api-tests/utils.go index f42092603..0f1b6f58d 100644 --- a/test/fn-api-tests/utils.go +++ b/test/fn-api-tests/utils.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/server" "github.com/fnproject/fn_go/client" httptransport "github.com/go-openapi/runtime/client" @@ -24,11 +23,7 @@ import ( const lBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" func GetAPIURL() (string, *url.URL) { - apiURL := os.Getenv("FN_API_URL") - if apiURL == "" { - apiURL = "http://localhost:8080" - } - + apiURL := getEnv("FN_API_URL", "http://localhost:8080") u, err := url.Parse(apiURL) if err != nil { log.Fatalf("Couldn't parse API URL: %s error: %s", apiURL, err) @@ -79,12 +74,18 @@ func checkServer(ctx context.Context) error { return ctx.Err() } +func getEnv(key, fallback string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return fallback +} + func startServer() { getServer.Do(func() { ctx := context.Background() - common.SetLogLevel("fatal") timeString := time.Now().Format("2006_01_02_15_04_05") dbURL := os.Getenv(server.EnvDBURL) tmpDir := os.TempDir() @@ -95,7 +96,12 @@ func startServer() { dbURL = fmt.Sprintf("sqlite3://%s", tmpDb) } - s = server.New(ctx, server.WithDBURL(dbURL), server.WithMQURL(mqURL), server.WithFullAgent()) + s = server.New(ctx, + server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)), + server.WithDBURL(dbURL), + server.WithMQURL(mqURL), + server.WithFullAgent(), + ) go func() { s.Start(ctx) diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index f46d9d59c..b041ae3db 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -114,7 +114,7 @@ func SetUpAPINode(ctx context.Context) (*server.Server, error) { opts := make([]server.ServerOption, 0) opts = append(opts, server.WithWebPort(8085)) opts = append(opts, server.WithType(nodeType)) - opts = append(opts, server.WithLogLevel(server.DefaultLogLevel)) + opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) opts = append(opts, server.WithLogDest(server.DefaultLogDest, "API")) opts = append(opts, server.WithDBURL(getEnv(server.EnvDBURL, defaultDB))) opts = append(opts, server.WithMQURL(getEnv(server.EnvMQURL, defaultMQ))) @@ -129,7 +129,7 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { opts := make([]server.ServerOption, 0) opts = append(opts, server.WithWebPort(8081)) opts = append(opts, server.WithType(nodeType)) - opts = append(opts, server.WithLogLevel(server.DefaultLogLevel)) + opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) opts = append(opts, server.WithLogDest(server.DefaultLogDest, "LB")) opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithMQURL("")) @@ -194,7 +194,7 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro opts = append(opts, server.WithWebPort(8082+nodeNum)) opts = append(opts, server.WithGRPCPort(9190+nodeNum)) opts = append(opts, server.WithType(nodeType)) - opts = append(opts, server.WithLogLevel("debug")) + opts = append(opts, server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel))) opts = append(opts, server.WithLogDest(server.DefaultLogDest, "PURE-RUNNER")) opts = append(opts, server.WithDBURL("")) opts = append(opts, server.WithMQURL(""))