Add support for Async worker

This commit is contained in:
Seif Lotfy
2016-09-19 14:22:48 -07:00
parent b623fc27e4
commit 92df53b144
13 changed files with 285 additions and 342 deletions

View File

@@ -1,71 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/go-openapi/errors"
"github.com/go-openapi/validate"
)
/*Group group
swagger:model Group
*/
type Group struct {
/* Time when image first used/created.
Read Only: true
*/
CreatedAt strfmt.DateTime `json:"created_at,omitempty"`
/* User defined environment variables that will be passed in to each task in this group.
*/
EnvVars map[string]string `json:"env_vars,omitempty"`
/* Name of Docker image to use in this group. You should include the image tag, which should be a version number, to be more accurate. Can be overridden on a per task basis with task.image.
*/
Image string `json:"image,omitempty"`
/* The maximum number of tasks that will run at the exact same time in this group.
*/
MaxConcurrency int32 `json:"max_concurrency,omitempty"`
/* Name of this group. Must be different than the image name. Can ony contain alphanumeric, -, and _.
Read Only: true
*/
Name string `json:"name,omitempty"`
}
// Validate validates this group
func (m *Group) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateEnvVars(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *Group) validateEnvVars(formats strfmt.Registry) error {
if swag.IsZero(m.EnvVars) { // not required
return nil
}
if err := validate.Required("env_vars", "body", m.EnvVars); err != nil {
return err
}
return nil
}

View File

@@ -1,50 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/errors"
)
/*GroupWrapper group wrapper
swagger:model GroupWrapper
*/
type GroupWrapper struct {
/* group
Required: true
*/
Group *Group `json:"group"`
}
// Validate validates this group wrapper
func (m *GroupWrapper) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateGroup(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *GroupWrapper) validateGroup(formats strfmt.Registry) error {
if m.Group != nil {
if err := m.Group.Validate(formats); err != nil {
return err
}
}
return nil
}

View File

@@ -1,48 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/errors"
"github.com/go-openapi/validate"
)
/*GroupsWrapper groups wrapper
swagger:model GroupsWrapper
*/
type GroupsWrapper struct {
/* groups
Required: true
*/
Groups []*Group `json:"groups"`
}
// Validate validates this groups wrapper
func (m *GroupsWrapper) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateGroups(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *GroupsWrapper) validateGroups(formats strfmt.Registry) error {
if err := validate.Required("groups", "body", m.Groups); err != nil {
return err
}
return nil
}

View File

@@ -1,95 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/errors"
"github.com/go-openapi/validate"
)
/*NewJob new job
swagger:model NewJob
*/
type NewJob struct {
/* Number of seconds to wait before queueing the job for consumption for the first time. Must be a positive integer. Jobs with a delay start in state "delayed" and transition to "running" after delay seconds.
*/
Delay int32 `json:"delay,omitempty"`
/* Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
Required: true
*/
Image *string `json:"image"`
/* "Number of automatic retries this job is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new job with the same image id and payload. The new job's max_retries is one less than the original. The new job's `retry_of` field is set to the original Job ID. Titan will delay the new job for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried."
*/
MaxRetries int32 `json:"max_retries,omitempty"`
/* Payload for the job. This is what you pass into each job to make it do something.
*/
Payload string `json:"payload,omitempty"`
/* Priority of the job. Higher has more priority. 3 levels from 0-2. Jobs at same priority are processed in FIFO order.
Required: true
*/
Priority *int32 `json:"priority"`
/* Time in seconds to wait before retrying the job. Must be a non-negative integer.
*/
RetriesDelay *int32 `json:"retries_delay,omitempty"`
/* Maximum runtime in seconds. If a consumer retrieves the
job, but does not change it's status within timeout seconds, the job
is considered failed, with reason timeout (Titan may allow a small
grace period). The consumer should also kill the job after timeout
seconds. If a consumer tries to change status after Titan has already
timed out the job, the consumer will be ignored.
*/
Timeout *int32 `json:"timeout,omitempty"`
}
// Validate validates this new job
func (m *NewJob) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateImage(formats); err != nil {
// prop
res = append(res, err)
}
if err := m.validatePriority(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *NewJob) validateImage(formats strfmt.Registry) error {
if err := validate.Required("image", "body", m.Image); err != nil {
return err
}
return nil
}
func (m *NewJob) validatePriority(formats strfmt.Registry) error {
if err := validate.Required("priority", "body", m.Priority); err != nil {
return err
}
return nil
}

