Files
fn-serverless/api/agent/static_pool.go
Gerardo Viedma 348bbaf36b support runner TLS certificates with specified certificate Common Names (#900)
* support runner TLS certificates with specified certificate Common Names

* removes duplicate constant

* run in insecure mode by default but expose ability to create tls-secured runner pools programmatically

* fixes runner tests to use new tls interfaces
2018-03-28 13:57:15 +01:00

134 lines
3.2 KiB
Go

package agent
import (
"context"
"sync"
"time"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/sirupsen/logrus"
)
const (
staticPoolShutdownTimeout = 5 * time.Second
)
// manages a single set of runners ignoring lb groups
type staticRunnerPool struct {
generator pool.MTLSRunnerFactory
pki *pool.PKIData // can be nil when running in insecure mode
runnerCN string
rMtx *sync.RWMutex
runners []pool.Runner
}
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool {
return NewStaticRunnerPool(runnerAddresses, nil, "", SecureGRPCRunnerFactory)
}
func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN string, runnerFactory pool.MTLSRunnerFactory) pool.RunnerPool {
logrus.WithField("runners", runnerAddresses).Info("Starting static runner pool")
var runners []pool.Runner
for _, addr := range runnerAddresses {
r, err := runnerFactory(addr, runnerCN, pki)
if err != nil {
logrus.WithField("runner_addr", addr).Warn("Invalid runner")
continue
}
logrus.WithField("runner_addr", addr).Debug("Adding runner to pool")
runners = append(runners, r)
}
return &staticRunnerPool{
rMtx: &sync.RWMutex{},
runners: runners,
pki: pki,
runnerCN: runnerCN,
generator: runnerFactory,
}
}
func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) {
rp.rMtx.RLock()
defer rp.rMtx.RUnlock()
r := make([]pool.Runner, len(rp.runners))
copy(r, rp.runners)
return r, nil
}
func (rp *staticRunnerPool) AddRunner(address string) error {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
r, err := rp.generator(address, rp.runnerCN, rp.pki)
if err != nil {
logrus.WithField("runner_addr", address).Warn("Failed to add runner")
return err
}
rp.runners = append(rp.runners, r)
return nil
}
func (rp *staticRunnerPool) RemoveRunner(address string) {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout)
defer cancel()
for i, r := range rp.runners {
if r.Address() == address {
err := r.Close(ctx)
if err != nil {
logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Failed to close runner")
}
// delete runner from list
rp.runners = append(rp.runners[:i], rp.runners[i+1:]...)
return
}
}
}
// Shutdown blocks waiting for all runners to close, or until ctx is done
func (rp *staticRunnerPool) Shutdown(ctx context.Context) (e error) {
rp.rMtx.Lock()
defer rp.rMtx.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout)
defer cancel()
errors := make(chan error, len(rp.runners))
var wg sync.WaitGroup
for _, r := range rp.runners {
wg.Add(1)
go func(runner pool.Runner) {
defer wg.Done()
err := runner.Close(ctx)
if err != nil {
logrus.WithError(err).WithField("runner_addr", runner.Address()).Error("Failed to close runner")
errors <- err
}
}(r)
}
done := make(chan interface{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
close(errors)
for e := range errors {
// return the first error
if e != nil {
return e
}
}
return nil
case <-ctx.Done():
return ctx.Err() // context timed out while waiting
}
}