fn: remove dead code in static pool (#1052)

Static pool is oriented for testing/basic usage and
as it's name implies it is a static pool. Therefore,
removing unnecessary/dead code.
This commit is contained in:
Tolga Ceylan
2018-06-08 15:57:06 -07:00
committed by GitHub
parent 8f969918bd
commit fce1e54746
2 changed files with 9 additions and 224 deletions

View File

@@ -2,26 +2,17 @@ package agent
import ( import (
"context" "context"
"errors"
"sync"
pool "github.com/fnproject/fn/api/runnerpool" pool "github.com/fnproject/fn/api/runnerpool"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var (
ErrorPoolClosed = errors.New("Runner pool closed")
ErrorPoolRunnerExists = errors.New("Runner already exists")
)
// manages a single set of runners ignoring lb groups // manages a single set of runners ignoring lb groups
type staticRunnerPool struct { type staticRunnerPool struct {
generator pool.MTLSRunnerFactory generator pool.MTLSRunnerFactory
pki *pool.PKIData // can be nil when running in insecure mode pki *pool.PKIData // can be nil when running in insecure mode
runnerCN string runnerCN string
rMtx *sync.RWMutex
runners []pool.Runner runners []pool.Runner
isClosed bool
} }
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool { func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool {
@@ -41,7 +32,6 @@ func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN s
runners = append(runners, r) runners = append(runners, r)
} }
return &staticRunnerPool{ return &staticRunnerPool{
rMtx: &sync.RWMutex{},
runners: runners, runners: runners,
pki: pki, pki: pki,
runnerCN: runnerCN, runnerCN: runnerCN,
@@ -49,104 +39,15 @@ func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN s
} }
} }
func (rp *staticRunnerPool) shutdown() []pool.Runner { func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
if rp.isClosed {
return nil
}
rp.isClosed = true
toRemove := rp.runners[:]
rp.runners = nil
return toRemove
}
func (rp *staticRunnerPool) addRunner(runner pool.Runner) error {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
if rp.isClosed {
return ErrorPoolClosed
}
for _, r := range rp.runners {
if r.Address() == runner.Address() {
return ErrorPoolRunnerExists
}
}
rp.runners = append(rp.runners, runner)
return nil
}
func (rp *staticRunnerPool) removeRunner(address string) pool.Runner {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
for i, r := range rp.runners {
if r.Address() == address {
rp.runners = append(rp.runners[:i], rp.runners[i+1:]...)
return r
}
}
return nil
}
func (rp *staticRunnerPool) getRunners() ([]pool.Runner, error) {
rp.rMtx.RLock()
defer rp.rMtx.RUnlock()
if rp.isClosed {
return nil, ErrorPoolClosed
}
r := make([]pool.Runner, len(rp.runners)) r := make([]pool.Runner, len(rp.runners))
copy(r, rp.runners) copy(r, rp.runners)
return r, nil return r, nil
} }
func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) {
return rp.getRunners()
}
func (rp *staticRunnerPool) AddRunner(address string) error {
r, err := rp.generator(address, rp.runnerCN, rp.pki)
if err != nil {
logrus.WithError(err).WithField("runner_addr", address).Warn("Failed to add runner")
return err
}
err = rp.addRunner(r)
if err != nil {
err2 := r.Close(context.Background())
if err2 != nil {
logrus.WithError(err2).WithField("runner_addr", address).Warn("Error closing runner on AddRunner failure")
}
}
return err
}
func (rp *staticRunnerPool) RemoveRunner(address string) {
toRemove := rp.removeRunner(address)
if toRemove == nil {
return
}
err := toRemove.Close(context.Background())
if err != nil {
logrus.WithError(err).WithField("runner_addr", toRemove.Address()).Error("Error closing runner")
}
}
func (rp *staticRunnerPool) Shutdown(ctx context.Context) error { func (rp *staticRunnerPool) Shutdown(ctx context.Context) error {
toRemove := rp.shutdown()
var retErr error var retErr error
for _, r := range toRemove { for _, r := range rp.runners {
err := r.Close(ctx) err := r.Close(ctx)
if err != nil { if err != nil {
logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Error closing runner") logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Error closing runner")

View File

@@ -47,6 +47,11 @@ func TestNewStaticPool(t *testing.T) {
if len(runners) != len(addrs) { if len(runners) != len(addrs) {
t.Fatalf("Invalid number of runners %v", len(runners)) t.Fatalf("Invalid number of runners %v", len(runners))
} }
err = np.Shutdown(context.Background())
if err != ErrorGarbanzoBeans {
t.Fatalf("Expected garbanzo beans error from shutdown %v", err)
}
} }
func TestEmptyPool(t *testing.T) { func TestEmptyPool(t *testing.T) {
@@ -60,129 +65,8 @@ func TestEmptyPool(t *testing.T) {
t.Fatalf("Invalid number of runners %v", len(runners)) t.Fatalf("Invalid number of runners %v", len(runners))
} }
err = np.AddRunner("127.0.0.1:8082")
if err != nil {
t.Fatalf("Failed to add runner %v", err)
}
runners, err = np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 1 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
err = np.Shutdown(context.Background())
if err != ErrorGarbanzoBeans {
t.Fatalf("Expected garbanzo beans error from shutdown %v", err)
}
runners, err = np.Runners(nil)
if err == nil {
t.Fatalf("Should fail to list runners (shutdown)")
}
if len(runners) != 0 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
}
func TestAddNodeToPool(t *testing.T) {
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
np := setupStaticPool(addrs).(*staticRunnerPool)
err := np.AddRunner("127.0.0.1:8082")
if err != nil {
t.Fatalf("Add Should not fail %v", err)
}
err = np.AddRunner("127.0.0.1:8082")
if err != ErrorPoolRunnerExists {
t.Fatalf("Add Should fail since duplicate %v", err)
}
runners, err := np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 3 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
err = np.Shutdown(context.Background())
if err != ErrorGarbanzoBeans {
t.Fatalf("Expected garbanzo beans error from shutdown %v", err)
}
runners, err = np.Runners(nil)
if err == nil {
t.Fatalf("Should fail to list runners (shutdown)")
}
if len(runners) != 0 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
}
func TestRemoveNodeFromPool(t *testing.T) {
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
np := setupStaticPool(addrs).(*staticRunnerPool)
np.RemoveRunner("127.0.0.1:8081")
runners, err := np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 1 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
np.RemoveRunner("127.0.0.1:8081")
runners, err = np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 1 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
np.RemoveRunner("127.0.0.1:8080")
runners, err = np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 0 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
np.RemoveRunner("127.0.0.1:8080")
runners, err = np.Runners(nil)
if err != nil {
t.Fatalf("Failed to list runners %v", err)
}
if len(runners) != 0 {
t.Fatalf("Invalid number of runners %v", len(runners))
}
// Let's try a double shutdown
err = np.Shutdown(context.Background()) err = np.Shutdown(context.Background())
if err != nil { if err != nil {
t.Fatalf("Not expected error from shutdown I (empty pool) %v", err) t.Fatalf("Unexpected error from shutdown %v", err)
}
err = np.Shutdown(context.Background())
if err != nil {
t.Fatalf("Not expected error from shutdown II (empty pool) %v", err)
}
runners, err = np.Runners(nil)
if err == nil {
t.Fatalf("Should fail to list runners (shutdown)")
}
if len(runners) != 0 {
t.Fatalf("Invalid number of runners %v", len(runners))
} }
} }