fn: shutdown improvements (#1009)

*) singleton style teardown for api tests
*) cancel content for system tests
This commit is contained in:
Tolga Ceylan
2018-05-18 14:40:15 -07:00
committed by GitHub
parent 3198a14410
commit e9c465c781
2 changed files with 107 additions and 53 deletions

View File

@@ -46,11 +46,6 @@ func APIClient() *client.Fn {
return client.New(transport, strfmt.Default)
}
var (
getServer sync.Once
s *server.Server
)
func checkServer(ctx context.Context) error {
if ctx.Err() != nil {
log.Print("Server check failed, timeout")
@@ -81,57 +76,104 @@ func getEnv(key, fallback string) string {
return fallback
}
var (
srvLock sync.Mutex
srvRefCount uint64
srvInstance *server.Server
srvDone chan struct{}
srvCancel func()
)
func stopServer(ctx context.Context) {
srvLock.Lock()
defer srvLock.Unlock()
if srvRefCount == 0 {
log.Printf("Server not running, ref count %v", srvRefCount)
return
}
srvRefCount--
if srvRefCount != 0 {
log.Printf("Server decrement ref count %v", srvRefCount)
return
}
srvCancel()
select {
case <-srvDone:
case <-ctx.Done():
log.Panic("Server Cleanup failed, timeout")
}
}
func startServer() {
getServer.Do(func() {
ctx := context.Background()
srvLock.Lock()
srvRefCount++
timeString := time.Now().Format("2006_01_02_15_04_05")
dbURL := os.Getenv(server.EnvDBURL)
tmpDir := os.TempDir()
tmpMq := fmt.Sprintf("%s/fn_integration_test_%s_worker_mq.db", tmpDir, timeString)
tmpDb := fmt.Sprintf("%s/fn_integration_test_%s_fn.db", tmpDir, timeString)
mqURL := fmt.Sprintf("bolt://%s", tmpMq)
if dbURL == "" {
dbURL = fmt.Sprintf("sqlite3://%s", tmpDb)
if srvRefCount != 1 {
log.Printf("Server already running, ref count %v", srvRefCount)
srvLock.Unlock()
// check once
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(2)*time.Second))
defer cancel()
err := checkServer(ctx)
if err != nil {
log.Panicf("Server check failed: %s", err)
}
s = server.New(ctx,
server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)),
server.WithDBURL(dbURL),
server.WithMQURL(mqURL),
server.WithFullAgent(),
)
return
}
go func() {
s.Start(ctx)
os.Remove(tmpMq)
os.Remove(tmpDb)
}()
log.Printf("Starting server, ref count %v", srvRefCount)
startCtx, startCancel := context.WithDeadline(ctx, time.Now().Add(time.Duration(10)*time.Second))
defer startCancel()
for {
err := checkServer(startCtx)
if err == nil {
break
}
select {
case <-time.After(time.Second * 1):
case <-ctx.Done():
}
if ctx.Err() != nil {
log.Panic("Server check failed, timeout")
}
srvDone = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
srvCancel = cancel
timeString := time.Now().Format("2006_01_02_15_04_05")
dbURL := os.Getenv(server.EnvDBURL)
tmpDir := os.TempDir()
tmpMq := fmt.Sprintf("%s/fn_integration_test_%s_worker_mq.db", tmpDir, timeString)
tmpDb := fmt.Sprintf("%s/fn_integration_test_%s_fn.db", tmpDir, timeString)
mqURL := fmt.Sprintf("bolt://%s", tmpMq)
if dbURL == "" {
dbURL = fmt.Sprintf("sqlite3://%s", tmpDb)
}
srvInstance = server.New(ctx,
server.WithLogLevel(getEnv(server.EnvLogLevel, server.DefaultLogLevel)),
server.WithDBURL(dbURL),
server.WithMQURL(mqURL),
server.WithFullAgent(),
)
go func() {
srvInstance.Start(ctx)
log.Print("Stopped server")
os.Remove(tmpMq)
os.Remove(tmpDb)
close(srvDone)
}()
srvLock.Unlock()
startCtx, startCancel := context.WithDeadline(ctx, time.Now().Add(time.Duration(10)*time.Second))
defer startCancel()
for {
err := checkServer(startCtx)
if err == nil {
break
}
select {
case <-time.After(time.Second * 1):
case <-ctx.Done():
}
if ctx.Err() != nil {
log.Panic("Server check failed, timeout")
}
})
// check once
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(2)*time.Second))
defer cancel()
err := checkServer(ctx)
if err != nil {
log.Panicf("Server check failed: %s", err)
}
}
@@ -166,6 +208,9 @@ func RandStringBytes(n int) string {
// SetupHarness creates a test harness for a test case - this picks up external options and
func SetupHarness() *TestHarness {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
startServer()
ss := &TestHarness{
Context: ctx,
Cancel: cancel,
@@ -182,13 +227,12 @@ func SetupHarness() *TestHarness {
IdleTimeout: int32(30),
createdApps: make(map[string]bool),
}
startServer()
return ss
}
func (s *TestHarness) Cleanup() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
//for _,ar := range s.createdRoutes {
// deleteRoute(ctx, s.Client, ar.appName, ar.routeName)
@@ -197,6 +241,8 @@ func (s *TestHarness) Cleanup() {
for app, _ := range s.createdApps {
safeDeleteApp(ctx, s.Client, app)
}
stopServer(ctx)
}
func EnvAsHeader(req *http.Request, selectedEnv []string) {

View File

@@ -37,11 +37,14 @@ func NewSystemTestNodePool() (pool.RunnerPool, error) {
type state struct {
memory string
cancel func()
}
func SetUpSystem() (*state, error) {
ctx := context.Background()
state := &state{}
ctx, cancel := context.WithCancel(context.Background())
state := &state{
cancel: cancel,
}
api, err := SetUpAPINode(ctx)
if err != nil {
@@ -106,6 +109,11 @@ func CleanUpSystem(st *state) error {
if err != nil {
return err
}
if st.cancel != nil {
st.cancel()
}
// Wait for shutdown - not great
time.Sleep(5 * time.Second)