diff --git a/api/agent/agent.go b/api/agent/agent.go index cd0132578..e57829c1c 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -84,6 +84,7 @@ type Agent interface { Submit(Call) error // Close will wait for any outstanding calls to complete and then exit. + // Closing the agent will invoke Close on the underlying DataAccess. // Close is not safe to be called from multiple threads. io.Closer @@ -209,6 +210,12 @@ func (a *agent) Close() error { } }) + // shutdown any db/queue resources + // associated with DataAccess + daErr := a.da.Close() + if daErr != nil { + return daErr + } return err } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 79088c931..f3f172d75 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "math" @@ -61,6 +62,12 @@ func checkExpectedHeaders(t *testing.T, expectedHeaders http.Header, receivedHea } } +func checkClose(t *testing.T, a Agent) { + if err := a.Close(); err != nil { + t.Fatalf("Failed to close agent: %v", err) + } +} + func TestCallConfigurationRequest(t *testing.T) { appName := "myapp" path := "/" @@ -94,7 +101,7 @@ func TestCallConfigurationRequest(t *testing.T) { ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) w := httptest.NewRecorder() @@ -237,7 +244,7 @@ func TestCallConfigurationModel(t *testing.T) { ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) callI, err := a.GetCall(FromModel(cm)) if err != nil { @@ -308,7 +315,7 @@ func TestAsyncCallHeaders(t *testing.T) { ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) callI, err := a.GetCall(FromModel(cm)) if err != nil { @@ -434,7 +441,7 @@ func TestReqTooLarge(t *testing.T) { cfg.MaxRequestSize = 5 a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) - defer a.Close() + defer checkClose(t, a) _, err = a.GetCall(FromModel(cm)) if err != models.ErrRequestContentTooBig { @@ -487,7 +494,7 @@ func TestSubmitError(t *testing.T) { ds := datastore.NewMockInit() a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) var wg sync.WaitGroup wg.Add(1) @@ -554,7 +561,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) { ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) bodOne := `{"echoContent":"yodawg"}` @@ -617,7 +624,7 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { ds := datastore.NewMockInit() a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock)))) - defer a.Close() + defer checkClose(t, a) _, err := a.GetCall(FromModel(call)) if err != models.ErrCallTimeoutServerBusy { @@ -723,7 +730,7 @@ func TestPipesAreClear(t *testing.T) { ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) // test read this body after 5s (after call times out) and make sure we don't get yodawg // TODO could read after 10 seconds, to make sure the 2nd task's input stream isn't blocked @@ -873,7 +880,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) { ) a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) - defer a.Close() + defer checkClose(t, a) bodOne := `{"echoContent":"yodawg"}` req, err := http.NewRequest("GET", call.URL, strings.NewReader(bodOne)) @@ -979,7 +986,7 @@ func TestNBIOResourceTracker(t *testing.T) { cfg.HotPoll = 20 * time.Millisecond a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg)) - defer a.Close() + defer checkClose(t, a) reqCount := 20 errors := make(chan error, reqCount) @@ -1029,3 +1036,43 @@ func TestNBIOResourceTracker(t *testing.T) { t.Fatalf("Expected successes, but got %d", ok) } } + +type closingDataAccess struct { + DataAccess + closeReturn error + closed chan struct{} +} + +func newClosingDataAccess(closeReturn error) *closingDataAccess { + ds := datastore.NewMockInit() + return &closingDataAccess{ + DataAccess: NewDirectDataAccess(ds, ds, new(mqs.Mock)), + closed: make(chan struct{}), + closeReturn: closeReturn, + } + +} + +func (da *closingDataAccess) Close() error { + close(da.closed) + return da.closeReturn +} + +func TestClosesDataAccess(t *testing.T) { + da := newClosingDataAccess(nil) + + a := New(da) + checkClose(t, a) + <-da.closed +} + +func TestCloseReturnsDataAccessError(t *testing.T) { + err := errors.New("foo") + da := newClosingDataAccess(err) + a := New(da) + + if cerr := a.Close(); cerr != err { + t.Fatalf("Wrong error returned, expected %v but got %v", err, cerr) + } + <-da.closed +} diff --git a/api/agent/data_access.go b/api/agent/data_access.go index d4821746d..7cf15eccf 100644 --- a/api/agent/data_access.go +++ b/api/agent/data_access.go @@ -38,6 +38,11 @@ type DataAccess interface { // Finish will notify the system that the Call has been processed, and // fulfill the reservation in the queue if the call came from a queue. Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error + + // Close will wait for any pending operations to complete and + // shuts down connections to the underlying datastore/queue resources. + // Close is not safe to be called from multiple threads. + io.Closer } // CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute. @@ -108,6 +113,11 @@ func (da *CachedDataAccess) GetRoute(ctx context.Context, appID string, routePat return r.(*models.Route), nil } +// Close invokes close on the underlying DataAccess +func (da *CachedDataAccess) Close() error { + return da.DataAccess.Close() +} + type directDataAccess struct { mq models.MessageQueue ds models.Datastore @@ -177,3 +187,18 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde } return nil } + +// Close calls close on the underlying Datastore and MessageQueue. If the Logstore +// and Datastore are different, it will call Close on the Logstore as well. +func (da *directDataAccess) Close() error { + err := da.ds.Close() + if da.ds != da.ls { + if daErr := da.ls.Close(); daErr != nil { + err = daErr + } + } + if mqErr := da.mq.Close(); mqErr != nil { + err = mqErr + } + return err +} diff --git a/api/agent/hybrid/client.go b/api/agent/hybrid/client.go index 46eda8a28..72ea2493f 100644 --- a/api/agent/hybrid/client.go +++ b/api/agent/hybrid/client.go @@ -248,3 +248,7 @@ func (cl *client) once(ctx context.Context, request, result interface{}, method func (cl *client) url(args ...string) string { return cl.base + strings.Join(args, "/") } + +func (cl *client) Close() error { + return nil +} diff --git a/api/agent/hybrid/nop.go b/api/agent/hybrid/nop.go index d5992b2fc..c1e757c14 100644 --- a/api/agent/hybrid/nop.go +++ b/api/agent/hybrid/nop.go @@ -58,3 +58,7 @@ func (cl *nopDataStore) GetRoute(ctx context.Context, appName, route string) (*m defer span.End() return nil, errors.New("Should not call GetRoute on a NOP data store") } + +func (cl *nopDataStore) Close() error { + return nil +} diff --git a/api/datastore/internal/datastoreutil/metrics.go b/api/datastore/internal/datastoreutil/metrics.go index 9b26aade4..da168dd27 100644 --- a/api/datastore/internal/datastoreutil/metrics.go +++ b/api/datastore/internal/datastoreutil/metrics.go @@ -116,3 +116,8 @@ func (m *metricds) GetLog(ctx context.Context, appName, callID string) (io.Reade // instant & no context ;) func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() } + +// Close calls Close on the underlying Datastore +func (m *metricds) Close() error { + return m.ds.Close() +} diff --git a/api/datastore/mock.go b/api/datastore/mock.go index e2c67c2c2..66dfd9381 100644 --- a/api/datastore/mock.go +++ b/api/datastore/mock.go @@ -198,3 +198,7 @@ func (m *mock) batchDeleteRoutes(ctx context.Context, appID string) error { func (m *mock) GetDatabase() *sqlx.DB { return nil } + +func (m *mock) Close() error { + return nil +} diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 39709e47c..9c17f2f0d 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -910,3 +910,8 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) { func (ds *sqlStore) GetDatabase() *sqlx.DB { return ds.db } + +// Close closes the database, releasing any open resources. +func (ds *sqlStore) Close() error { + return ds.db.Close() +} diff --git a/api/datastore/sql/sql_test.go b/api/datastore/sql/sql_test.go index d78356d5c..ed6e147da 100644 --- a/api/datastore/sql/sql_test.go +++ b/api/datastore/sql/sql_test.go @@ -127,4 +127,23 @@ func TestDatastore(t *testing.T) { both(u) } + +} + +func TestClose(t *testing.T) { + ctx := context.Background() + defer os.RemoveAll("sqlite_test_dir") + u, err := url.Parse("sqlite3://sqlite_test_dir") + if err != nil { + t.Fatal(err) + } + os.RemoveAll("sqlite_test_dir") + ds, err := newDS(ctx, u) + if err != nil { + t.Fatal(err) + } + + if err := ds.Close(); err != nil { + t.Fatalf("Failed to close datastore: %v", err) + } } diff --git a/api/logs/mock.go b/api/logs/mock.go index 73557e76a..04abb55af 100644 --- a/api/logs/mock.go +++ b/api/logs/mock.go @@ -89,3 +89,7 @@ func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mode return calls, nil } + +func (m *mock) Close() error { + return nil +} diff --git a/api/logs/s3/s3.go b/api/logs/s3/s3.go index 4d55e4d7d..a590c9b3d 100644 --- a/api/logs/s3/s3.go +++ b/api/logs/s3/s3.go @@ -460,3 +460,7 @@ func init() { } } } + +func (s *store) Close() error { + return nil +} diff --git a/api/models/logs.go b/api/models/logs.go index f035d5e46..f81517ff6 100644 --- a/api/models/logs.go +++ b/api/models/logs.go @@ -30,4 +30,8 @@ type LogStore interface { // GetCalls returns a list of calls that satisfy the given CallFilter. If no // calls exist, an empty list and a nil error are returned. GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error) + + // Close will close any underlying connections as needed. + // Close is not safe to be called from multiple threads. + io.Closer } diff --git a/api/models/mq.go b/api/models/mq.go index 72802c5a2..884fa34a7 100644 --- a/api/models/mq.go +++ b/api/models/mq.go @@ -1,6 +1,9 @@ package models -import "context" +import ( + "context" + "io" +) // Message Queue is used to impose a total ordering on jobs that it will // execute in order. calls are added to the queue via the Push() interface. The @@ -49,4 +52,8 @@ type MessageQueue interface { // the job does not have an outstanding reservation, error. If a job did not // exist, succeed. Delete(context.Context, *Call) error + + // Close will close any underlying connections as needed. + // Close is not safe to be called from multiple threads. + io.Closer } diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 2f51b01cd..9a6328bd2 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -347,3 +347,10 @@ func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Call) error { return nil }) } + +// Close shuts down the bolt db connection and +// stops the goroutine associated with the ticker +func (mq *BoltDbMQ) Close() error { + mq.ticker.Stop() + return mq.db.Close() +} diff --git a/api/mqs/memory.go b/api/mqs/memory.go index ccaf1053c..d5be51c74 100644 --- a/api/mqs/memory.go +++ b/api/mqs/memory.go @@ -192,3 +192,9 @@ func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Call) error { log.Debugln("Deleted") return nil } + +// Close stops the associated goroutines by stopping the ticker +func (mq *MemoryMQ) Close() error { + mq.Ticker.Stop() + return nil +} diff --git a/api/mqs/mock.go b/api/mqs/mock.go index 49e545d57..5106e425a 100644 --- a/api/mqs/mock.go +++ b/api/mqs/mock.go @@ -24,3 +24,7 @@ func (mock *Mock) Reserve(context.Context) (*models.Call, error) { func (mock *Mock) Delete(context.Context, *models.Call) error { return nil } + +func (mock *Mock) Close() error { + return nil +} diff --git a/api/mqs/new.go b/api/mqs/new.go index 2e0549214..53af01dc3 100644 --- a/api/mqs/new.go +++ b/api/mqs/new.go @@ -60,3 +60,8 @@ func (m *metricMQ) Delete(ctx context.Context, t *models.Call) error { defer span.End() return m.mq.Delete(ctx, t) } + +// Close closes the underlying message queue +func (m *metricMQ) Close() error { + return m.mq.Close() +} diff --git a/api/mqs/redis.go b/api/mqs/redis.go index 80a371ef2..39bb29b3c 100644 --- a/api/mqs/redis.go +++ b/api/mqs/redis.go @@ -307,3 +307,10 @@ func (mq *RedisMQ) Delete(ctx context.Context, job *models.Call) error { _, err = conn.Do("HDEL", "timeout", resID) return err } + +// Close shuts down the redis connection pool and +// stops the goroutine associated with the ticker +func (mq *RedisMQ) Close() error { + mq.ticker.Stop() + return mq.pool.Close() +} diff --git a/api/server/runner_test.go b/api/server/runner_test.go index ad462c862..8aab40afa 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -446,7 +446,7 @@ func (mock *errorMQ) Push(context.Context, *models.Call) (*models.Call, error) { func (mock *errorMQ) Reserve(context.Context) (*models.Call, error) { return nil, mock } func (mock *errorMQ) Delete(context.Context, *models.Call) error { return mock } func (mock *errorMQ) Code() int { return mock.code } - +func (mock *errorMQ) Close() error { return nil } func TestFailedEnqueue(t *testing.T) { buf := setLogBuffer() app := &models.App{Name: "myapp", Config: models.Config{}}