diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 285c1e401..4b7a48825 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -96,8 +96,11 @@ func runTask(task *models.Task) (drivers.RunResult, error) { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port string, n int) { - u := tasksrvURL(tasksrv, port) +func RunAsyncRunner(ctx context.Context, tasksrv, port string, n int) { + u, h := tasksrvURL(tasksrv, port) + if isHostOpen(h) { + return + } var wg sync.WaitGroup for i := 0; i < n; i++ { @@ -107,7 +110,15 @@ func RunAsyncRunner(ctx context.Context, wgAsync *sync.WaitGroup, tasksrv, port wg.Wait() <-ctx.Done() - wgAsync.Done() +} + +func isHostOpen(host string) bool { + conn, err := net.Dial("tcp", host) + available := err == nil + if available { + conn.Close() + } + return available } func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url string, runTask func(task *models.Task) (drivers.RunResult, error)) { @@ -150,7 +161,7 @@ func startAsyncRunners(ctx context.Context, wg *sync.WaitGroup, i int, url strin } } -func tasksrvURL(tasksrv, port string) string { +func tasksrvURL(tasksrv, port string) (parsedURL, host string) { parsed, err := url.Parse(tasksrv) if err != nil { log.Fatalf("cannot parse TASKSRV endpoint: %v", err) @@ -168,5 +179,5 @@ func tasksrvURL(tasksrv, port string) string { parsed.Host = net.JoinHostPort(parsed.Host, port) } - return parsed.String() + return parsed.String(), parsed.Host } diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 59a31ef90..dfb9da8f5 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -204,7 +204,7 @@ func TestTasksrvURL(t *testing.T) { } for _, tt := range tests { - if got := tasksrvURL(tt.in, tt.port); got != tt.out { + if got, _ := tasksrvURL(tt.in, tt.port); got != tt.out { t.Errorf("port: %s\ttasksrv: %s\texpected: %s\tgot: %s", tt.port, tt.in, tt.out, got) } } diff --git a/api/server/server.go b/api/server/server.go index e29fef9bf..1e00d3016 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -146,7 +146,8 @@ func (s *Server) Run(ctx context.Context) { // By default it serves on :8080 unless a // PORT environment variable was defined. - s.Router.Run() + go s.Router.Run() + <-ctx.Done() } func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) { diff --git a/glide.lock b/glide.lock index f6a52362f..4127a796e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,6 +1,8 @@ -hash: 5ebe9893c89f9d0b31b71bd27fb37bf4fae1662e6fbc429d566b0d75ca7ffcfc -updated: 2016-10-11T18:10:30.697966926-07:00 +hash: 25de4b7591e695585ed10d0a2b6c6419576f5aab60e28b84f07fb77908eca9e1 +updated: 2016-10-13T19:37:24.59686575+02:00 imports: +- name: cirello.io/supervisor + version: dd9fbc61e43585f9b73af69d598816902b643fb0 - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 subpackages: @@ -13,6 +15,16 @@ imports: version: 91c326c3f7bd20f0226d3d1c289dd9f8ce28d33d subpackages: - statsd +- name: github.com/cenkalti/backoff + version: 8edc80b07f38c27352fb186d971c628a6c32552b +- name: github.com/dghubble/go-twitter + version: 12e5387804e84bf20dc60ba964f59437553e16a6 + subpackages: + - twitter +- name: github.com/dghubble/oauth1 + version: 4385816142116aade2d97d0f320f9d3666e74cd9 +- name: github.com/dghubble/sling + version: c961a4334054e64299d16f8a31bd686ee2565ae4 - name: github.com/dgrijalva/jwt-go version: 268038b363c7a8d7306b8e35bf77a1fde4b0c402 - name: github.com/docker/distribution @@ -86,6 +98,10 @@ imports: - proto - name: github.com/google/btree version: 925471ac9e2131377a91e1595defec898166fe49 +- name: github.com/google/go-querystring + version: 9235644dd9e52eeae6fa48efd539fdc351a0af53 + subpackages: + - query - name: github.com/gorilla/context version: 14f550f51af52180c2eefed15e5fd18d63c0a64a - name: github.com/gorilla/mux @@ -215,4 +231,4 @@ imports: - internal/scram - name: gopkg.in/yaml.v2 version: e4d366fc3c7938e2958e662b4258c7a89e1f0e3e -testImports: [] \ No newline at end of file +testImports: [] diff --git a/glide.yaml b/glide.yaml index cd0a9ebb1..52fbac8a4 100644 --- a/glide.yaml +++ b/glide.yaml @@ -36,4 +36,6 @@ import: - package: github.com/cactus/go-statsd-client version: ^3.1.0 subpackages: - - statsd \ No newline at end of file + - statsd +- package: cirello.io/supervisor + version: v0.5.0 diff --git a/main.go b/main.go index c073c8339..856a8688c 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( "os" "os/signal" "strings" - "sync" + "cirello.io/supervisor" log "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/iron-io/functions/api/datastore" @@ -79,16 +79,24 @@ func main() { log.WithError(err).Fatalln("Failed to create a runner") } - apiURL, port, numAsync := viper.GetString(envAPIURL), viper.GetString(envPort), viper.GetInt(envNumAsync) - log.Debug("async workers:", numAsync) - var wgAsync sync.WaitGroup - if numAsync > 0 { - wgAsync.Add(1) - go runner.RunAsyncRunner(ctx, &wgAsync, apiURL, port, numAsync) + svr := &supervisor.Supervisor{ + Log: func(msg interface{}) { + log.Debug("supervisor: ", msg) + }, } - srv := server.New(ds, mqType, rnr) - go srv.Run(ctx) - <-ctx.Done() - wgAsync.Wait() + svr.AddFunc(func(ctx context.Context) { + srv := server.New(ds, mqType, rnr) + srv.Run(ctx) + }) + + apiURL, port, numAsync := viper.GetString(envAPIURL), viper.GetString(envPort), viper.GetInt(envNumAsync) + log.Debug("async workers:", numAsync) + if numAsync > 0 { + svr.AddFunc(func(ctx context.Context) { + runner.RunAsyncRunner(ctx, apiURL, port, numAsync) + }) + } + + svr.Serve(ctx) }