mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Moves out node pool manager behind an extension using runner pool abstraction (Part 2) (#862)
* Move out node-pool manager and replace it with RunnerPool extension * adds extension points for runner pools in load-balanced mode * adds error to return values in RunnerPool and Runner interfaces * Implements runner pool contract with context-aware shutdown * fixes issue with range * fixes tests to use runner abstraction * adds empty test file as a workaround for build requiring go source files in top-level package * removes flappy timeout test * update docs to reflect runner pool setup * refactors system tests to use runner abstraction * removes poolmanager * moves runner interfaces from models to api/runnerpool package * Adds a second runner to pool docs example * explicitly check for request spillover to second runner in test * moves runner pool package name for system tests * renames runner pool pointer variable for consistency * pass model json to runner * automatically cast to http.ResponseWriter in load-balanced call case * allow overriding of server RunnerPool via a programmatic ServerOption * fixes return type of ResponseWriter in test * move Placer interface to runnerpool package * moves hash-based placer out of open source project * removes siphash from Gopkg.lock
This commit is contained in:
@@ -274,6 +274,22 @@ type call struct {
|
||||
disablePreemptiveCapacityCheck bool
|
||||
}
|
||||
|
||||
func (c *call) SlotDeadline() time.Time {
|
||||
return c.slotDeadline
|
||||
}
|
||||
|
||||
func (c *call) Request() *http.Request {
|
||||
return c.req
|
||||
}
|
||||
|
||||
func (c *call) ResponseWriter() http.ResponseWriter {
|
||||
return c.w.(http.ResponseWriter)
|
||||
}
|
||||
|
||||
func (c *call) StdErr() io.ReadWriteCloser {
|
||||
return c.stderr
|
||||
}
|
||||
|
||||
func (c *call) Model() *models.Call { return c.Call }
|
||||
|
||||
func (c *call) Start(ctx context.Context) error {
|
||||
|
||||
@@ -2,9 +2,6 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -13,55 +10,18 @@ import (
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
"github.com/fnproject/fn/fnext"
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
)
|
||||
|
||||
// RequestReader takes an agent.Call and return a ReadCloser for the request body inside it
|
||||
func RequestReader(c *Call) (io.ReadCloser, error) {
|
||||
// Get the call :(((((
|
||||
cc, ok := (*c).(*call)
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("Can't cast agent.Call to agent.call")
|
||||
}
|
||||
|
||||
if cc.req == nil {
|
||||
return nil, errors.New("Call doesn't contain a request")
|
||||
}
|
||||
|
||||
return cc.req.Body, nil
|
||||
}
|
||||
|
||||
func ResponseWriter(c *Call) (*http.ResponseWriter, error) {
|
||||
cc, ok := (*c).(*call)
|
||||
|
||||
if !ok {
|
||||
return nil, errors.New("Can't cast agent.Call to agent.call")
|
||||
}
|
||||
|
||||
if rw, ok := cc.w.(http.ResponseWriter); ok {
|
||||
return &rw, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("Unable to get HTTP response writer from the call")
|
||||
}
|
||||
|
||||
type remoteSlot struct {
|
||||
lbAgent *lbAgent
|
||||
}
|
||||
|
||||
func (s *remoteSlot) exec(ctx context.Context, call *call) error {
|
||||
func (s *remoteSlot) exec(ctx context.Context, call pool.RunnerCall) error {
|
||||
a := s.lbAgent
|
||||
|
||||
memMb := call.Model().Memory
|
||||
lbGroupID := GetGroupID(call.Model())
|
||||
|
||||
capacityRequest := &poolmanager.CapacityRequest{TotalMemoryMb: memMb, LBGroupID: lbGroupID}
|
||||
a.np.AssignCapacity(capacityRequest)
|
||||
defer a.np.ReleaseCapacity(capacityRequest)
|
||||
|
||||
err := a.placer.PlaceCall(a.np, ctx, call, lbGroupID)
|
||||
err := a.placer.PlaceCall(a.rp, ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to place call")
|
||||
}
|
||||
@@ -76,14 +36,10 @@ func (s *remoteSlot) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Placer interface {
|
||||
PlaceCall(np NodePool, ctx context.Context, call *call, lbGroupID string) error
|
||||
}
|
||||
|
||||
type naivePlacer struct {
|
||||
}
|
||||
|
||||
func NewNaivePlacer() Placer {
|
||||
func NewNaivePlacer() pool.Placer {
|
||||
return &naivePlacer{}
|
||||
}
|
||||
|
||||
@@ -94,8 +50,8 @@ func minDuration(f, s time.Duration) time.Duration {
|
||||
return s
|
||||
}
|
||||
|
||||
func (sp *naivePlacer) PlaceCall(np NodePool, ctx context.Context, call *call, lbGroupID string) error {
|
||||
timeout := time.After(call.slotDeadline.Sub(time.Now()))
|
||||
func (sp *naivePlacer) PlaceCall(rp pool.RunnerPool, ctx context.Context, call pool.RunnerCall) error {
|
||||
timeout := time.After(call.SlotDeadline().Sub(time.Now()))
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -104,19 +60,23 @@ func (sp *naivePlacer) PlaceCall(np NodePool, ctx context.Context, call *call, l
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
for _, r := range np.Runners(lbGroupID) {
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
for _, r := range runners {
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if placed {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
remaining := call.slotDeadline.Sub(time.Now())
|
||||
|
||||
remaining := call.SlotDeadline().Sub(time.Now())
|
||||
if remaining <= 0 {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
// backoff
|
||||
time.Sleep(minDuration(retryWaitInterval, remaining))
|
||||
}
|
||||
}
|
||||
@@ -129,22 +89,23 @@ const (
|
||||
// sleep time when scaling from 0 to 1 runners
|
||||
noCapacityWaitInterval = 1 * time.Second
|
||||
// amount of time to wait to place a request on a runner
|
||||
placementTimeout = 15 * time.Second
|
||||
placementTimeout = 15 * time.Second
|
||||
runnerPoolShutdownTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
type lbAgent struct {
|
||||
delegatedAgent Agent
|
||||
np NodePool
|
||||
placer Placer
|
||||
rp pool.RunnerPool
|
||||
placer pool.Placer
|
||||
|
||||
wg sync.WaitGroup // Needs a good name
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
func NewLBAgent(agent Agent, np NodePool, p Placer) (Agent, error) {
|
||||
func NewLBAgent(agent Agent, rp pool.RunnerPool, p pool.Placer) (Agent, error) {
|
||||
a := &lbAgent{
|
||||
delegatedAgent: agent,
|
||||
np: np,
|
||||
rp: rp,
|
||||
placer: p,
|
||||
}
|
||||
return a, nil
|
||||
@@ -158,7 +119,11 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) {
|
||||
}
|
||||
|
||||
func (a *lbAgent) Close() error {
|
||||
a.np.Shutdown()
|
||||
// we should really be passing the server's context here
|
||||
ctx, cancel := context.WithTimeout(context.Background(), runnerPoolShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
a.rp.Shutdown(ctx)
|
||||
err := a.delegatedAgent.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
225
api/agent/lb_agent_test.go
Normal file
225
api/agent/lb_agent_test.go
Normal file
@@ -0,0 +1,225 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
)
|
||||
|
||||
type mockRunner struct {
|
||||
wg sync.WaitGroup
|
||||
sleep time.Duration
|
||||
mtx sync.Mutex
|
||||
maxCalls int32 // Max concurrent calls
|
||||
curCalls int32 // Current calls
|
||||
procCalls int32 // Processed calls
|
||||
addr string
|
||||
}
|
||||
|
||||
type mockRunnerPool struct {
|
||||
runners []pool.Runner
|
||||
generator insecureRunnerFactory
|
||||
pki *pkiData
|
||||
}
|
||||
|
||||
func newMockRunnerPool(rf insecureRunnerFactory, runnerAddrs []string) *mockRunnerPool {
|
||||
var runners []pool.Runner
|
||||
for _, addr := range runnerAddrs {
|
||||
r, err := rf(addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
runners = append(runners, r)
|
||||
}
|
||||
|
||||
return &mockRunnerPool{
|
||||
runners: runners,
|
||||
generator: rf,
|
||||
pki: &pkiData{},
|
||||
}
|
||||
}
|
||||
|
||||
func (npm *mockRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) {
|
||||
return npm.runners, nil
|
||||
}
|
||||
|
||||
func (npm *mockRunnerPool) Shutdown(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMockRunnerFactory(sleep time.Duration, maxCalls int32) insecureRunnerFactory {
|
||||
return func(addr string) (pool.Runner, error) {
|
||||
return &mockRunner{
|
||||
sleep: sleep,
|
||||
maxCalls: maxCalls,
|
||||
addr: addr,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func FaultyRunnerFactory() insecureRunnerFactory {
|
||||
return func(addr string) (pool.Runner, error) {
|
||||
return &mockRunner{
|
||||
addr: addr,
|
||||
}, errors.New("Creation of new runner failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mockRunner) checkAndIncrCalls() error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if r.curCalls >= r.maxCalls {
|
||||
return models.ErrCallTimeoutServerBusy //TODO is that the correct error?
|
||||
}
|
||||
r.curCalls++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockRunner) decrCalls() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
r.curCalls--
|
||||
}
|
||||
|
||||
func (r *mockRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
err := r.checkAndIncrCalls()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer r.decrCalls()
|
||||
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
time.Sleep(r.sleep)
|
||||
|
||||
r.procCalls++
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *mockRunner) Close(ctx context.Context) error {
|
||||
go func() {
|
||||
r.wg.Wait()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockRunner) Address() string {
|
||||
return r.addr
|
||||
}
|
||||
|
||||
type mockRunnerCall struct {
|
||||
slotDeadline time.Time
|
||||
r *http.Request
|
||||
rw http.ResponseWriter
|
||||
stdErr io.ReadWriteCloser
|
||||
model *models.Call
|
||||
}
|
||||
|
||||
func (c *mockRunnerCall) SlotDeadline() time.Time {
|
||||
return c.slotDeadline
|
||||
}
|
||||
|
||||
func (c *mockRunnerCall) Request() *http.Request {
|
||||
return c.r
|
||||
}
|
||||
func (c *mockRunnerCall) ResponseWriter() http.ResponseWriter {
|
||||
return c.rw
|
||||
}
|
||||
func (c *mockRunnerCall) StdErr() io.ReadWriteCloser {
|
||||
return c.stdErr
|
||||
}
|
||||
func (c *mockRunnerCall) Model() *models.Call {
|
||||
return c.model
|
||||
}
|
||||
|
||||
func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxCalls int32) *mockRunnerPool {
|
||||
rf := NewMockRunnerFactory(execSleep, maxCalls)
|
||||
return newMockRunnerPool(rf, expectedRunners)
|
||||
}
|
||||
|
||||
func TestOneRunner(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)}
|
||||
err := placer.PlaceCall(rp, context.Background(), call)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to place call on runner %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnforceTimeoutFromContext(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)}
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now())
|
||||
defer cancel()
|
||||
err := placer.PlaceCall(rp, ctx, call)
|
||||
if err == nil {
|
||||
t.Fatal("Call should have timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpilloverToSecondRunner(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
|
||||
|
||||
parallelCalls := 3
|
||||
var wg sync.WaitGroup
|
||||
failures := make(chan error, parallelCalls)
|
||||
for i := 0; i < parallelCalls; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)}
|
||||
err := placer.PlaceCall(rp, context.Background(), call)
|
||||
if err != nil {
|
||||
failures <- fmt.Errorf("Timed out call %d", i)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(failures)
|
||||
|
||||
err := <-failures
|
||||
if err != nil || rp.runners[1].(*mockRunner).procCalls != 1 {
|
||||
t.Fatal("Expected spillover to second runner")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnforceSlotTimeout(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
|
||||
|
||||
parallelCalls := 5
|
||||
var wg sync.WaitGroup
|
||||
failures := make(chan error, parallelCalls)
|
||||
for i := 0; i < parallelCalls; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(10 * time.Millisecond)}
|
||||
err := placer.PlaceCall(rp, context.Background(), call)
|
||||
if err != nil {
|
||||
failures <- fmt.Errorf("Timed out call %d", i)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(failures)
|
||||
|
||||
err := <-failures
|
||||
if err == nil {
|
||||
t.Fatal("Expected a call failure")
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
/* The consistent hash ring from the original fnlb.
|
||||
The behaviour of this depends on changes to the runner list leaving it relatively stable.
|
||||
*/
|
||||
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/dchest/siphash"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type chPlacer struct {
|
||||
}
|
||||
|
||||
func NewCHPlacer() Placer {
|
||||
return &chPlacer{}
|
||||
}
|
||||
|
||||
// This borrows the CH placement algorithm from the original FNLB.
|
||||
// Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use
|
||||
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
|
||||
func (p *chPlacer) PlaceCall(np NodePool, ctx context.Context, call *call, lbGroupID string) error {
|
||||
// The key is just the path in this case
|
||||
key := call.Path
|
||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||
timeout := time.After(call.slotDeadline.Sub(time.Now()))
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
runners := np.Runners(lbGroupID)
|
||||
i := int(jumpConsistentHash(sum64, int32(len(runners))))
|
||||
for j := 0; j < len(runners); j++ {
|
||||
r := runners[i]
|
||||
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
}
|
||||
|
||||
i = (i + 1) % len(runners)
|
||||
}
|
||||
|
||||
remaining := call.slotDeadline.Sub(time.Now())
|
||||
if remaining <= 0 {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
time.Sleep(minDuration(retryWaitInterval, remaining))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
||||
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
||||
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
||||
var b, j int64 = -1, 0
|
||||
for j < int64(num_buckets) {
|
||||
b = j
|
||||
key = key*2862933555777941757 + 1
|
||||
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
||||
}
|
||||
return int32(b)
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
)
|
||||
|
||||
// NodePool provides information about pools of runners and receives capacity demands
|
||||
type NodePool interface {
|
||||
Runners(lbgID string) []Runner
|
||||
AssignCapacity(r *poolmanager.CapacityRequest)
|
||||
ReleaseCapacity(r *poolmanager.CapacityRequest)
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// Runner is the interface to invoke the execution of a function call on a specific runner
|
||||
type Runner interface {
|
||||
TryExec(ctx context.Context, call Call) (bool, error)
|
||||
Close()
|
||||
Address() string
|
||||
}
|
||||
@@ -1,415 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
pb "github.com/fnproject/fn/api/agent/grpc"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/grpcutil"
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// CapacityUpdatePeriod defines how often the capacity updates are sent
|
||||
CapacityUpdatePeriod = 1 * time.Second
|
||||
)
|
||||
|
||||
type gRPCNodePool struct {
|
||||
npm poolmanager.NodePoolManager
|
||||
advertiser poolmanager.CapacityAdvertiser
|
||||
mx sync.RWMutex
|
||||
lbg map[string]*lbg // {lbgid -> *lbg}
|
||||
generator secureRunnerFactory
|
||||
//TODO find a better place for this
|
||||
pki *pkiData
|
||||
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
// TODO need to go in a better place
|
||||
type pkiData struct {
|
||||
ca string
|
||||
key string
|
||||
cert string
|
||||
}
|
||||
|
||||
type lbg struct {
|
||||
mx sync.RWMutex
|
||||
id string
|
||||
runners map[string]agent.Runner
|
||||
r_list atomic.Value // We attempt to maintain the same order of runners as advertised by the NPM.
|
||||
// This is to preserve as reasonable behaviour as possible for the CH algorithm
|
||||
generator secureRunnerFactory
|
||||
}
|
||||
|
||||
type nullRunner struct{}
|
||||
|
||||
func (n *nullRunner) TryExec(ctx context.Context, call agent.Call) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (n *nullRunner) Close() {}
|
||||
|
||||
func (n *nullRunner) Address() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
var nullRunnerSingleton = new(nullRunner)
|
||||
|
||||
type gRPCRunner struct {
|
||||
// Need a WaitGroup of TryExec in flight
|
||||
wg sync.WaitGroup
|
||||
address string
|
||||
conn *grpc.ClientConn
|
||||
client pb.RunnerProtocolClient
|
||||
}
|
||||
|
||||
// allow factory to be overridden in tests
|
||||
type secureRunnerFactory func(addr string, cert string, key string, ca string) (agent.Runner, error)
|
||||
|
||||
func secureGRPCRunnerFactory(addr string, cert string, key string, ca string) (agent.Runner, error) {
|
||||
p := &pkiData{
|
||||
cert: cert,
|
||||
key: key,
|
||||
ca: ca,
|
||||
}
|
||||
conn, client, err := runnerConnection(addr, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &gRPCRunner{
|
||||
address: addr,
|
||||
conn: conn,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func DefaultgRPCNodePool(npmAddress string, cert string, key string, ca string) agent.NodePool {
|
||||
npm := poolmanager.NewNodePoolManager(npmAddress, cert, key, ca)
|
||||
// TODO do we need to persistent this ID in order to survive restart?
|
||||
lbID := id.New().String()
|
||||
advertiser := poolmanager.NewCapacityAdvertiser(npm, lbID, CapacityUpdatePeriod)
|
||||
return newgRPCNodePool(cert, key, ca, npm, advertiser, secureGRPCRunnerFactory)
|
||||
}
|
||||
|
||||
func newgRPCNodePool(cert string, key string, ca string, npm poolmanager.NodePoolManager, advertiser poolmanager.CapacityAdvertiser, rf secureRunnerFactory) agent.NodePool {
|
||||
|
||||
logrus.Info("Starting dynamic runner pool")
|
||||
p := &pkiData{
|
||||
ca: ca,
|
||||
cert: cert,
|
||||
key: key,
|
||||
}
|
||||
|
||||
np := &gRPCNodePool{
|
||||
npm: npm,
|
||||
advertiser: advertiser,
|
||||
lbg: make(map[string]*lbg),
|
||||
generator: rf,
|
||||
shutdown: make(chan struct{}),
|
||||
pki: p,
|
||||
}
|
||||
go np.maintenance()
|
||||
return np
|
||||
}
|
||||
|
||||
func (np *gRPCNodePool) Runners(lbgID string) []agent.Runner {
|
||||
np.mx.RLock()
|
||||
lbg, ok := np.lbg[lbgID]
|
||||
np.mx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
np.mx.Lock()
|
||||
lbg, ok = np.lbg[lbgID]
|
||||
if !ok {
|
||||
lbg = newLBG(lbgID, np.generator)
|
||||
np.lbg[lbgID] = lbg
|
||||
}
|
||||
np.mx.Unlock()
|
||||
}
|
||||
|
||||
return lbg.runnerList()
|
||||
}
|
||||
|
||||
func (np *gRPCNodePool) Shutdown() {
|
||||
np.advertiser.Shutdown()
|
||||
np.npm.Shutdown()
|
||||
}
|
||||
|
||||
func (np *gRPCNodePool) AssignCapacity(r *poolmanager.CapacityRequest) {
|
||||
np.advertiser.AssignCapacity(r)
|
||||
|
||||
}
|
||||
func (np *gRPCNodePool) ReleaseCapacity(r *poolmanager.CapacityRequest) {
|
||||
np.advertiser.ReleaseCapacity(r)
|
||||
}
|
||||
|
||||
func (np *gRPCNodePool) maintenance() {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-np.shutdown:
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Reload any LBGroup information from NPM (pull for the moment, shift to listening to a stream later)
|
||||
np.reloadLBGmembership()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newLBG(lbgID string, generator secureRunnerFactory) *lbg {
|
||||
lbg := &lbg{
|
||||
id: lbgID,
|
||||
runners: make(map[string]agent.Runner),
|
||||
r_list: atomic.Value{},
|
||||
generator: generator,
|
||||
}
|
||||
lbg.r_list.Store([]agent.Runner{})
|
||||
return lbg
|
||||
}
|
||||
|
||||
func (np *gRPCNodePool) reloadLBGmembership() {
|
||||
np.mx.RLock()
|
||||
lbgroups := np.lbg
|
||||
np.mx.RUnlock()
|
||||
for lbgID, lbg := range lbgroups {
|
||||
lbg.reloadMembers(lbgID, np.npm, np.pki)
|
||||
}
|
||||
}
|
||||
|
||||
func (lbg *lbg) runnerList() []agent.Runner {
|
||||
orig_runners := lbg.r_list.Load().([]agent.Runner)
|
||||
// XXX: Return a copy. If we required this to be immutably read by the caller, we could return the structure directly
|
||||
runners := make([]agent.Runner, len(orig_runners))
|
||||
copy(runners, orig_runners)
|
||||
return runners
|
||||
}
|
||||
|
||||
func (lbg *lbg) reloadMembers(lbgID string, npm poolmanager.NodePoolManager, p *pkiData) {
|
||||
|
||||
runners, err := npm.GetRunners(lbgID)
|
||||
if err != nil {
|
||||
logrus.Debug("Failed to get the list of runners from node pool manager")
|
||||
}
|
||||
lbg.mx.Lock()
|
||||
defer lbg.mx.Unlock()
|
||||
r_list := make([]agent.Runner, len(runners))
|
||||
seen := map[string]bool{} // If we've seen a particular runner or not
|
||||
var errGenerator error
|
||||
for i, addr := range runners {
|
||||
r, ok := lbg.runners[addr]
|
||||
if !ok {
|
||||
logrus.WithField("runner_addr", addr).Debug("New Runner to be added")
|
||||
r, errGenerator = lbg.generator(addr, p.cert, p.key, p.ca)
|
||||
if errGenerator != nil {
|
||||
logrus.WithField("runner_addr", addr).Debug("Creation of the new runner failed")
|
||||
} else {
|
||||
lbg.runners[addr] = r
|
||||
}
|
||||
}
|
||||
if errGenerator == nil {
|
||||
r_list[i] = r // Maintain the delivered order
|
||||
} else {
|
||||
// some algorithms (like consistent hash) work better if the i'th element
|
||||
// of r_list points to the same node on all LBs, so insert a placeholder
|
||||
// if we can't create the runner for some reason"
|
||||
r_list[i] = nullRunnerSingleton
|
||||
}
|
||||
|
||||
seen[addr] = true
|
||||
}
|
||||
lbg.r_list.Store(r_list)
|
||||
|
||||
// Remove any runners that we have not encountered
|
||||
for addr, r := range lbg.runners {
|
||||
if _, ok := seen[addr]; !ok {
|
||||
logrus.WithField("runner_addr", addr).Debug("Removing drained runner")
|
||||
delete(lbg.runners, addr)
|
||||
r.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) Close() {
|
||||
go func() {
|
||||
r.wg.Wait()
|
||||
r.conn.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
func runnerConnection(address string, pki *pkiData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
var creds credentials.TransportCredentials
|
||||
if pki != nil {
|
||||
var err error
|
||||
creds, err = grpcutil.CreateCredentials(pki.cert, pki.key, pki.ca)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to create credentials to connect to runner node")
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// we want to set a very short timeout to fail-fast if something goes wrong
|
||||
conn, err := grpcutil.DialWithBackoff(ctx, address, creds, 100*time.Millisecond, grpc.DefaultBackoffConfig)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to connect to runner node")
|
||||
}
|
||||
|
||||
protocolClient := pb.NewRunnerProtocolClient(conn)
|
||||
logrus.WithField("runner_addr", address).Info("Connected to runner")
|
||||
|
||||
return conn, protocolClient, nil
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) Address() string {
|
||||
return r.address
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) TryExec(ctx context.Context, call agent.Call) (bool, error) {
|
||||
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
// Get app and route information
|
||||
// Construct model.Call with CONFIG in it already
|
||||
modelJSON, err := json.Marshal(call.Model())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to encode model as JSON")
|
||||
// If we can't encode the model, no runner will ever be able to run this. Give up.
|
||||
return true, err
|
||||
}
|
||||
runnerConnection, err := r.client.Engage(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to create client to runner node")
|
||||
// Try on next runner
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ModelsCallJson: string(modelJSON)}}})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send message to runner node")
|
||||
return false, err
|
||||
}
|
||||
msg, err := runnerConnection.Recv()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to receive first message from runner node")
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch body := msg.Body.(type) {
|
||||
case *pb.RunnerMsg_Acknowledged:
|
||||
if !body.Acknowledged.Committed {
|
||||
logrus.Debugf("Runner didn't commit invocation request: %v", body.Acknowledged.Details)
|
||||
return false, nil
|
||||
// Try the next runner
|
||||
}
|
||||
logrus.Debug("Runner committed invocation request, sending data frames")
|
||||
done := make(chan error)
|
||||
go receiveFromRunner(runnerConnection, call, done)
|
||||
sendToRunner(call, runnerConnection)
|
||||
return true, <-done
|
||||
|
||||
default:
|
||||
logrus.Errorf("Unhandled message type received from runner: %v\n", msg)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func sendToRunner(call agent.Call, protocolClient pb.RunnerProtocol_EngageClient) error {
|
||||
bodyReader, err := agent.RequestReader(&call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to get reader for request body")
|
||||
return err
|
||||
}
|
||||
writeBufferSize := 10 * 1024 // 10KB
|
||||
writeBuffer := make([]byte, writeBufferSize)
|
||||
for {
|
||||
n, err := bodyReader.Read(writeBuffer)
|
||||
logrus.Debugf("Wrote %v bytes to the runner", n)
|
||||
|
||||
if err == io.EOF {
|
||||
err = protocolClient.Send(&pb.ClientMsg{
|
||||
Body: &pb.ClientMsg_Data{
|
||||
Data: &pb.DataFrame{
|
||||
Data: writeBuffer,
|
||||
Eof: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send data frame with EOF to runner")
|
||||
}
|
||||
break
|
||||
}
|
||||
err = protocolClient.Send(&pb.ClientMsg{
|
||||
Body: &pb.ClientMsg_Data{
|
||||
Data: &pb.DataFrame{
|
||||
Data: writeBuffer,
|
||||
Eof: false,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send data frame")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, call agent.Call, done chan error) {
|
||||
w, err := agent.ResponseWriter(&call)
|
||||
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to get response writer from call")
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := protocolClient.Recv()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to receive message from runner")
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
|
||||
switch body := msg.Body.(type) {
|
||||
case *pb.RunnerMsg_ResultStart:
|
||||
switch meta := body.ResultStart.Meta.(type) {
|
||||
case *pb.CallResultStart_Http:
|
||||
for _, header := range meta.Http.Headers {
|
||||
(*w).Header().Set(header.Key, header.Value)
|
||||
}
|
||||
default:
|
||||
logrus.Errorf("Unhandled meta type in start message: %v", meta)
|
||||
}
|
||||
case *pb.RunnerMsg_Data:
|
||||
(*w).Write(body.Data.Data)
|
||||
case *pb.RunnerMsg_Finished:
|
||||
if body.Finished.Success {
|
||||
logrus.Infof("Call finished successfully: %v", body.Finished.Details)
|
||||
} else {
|
||||
logrus.Infof("Call finish unsuccessfully:: %v", body.Finished.Details)
|
||||
}
|
||||
close(done)
|
||||
return
|
||||
default:
|
||||
logrus.Errorf("Unhandled message type from runner: %v", body)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,214 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
model "github.com/fnproject/fn/poolmanager/grpc"
|
||||
)
|
||||
|
||||
type mockRunner struct {
|
||||
wg sync.WaitGroup
|
||||
sleep time.Duration
|
||||
mtx sync.Mutex
|
||||
maxCalls int32 // Max concurrent calls
|
||||
curCalls int32 // Current calls
|
||||
addr string
|
||||
}
|
||||
|
||||
type mockNodePoolManager struct {
|
||||
runners []string
|
||||
}
|
||||
|
||||
type mockgRPCNodePool struct {
|
||||
npm poolmanager.NodePoolManager
|
||||
lbg map[string]*lbg
|
||||
generator secureRunnerFactory
|
||||
pki *pkiData
|
||||
}
|
||||
|
||||
func newMockgRPCNodePool(rf secureRunnerFactory, runners []string) *mockgRPCNodePool {
|
||||
npm := &mockNodePoolManager{runners: runners}
|
||||
|
||||
return &mockgRPCNodePool{
|
||||
npm: npm,
|
||||
lbg: make(map[string]*lbg),
|
||||
generator: rf,
|
||||
pki: &pkiData{},
|
||||
}
|
||||
}
|
||||
|
||||
func (npm *mockNodePoolManager) AdvertiseCapacity(snapshots *model.CapacitySnapshotList) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (npm *mockNodePoolManager) GetRunners(lbgID string) ([]string, error) {
|
||||
return npm.runners, nil
|
||||
}
|
||||
|
||||
func (npm *mockNodePoolManager) Shutdown() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMockRunnerFactory(sleep time.Duration, maxCalls int32) secureRunnerFactory {
|
||||
return func(addr string, cert string, key string, ca string) (agent.Runner, error) {
|
||||
return &mockRunner{
|
||||
sleep: sleep,
|
||||
maxCalls: maxCalls,
|
||||
addr: addr,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func FaultyRunnerFactory() secureRunnerFactory {
|
||||
return func(addr string, cert string, key string, ca string) (agent.Runner, error) {
|
||||
return &mockRunner{
|
||||
addr: addr,
|
||||
}, errors.New("Creation of new runner failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mockRunner) checkAndIncrCalls() error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if r.curCalls >= r.maxCalls {
|
||||
return models.ErrCallTimeoutServerBusy //TODO is that the correct error?
|
||||
}
|
||||
r.curCalls++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockRunner) decrCalls() {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
r.curCalls--
|
||||
}
|
||||
|
||||
func (r *mockRunner) TryExec(ctx context.Context, call agent.Call) (bool, error) {
|
||||
err := r.checkAndIncrCalls()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer r.decrCalls()
|
||||
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
time.Sleep(r.sleep)
|
||||
|
||||
w, err := agent.ResponseWriter(&call)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
buf := []byte("OK")
|
||||
(*w).Header().Set("Content-Type", "text/plain")
|
||||
(*w).Header().Set("Content-Length", strconv.Itoa(len(buf)))
|
||||
(*w).Write(buf)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *mockRunner) Close() {
|
||||
go func() {
|
||||
r.wg.Wait()
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *mockRunner) Address() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func setupMockNodePool(lbgID string, expectedRunners []string) (*mockgRPCNodePool, *lbg) {
|
||||
rf := NewMockRunnerFactory(1*time.Millisecond, 1)
|
||||
lb := newLBG(lbgID, rf)
|
||||
|
||||
np := newMockgRPCNodePool(rf, expectedRunners)
|
||||
np.lbg[lbgID] = lb
|
||||
return np, lb
|
||||
}
|
||||
|
||||
func checkRunners(t *testing.T, expectedRunners []string, actualRunners map[string]agent.Runner, ordList []agent.Runner) {
|
||||
if len(expectedRunners) != len(actualRunners) {
|
||||
t.Errorf("List of runners is wrong, expected: %d got: %d", len(expectedRunners), len(actualRunners))
|
||||
}
|
||||
for i, r := range expectedRunners {
|
||||
_, ok := actualRunners[r]
|
||||
if !ok {
|
||||
t.Errorf("Advertised runner %s not found in the list of runners", r)
|
||||
}
|
||||
ordR := ordList[i].(*mockRunner)
|
||||
if ordR.addr != r {
|
||||
t.Error("Ordered list is not in sync with the advertised list")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReloadMembersNoRunners(t *testing.T) {
|
||||
lbgID := "lb-test"
|
||||
// // Empty list, no runners available
|
||||
np, lb := setupMockNodePool(lbgID, make([]string, 0))
|
||||
|
||||
np.lbg[lbgID].reloadMembers(lbgID, np.npm, np.pki)
|
||||
expectedRunners := []string{}
|
||||
ordList := lb.r_list.Load().([]agent.Runner)
|
||||
checkRunners(t, expectedRunners, lb.runners, ordList)
|
||||
}
|
||||
|
||||
func TestReloadMembersNewRunners(t *testing.T) {
|
||||
lbgID := "lb-test"
|
||||
expectedRunners := []string{"171.16.0.1", "171.16.0.2"}
|
||||
np, lb := setupMockNodePool(lbgID, expectedRunners)
|
||||
|
||||
np.lbg[lbgID].reloadMembers(lbgID, np.npm, np.pki)
|
||||
ordList := lb.r_list.Load().([]agent.Runner)
|
||||
checkRunners(t, expectedRunners, lb.runners, ordList)
|
||||
}
|
||||
|
||||
func TestReloadMembersRemoveRunners(t *testing.T) {
|
||||
lbgID := "lb-test"
|
||||
expectedRunners := []string{"171.16.0.1", "171.16.0.3"}
|
||||
np, lb := setupMockNodePool(lbgID, expectedRunners)
|
||||
|
||||
// actual runners before the update
|
||||
actualRunners := []string{"171.16.0.1", "171.16.0.2", "171.16.0.19"}
|
||||
for _, v := range actualRunners {
|
||||
r, err := lb.generator(v, np.pki.cert, np.pki.key, np.pki.ca)
|
||||
if err != nil {
|
||||
t.Error("Failed to create new runner")
|
||||
}
|
||||
lb.runners[v] = r
|
||||
}
|
||||
|
||||
if len(lb.runners) != len(actualRunners) {
|
||||
t.Errorf("Failed to load list of runners")
|
||||
}
|
||||
|
||||
np.lbg[lbgID].reloadMembers(lbgID, np.npm, np.pki)
|
||||
ordList := lb.r_list.Load().([]agent.Runner)
|
||||
checkRunners(t, expectedRunners, lb.runners, ordList)
|
||||
}
|
||||
|
||||
func TestReloadMembersFailToCreateNewRunners(t *testing.T) {
|
||||
lbgID := "lb-test"
|
||||
rf := FaultyRunnerFactory()
|
||||
lb := newLBG(lbgID, rf)
|
||||
np := newMockgRPCNodePool(rf, []string{"171.19.0.1"})
|
||||
np.lbg[lbgID] = lb
|
||||
np.lbg[lbgID].reloadMembers(lbgID, np.npm, np.pki)
|
||||
actualRunners := lb.runners
|
||||
if len(actualRunners) != 0 {
|
||||
t.Errorf("List of runners should be empty")
|
||||
}
|
||||
ordList := lb.r_list.Load().([]agent.Runner)
|
||||
if ordList[0] != nullRunnerSingleton {
|
||||
t.Errorf("Ordered list should have a nullRunner")
|
||||
}
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
)
|
||||
|
||||
// allow factory to be overridden in tests
|
||||
type insecureRunnerFactory func(addr string) (agent.Runner, error)
|
||||
|
||||
func insecureGRPCRunnerFactory(addr string) (agent.Runner, error) {
|
||||
conn, client, err := runnerConnection(addr, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &gRPCRunner{
|
||||
address: addr,
|
||||
conn: conn,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// manages a single set of runners ignoring lb groups
|
||||
type staticNodePool struct {
|
||||
generator insecureRunnerFactory
|
||||
rMtx *sync.RWMutex
|
||||
runners []agent.Runner
|
||||
}
|
||||
|
||||
// NewStaticNodePool returns a NodePool consisting of a static set of runners
|
||||
func DefaultStaticNodePool(runnerAddresses []string) agent.NodePool {
|
||||
return newStaticNodePool(runnerAddresses, insecureGRPCRunnerFactory)
|
||||
}
|
||||
|
||||
// NewStaticNodePool returns a NodePool consisting of a static set of runners
|
||||
func newStaticNodePool(runnerAddresses []string, runnerFactory insecureRunnerFactory) agent.NodePool {
|
||||
logrus.WithField("runners", runnerAddresses).Info("Starting static runner pool")
|
||||
var runners []agent.Runner
|
||||
for _, addr := range runnerAddresses {
|
||||
r, err := runnerFactory(addr)
|
||||
if err != nil {
|
||||
logrus.WithField("runner_addr", addr).Warn("Invalid runner")
|
||||
continue
|
||||
}
|
||||
logrus.WithField("runner_addr", addr).Debug("Adding runner to pool")
|
||||
runners = append(runners, r)
|
||||
}
|
||||
return &staticNodePool{
|
||||
rMtx: &sync.RWMutex{},
|
||||
runners: runners,
|
||||
generator: runnerFactory,
|
||||
}
|
||||
}
|
||||
|
||||
func (np *staticNodePool) Runners(lbgID string) []agent.Runner {
|
||||
np.rMtx.RLock()
|
||||
defer np.rMtx.RUnlock()
|
||||
|
||||
r := make([]agent.Runner, len(np.runners))
|
||||
copy(r, np.runners)
|
||||
return r
|
||||
}
|
||||
|
||||
func (np *staticNodePool) AddRunner(address string) error {
|
||||
np.rMtx.Lock()
|
||||
defer np.rMtx.Unlock()
|
||||
|
||||
r, err := np.generator(address)
|
||||
if err != nil {
|
||||
logrus.WithField("runner_addr", address).Warn("Failed to add runner")
|
||||
return err
|
||||
}
|
||||
np.runners = append(np.runners, r)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (np *staticNodePool) RemoveRunner(address string) {
|
||||
np.rMtx.Lock()
|
||||
defer np.rMtx.Unlock()
|
||||
|
||||
for i, r := range np.runners {
|
||||
if r.Address() == address {
|
||||
// delete runner from list
|
||||
np.runners = append(np.runners[:i], np.runners[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (np *staticNodePool) AssignCapacity(r *poolmanager.CapacityRequest) {
|
||||
// NO-OP
|
||||
}
|
||||
|
||||
func (np *staticNodePool) ReleaseCapacity(r *poolmanager.CapacityRequest) {
|
||||
// NO-OP
|
||||
}
|
||||
|
||||
func (np *staticNodePool) Shutdown() {
|
||||
// NO-OP
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/poolmanager"
|
||||
)
|
||||
|
||||
func setupStaticPool(runners []string) agent.NodePool {
|
||||
return newStaticNodePool(runners, mockRunnerFactory)
|
||||
}
|
||||
|
||||
type mockStaticRunner struct {
|
||||
address string
|
||||
}
|
||||
|
||||
func (r *mockStaticRunner) TryExec(ctx context.Context, call agent.Call) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *mockStaticRunner) Close() {
|
||||
|
||||
}
|
||||
func (r *mockStaticRunner) Address() string {
|
||||
return r.address
|
||||
}
|
||||
|
||||
func mockRunnerFactory(addr string) (agent.Runner, error) {
|
||||
return &mockStaticRunner{address: addr}, nil
|
||||
}
|
||||
|
||||
func TestNewStaticPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs)
|
||||
|
||||
if len(np.Runners("foo")) != len(addrs) {
|
||||
t.Fatalf("Invalid number of runners %v", len(np.Runners("foo")))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCapacityForStaticPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs)
|
||||
|
||||
cr := &poolmanager.CapacityRequest{TotalMemoryMb: 100, LBGroupID: "foo"}
|
||||
np.AssignCapacity(cr)
|
||||
np.ReleaseCapacity(cr)
|
||||
|
||||
if len(np.Runners("foo")) != len(addrs) {
|
||||
t.Fatalf("Invalid number of runners %v", len(np.Runners("foo")))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddNodeToPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs).(*staticNodePool)
|
||||
np.AddRunner("127.0.0.1:8082")
|
||||
|
||||
if len(np.Runners("foo")) != len(addrs)+1 {
|
||||
t.Fatalf("Invalid number of runners %v", len(np.Runners("foo")))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveNodeFromPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs).(*staticNodePool)
|
||||
np.RemoveRunner("127.0.0.1:8081")
|
||||
|
||||
if len(np.Runners("foo")) != len(addrs)-1 {
|
||||
t.Fatalf("Invalid number of runners %v", len(np.Runners("foo")))
|
||||
}
|
||||
|
||||
np.RemoveRunner("127.0.0.1:8081")
|
||||
if len(np.Runners("foo")) != len(addrs)-1 {
|
||||
t.Fatalf("Invalid number of runners %v", len(np.Runners("foo")))
|
||||
}
|
||||
}
|
||||
240
api/agent/runner_client.go
Normal file
240
api/agent/runner_client.go
Normal file
@@ -0,0 +1,240 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
pb "github.com/fnproject/fn/api/agent/grpc"
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
"github.com/fnproject/fn/grpcutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type pkiData struct {
|
||||
ca string
|
||||
key string
|
||||
cert string
|
||||
}
|
||||
|
||||
type nullRunner struct{}
|
||||
|
||||
func (n *nullRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (n *nullRunner) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *nullRunner) Address() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
var nullRunnerSingleton = new(nullRunner)
|
||||
|
||||
type gRPCRunner struct {
|
||||
// Need a WaitGroup of TryExec in flight
|
||||
wg sync.WaitGroup
|
||||
address string
|
||||
conn *grpc.ClientConn
|
||||
client pb.RunnerProtocolClient
|
||||
}
|
||||
|
||||
// allow factory to be overridden in tests
|
||||
type secureRunnerFactory func(addr string, cert string, key string, ca string) (pool.Runner, error)
|
||||
|
||||
func secureGRPCRunnerFactory(addr string, cert string, key string, ca string) (pool.Runner, error) {
|
||||
p := &pkiData{
|
||||
cert: cert,
|
||||
key: key,
|
||||
ca: ca,
|
||||
}
|
||||
conn, client, err := runnerConnection(addr, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &gRPCRunner{
|
||||
address: addr,
|
||||
conn: conn,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close waits until the context is closed for all inflight requests
|
||||
// to complete prior to terminating the underlying grpc connection
|
||||
func (r *gRPCRunner) Close(ctx context.Context) error {
|
||||
err := make(chan error)
|
||||
go func() {
|
||||
defer close(err)
|
||||
r.wg.Wait()
|
||||
err <- r.conn.Close()
|
||||
}()
|
||||
|
||||
select {
|
||||
case e := <-err:
|
||||
return e
|
||||
case <-ctx.Done():
|
||||
return ctx.Err() // context timed out while waiting
|
||||
}
|
||||
}
|
||||
|
||||
func runnerConnection(address string, pki *pkiData) (*grpc.ClientConn, pb.RunnerProtocolClient, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
var creds credentials.TransportCredentials
|
||||
if pki != nil {
|
||||
var err error
|
||||
creds, err = grpcutil.CreateCredentials(pki.cert, pki.key, pki.ca)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to create credentials to connect to runner node")
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// we want to set a very short timeout to fail-fast if something goes wrong
|
||||
conn, err := grpcutil.DialWithBackoff(ctx, address, creds, 100*time.Millisecond, grpc.DefaultBackoffConfig)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to connect to runner node")
|
||||
}
|
||||
|
||||
protocolClient := pb.NewRunnerProtocolClient(conn)
|
||||
logrus.WithField("runner_addr", address).Info("Connected to runner")
|
||||
|
||||
return conn, protocolClient, nil
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) Address() string {
|
||||
return r.address
|
||||
}
|
||||
|
||||
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
logrus.WithField("runner_addr", r.address).Debug("Attempting to place call")
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
// extract the call's model data to pass on to the pure runner
|
||||
modelJSON, err := json.Marshal(call.Model())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to encode model as JSON")
|
||||
// If we can't encode the model, no runner will ever be able to run this. Give up.
|
||||
return true, err
|
||||
}
|
||||
runnerConnection, err := r.client.Engage(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to create client to runner node")
|
||||
// Try on next runner
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = runnerConnection.Send(&pb.ClientMsg{Body: &pb.ClientMsg_Try{Try: &pb.TryCall{ModelsCallJson: string(modelJSON)}}})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send message to runner node")
|
||||
return false, err
|
||||
}
|
||||
msg, err := runnerConnection.Recv()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to receive first message from runner node")
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch body := msg.Body.(type) {
|
||||
case *pb.RunnerMsg_Acknowledged:
|
||||
if !body.Acknowledged.Committed {
|
||||
logrus.Debugf("Runner didn't commit invocation request: %v", body.Acknowledged.Details)
|
||||
return false, nil
|
||||
// Try the next runner
|
||||
}
|
||||
logrus.Debug("Runner committed invocation request, sending data frames")
|
||||
done := make(chan error)
|
||||
go receiveFromRunner(runnerConnection, call, done)
|
||||
sendToRunner(call, runnerConnection)
|
||||
return true, <-done
|
||||
|
||||
default:
|
||||
logrus.Errorf("Unhandled message type received from runner: %v\n", msg)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func sendToRunner(call pool.RunnerCall, protocolClient pb.RunnerProtocol_EngageClient) error {
|
||||
bodyReader := call.Request().Body
|
||||
writeBufferSize := 10 * 1024 // 10KB
|
||||
writeBuffer := make([]byte, writeBufferSize)
|
||||
for {
|
||||
n, err := bodyReader.Read(writeBuffer)
|
||||
logrus.Debugf("Wrote %v bytes to the runner", n)
|
||||
|
||||
if err == io.EOF {
|
||||
err = protocolClient.Send(&pb.ClientMsg{
|
||||
Body: &pb.ClientMsg_Data{
|
||||
Data: &pb.DataFrame{
|
||||
Data: writeBuffer,
|
||||
Eof: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send data frame with EOF to runner")
|
||||
}
|
||||
break
|
||||
}
|
||||
err = protocolClient.Send(&pb.ClientMsg{
|
||||
Body: &pb.ClientMsg_Data{
|
||||
Data: &pb.DataFrame{
|
||||
Data: writeBuffer,
|
||||
Eof: false,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to send data frame")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func receiveFromRunner(protocolClient pb.RunnerProtocol_EngageClient, c pool.RunnerCall, done chan error) {
|
||||
w := c.ResponseWriter()
|
||||
|
||||
for {
|
||||
msg, err := protocolClient.Recv()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to receive message from runner")
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
|
||||
switch body := msg.Body.(type) {
|
||||
case *pb.RunnerMsg_ResultStart:
|
||||
switch meta := body.ResultStart.Meta.(type) {
|
||||
case *pb.CallResultStart_Http:
|
||||
for _, header := range meta.Http.Headers {
|
||||
w.Header().Set(header.Key, header.Value)
|
||||
}
|
||||
default:
|
||||
logrus.Errorf("Unhandled meta type in start message: %v", meta)
|
||||
}
|
||||
case *pb.RunnerMsg_Data:
|
||||
w.Write(body.Data.Data)
|
||||
case *pb.RunnerMsg_Finished:
|
||||
if body.Finished.Success {
|
||||
logrus.Infof("Call finished successfully: %v", body.Finished.Details)
|
||||
} else {
|
||||
logrus.Infof("Call finish unsuccessfully:: %v", body.Finished.Details)
|
||||
}
|
||||
close(done)
|
||||
return
|
||||
default:
|
||||
logrus.Errorf("Unhandled message type from runner: %v", body)
|
||||
}
|
||||
}
|
||||
}
|
||||
146
api/agent/static_pool.go
Normal file
146
api/agent/static_pool.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
staticPoolShutdownTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// allow factory to be overridden in tests
|
||||
type insecureRunnerFactory func(addr string) (pool.Runner, error)
|
||||
|
||||
func insecureGRPCRunnerFactory(addr string) (pool.Runner, error) {
|
||||
conn, client, err := runnerConnection(addr, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &gRPCRunner{
|
||||
address: addr,
|
||||
conn: conn,
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// manages a single set of runners ignoring lb groups
|
||||
type staticRunnerPool struct {
|
||||
generator insecureRunnerFactory
|
||||
rMtx *sync.RWMutex
|
||||
runners []pool.Runner
|
||||
}
|
||||
|
||||
// DefaultStaticRunnerPool returns a RunnerPool consisting of a static set of runners
|
||||
func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool {
|
||||
return newStaticRunnerPool(runnerAddresses, insecureGRPCRunnerFactory)
|
||||
}
|
||||
|
||||
func newStaticRunnerPool(runnerAddresses []string, runnerFactory insecureRunnerFactory) pool.RunnerPool {
|
||||
logrus.WithField("runners", runnerAddresses).Info("Starting static runner pool")
|
||||
var runners []pool.Runner
|
||||
for _, addr := range runnerAddresses {
|
||||
r, err := runnerFactory(addr)
|
||||
if err != nil {
|
||||
logrus.WithField("runner_addr", addr).Warn("Invalid runner")
|
||||
continue
|
||||
}
|
||||
logrus.WithField("runner_addr", addr).Debug("Adding runner to pool")
|
||||
runners = append(runners, r)
|
||||
}
|
||||
return &staticRunnerPool{
|
||||
rMtx: &sync.RWMutex{},
|
||||
runners: runners,
|
||||
generator: runnerFactory,
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *staticRunnerPool) Runners(call pool.RunnerCall) ([]pool.Runner, error) {
|
||||
rp.rMtx.RLock()
|
||||
defer rp.rMtx.RUnlock()
|
||||
|
||||
r := make([]pool.Runner, len(rp.runners))
|
||||
copy(r, rp.runners)
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (rp *staticRunnerPool) AddRunner(address string) error {
|
||||
rp.rMtx.Lock()
|
||||
defer rp.rMtx.Unlock()
|
||||
|
||||
r, err := rp.generator(address)
|
||||
if err != nil {
|
||||
logrus.WithField("runner_addr", address).Warn("Failed to add runner")
|
||||
return err
|
||||
}
|
||||
rp.runners = append(rp.runners, r)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *staticRunnerPool) RemoveRunner(address string) {
|
||||
rp.rMtx.Lock()
|
||||
defer rp.rMtx.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
for i, r := range rp.runners {
|
||||
if r.Address() == address {
|
||||
err := r.Close(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("runner_addr", r.Address()).Error("Failed to close runner")
|
||||
}
|
||||
// delete runner from list
|
||||
rp.runners = append(rp.runners[:i], rp.runners[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown blocks waiting for all runners to close, or until ctx is done
|
||||
func (rp *staticRunnerPool) Shutdown(ctx context.Context) (e error) {
|
||||
rp.rMtx.Lock()
|
||||
defer rp.rMtx.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), staticPoolShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
errors := make(chan error, len(rp.runners))
|
||||
var wg sync.WaitGroup
|
||||
for _, r := range rp.runners {
|
||||
wg.Add(1)
|
||||
go func(runner pool.Runner) {
|
||||
defer wg.Done()
|
||||
err := runner.Close(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("runner_addr", runner.Address()).Error("Failed to close runner")
|
||||
errors <- err
|
||||
}
|
||||
}(r)
|
||||
}
|
||||
|
||||
done := make(chan interface{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
close(errors)
|
||||
for e := range errors {
|
||||
// return the first error
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err() // context timed out while waiting
|
||||
}
|
||||
}
|
||||
84
api/agent/static_pool_test.go
Normal file
84
api/agent/static_pool_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
pool "github.com/fnproject/fn/api/runnerpool"
|
||||
)
|
||||
|
||||
func setupStaticPool(runners []string) pool.RunnerPool {
|
||||
return newStaticRunnerPool(runners, mockRunnerFactory)
|
||||
}
|
||||
|
||||
type mockStaticRunner struct {
|
||||
address string
|
||||
}
|
||||
|
||||
func (r *mockStaticRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *mockStaticRunner) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockStaticRunner) Address() string {
|
||||
return r.address
|
||||
}
|
||||
|
||||
func mockRunnerFactory(addr string) (pool.Runner, error) {
|
||||
return &mockStaticRunner{address: addr}, nil
|
||||
}
|
||||
|
||||
func TestNewStaticPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs)
|
||||
|
||||
runners, err := np.Runners(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list runners %v", err)
|
||||
}
|
||||
if len(runners) != len(addrs) {
|
||||
t.Fatalf("Invalid number of runners %v", len(runners))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddNodeToPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs).(*staticRunnerPool)
|
||||
np.AddRunner("127.0.0.1:8082")
|
||||
|
||||
runners, err := np.Runners(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list runners %v", err)
|
||||
}
|
||||
if len(runners) != 3 {
|
||||
t.Fatalf("Invalid number of runners %v", len(runners))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveNodeFromPool(t *testing.T) {
|
||||
addrs := []string{"127.0.0.1:8080", "127.0.0.1:8081"}
|
||||
np := setupStaticPool(addrs).(*staticRunnerPool)
|
||||
np.RemoveRunner("127.0.0.1:8081")
|
||||
|
||||
runners, err := np.Runners(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list runners %v", err)
|
||||
}
|
||||
|
||||
if len(runners) != 1 {
|
||||
t.Fatalf("Invalid number of runners %v", len(runners))
|
||||
}
|
||||
|
||||
np.RemoveRunner("127.0.0.1:8081")
|
||||
|
||||
runners, err = np.Runners(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list runners %v", err)
|
||||
}
|
||||
if len(runners) != 1 {
|
||||
t.Fatalf("Invalid number of runners %v", len(runners))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user