diff --git a/api/config/config.go b/api/config/config.go deleted file mode 100644 index 7a789bb25..000000000 --- a/api/config/config.go +++ /dev/null @@ -1,27 +0,0 @@ -package config - -import ( - "fmt" - "os" - "strings" - - log "github.com/Sirupsen/logrus" - "github.com/spf13/viper" -) - -func InitConfig() { - cwd, _ := os.Getwd() - viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) - viper.SetDefault("log_level", "info") - viper.SetDefault("mq", fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) - viper.SetDefault("db", fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) - viper.SetConfigName("config") - viper.AddConfigPath(".") - viper.AutomaticEnv() // picks up env vars automatically - viper.ReadInConfig() - logLevel, err := log.ParseLevel(viper.GetString("log_level")) - if err != nil { - log.WithError(err).Fatalln("Invalid log level.") - } - log.SetLevel(logLevel) -} diff --git a/api/datastore/bolt/bolt.go b/api/datastore/bolt/bolt.go index 68ebcb9d6..0b6b32d80 100644 --- a/api/datastore/bolt/bolt.go +++ b/api/datastore/bolt/bolt.go @@ -5,6 +5,7 @@ import ( "net/url" "os" "path/filepath" + "time" "github.com/Sirupsen/logrus" "github.com/boltdb/bolt" @@ -29,7 +30,7 @@ func New(url *url.URL) (models.Datastore, error) { return nil, err } log.Infoln("Creating bolt db at ", url.Path) - db, err := bolt.Open(url.Path, 0600, nil) + db, err := bolt.Open(url.Path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { log.WithError(err).Errorln("Error on bolt.Open") return nil, err diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 7568f6496..71eaf28e6 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -58,8 +58,9 @@ func NewBoltMQ(url *url.URL) (*BoltDbMQ, error) { log.WithError(err).Errorln("Could not create data directory for mq") return nil, err } - db, err := bolt.Open(url.Path, 0600, nil) + db, err := bolt.Open(url.Path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { + log.WithError(err).Errorln("Could not open BoltDB file for MQ") return nil, err } diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index a3f3d33e7..a7fb1c9b2 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -4,16 +4,15 @@ import ( "bytes" "encoding/json" "errors" - "fmt" "io/ioutil" + "net" "net/http" + "net/url" "time" - "golang.org/x/net/context" - log "github.com/Sirupsen/logrus" - "github.com/iron-io/functions/api/models" + "golang.org/x/net/context" ) func getTask(url string) (*models.Task, error) { @@ -96,10 +95,10 @@ func runTask(task *models.Task) error { } // RunAsyncRunner pulls tasks off a queue and processes them -func RunAsyncRunner(mqAdr string) { - url := fmt.Sprintf("http://%s/tasks", mqAdr) +func RunAsyncRunner(tasksrv, port string) { + u := tasksrvURL(tasksrv, port) for { - task, err := getTask(url) + task, err := getTask(u) if err != nil { log.WithError(err) time.Sleep(1 * time.Second) @@ -115,10 +114,31 @@ func RunAsyncRunner(mqAdr string) { log.Info("Processed task:", task.ID) // Delete task from queue - if err := deleteTask(url, task); err != nil { + if err := deleteTask(u, task); err != nil { log.WithError(err) } else { log.Info("Deleted task:", task.ID) } } } + +func tasksrvURL(tasksrv, port string) string { + parsed, err := url.Parse(tasksrv) + if err != nil { + log.Fatalf("cannot parse TASKSRV endpoint: %v", err) + } + + if parsed.Scheme == "" { + parsed.Scheme = "http" + } + + if parsed.Path == "" || parsed.Path == "/" { + parsed.Path = "/tasks" + } + + if _, _, err := net.SplitHostPort(parsed.Host); err != nil { + parsed.Host = net.JoinHostPort(parsed.Host, port) + } + + return parsed.String() +} diff --git a/api/runner/async_runner_test.go b/api/runner/async_runner_test.go index 8c4f2fdf3..14dce67e9 100644 --- a/api/runner/async_runner_test.go +++ b/api/runner/async_runner_test.go @@ -142,3 +142,25 @@ func TestDeleteTask(t *testing.T) { t.Error("expected no error, got", err) } } + +func TestTasksrvURL(t *testing.T) { + tests := []struct { + port, in, out string + }{ + {"8080", "//localhost", "http://localhost:8080/tasks"}, + {"8080", "//localhost/", "http://localhost:8080/tasks"}, + {"8080", "//localhost:8081", "http://localhost:8081/tasks"}, + {"8080", "//localhost:8081/", "http://localhost:8081/tasks"}, + {"8080", "http://localhost", "http://localhost:8080/tasks"}, + {"8080", "http://localhost/", "http://localhost:8080/tasks"}, + {"8080", "http://localhost:8081", "http://localhost:8081/tasks"}, + {"8080", "http://localhost:8081/", "http://localhost:8081/tasks"}, + {"8080", "http://localhost:8081/endpoint", "http://localhost:8081/endpoint"}, + } + + for _, tt := range tests { + if got := tasksrvURL(tt.in, tt.port); got != tt.out { + t.Errorf("port: %s\ttasksrv: %s\texpected: %s\tgot: %s", tt.port, tt.in, tt.out, got) + } + } +} diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 45b9858e6..ae7097dd4 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -107,3 +107,5 @@ func TestRunnerError(t *testing.T) { } } } + + diff --git a/main.go b/main.go index ec94f8398..d8b8547e0 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,10 @@ package main import ( "fmt" - "strconv" + "os" "strings" log "github.com/Sirupsen/logrus" - "github.com/iron-io/functions/api/config" "github.com/iron-io/functions/api/datastore" "github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/runner" @@ -15,30 +14,40 @@ import ( "golang.org/x/net/context" ) +func init() { + cwd, err := os.Getwd() + if err != nil { + log.WithError(err) + } + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + viper.SetDefault("log_level", "info") + viper.SetDefault("mq", fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) + viper.SetDefault("db", fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) + viper.SetDefault("port", 8080) + viper.SetDefault("tasksrv", fmt.Sprintf("http://localhost:%d", viper.GetInt("port"))) + viper.SetDefault("NASYNC", 1) + viper.SetConfigName("config") + viper.AddConfigPath(".") + viper.AutomaticEnv() // picks up env vars automatically + viper.ReadInConfig() + logLevel, err := log.ParseLevel(viper.GetString("log_level")) + if err != nil { + log.WithError(err).Fatalln("Invalid log level.") + } + log.SetLevel(logLevel) +} + func main() { ctx := context.Background() - config.InitConfig() - ds, err := datastore.New(viper.GetString("DB")) if err != nil { log.WithError(err).Fatalln("Invalid DB url.") } - mqType, err := mqs.New(viper.GetString("MQ")) if err != nil { log.WithError(err).Fatal("Error on init MQ") } - - mqAdr := strings.TrimSpace(viper.GetString("MQADR")) - port := viper.GetInt("PORT") - if port == 0 { - port = 8080 - } - if mqAdr == "" { - mqAdr = fmt.Sprintf("localhost:%d", port) - } - metricLogger := runner.NewMetricLogger() rnr, err := runner.New(metricLogger) @@ -46,17 +55,9 @@ func main() { log.WithError(err).Fatalln("Failed to create a runner") } - nasync := 1 - if nasyncStr := strings.TrimSpace(viper.GetString("NASYNC")); len(nasyncStr) > 0 { - var err error - nasync, err = strconv.Atoi(nasyncStr) - if err != nil { - log.WithError(err).Fatalln("Failed to parse number of async runners") - } - } - - for i := 0; i < nasync; i++ { - go runner.RunAsyncRunner(mqAdr) + tasksrv, port := viper.GetString("PORT"), viper.GetString("TASKSVR") + for nasync, i := viper.GetInt("NASYNC"), 0; i < nasync; i++ { + go runner.RunAsyncRunner(tasksrv, port) } srv := server.New(ds, mqType, rnr)