From bc3fba088f1600d7c35bc77e87ac64b22cd0fdc3 Mon Sep 17 00:00:00 2001 From: Carlos C Date: Tue, 4 Oct 2016 23:45:02 +0200 Subject: [PATCH] Fix start problem with two IronFunction colliding configurations By default, BoltDB will hang while waiting to acquire lock to the datafile, thus the users might find themselves waiting for something but not what. The added timeout aims inform use about what's happening. Also this renames MQADR to TASKSRV, refactor configuration to read environment variables. RunAsyncRunner now fills the gaps when parsing TASKSRV. Fixes #119 --- api/config/config.go | 27 ----------------- api/datastore/bolt/bolt.go | 3 +- api/mqs/bolt.go | 3 +- api/runner/async_runner.go | 36 +++++++++++++++++----- api/runner/async_runner_test.go | 22 ++++++++++++++ api/runner/runner_test.go | 2 ++ main.go | 53 +++++++++++++++++---------------- 7 files changed, 83 insertions(+), 63 deletions(-) delete mode 100644 api/config/config.go diff --git a/api/config/config.go b/api/config/config.go deleted file mode 100644 index 7a789bb25..000000000 --- a/api/config/config.go +++ /dev/null @@ -1,27 +0,0 @@ -package config - -import ( - "fmt" - "os" - "strings" - - log "github.com/Sirupsen/logrus" - "github.com/spf13/viper" -) - -func InitConfig() { - cwd, _ := os.Getwd() - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) - viper.SetDefault("log_level", "info") - viper.SetDefault("mq", fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) - viper.SetDefault("db", fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) - viper.SetConfigName("config") - viper.AddConfigPath(".") - viper.AutomaticEnv() // picks up env vars automatically - viper.ReadInConfig() - logLevel, err := log.ParseLevel(viper.GetString("log_level")) - if err != nil { - log.WithError(err).Fatalln("Invalid log level.") - } - log.SetLevel(logLevel) -} diff --git a/api/datastore/bolt/bolt.go b/api/datastore/bolt/bolt.go index 68ebcb9d6..0b6b32d80 100644 --- a/api/datastore/bolt/bolt.go +++ b/api/datastore/bolt/bolt.go @@ -5,6 +5,7 @@ import ( "net/url" "os" "path/filepath" + "time" "github.com/Sirupsen/logrus" "github.com/boltdb/bolt" @@ -29,7 +30,7 @@ func New(url *url.URL) (models.Datastore, error) { return nil, err } log.Infoln("Creating bolt db at ", url.Path) - db, err := bolt.Open(url.Path, 0600, nil) + db, err := bolt.Open(url.Path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { log.WithError(err).Errorln("Error on bolt.Open") return nil, err diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 7568f6496..71eaf28e6 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -58,8 +58,9 @@ func NewBoltMQ(url *url.URL) (*BoltDbMQ, error) { log.WithError(err).Errorln("Could not create data directory for mq") return nil, err } - db, err := bolt.Open(url.Path, 0600, nil) + db, err := bolt.Open(url.Path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { + log.WithError(err).Errorln("Could not open BoltDB file for MQ") return nil, err } diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index a3f3d33e7..a7fb1c9b2 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -4,16 +4,15 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "io/ioutil" + "net" "net/http" + "net/url" "time" - "golang.org/x/net/context" - log "github.com/Sirupsen/logrus" - "github.com/iron-io/functions/api/models" + "golang.org/x/net/context" ) func getTask(url string) (*models.Task, error) { @@ -96,10 +95,10 @@ func runTask(task *models.Task) error { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(mqAdr string) { - url := fmt.Sprintf("http://%s/tasks", mqAdr) +func RunAsyncRunner(tasksrv, port string) { + u := tasksrvURL(tasksrv, port) for { - task, err := getTask(url) + task, err := getTask(u) if err != nil { log.WithError(err) time.Sleep(1 * time.Second) @@ -115,10 +114,31 @@ func RunAsyncRunner(mqAdr string) { log.Info("Processed task:", task.ID) // Delete task from queue - if err := deleteTask(url, task); err != nil { + if err := deleteTask(u, task); err != nil { log.WithError(err) } else { log.Info("Deleted task:", task.ID) } } } + +func tasksrvURL(tasksrv, port string) string { + parsed, err := url.Parse(tasksrv) + if err != nil { + log.Fatalf("cannot parse TASKSRV endpoint: %v", err) + } + + if parsed.Scheme == "" { + parsed.Scheme = "http" + } + + if parsed.Path == "" || parsed.Path == "/" { + parsed.Path = "/tasks" + } + + if _, _, err := net.SplitHostPort(parsed.Host); err != nil { + parsed.Host = net.JoinHostPort(parsed.Host, port) + } + + return parsed.String() +} diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 8c4f2fdf3..14dce67e9 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -142,3 +142,25 @@ func TestDeleteTask(t *testing.T) { t.Error("expected no error, got", err) } } + +func TestTasksrvURL(t *testing.T) { + tests := []struct { + port, in, out string + }{ + {"8080", "//localhost", "http://localhost:8080/tasks"}, + {"8080", "//localhost/", "http://localhost:8080/tasks"}, + {"8080", "//localhost:8081", "http://localhost:8081/tasks"}, + {"8080", "//localhost:8081/", "http://localhost:8081/tasks"}, + {"8080", "http://localhost", "http://localhost:8080/tasks"}, + {"8080", "http://localhost/", "http://localhost:8080/tasks"}, + {"8080", "http://localhost:8081", "http://localhost:8081/tasks"}, + {"8080", "http://localhost:8081/", "http://localhost:8081/tasks"}, + {"8080", "http://localhost:8081/endpoint", "http://localhost:8081/endpoint"}, + } + + for _, tt := range tests { + if got := tasksrvURL(tt.in, tt.port); got != tt.out { + t.Errorf("port: %s\ttasksrv: %s\texpected: %s\tgot: %s", tt.port, tt.in, tt.out, got) + } + } +} diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 45b9858e6..ae7097dd4 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -107,3 +107,5 @@ func TestRunnerError(t *testing.T) { } } } + + diff --git a/main.go b/main.go index ec94f8398..d8b8547e0 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,10 @@ package main import ( "fmt" - "strconv" + "os" "strings" log "github.com/Sirupsen/logrus" - "github.com/iron-io/functions/api/config" "github.com/iron-io/functions/api/datastore" "github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/runner" @@ -15,30 +14,40 @@ import ( "golang.org/x/net/context" ) +func init() { + cwd, err := os.Getwd() + if err != nil { + log.WithError(err) + } + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.SetDefault("log_level", "info") + viper.SetDefault("mq", fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) + viper.SetDefault("db", fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) + viper.SetDefault("port", 8080) + viper.SetDefault("tasksrv", fmt.Sprintf("http://localhost:%d", viper.GetInt("port"))) + viper.SetDefault("NASYNC", 1) + viper.SetConfigName("config") + viper.AddConfigPath(".") + viper.AutomaticEnv() // picks up env vars automatically + viper.ReadInConfig() + logLevel, err := log.ParseLevel(viper.GetString("log_level")) + if err != nil { + log.WithError(err).Fatalln("Invalid log level.") + } + log.SetLevel(logLevel) +} + func main() { ctx := context.Background() - config.InitConfig() - ds, err := datastore.New(viper.GetString("DB")) if err != nil { log.WithError(err).Fatalln("Invalid DB url.") } - mqType, err := mqs.New(viper.GetString("MQ")) if err != nil { log.WithError(err).Fatal("Error on init MQ") } - - mqAdr := strings.TrimSpace(viper.GetString("MQADR")) - port := viper.GetInt("PORT") - if port == 0 { - port = 8080 - } - if mqAdr == "" { - mqAdr = fmt.Sprintf("localhost:%d", port) - } - metricLogger := runner.NewMetricLogger() rnr, err := runner.New(metricLogger) @@ -46,17 +55,9 @@ func main() { log.WithError(err).Fatalln("Failed to create a runner") } - nasync := 1 - if nasyncStr := strings.TrimSpace(viper.GetString("NASYNC")); len(nasyncStr) > 0 { - var err error - nasync, err = strconv.Atoi(nasyncStr) - if err != nil { - log.WithError(err).Fatalln("Failed to parse number of async runners") - } - } - - for i := 0; i < nasync; i++ { - go runner.RunAsyncRunner(mqAdr) + tasksrv, port := viper.GetString("PORT"), viper.GetString("TASKSVR") + for nasync, i := viper.GetInt("NASYNC"), 0; i < nasync; i++ { + go runner.RunAsyncRunner(tasksrv, port) } srv := server.New(ds, mqType, rnr)