Fix input async tasks + tests (#137)

This commit is contained in:
Pedro Nasser
2016-10-12 17:23:34 -03:00
committed by C Cirello
parent b59a56ee51
commit 2e12e2c700
10 changed files with 227 additions and 108 deletions

View File

@@ -30,6 +30,11 @@ func handleSpecial(c *gin.Context) {
}
}
func toEnvName(envtype, name string) string {
name = strings.ToUpper(strings.Replace(name, "-", "_", -1))
return fmt.Sprintf("%s_%s", envtype, name)
}
func handleRequest(c *gin.Context, enqueue models.Enqueue) {
if strings.HasPrefix(c.Request.URL.Path, "/v1") {
c.Status(http.StatusNotFound)
@@ -56,9 +61,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
}()
} else if c.Request.Method == "GET" {
reqPayload := c.Request.URL.Query().Get("payload")
if len(reqPayload) > 0 {
payload = strings.NewReader(reqPayload)
}
payload = strings.NewReader(reqPayload)
}
appName := c.Param("app")
@@ -114,22 +117,22 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
// app config
for k, v := range app.Config {
envVars["CONFIG_"+strings.ToUpper(k)] = v
envVars[toEnvName("CONFIG", k)] = v
}
// route config
for k, v := range el.Config {
envVars["CONFIG_"+strings.ToUpper(k)] = v
envVars[toEnvName("CONFIG", k)] = v
}
// params
for _, param := range params {
envVars["PARAM_"+strings.ToUpper(param.Key)] = param.Value
envVars[toEnvName("PARAM", param.Key)] = param.Value
}
// headers
for header, value := range c.Request.Header {
envVars["HEADER_"+strings.ToUpper(header)] = strings.Join(value, " ")
envVars[toEnvName("HEADER", header)] = strings.Join(value, " ")
}
cfg := &runner.Config{
@@ -148,6 +151,14 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
var result drivers.RunResult
switch el.Type {
case "async":
// Read payload
pl, err := ioutil.ReadAll(cfg.Stdin)
if err != nil {
log.WithError(err).Error(models.ErrInvalidPayload)
c.JSON(http.StatusBadRequest, simpleError(models.ErrInvalidPayload))
return
}
// Create Task
priority := int32(0)
task := &models.Task{}
@@ -155,6 +166,8 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
task.ID = cfg.ID
task.RouteName = cfg.AppName
task.Priority = &priority
task.EnvVars = cfg.Env
task.Payload = string(pl)
// Push to queue
enqueue(task)
log.Info("Added new task to queue")
@@ -175,10 +188,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
}
}
if err != nil {
log.WithError(err).Error(models.ErrRunnerRunRoute)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
}
return
}
}