update glide

This commit is contained in:
Seif Lotfy
2016-09-25 22:31:06 +02:00
parent 92df53b144
commit 54f66c7b09
7 changed files with 61 additions and 44 deletions

View File

@@ -2,7 +2,6 @@ package models
import ( import (
"errors" "errors"
"fmt"
"net/http" "net/http"
"path" "path"
@@ -64,16 +63,14 @@ func (r *Route) Validate() error {
res = append(res, ErrRoutesValidationInvalidPath) res = append(res, ErrRoutesValidationInvalidPath)
} }
if r.Type == "" { if r.Type == TypeNone {
r.Type = "sync" r.Type = TypeSync
} }
if r.Type != "async" && r.Type != "sync" { if r.Type != TypeAsync && r.Type != TypeSync {
res = append(res, ErrRoutesValidationInvalidType) res = append(res, ErrRoutesValidationInvalidType)
} }
fmt.Println(">>>", r.Type)
if len(res) > 0 { if len(res) > 0 {
return apiErrors.CompositeValidationError(res...) return apiErrors.CompositeValidationError(res...)
} }

View File

@@ -12,6 +12,15 @@ import (
"github.com/go-openapi/validate" "github.com/go-openapi/validate"
) )
const (
// TypeNone ...
TypeNone = ""
// TypeSync ...
TypeSync = "sync"
// TypeAsync ...
TypeAsync = "async"
)
/*Task task /*Task task
swagger:model Task swagger:model Task

View File

@@ -53,7 +53,7 @@ func testRouter() *gin.Engine {
func(ctx *gin.Context) { func(ctx *gin.Context) {
handleRequest(ctx, nil) handleRequest(ctx, nil)
}, },
func(ctx *gin.Context, del bool) {}) func(ctx *gin.Context) {})
return r return r
} }

View File

@@ -5,7 +5,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "path"
"strings"
"golang.org/x/net/context" "golang.org/x/net/context"
@@ -92,8 +91,9 @@ func (s *Server) handleRunnerRequest(c *gin.Context) {
handleRequest(c, enqueue) handleRequest(c, enqueue)
} }
func (s *Server) handleTaskRequest(c *gin.Context, del bool) { func (s *Server) handleTaskRequest(c *gin.Context) {
if !del { switch c.Request.Method {
case "GET":
task, err := s.MQ.Reserve() task, err := s.MQ.Reserve()
if err != nil { if err != nil {
logrus.WithError(err) logrus.WithError(err)
@@ -101,25 +101,18 @@ func (s *Server) handleTaskRequest(c *gin.Context, del bool) {
return return
} }
c.JSON(http.StatusAccepted, task) c.JSON(http.StatusAccepted, task)
} else { case "DELETE":
body, err := ioutil.ReadAll(c.Request.Body) body, err := ioutil.ReadAll(c.Request.Body)
if err != nil { if err != nil {
logrus.WithError(err) logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err) c.JSON(http.StatusInternalServerError, err)
return return
} }
bodyStr := strings.TrimSpace(string(body))
if bodyStr == "null" {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err)
return
}
var task models.Task var task models.Task
if err = json.Unmarshal(body, &task); err != nil { if err = json.Unmarshal(body, &task); err != nil {
logrus.WithError(err) logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err) c.JSON(http.StatusInternalServerError, err)
return
} }
if err := s.MQ.Delete(&task); err != nil { if err := s.MQ.Delete(&task); err != nil {
@@ -141,7 +134,6 @@ func extractFields(c *gin.Context) logrus.Fields {
} }
func (s *Server) Run(ctx context.Context) { func (s *Server) Run(ctx context.Context) {
s.Router.Use(func(c *gin.Context) { s.Router.Use(func(c *gin.Context) {
ctx, _ := titancommon.LoggerWithFields(ctx, extractFields(c)) ctx, _ := titancommon.LoggerWithFields(ctx, extractFields(c))
c.Set("ctx", ctx) c.Set("ctx", ctx)
@@ -155,7 +147,7 @@ func (s *Server) Run(ctx context.Context) {
s.Router.Run() s.Router.Run()
} }
func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context, del bool)) { func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context)) {
engine.GET("/", handlePing) engine.GET("/", handlePing)
engine.GET("/version", handleVersion) engine.GET("/version", handleVersion)
@@ -180,15 +172,8 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHa
} }
} }
taskHandlerDelete := func(ginC *gin.Context) { engine.DELETE("/tasks", taskHandler)
taskHandler(ginC, true) engine.GET("/tasks", taskHandler)
}
taskHandlerReserve := func(ginC *gin.Context) {
taskHandler(ginC, false)
}
engine.GET("/tasks", taskHandlerReserve)
engine.DELETE("/tasks", taskHandlerDelete)
engine.Any("/r/:app/*route", reqHandler) engine.Any("/r/:app/*route", reqHandler)
// This final route is used for extensions, see Server.Add // This final route is used for extensions, see Server.Add

5
glide.lock generated
View File

@@ -1,5 +1,5 @@
hash: 168e1ab65d5e2e781369df5dc87655f682df5c5d9448dee547c6acce78720ecc hash: 168e1ab65d5e2e781369df5dc87655f682df5c5d9448dee547c6acce78720ecc
updated: 2016-09-23T20:45:34.389837453+02:00 updated: 2016-09-25T22:23:43.214193929+02:00
imports: imports:
- name: github.com/amir/raidman - name: github.com/amir/raidman
version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
@@ -107,7 +107,7 @@ imports:
subpackages: subpackages:
- registry - registry
- name: github.com/iron-io/worker - name: github.com/iron-io/worker
version: 1487ce7896770b3d052e0909f564441fe59ee1df version: 15dcee7470fed6dcd956b5565e3dc1404cb1b96b
repo: git@github.com:iron-io/worker.git repo: git@github.com:iron-io/worker.git
vcs: git vcs: git
subpackages: subpackages:
@@ -120,6 +120,7 @@ imports:
- runner/tasker - runner/tasker
- runner/tasker/client/models - runner/tasker/client/models
- runner/tasker/client/titan - runner/tasker/client/titan
- runner/tasker/client/titan/jobs
- runner/tasker/client/titan/groups - runner/tasker/client/titan/groups
- runner/tasker/client/titan/runner - runner/tasker/client/titan/runner
- runner/tasker/client/titan/tasks - runner/tasker/client/titan/tasks

View File

@@ -7,8 +7,8 @@ import:
- package: github.com/go-openapi/strfmt - package: github.com/go-openapi/strfmt
- package: github.com/go-openapi/validate - package: github.com/go-openapi/validate
- package: github.com/iron-io/worker - package: github.com/iron-io/worker
version: master
repo: git@github.com:iron-io/worker.git repo: git@github.com:iron-io/worker.git
vcs: git vcs: git
version: master
- package: github.com/lib/pq - package: github.com/lib/pq
- package: github.com/google/btree - package: github.com/google/btree

47
main.go
View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"strings" "strings"
"time" "time"
@@ -30,12 +31,21 @@ func main() {
log.WithError(err).Fatalln("Invalid DB url.") log.WithError(err).Fatalln("Invalid DB url.")
} }
mqType, err := mqs.New(viper.GetString("MQTYPE")) mqType, err := mqs.New(viper.GetString("MQ"))
if err != nil { if err != nil {
log.WithError(err).Fatal("Error on init MQTYPE") log.WithError(err).Fatal("Error on init MQ")
}
nasync := 1
if nasyncStr := strings.TrimSpace(viper.GetString("MQADR")); len(nasyncStr) > 0 {
var err error
nasync, err = strconv.Atoi(nasyncStr)
if err != nil {
log.WithError(err).Fatalln("Failed to parse number of async runners")
}
} }
nasync := viper.GetInt("MQADR")
mqAdr := strings.TrimSpace(viper.GetString("MQADR")) mqAdr := strings.TrimSpace(viper.GetString("MQADR"))
port := viper.GetInt("PORT") port := viper.GetInt("PORT")
if port == 0 { if port == 0 {
@@ -86,24 +96,29 @@ func runAsyncRunners(mqAdr string) {
continue continue
} }
bodyStr := strings.TrimSpace(string(body))
if bodyStr == "null" {
logAndWait(err)
continue
}
var task models.Task var task models.Task
if err := json.Unmarshal(body, &task); err != nil { if err := json.Unmarshal(body, &task); err != nil {
logAndWait(err) logAndWait(err)
continue continue
} }
if task.ID == "" {
time.Sleep(1 * time.Second)
continue
}
log.Info("Picked up task:", task.ID)
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that?
if task.Timeout == nil {
timeout := int32(30)
task.Timeout = &timeout
}
cfg := &runner.Config{ cfg := &runner.Config{
Image: *task.Image, Image: *task.Image,
Timeout: time.Duration(*task.Timeout), Timeout: time.Duration(*task.Timeout) * time.Second,
ID: task.ID, ID: task.ID,
AppName: task.RouteName, AppName: task.RouteName,
Stdout: &stdout, Stdout: &stdout,
@@ -125,8 +140,18 @@ func runAsyncRunners(mqAdr string) {
continue continue
} }
if _, err = http.NewRequest(http.MethodDelete, url, nil); err != nil { log.Info("Processed task:", task.ID)
req, err := http.NewRequest(http.MethodDelete, url, bytes.NewBuffer(body))
if err != nil {
log.WithError(err) log.WithError(err)
} }
c := &http.Client{}
if _, err := c.Do(req); err != nil {
log.WithError(err)
continue
}
log.Info("Deleted task:", task.ID)
} }
} }