View File

@@ -1,48 +0,0 @@
package models
// This file was generated by the swagger tool.
// Editing this file might prove futile when you re-run the swagger generate command
import (
strfmt "github.com/go-openapi/strfmt"
"github.com/go-openapi/errors"
"github.com/go-openapi/validate"
)
/*NewJobsWrapper new jobs wrapper
swagger:model NewJobsWrapper
*/
type NewJobsWrapper struct {
/* jobs
Required: true
*/
Jobs []*NewJob `json:"jobs"`
}
// Validate validates this new jobs wrapper
func (m *NewJobsWrapper) Validate(formats strfmt.Registry) error {
var res []error
if err := m.validateJobs(formats); err != nil {
// prop
res = append(res, err)
}
if len(res) > 0 {
return errors.CompositeValidationError(res...)
}
return nil
}
func (m *NewJobsWrapper) validateJobs(formats strfmt.Registry) error {
if err := validate.Required("jobs", "body", m.Jobs); err != nil {
return err
}
return nil
}

View File

@@ -20,7 +20,7 @@ type NewTask struct {
*/
Delay int32 `json:"delay,omitempty"`
/* Name of Docker image to use. This is optional and can be used to override the image defined at the group level.
/* Name of Docker image to use. This is optional and can be used to override the image defined at the route level.
Required: true
*/

View File

@@ -31,7 +31,7 @@ type Task struct {
*/
CreatedAt strfmt.DateTime `json:"created_at,omitempty"`
/* Env vars for the task. Comes from the ones set on the Group.
/* Env vars for the task. Comes from the ones set on the Route.
*/
EnvVars map[string]string `json:"env_vars,omitempty"`
@@ -39,11 +39,11 @@ type Task struct {
*/
Error string `json:"error,omitempty"`
/* Group this task belongs to.
/* Route this task belongs to.
Read Only: true
*/
GroupName string `json:"group_name,omitempty"`
RouteName string `json:"route_name,omitempty"`
/* Machine usable reason for task being in this state.
Valid values for error status are `timeout | killed | bad_exit`.

View File

@@ -30,7 +30,7 @@ func (t *containerTask) Labels() map[string]string {
}
func (t *containerTask) Id() string { return t.cfg.ID }
func (t *containerTask) Group() string { return "" }
func (t *containerTask) Route() string { return "" }
func (t *containerTask) Image() string { return t.cfg.Image }
func (t *containerTask) Timeout() uint { return uint(t.cfg.Timeout.Seconds()) }
func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }

View File

@@ -36,6 +36,11 @@ type routesResponse struct {
Routes models.Routes `json:"routes"`
}
type tasksResponse struct {
Message string `json:"message"`
Task models.Task `json:"tasksResponse"`
}
func testRouter() *gin.Engine {
r := gin.Default()
ctx := context.Background()
@@ -44,9 +49,11 @@ func testRouter() *gin.Engine {
c.Set("ctx", ctx)
c.Next()
})
bindHandlers(r, func(ctx *gin.Context) {
handleRequest(ctx, nil)
})
bindHandlers(r,
func(ctx *gin.Context) {
handleRequest(ctx, nil)
},
func(ctx *gin.Context, del bool) {})
return r
}

View File

@@ -152,12 +152,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
Memory: el.Memory,
}
// Request count metric
metricBaseName := "server.handleRequest." + appName + "."
runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1)
metricStart := time.Now()
var err error
var result drivers.RunResult
switch el.Type {
@@ -167,7 +161,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
task := &models.Task{}
task.Image = &cfg.Image
task.ID = cfg.ID
task.GroupName = cfg.AppName
task.RouteName = cfg.AppName
task.Priority = &priority
// TODO: Push to queue
enqueue(task)
@@ -191,11 +185,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) {
log.WithError(err).Error(models.ErrRunnerRunRoute)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
}
// Execution time metric
metricElapsed := time.Since(metricStart)
runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed)
runner.LogMetricTime(ctx, "server.handleRunner.exec_time", metricElapsed)
return
}
}

View File

@@ -1,7 +1,11 @@
package server
import (
"encoding/json"
"io/ioutil"
"net/http"
"path"
"strings"
"golang.org/x/net/context"
@@ -81,11 +85,50 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error {
return nil
}
func (s *Server) handleRequest(ginC *gin.Context) {
func (s *Server) handleRunnerRequest(c *gin.Context) {
enqueue := func(task *models.Task) (*models.Task, error) {
return s.MQ.Push(task)
}
handleRequest(ginC, enqueue)
handleRequest(c, enqueue)
}
func (s *Server) handleTaskRequest(c *gin.Context, del bool) {
if !del {
task, err := s.MQ.Reserve()
if err != nil {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList))
return
}
c.JSON(http.StatusAccepted, task)
} else {
body, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err)
return
}
bodyStr := strings.TrimSpace(string(body))
if bodyStr == "null" {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err)
return
}
var task models.Task
if err = json.Unmarshal(body, &task); err != nil {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err)
}
if err := s.MQ.Delete(&task); err != nil {
logrus.WithError(err)
c.JSON(http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusAccepted, task)
}
}
func extractFields(c *gin.Context) logrus.Fields {
@@ -93,6 +136,7 @@ func extractFields(c *gin.Context) logrus.Fields {
for _, param := range c.Params {
fields[param.Key] = param.Value
}
return fields
}
@@ -104,14 +148,14 @@ func (s *Server) Run(ctx context.Context) {
c.Next()
})
bindHandlers(s.Router, s.handleRequest)
bindHandlers(s.Router, s.handleRunnerRequest, s.handleTaskRequest)
// By default it serves on :8080 unless a
// PORT environment variable was defined.
s.Router.Run()
}
func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context)) {
func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context, del bool)) {
engine.GET("/", handlePing)
engine.GET("/version", handleVersion)
@@ -136,6 +180,15 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context)) {
}
}
taskHandlerDelete := func(ginC *gin.Context) {
taskHandler(ginC, true)
}
taskHandlerReserve := func(ginC *gin.Context) {
taskHandler(ginC, false)
}
engine.GET("/tasks", taskHandlerReserve)
engine.DELETE("/tasks", taskHandlerDelete)
engine.Any("/r/:app/*route", reqHandler)
// This final route is used for extensions, see Server.Add

View File

@@ -234,6 +234,30 @@ paths:
200:
description: Route successfully deleted. Deletion succeeds even on routes that do not exist.
/tasks:
get:
summary: Get next task.
description: Gets the next task in the queue, ready for processing. Titan may return <=n tasks. Consumers should start processing tasks in order. Each returned task is set to `status` "running" and `started_at` is set to the current time. No other consumer can retrieve this task.
tags:
- Tasks
parameters:
- name: "n"
in: query
default: 1
description: Number of tasks to return.
type: integer
format: int32
responses:
200:
description: Task information
schema:
$ref: '#/definitions/TasksWrapper'
default:
description: Unexpected error
schema:
$ref: '#/definitions/Error'
definitions:
Route:
allOf:
@@ -305,7 +329,95 @@ definitions:
properties:
app:
$ref: '#/definitions/App'
Task:
allOf:
- $ref: "#/definitions/NewTask"
- $ref: "#/definitions/IdStatus"
- type: object
properties:
group_name:
type: string
description: "Group this task belongs to."
readOnly: true
error:
type: string
description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log."
reason:
type: string
description: |
Machine usable reason for task being in this state.
Valid values for error status are `timeout | killed | bad_exit`.
Valid values for cancelled status are `client_request`.
For everything else, this is undefined.
enum:
- timeout
- killed
- bad_exit
- client_request
created_at:
type: string
format: date-time
description: Time when task was submitted. Always in UTC.
readOnly: true
started_at:
type: string
format: date-time
description: Time when task started execution. Always in UTC.
completed_at:
type: string
format: date-time
description: Time when task completed, whether it was successul or failed. Always in UTC.
# We maintain a doubly linked list of the retried task to the
# original task.
retry_of:
type: string
description: If this field is set, then this task is a retry of the ID in this field.
readOnly: true
retry_at:
type: string
description: If this field is set, then this task was retried by the task referenced in this field.
readOnly: true
env_vars:
# this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties
type: object
description: Env vars for the task. Comes from the ones set on the Group.
additionalProperties:
type: string
NewTasksWrapper:
type: object
required:
- tasks
properties:
tasks:
type: array
items:
$ref: '#/definitions/NewTask'
TasksWrapper:
type: object
required:
- tasks
properties:
tasks:
type: array
items:
$ref: '#/definitions/Task'
cursor:
type: string
description: Used to paginate results. If this is returned, pass it into the same query again to get more results.
error:
$ref: '#/definitions/ErrorBody'
TaskWrapper:
type: object
required:
- task
properties:
task:
$ref: '#/definitions/Task'
ErrorBody:
type: object
properties:

104
main.go
View File

@@ -1,9 +1,18 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/iron-io/functions/api/config"
"github.com/iron-io/functions/api/datastore"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/mqs"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/functions/api/server"
@@ -21,18 +30,103 @@ func main() {
log.WithError(err).Fatalln("Invalid DB url.")
}
mq, err := mqs.New(viper.GetString("mq"))
mqType, err := mqs.New(viper.GetString("MQTYPE"))
if err != nil {
log.WithError(err).Fatal("Error on init MQ")
log.WithError(err).Fatal("Error on init MQTYPE")
}
nasync := viper.GetInt("MQADR")
mqAdr := strings.TrimSpace(viper.GetString("MQADR"))
port := viper.GetInt("PORT")
if port == 0 {
port = 8080
}
if mqAdr == "" {
mqAdr = fmt.Sprintf("localhost:%d", port)
}
metricLogger := runner.NewMetricLogger()
runner, err := runner.New(metricLogger)
runner, err := runner.New(metricLogger)
if err != nil {
log.WithError(err).Fatalln("Failed to create a runner")
}
srv := server.New(ds, mq, runner)
srv.Run(ctx)
srv := server.New(ds, mqType, runner)
go srv.Run(ctx)
for i := 0; i < nasync; i++ {
fmt.Println(i)
go runAsyncRunners(mqAdr)
}
quit := make(chan bool)
for _ = range quit {
}
}
func runAsyncRunners(mqAdr string) {
url := fmt.Sprintf("http://%s/tasks", mqAdr)
logAndWait := func(err error) {
log.WithError(err)
time.Sleep(1 * time.Second)
}
for {
resp, err := http.Get(url)
if err != nil {
logAndWait(err)
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logAndWait(err)
continue
}
bodyStr := strings.TrimSpace(string(body))
if bodyStr == "null" {
logAndWait(err)
continue
}
var task models.Task
if err := json.Unmarshal(body, &task); err != nil {
logAndWait(err)
continue
}
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that?
cfg := &runner.Config{
Image: *task.Image,
Timeout: time.Duration(*task.Timeout),
ID: task.ID,
AppName: task.RouteName,
Stdout: &stdout,
Stderr: stderr,
Env: task.EnvVars,
}
metricLogger := runner.NewMetricLogger()
rnr, err := runner.New(metricLogger)
if err != nil {
log.WithError(err)
continue
}
ctx := context.Background()
if _, err = rnr.Run(ctx, cfg); err != nil {
log.WithError(err)
continue
}
if _, err = http.NewRequest(http.MethodDelete, url, nil); err != nil {
log.WithError(err)
}
}
}