From df3d5b48cecffa9782a0dd58c181fd76eda8e6f0 Mon Sep 17 00:00:00 2001 From: C Cirello Date: Thu, 13 Oct 2016 22:56:34 +0200 Subject: [PATCH] Fix race condition during initialization (#163) Currently, async workers are started before HTTP interface is available to get their requests. It fixes by ensuring that async workers are started after HTTP interface is up. Essentially we are getting rid of an error message during bootstrap: ERRO[0000] Could not fetch task error=Get http://127.0.0.1:8080/tasks: dial tcp 127.0.0.1:8080: getsockopt: connection refused --- api/runner/async_runner.go | 21 ++++++++++++++++----- api/runner/async_runner_test.go | 2 +- api/server/server.go | 3 ++- glide.lock | 22 +++++++++++++++++++--- glide.yaml | 4 +++- main.go | 30 +++++++++++++++++++----------- 6 files changed, 60 insertions(+), 22 deletions(-) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 285c1e401..4b7a48825 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -96,8 +96,11 @@ func runTask(task *models.Task) (drivers.RunResult, error) { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port string, n int) { - u := tasksrvURL(tasksrv, port) +func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) { + u, h := tasksrvURL(tasksrv, port) + if isHostOpen(h) { + return + } var wg sync.WaitGroup for i := 0; i < n; i++ { @@ -107,7 +110,15 @@ func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port wg.Wait() <-ctx.Done() - wgAsync.Done() +} + +func isHostOpen(host string) bool { + conn, err := net.Dial("tcp", host) + available := err == nil + if available { + conn.Close() + } + return available } func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) { @@ -150,7 +161,7 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin } } -func tasksrvURL(tasksrv, port string) string { +func tasksrvURL(tasksrv, port string) (parsedURL, host string) { parsed, err := url.Parse(tasksrv) if err != nil { log.Fatalf("cannot parse TASKSRV endpoint: %v", err) @@ -168,5 +179,5 @@ func tasksrvURL(tasksrv, port string) string { parsed.Host = net.JoinHostPort(parsed.Host, port) } - return parsed.String() + return parsed.String(), parsed.Host } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 59a31ef90..dfb9da8f5 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -204,7 +204,7 @@ func TestTasksrvURL(t *testing.T) { } for _, tt := range tests { - if got := tasksrvURL(tt.in, tt.port); got != tt.out { + if got, _ := tasksrvURL(tt.in, tt.port); got != tt.out { t.Errorf("port: %s\ttasksrv: %s\texpected: %s\tgot: %s", tt.port, tt.in, tt.out, got) } } diff --git a/api/server/server.go b/api/server/server.go index e29fef9bf..1e00d3016 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -146,7 +146,8 @@ func (s *Server) Run(ctx context.Context) { // By default it serves on :8080 unless a // PORT environment variable was defined. - s.Router.Run() + go s.Router.Run() + <-ctx.Done() } func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) { diff --git a/glide.lock b/glide.lock index f6a52362f..4127a796e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,6 +1,8 @@ -hash: 5ebe9893c89f9d0b31b71bd27fb37bf4fae1662e6fbc429d566b0d75ca7ffcfc -updated: 2016-10-11T18:10:30.697966926-07:00 +hash: 25de4b7591e695585ed10d0a2b6c6419576f5aab60e28b84f07fb77908eca9e1 +updated: 2016-10-13T19:37:24.59686575+02:00 imports: +- name: cirello.io/supervisor + version: dd9fbc61e43585f9b73af69d598816902b643fb0 - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 subpackages: @@ -13,6 +15,16 @@ imports: version: 91c326c3f7bd20f0226d3d1c289dd9f8ce28d33d subpackages: - statsd +- name: github.com/cenkalti/backoff + version: 8edc80b07f38c27352fb186d971c628a6c32552b +- name: github.com/dghubble/go-twitter + version: 12e5387804e84bf20dc60ba964f59437553e16a6 + subpackages: + - twitter +- name: github.com/dghubble/oauth1 + version: 4385816142116aade2d97d0f320f9d3666e74cd9 +- name: github.com/dghubble/sling + version: c961a4334054e64299d16f8a31bd686ee2565ae4 - name: github.com/dgrijalva/jwt-go version: 268038b363c7a8d7306b8e35bf77a1fde4b0c402 - name: github.com/docker/distribution @@ -86,6 +98,10 @@ imports: - proto - name: github.com/google/btree version: 925471ac9e2131377a91e1595defec898166fe49 +- name: github.com/google/go-querystring + version: 9235644dd9e52eeae6fa48efd539fdc351a0af53 + subpackages: + - query - name: github.com/gorilla/context version: 14f550f51af52180c2eefed15e5fd18d63c0a64a - name: github.com/gorilla/mux @@ -215,4 +231,4 @@ imports: - internal/scram - name: gopkg.in/yaml.v2 version: e4d366fc3c7938e2958e662b4258c7a89e1f0e3e -testImports: [] \ No newline at end of file +testImports: [] diff --git a/glide.yaml b/glide.yaml index cd0a9ebb1..52fbac8a4 100644 --- a/glide.yaml +++ b/glide.yaml @@ -36,4 +36,6 @@ import: - package: github.com/cactus/go-statsd-client version: ^3.1.0 subpackages: - - statsd \ No newline at end of file + - statsd +- package: cirello.io/supervisor + version: v0.5.0 diff --git a/main.go b/main.go index c073c8339..856a8688c 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( "os" "os/signal" "strings" - "sync" + "cirello.io/supervisor" log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/iron-io/functions/api/datastore" @@ -79,16 +79,24 @@ func main() { log.WithError(err).Fatalln("Failed to create a runner") } - apiURL, port, numAsync := viper.GetString(envAPIURL), viper.GetString(envPort), viper.GetInt(envNumAsync) - log.Debug("async workers:", numAsync) - var wgAsync sync.WaitGroup - if numAsync > 0 { - wgAsync.Add(1) - go runner.RunAsyncRunner(ctx, &wgAsync, apiURL, port, numAsync) + svr := &supervisor.Supervisor{ + Log: func(msg interface{}) { + log.Debug("supervisor: ", msg) + }, } - srv := server.New(ds, mqType, rnr) - go srv.Run(ctx) - <-ctx.Done() - wgAsync.Wait() + svr.AddFunc(func(ctx context.Context) { + srv := server.New(ds, mqType, rnr) + srv.Run(ctx) + }) + + apiURL, port, numAsync := viper.GetString(envAPIURL), viper.GetString(envPort), viper.GetInt(envNumAsync) + log.Debug("async workers:", numAsync) + if numAsync > 0 { + svr.AddFunc(func(ctx context.Context) { + runner.RunAsyncRunner(ctx, apiURL, port, numAsync) + }) + } + + svr.Serve(ctx) }