mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Implement graceful shutdown of agent.DataAccess (#1008)
* Implements graceful shutdown of agent.DataAccess and underlying Datastore/Logstore/MessageQueue * adds tests for closing agent.DataAccess and Datastore
This commit is contained in:
@@ -84,6 +84,7 @@ type Agent interface {
|
|||||||
Submit(Call) error
|
Submit(Call) error
|
||||||
|
|
||||||
// Close will wait for any outstanding calls to complete and then exit.
|
// Close will wait for any outstanding calls to complete and then exit.
|
||||||
|
// Closing the agent will invoke Close on the underlying DataAccess.
|
||||||
// Close is not safe to be called from multiple threads.
|
// Close is not safe to be called from multiple threads.
|
||||||
io.Closer
|
io.Closer
|
||||||
|
|
||||||
@@ -209,6 +210,12 @@ func (a *agent) Close() error {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// shutdown any db/queue resources
|
||||||
|
// associated with DataAccess
|
||||||
|
daErr := a.da.Close()
|
||||||
|
if daErr != nil {
|
||||||
|
return daErr
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@@ -61,6 +62,12 @@ func checkExpectedHeaders(t *testing.T, expectedHeaders http.Header, receivedHea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkClose(t *testing.T, a Agent) {
|
||||||
|
if err := a.Close(); err != nil {
|
||||||
|
t.Fatalf("Failed to close agent: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCallConfigurationRequest(t *testing.T) {
|
func TestCallConfigurationRequest(t *testing.T) {
|
||||||
appName := "myapp"
|
appName := "myapp"
|
||||||
path := "/"
|
path := "/"
|
||||||
@@ -94,7 +101,7 @@ func TestCallConfigurationRequest(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
@@ -237,7 +244,7 @@ func TestCallConfigurationModel(t *testing.T) {
|
|||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
callI, err := a.GetCall(FromModel(cm))
|
callI, err := a.GetCall(FromModel(cm))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -308,7 +315,7 @@ func TestAsyncCallHeaders(t *testing.T) {
|
|||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
callI, err := a.GetCall(FromModel(cm))
|
callI, err := a.GetCall(FromModel(cm))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -434,7 +441,7 @@ func TestReqTooLarge(t *testing.T) {
|
|||||||
cfg.MaxRequestSize = 5
|
cfg.MaxRequestSize = 5
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
_, err = a.GetCall(FromModel(cm))
|
_, err = a.GetCall(FromModel(cm))
|
||||||
if err != models.ErrRequestContentTooBig {
|
if err != models.ErrRequestContentTooBig {
|
||||||
@@ -487,7 +494,7 @@ func TestSubmitError(t *testing.T) {
|
|||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -554,7 +561,7 @@ func TestHTTPWithoutContentLengthWorks(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
bodOne := `{"echoContent":"yodawg"}`
|
bodOne := `{"echoContent":"yodawg"}`
|
||||||
|
|
||||||
@@ -617,7 +624,7 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) {
|
|||||||
ds := datastore.NewMockInit()
|
ds := datastore.NewMockInit()
|
||||||
|
|
||||||
a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock))))
|
a := New(NewCachedDataAccess(NewDirectDataAccess(ds, ds, new(mqs.Mock))))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
_, err := a.GetCall(FromModel(call))
|
_, err := a.GetCall(FromModel(call))
|
||||||
if err != models.ErrCallTimeoutServerBusy {
|
if err != models.ErrCallTimeoutServerBusy {
|
||||||
@@ -723,7 +730,7 @@ func TestPipesAreClear(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
// test read this body after 5s (after call times out) and make sure we don't get yodawg
|
// test read this body after 5s (after call times out) and make sure we don't get yodawg
|
||||||
// TODO could read after 10 seconds, to make sure the 2nd task's input stream isn't blocked
|
// TODO could read after 10 seconds, to make sure the 2nd task's input stream isn't blocked
|
||||||
@@ -873,7 +880,7 @@ func TestPipesDontMakeSpuriousCalls(t *testing.T) {
|
|||||||
)
|
)
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
bodOne := `{"echoContent":"yodawg"}`
|
bodOne := `{"echoContent":"yodawg"}`
|
||||||
req, err := http.NewRequest("GET", call.URL, strings.NewReader(bodOne))
|
req, err := http.NewRequest("GET", call.URL, strings.NewReader(bodOne))
|
||||||
@@ -979,7 +986,7 @@ func TestNBIOResourceTracker(t *testing.T) {
|
|||||||
cfg.HotPoll = 20 * time.Millisecond
|
cfg.HotPoll = 20 * time.Millisecond
|
||||||
|
|
||||||
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)), WithConfig(cfg))
|
||||||
defer a.Close()
|
defer checkClose(t, a)
|
||||||
|
|
||||||
reqCount := 20
|
reqCount := 20
|
||||||
errors := make(chan error, reqCount)
|
errors := make(chan error, reqCount)
|
||||||
@@ -1029,3 +1036,43 @@ func TestNBIOResourceTracker(t *testing.T) {
|
|||||||
t.Fatalf("Expected successes, but got %d", ok)
|
t.Fatalf("Expected successes, but got %d", ok)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type closingDataAccess struct {
|
||||||
|
DataAccess
|
||||||
|
closeReturn error
|
||||||
|
closed chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClosingDataAccess(closeReturn error) *closingDataAccess {
|
||||||
|
ds := datastore.NewMockInit()
|
||||||
|
return &closingDataAccess{
|
||||||
|
DataAccess: NewDirectDataAccess(ds, ds, new(mqs.Mock)),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
closeReturn: closeReturn,
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (da *closingDataAccess) Close() error {
|
||||||
|
close(da.closed)
|
||||||
|
return da.closeReturn
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClosesDataAccess(t *testing.T) {
|
||||||
|
da := newClosingDataAccess(nil)
|
||||||
|
|
||||||
|
a := New(da)
|
||||||
|
checkClose(t, a)
|
||||||
|
<-da.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCloseReturnsDataAccessError(t *testing.T) {
|
||||||
|
err := errors.New("foo")
|
||||||
|
da := newClosingDataAccess(err)
|
||||||
|
a := New(da)
|
||||||
|
|
||||||
|
if cerr := a.Close(); cerr != err {
|
||||||
|
t.Fatalf("Wrong error returned, expected %v but got %v", err, cerr)
|
||||||
|
}
|
||||||
|
<-da.closed
|
||||||
|
}
|
||||||
|
|||||||
@@ -38,6 +38,11 @@ type DataAccess interface {
|
|||||||
// Finish will notify the system that the Call has been processed, and
|
// Finish will notify the system that the Call has been processed, and
|
||||||
// fulfill the reservation in the queue if the call came from a queue.
|
// fulfill the reservation in the queue if the call came from a queue.
|
||||||
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
|
Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
|
||||||
|
|
||||||
|
// Close will wait for any pending operations to complete and
|
||||||
|
// shuts down connections to the underlying datastore/queue resources.
|
||||||
|
// Close is not safe to be called from multiple threads.
|
||||||
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
// CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
|
// CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.
|
||||||
@@ -108,6 +113,11 @@ func (da *CachedDataAccess) GetRoute(ctx context.Context, appID string, routePat
|
|||||||
return r.(*models.Route), nil
|
return r.(*models.Route), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close invokes close on the underlying DataAccess
|
||||||
|
func (da *CachedDataAccess) Close() error {
|
||||||
|
return da.DataAccess.Close()
|
||||||
|
}
|
||||||
|
|
||||||
type directDataAccess struct {
|
type directDataAccess struct {
|
||||||
mq models.MessageQueue
|
mq models.MessageQueue
|
||||||
ds models.Datastore
|
ds models.Datastore
|
||||||
@@ -177,3 +187,18 @@ func (da *directDataAccess) Finish(ctx context.Context, mCall *models.Call, stde
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close calls close on the underlying Datastore and MessageQueue. If the Logstore
|
||||||
|
// and Datastore are different, it will call Close on the Logstore as well.
|
||||||
|
func (da *directDataAccess) Close() error {
|
||||||
|
err := da.ds.Close()
|
||||||
|
if da.ds != da.ls {
|
||||||
|
if daErr := da.ls.Close(); daErr != nil {
|
||||||
|
err = daErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if mqErr := da.mq.Close(); mqErr != nil {
|
||||||
|
err = mqErr
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
@@ -248,3 +248,7 @@ func (cl *client) once(ctx context.Context, request, result interface{}, method
|
|||||||
func (cl *client) url(args ...string) string {
|
func (cl *client) url(args ...string) string {
|
||||||
return cl.base + strings.Join(args, "/")
|
return cl.base + strings.Join(args, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cl *client) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -58,3 +58,7 @@ func (cl *nopDataStore) GetRoute(ctx context.Context, appName, route string) (*m
|
|||||||
defer span.End()
|
defer span.End()
|
||||||
return nil, errors.New("Should not call GetRoute on a NOP data store")
|
return nil, errors.New("Should not call GetRoute on a NOP data store")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cl *nopDataStore) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -116,3 +116,8 @@ func (m *metricds) GetLog(ctx context.Context, appName, callID string) (io.Reade
|
|||||||
|
|
||||||
// instant & no context ;)
|
// instant & no context ;)
|
||||||
func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() }
|
func (m *metricds) GetDatabase() *sqlx.DB { return m.ds.GetDatabase() }
|
||||||
|
|
||||||
|
// Close calls Close on the underlying Datastore
|
||||||
|
func (m *metricds) Close() error {
|
||||||
|
return m.ds.Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -198,3 +198,7 @@ func (m *mock) batchDeleteRoutes(ctx context.Context, appID string) error {
|
|||||||
func (m *mock) GetDatabase() *sqlx.DB {
|
func (m *mock) GetDatabase() *sqlx.DB {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mock) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -910,3 +910,8 @@ func buildFilterCallQuery(filter *models.CallFilter) (string, []interface{}) {
|
|||||||
func (ds *sqlStore) GetDatabase() *sqlx.DB {
|
func (ds *sqlStore) GetDatabase() *sqlx.DB {
|
||||||
return ds.db
|
return ds.db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes the database, releasing any open resources.
|
||||||
|
func (ds *sqlStore) Close() error {
|
||||||
|
return ds.db.Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -127,4 +127,23 @@ func TestDatastore(t *testing.T) {
|
|||||||
|
|
||||||
both(u)
|
both(u)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClose(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
defer os.RemoveAll("sqlite_test_dir")
|
||||||
|
u, err := url.Parse("sqlite3://sqlite_test_dir")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
os.RemoveAll("sqlite_test_dir")
|
||||||
|
ds, err := newDS(ctx, u)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ds.Close(); err != nil {
|
||||||
|
t.Fatalf("Failed to close datastore: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,3 +89,7 @@ func (m *mock) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*mode
|
|||||||
|
|
||||||
return calls, nil
|
return calls, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mock) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -460,3 +460,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *store) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -30,4 +30,8 @@ type LogStore interface {
|
|||||||
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
// GetCalls returns a list of calls that satisfy the given CallFilter. If no
|
||||||
// calls exist, an empty list and a nil error are returned.
|
// calls exist, an empty list and a nil error are returned.
|
||||||
GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error)
|
GetCalls(ctx context.Context, filter *CallFilter) ([]*Call, error)
|
||||||
|
|
||||||
|
// Close will close any underlying connections as needed.
|
||||||
|
// Close is not safe to be called from multiple threads.
|
||||||
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
package models
|
package models
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
// Message Queue is used to impose a total ordering on jobs that it will
|
// Message Queue is used to impose a total ordering on jobs that it will
|
||||||
// execute in order. calls are added to the queue via the Push() interface. The
|
// execute in order. calls are added to the queue via the Push() interface. The
|
||||||
@@ -49,4 +52,8 @@ type MessageQueue interface {
|
|||||||
// the job does not have an outstanding reservation, error. If a job did not
|
// the job does not have an outstanding reservation, error. If a job did not
|
||||||
// exist, succeed.
|
// exist, succeed.
|
||||||
Delete(context.Context, *Call) error
|
Delete(context.Context, *Call) error
|
||||||
|
|
||||||
|
// Close will close any underlying connections as needed.
|
||||||
|
// Close is not safe to be called from multiple threads.
|
||||||
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -347,3 +347,10 @@ func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Call) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close shuts down the bolt db connection and
|
||||||
|
// stops the goroutine associated with the ticker
|
||||||
|
func (mq *BoltDbMQ) Close() error {
|
||||||
|
mq.ticker.Stop()
|
||||||
|
return mq.db.Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -192,3 +192,9 @@ func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Call) error {
|
|||||||
log.Debugln("Deleted")
|
log.Debugln("Deleted")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close stops the associated goroutines by stopping the ticker
|
||||||
|
func (mq *MemoryMQ) Close() error {
|
||||||
|
mq.Ticker.Stop()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -24,3 +24,7 @@ func (mock *Mock) Reserve(context.Context) (*models.Call, error) {
|
|||||||
func (mock *Mock) Delete(context.Context, *models.Call) error {
|
func (mock *Mock) Delete(context.Context, *models.Call) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mock *Mock) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -60,3 +60,8 @@ func (m *metricMQ) Delete(ctx context.Context, t *models.Call) error {
|
|||||||
defer span.End()
|
defer span.End()
|
||||||
return m.mq.Delete(ctx, t)
|
return m.mq.Delete(ctx, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying message queue
|
||||||
|
func (m *metricMQ) Close() error {
|
||||||
|
return m.mq.Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -307,3 +307,10 @@ func (mq *RedisMQ) Delete(ctx context.Context, job *models.Call) error {
|
|||||||
_, err = conn.Do("HDEL", "timeout", resID)
|
_, err = conn.Do("HDEL", "timeout", resID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close shuts down the redis connection pool and
|
||||||
|
// stops the goroutine associated with the ticker
|
||||||
|
func (mq *RedisMQ) Close() error {
|
||||||
|
mq.ticker.Stop()
|
||||||
|
return mq.pool.Close()
|
||||||
|
}
|
||||||
|
|||||||
@@ -446,7 +446,7 @@ func (mock *errorMQ) Push(context.Context, *models.Call) (*models.Call, error) {
|
|||||||
func (mock *errorMQ) Reserve(context.Context) (*models.Call, error) { return nil, mock }
|
func (mock *errorMQ) Reserve(context.Context) (*models.Call, error) { return nil, mock }
|
||||||
func (mock *errorMQ) Delete(context.Context, *models.Call) error { return mock }
|
func (mock *errorMQ) Delete(context.Context, *models.Call) error { return mock }
|
||||||
func (mock *errorMQ) Code() int { return mock.code }
|
func (mock *errorMQ) Code() int { return mock.code }
|
||||||
|
func (mock *errorMQ) Close() error { return nil }
|
||||||
func TestFailedEnqueue(t *testing.T) {
|
func TestFailedEnqueue(t *testing.T) {
|
||||||
buf := setLogBuffer()
|
buf := setLogBuffer()
|
||||||
app := &models.App{Name: "myapp", Config: models.Config{}}
|
app := &models.App{Name: "myapp", Config: models.Config{}}
|
||||||
|
|||||||
Reference in New Issue
Block a user