diff --git a/api/models/route.go b/api/models/route.go index aec62b703..b6b776062 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -2,7 +2,6 @@ package models import ( "errors" - "fmt" "net/http" "path" @@ -64,16 +63,14 @@ func (r *Route) Validate() error { res = append(res, ErrRoutesValidationInvalidPath) } - if r.Type == "" { - r.Type = "sync" + if r.Type == TypeNone { + r.Type = TypeSync } - if r.Type != "async" && r.Type != "sync" { + if r.Type != TypeAsync && r.Type != TypeSync { res = append(res, ErrRoutesValidationInvalidType) } - fmt.Println(">>>", r.Type) - if len(res) > 0 { return apiErrors.CompositeValidationError(res...) } diff --git a/api/models/task.go b/api/models/task.go index acad210f8..1a9a17bcc 100644 --- a/api/models/task.go +++ b/api/models/task.go @@ -12,6 +12,15 @@ import ( "github.com/go-openapi/validate" ) +const ( + // TypeNone ... + TypeNone = "" + // TypeSync ... + TypeSync = "sync" + // TypeAsync ... + TypeAsync = "async" +) + /*Task task swagger:model Task diff --git a/api/server/helpers.go b/api/server/helpers.go index 20b664f91..07e031ace 100644 --- a/api/server/helpers.go +++ b/api/server/helpers.go @@ -53,7 +53,7 @@ func testRouter() *gin.Engine { func(ctx *gin.Context) { handleRequest(ctx, nil) }, - func(ctx *gin.Context, del bool) {}) + func(ctx *gin.Context) {}) return r } diff --git a/api/server/server.go b/api/server/server.go index 1c5bd6e88..c2d529fb8 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "net/http" "path" - "strings" "golang.org/x/net/context" @@ -92,8 +91,9 @@ func (s *Server) handleRunnerRequest(c *gin.Context) { handleRequest(c, enqueue) } -func (s *Server) handleTaskRequest(c *gin.Context, del bool) { - if !del { +func (s *Server) handleTaskRequest(c *gin.Context) { + switch c.Request.Method { + case "GET": task, err := s.MQ.Reserve() if err != nil { logrus.WithError(err) @@ -101,25 +101,18 @@ func (s *Server) handleTaskRequest(c *gin.Context, del bool) { return } c.JSON(http.StatusAccepted, task) - } else { + case "DELETE": body, err := ioutil.ReadAll(c.Request.Body) if err != nil { logrus.WithError(err) c.JSON(http.StatusInternalServerError, err) return } - - bodyStr := strings.TrimSpace(string(body)) - if bodyStr == "null" { - logrus.WithError(err) - c.JSON(http.StatusInternalServerError, err) - return - } - var task models.Task if err = json.Unmarshal(body, &task); err != nil { logrus.WithError(err) c.JSON(http.StatusInternalServerError, err) + return } if err := s.MQ.Delete(&task); err != nil { @@ -141,7 +134,6 @@ func extractFields(c *gin.Context) logrus.Fields { } func (s *Server) Run(ctx context.Context) { - s.Router.Use(func(c *gin.Context) { ctx, _ := titancommon.LoggerWithFields(ctx, extractFields(c)) c.Set("ctx", ctx) @@ -155,7 +147,7 @@ func (s *Server) Run(ctx context.Context) { s.Router.Run() } -func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context, del bool)) { +func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) { engine.GET("/", handlePing) engine.GET("/version", handleVersion) @@ -180,15 +172,8 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHa } } - taskHandlerDelete := func(ginC *gin.Context) { - taskHandler(ginC, true) - } - taskHandlerReserve := func(ginC *gin.Context) { - taskHandler(ginC, false) - } - - engine.GET("/tasks", taskHandlerReserve) - engine.DELETE("/tasks", taskHandlerDelete) + engine.DELETE("/tasks", taskHandler) + engine.GET("/tasks", taskHandler) engine.Any("/r/:app/*route", reqHandler) // This final route is used for extensions, see Server.Add diff --git a/glide.lock b/glide.lock index 712ac5b16..bfdadda59 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 168e1ab65d5e2e781369df5dc87655f682df5c5d9448dee547c6acce78720ecc -updated: 2016-09-23T20:45:34.389837453+02:00 +updated: 2016-09-25T22:23:43.214193929+02:00 imports: - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -107,7 +107,7 @@ imports: subpackages: - registry - name: github.com/iron-io/worker - version: 1487ce7896770b3d052e0909f564441fe59ee1df + version: 15dcee7470fed6dcd956b5565e3dc1404cb1b96b repo: git@github.com:iron-io/worker.git vcs: git subpackages: @@ -120,6 +120,7 @@ imports: - runner/tasker - runner/tasker/client/models - runner/tasker/client/titan + - runner/tasker/client/titan/jobs - runner/tasker/client/titan/groups - runner/tasker/client/titan/runner - runner/tasker/client/titan/tasks diff --git a/glide.yaml b/glide.yaml index e106cabc0..f82ce443e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,8 +7,8 @@ import: - package: github.com/go-openapi/strfmt - package: github.com/go-openapi/validate - package: github.com/iron-io/worker - version: master repo: git@github.com:iron-io/worker.git vcs: git + version: master - package: github.com/lib/pq - package: github.com/google/btree diff --git a/main.go b/main.go index 18c52b7ba..aefff88cf 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "strings" "time" @@ -30,12 +31,21 @@ func main() { log.WithError(err).Fatalln("Invalid DB url.") } - mqType, err := mqs.New(viper.GetString("MQTYPE")) + mqType, err := mqs.New(viper.GetString("MQ")) if err != nil { - log.WithError(err).Fatal("Error on init MQTYPE") + log.WithError(err).Fatal("Error on init MQ") + } + + nasync := 1 + + if nasyncStr := strings.TrimSpace(viper.GetString("MQADR")); len(nasyncStr) > 0 { + var err error + nasync, err = strconv.Atoi(nasyncStr) + if err != nil { + log.WithError(err).Fatalln("Failed to parse number of async runners") + } } - nasync := viper.GetInt("MQADR") mqAdr := strings.TrimSpace(viper.GetString("MQADR")) port := viper.GetInt("PORT") if port == 0 { @@ -86,24 +96,29 @@ func runAsyncRunners(mqAdr string) { continue } - bodyStr := strings.TrimSpace(string(body)) - if bodyStr == "null" { - logAndWait(err) - continue - } - var task models.Task + if err := json.Unmarshal(body, &task); err != nil { logAndWait(err) continue } + if task.ID == "" { + time.Sleep(1 * time.Second) + continue + } + + log.Info("Picked up task:", task.ID) var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? + if task.Timeout == nil { + timeout := int32(30) + task.Timeout = &timeout + } cfg := &runner.Config{ Image: *task.Image, - Timeout: time.Duration(*task.Timeout), + Timeout: time.Duration(*task.Timeout) * time.Second, ID: task.ID, AppName: task.RouteName, Stdout: &stdout, @@ -125,8 +140,18 @@ func runAsyncRunners(mqAdr string) { continue } - if _, err = http.NewRequest(http.MethodDelete, url, nil); err != nil { + log.Info("Processed task:", task.ID) + req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body)) + if err != nil { log.WithError(err) } + + c := &http.Client{} + if _, err := c.Do(req); err != nil { + log.WithError(err) + continue + } + + log.Info("Deleted task:", task.ID) } }