diff --git a/api/agent/agent.go b/api/agent/agent.go index 8802ca02c..2644bb017 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -9,6 +9,7 @@ import ( "github.com/fnproject/fn/api/agent/drivers" "github.com/fnproject/fn/api/agent/drivers/docker" + "github.com/fnproject/fn/api/agent/drivers/mock" "github.com/fnproject/fn/api/agent/protocol" "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" @@ -111,14 +112,13 @@ type agent struct { } func New(da DataAccess) Agent { - a := NewSyncOnly(da).(*agent) + a := createAgent(da, true).(*agent) a.wg.Add(1) go a.asyncDequeue() // safe shutdown can nanny this fine return a } -func NewSyncOnly(da DataAccess) Agent { - +func createAgent(da DataAccess, withDocker bool) Agent { cfg, err := NewAgentConfig() if err != nil { logrus.WithError(err).Fatalf("error in agent config cfg=%+v", cfg) @@ -126,9 +126,14 @@ func NewSyncOnly(da DataAccess) Agent { logrus.Infof("agent starting cfg=%+v", cfg) // TODO: Create drivers.New(runnerConfig) - driver := docker.NewDocker(drivers.Config{ - ServerVersion: cfg.MinDockerVersion, - }) + var driver drivers.Driver + if withDocker { + driver = docker.NewDocker(drivers.Config{ + ServerVersion: cfg.MinDockerVersion, + }) + } else { + driver = mock.New() + } a := &agent{ cfg: *cfg, diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index e5c3be4f1..77cecc55d 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -102,7 +102,8 @@ type lbAgent struct { shutdown chan struct{} } -func NewLBAgent(agent Agent, rp pool.RunnerPool, p pool.Placer) (Agent, error) { +func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error) { + agent := createAgent(da, false) a := &lbAgent{ delegatedAgent: agent, rp: rp, diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 4da91ebb5..6c1c35259 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -546,15 +546,16 @@ func (pr *pureRunner) Start() error { return err } -func UnsecuredPureRunner(cancel context.CancelFunc, addr string, a Agent) (*pureRunner, error) { - return NewPureRunner(cancel, addr, a, "", "", "", nil) +func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (*pureRunner, error) { + return NewPureRunner(cancel, addr, da, "", "", "", nil) } -func DefaultPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string) (*pureRunner, error) { - return NewPureRunner(cancel, addr, a, cert, key, ca, nil) +func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (*pureRunner, error) { + return NewPureRunner(cancel, addr, da, cert, key, ca, nil) } -func NewPureRunner(cancel context.CancelFunc, addr string, a Agent, cert string, key string, ca string, gate CapacityGate) (*pureRunner, error) { +func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (*pureRunner, error) { + a := createAgent(da, true) var pr *pureRunner var err error if cert != "" && key != "" && ca != "" { diff --git a/api/server/server.go b/api/server/server.go index 26774ef0d..28745b7b6 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -416,9 +416,8 @@ func WithAgentFromEnv() ServerOption { return err } grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort) - delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, delegatedAgent, s.cert, s.certKey, s.certAuthority) + prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, ds, s.cert, s.certKey, s.certAuthority) if err != nil { return err } @@ -441,7 +440,6 @@ func WithAgentFromEnv() ServerOption { if err != nil { return err } - delegatedAgent := agent.New(agent.NewCachedDataAccess(cl)) runnerPool, err := s.defaultRunnerPool() if err != nil { @@ -449,7 +447,7 @@ func WithAgentFromEnv() ServerOption { } placer := s.defaultPlacer() - s.agent, err = agent.NewLBAgent(delegatedAgent, runnerPool, placer) + s.agent, err = agent.NewLBAgent(agent.NewCachedDataAccess(cl), runnerPool, placer) if err != nil { return errors.New("LBAgent creation failed") } diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index efbdf85a6..efae61be0 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -141,13 +141,12 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { if err != nil { return nil, err } - delegatedAgent := agent.New(agent.NewCachedDataAccess(cl)) nodePool, err := NewSystemTestNodePool() if err != nil { return nil, err } placer := agent.NewNaivePlacer() - agent, err := agent.NewLBAgent(delegatedAgent, nodePool, placer) + agent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer) if err != nil { return nil, err } @@ -207,9 +206,8 @@ func SetUpPureRunnerNode(ctx context.Context, nodeNum int) (*server.Server, erro return nil, err } grpcAddr := fmt.Sprintf(":%d", 9190+nodeNum) - delegatedAgent := agent.NewSyncOnly(agent.NewCachedDataAccess(ds)) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.NewPureRunner(cancel, grpcAddr, delegatedAgent, "", "", "", &testCapacityGate{runnerNumber: nodeNum}) + prAgent, err := agent.NewPureRunner(cancel, grpcAddr, ds, "", "", "", &testCapacityGate{runnerNumber: nodeNum}) if err != nil { return nil, err }