diff --git a/api/models/group.go b/api/models/group.go deleted file mode 100644 index 2db37f7ff..000000000 --- a/api/models/group.go +++ /dev/null @@ -1,71 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - "github.com/go-openapi/swag" - - "github.com/go-openapi/errors" - "github.com/go-openapi/validate" -) - -/*Group group - -swagger:model Group -*/ -type Group struct { - - /* Time when image first used/created. - - Read Only: true - */ - CreatedAt strfmt.DateTime `json:"created_at,omitempty"` - - /* User defined environment variables that will be passed in to each task in this group. - */ - EnvVars map[string]string `json:"env_vars,omitempty"` - - /* Name of Docker image to use in this group. You should include the image tag, which should be a version number, to be more accurate. Can be overridden on a per task basis with task.image. - */ - Image string `json:"image,omitempty"` - - /* The maximum number of tasks that will run at the exact same time in this group. - */ - MaxConcurrency int32 `json:"max_concurrency,omitempty"` - - /* Name of this group. Must be different than the image name. Can ony contain alphanumeric, -, and _. - - Read Only: true - */ - Name string `json:"name,omitempty"` -} - -// Validate validates this group -func (m *Group) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateEnvVars(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *Group) validateEnvVars(formats strfmt.Registry) error { - - if swag.IsZero(m.EnvVars) { // not required - return nil - } - - if err := validate.Required("env_vars", "body", m.EnvVars); err != nil { - return err - } - - return nil -} diff --git a/api/models/group_wrapper.go b/api/models/group_wrapper.go deleted file mode 100644 index 04cd6736e..000000000 --- a/api/models/group_wrapper.go +++ /dev/null @@ -1,50 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - - "github.com/go-openapi/errors" -) - -/*GroupWrapper group wrapper - -swagger:model GroupWrapper -*/ -type GroupWrapper struct { - - /* group - - Required: true - */ - Group *Group `json:"group"` -} - -// Validate validates this group wrapper -func (m *GroupWrapper) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateGroup(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *GroupWrapper) validateGroup(formats strfmt.Registry) error { - - if m.Group != nil { - - if err := m.Group.Validate(formats); err != nil { - return err - } - } - - return nil -} diff --git a/api/models/groups_wrapper.go b/api/models/groups_wrapper.go deleted file mode 100644 index 0145f46f0..000000000 --- a/api/models/groups_wrapper.go +++ /dev/null @@ -1,48 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - - "github.com/go-openapi/errors" - "github.com/go-openapi/validate" -) - -/*GroupsWrapper groups wrapper - -swagger:model GroupsWrapper -*/ -type GroupsWrapper struct { - - /* groups - - Required: true - */ - Groups []*Group `json:"groups"` -} - -// Validate validates this groups wrapper -func (m *GroupsWrapper) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateGroups(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *GroupsWrapper) validateGroups(formats strfmt.Registry) error { - - if err := validate.Required("groups", "body", m.Groups); err != nil { - return err - } - - return nil -} diff --git a/api/models/new_job.go b/api/models/new_job.go deleted file mode 100644 index e55581da3..000000000 --- a/api/models/new_job.go +++ /dev/null @@ -1,95 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - - "github.com/go-openapi/errors" - "github.com/go-openapi/validate" -) - -/*NewJob new job - -swagger:model NewJob -*/ -type NewJob struct { - - /* Number of seconds to wait before queueing the job for consumption for the first time. Must be a positive integer. Jobs with a delay start in state "delayed" and transition to "running" after delay seconds. - */ - Delay int32 `json:"delay,omitempty"` - - /* Name of Docker image to use. This is optional and can be used to override the image defined at the group level. - - Required: true - */ - Image *string `json:"image"` - - /* "Number of automatic retries this job is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new job with the same image id and payload. The new job's max_retries is one less than the original. The new job's `retry_of` field is set to the original Job ID. Titan will delay the new job for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried." - - */ - MaxRetries int32 `json:"max_retries,omitempty"` - - /* Payload for the job. This is what you pass into each job to make it do something. - */ - Payload string `json:"payload,omitempty"` - - /* Priority of the job. Higher has more priority. 3 levels from 0-2. Jobs at same priority are processed in FIFO order. - - Required: true - */ - Priority *int32 `json:"priority"` - - /* Time in seconds to wait before retrying the job. Must be a non-negative integer. - */ - RetriesDelay *int32 `json:"retries_delay,omitempty"` - - /* Maximum runtime in seconds. If a consumer retrieves the - job, but does not change it's status within timeout seconds, the job - is considered failed, with reason timeout (Titan may allow a small - grace period). The consumer should also kill the job after timeout - seconds. If a consumer tries to change status after Titan has already - timed out the job, the consumer will be ignored. - - */ - Timeout *int32 `json:"timeout,omitempty"` -} - -// Validate validates this new job -func (m *NewJob) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateImage(formats); err != nil { - // prop - res = append(res, err) - } - - if err := m.validatePriority(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *NewJob) validateImage(formats strfmt.Registry) error { - - if err := validate.Required("image", "body", m.Image); err != nil { - return err - } - - return nil -} - -func (m *NewJob) validatePriority(formats strfmt.Registry) error { - - if err := validate.Required("priority", "body", m.Priority); err != nil { - return err - } - - return nil -} diff --git a/api/models/new_jobs_wrapper.go b/api/models/new_jobs_wrapper.go deleted file mode 100644 index 16bf0422d..000000000 --- a/api/models/new_jobs_wrapper.go +++ /dev/null @@ -1,48 +0,0 @@ -package models - -// This file was generated by the swagger tool. -// Editing this file might prove futile when you re-run the swagger generate command - -import ( - strfmt "github.com/go-openapi/strfmt" - - "github.com/go-openapi/errors" - "github.com/go-openapi/validate" -) - -/*NewJobsWrapper new jobs wrapper - -swagger:model NewJobsWrapper -*/ -type NewJobsWrapper struct { - - /* jobs - - Required: true - */ - Jobs []*NewJob `json:"jobs"` -} - -// Validate validates this new jobs wrapper -func (m *NewJobsWrapper) Validate(formats strfmt.Registry) error { - var res []error - - if err := m.validateJobs(formats); err != nil { - // prop - res = append(res, err) - } - - if len(res) > 0 { - return errors.CompositeValidationError(res...) - } - return nil -} - -func (m *NewJobsWrapper) validateJobs(formats strfmt.Registry) error { - - if err := validate.Required("jobs", "body", m.Jobs); err != nil { - return err - } - - return nil -} diff --git a/api/models/new_task.go b/api/models/new_task.go index 75816eaea..414929161 100644 --- a/api/models/new_task.go +++ b/api/models/new_task.go @@ -20,7 +20,7 @@ type NewTask struct { */ Delay int32 `json:"delay,omitempty"` - /* Name of Docker image to use. This is optional and can be used to override the image defined at the group level. + /* Name of Docker image to use. This is optional and can be used to override the image defined at the route level. Required: true */ diff --git a/api/models/task.go b/api/models/task.go index ec6f115d4..acad210f8 100644 --- a/api/models/task.go +++ b/api/models/task.go @@ -31,7 +31,7 @@ type Task struct { */ CreatedAt strfmt.DateTime `json:"created_at,omitempty"` - /* Env vars for the task. Comes from the ones set on the Group. + /* Env vars for the task. Comes from the ones set on the Route. */ EnvVars map[string]string `json:"env_vars,omitempty"` @@ -39,11 +39,11 @@ type Task struct { */ Error string `json:"error,omitempty"` - /* Group this task belongs to. + /* Route this task belongs to. Read Only: true */ - GroupName string `json:"group_name,omitempty"` + RouteName string `json:"route_name,omitempty"` /* Machine usable reason for task being in this state. Valid values for error status are `timeout | killed | bad_exit`. diff --git a/api/runner/task.go b/api/runner/task.go index a9a28b7ee..0d77b9b02 100644 --- a/api/runner/task.go +++ b/api/runner/task.go @@ -30,7 +30,7 @@ func (t *containerTask) Labels() map[string]string { } func (t *containerTask) Id() string { return t.cfg.ID } -func (t *containerTask) Group() string { return "" } +func (t *containerTask) Route() string { return "" } func (t *containerTask) Image() string { return t.cfg.Image } func (t *containerTask) Timeout() uint { return uint(t.cfg.Timeout.Seconds()) } func (t *containerTask) Logger() (stdout, stderr io.Writer) { return t.cfg.Stdout, t.cfg.Stderr } diff --git a/api/server/helpers.go b/api/server/helpers.go index 886f18abb..20b664f91 100644 --- a/api/server/helpers.go +++ b/api/server/helpers.go @@ -36,6 +36,11 @@ type routesResponse struct { Routes models.Routes `json:"routes"` } +type tasksResponse struct { + Message string `json:"message"` + Task models.Task `json:"tasksResponse"` +} + func testRouter() *gin.Engine { r := gin.Default() ctx := context.Background() @@ -44,9 +49,11 @@ func testRouter() *gin.Engine { c.Set("ctx", ctx) c.Next() }) - bindHandlers(r, func(ctx *gin.Context) { - handleRequest(ctx, nil) - }) + bindHandlers(r, + func(ctx *gin.Context) { + handleRequest(ctx, nil) + }, + func(ctx *gin.Context, del bool) {}) return r } diff --git a/api/server/runner.go b/api/server/runner.go index 5a2053ece..958b7a40b 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -152,12 +152,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { Memory: el.Memory, } - // Request count metric - metricBaseName := "server.handleRequest." + appName + "." - runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1) - - metricStart := time.Now() - var err error var result drivers.RunResult switch el.Type { @@ -167,7 +161,7 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { task := &models.Task{} task.Image = &cfg.Image task.ID = cfg.ID - task.GroupName = cfg.AppName + task.RouteName = cfg.AppName task.Priority = &priority // TODO: Push to queue enqueue(task) @@ -191,11 +185,6 @@ func handleRequest(c *gin.Context, enqueue models.Enqueue) { log.WithError(err).Error(models.ErrRunnerRunRoute) c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) } - - // Execution time metric - metricElapsed := time.Since(metricStart) - runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed) - runner.LogMetricTime(ctx, "server.handleRunner.exec_time", metricElapsed) return } } diff --git a/api/server/server.go b/api/server/server.go index 371647955..1c5bd6e88 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -1,7 +1,11 @@ package server import ( + "encoding/json" + "io/ioutil" + "net/http" "path" + "strings" "golang.org/x/net/context" @@ -81,11 +85,50 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { return nil } -func (s *Server) handleRequest(ginC *gin.Context) { +func (s *Server) handleRunnerRequest(c *gin.Context) { enqueue := func(task *models.Task) (*models.Task, error) { return s.MQ.Push(task) } - handleRequest(ginC, enqueue) + handleRequest(c, enqueue) +} + +func (s *Server) handleTaskRequest(c *gin.Context, del bool) { + if !del { + task, err := s.MQ.Reserve() + if err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList)) + return + } + c.JSON(http.StatusAccepted, task) + } else { + body, err := ioutil.ReadAll(c.Request.Body) + if err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err) + return + } + + bodyStr := strings.TrimSpace(string(body)) + if bodyStr == "null" { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err) + return + } + + var task models.Task + if err = json.Unmarshal(body, &task); err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err) + } + + if err := s.MQ.Delete(&task); err != nil { + logrus.WithError(err) + c.JSON(http.StatusInternalServerError, err) + return + } + c.JSON(http.StatusAccepted, task) + } } func extractFields(c *gin.Context) logrus.Fields { @@ -93,6 +136,7 @@ func extractFields(c *gin.Context) logrus.Fields { for _, param := range c.Params { fields[param.Key] = param.Value } + return fields } @@ -104,14 +148,14 @@ func (s *Server) Run(ctx context.Context) { c.Next() }) - bindHandlers(s.Router, s.handleRequest) + bindHandlers(s.Router, s.handleRunnerRequest, s.handleTaskRequest) // By default it serves on :8080 unless a // PORT environment variable was defined. s.Router.Run() } -func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context)) { +func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context), taskHandler func(ginC *gin.Context, del bool)) { engine.GET("/", handlePing) engine.GET("/version", handleVersion) @@ -136,6 +180,15 @@ func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context)) { } } + taskHandlerDelete := func(ginC *gin.Context) { + taskHandler(ginC, true) + } + taskHandlerReserve := func(ginC *gin.Context) { + taskHandler(ginC, false) + } + + engine.GET("/tasks", taskHandlerReserve) + engine.DELETE("/tasks", taskHandlerDelete) engine.Any("/r/:app/*route", reqHandler) // This final route is used for extensions, see Server.Add diff --git a/api/swagger.yml b/api/swagger.yml index fc31460e7..aace1bcbd 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -234,6 +234,30 @@ paths: 200: description: Route successfully deleted. Deletion succeeds even on routes that do not exist. + /tasks: + get: + summary: Get next task. + description: Gets the next task in the queue, ready for processing. Titan may return <=n tasks. Consumers should start processing tasks in order. Each returned task is set to `status` "running" and `started_at` is set to the current time. No other consumer can retrieve this task. + tags: + - Tasks + parameters: + - name: "n" + in: query + default: 1 + description: Number of tasks to return. + type: integer + format: int32 + responses: + 200: + description: Task information + schema: + $ref: '#/definitions/TasksWrapper' + default: + description: Unexpected error + schema: + $ref: '#/definitions/Error' + + definitions: Route: allOf: @@ -305,7 +329,95 @@ definitions: properties: app: $ref: '#/definitions/App' - + + Task: + allOf: + - $ref: "#/definitions/NewTask" + - $ref: "#/definitions/IdStatus" + - type: object + properties: + group_name: + type: string + description: "Group this task belongs to." + readOnly: true + error: + type: string + description: "The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log." + reason: + type: string + description: | + Machine usable reason for task being in this state. + Valid values for error status are `timeout | killed | bad_exit`. + Valid values for cancelled status are `client_request`. + For everything else, this is undefined. + enum: + - timeout + - killed + - bad_exit + - client_request + created_at: + type: string + format: date-time + description: Time when task was submitted. Always in UTC. + readOnly: true + started_at: + type: string + format: date-time + description: Time when task started execution. Always in UTC. + completed_at: + type: string + format: date-time + description: Time when task completed, whether it was successul or failed. Always in UTC. + # We maintain a doubly linked list of the retried task to the + # original task. + retry_of: + type: string + description: If this field is set, then this task is a retry of the ID in this field. + readOnly: true + retry_at: + type: string + description: If this field is set, then this task was retried by the task referenced in this field. + readOnly: true + env_vars: + # this is a map: https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#model-with-mapdictionary-properties + type: object + description: Env vars for the task. Comes from the ones set on the Group. + additionalProperties: + type: string + + NewTasksWrapper: + type: object + required: + - tasks + properties: + tasks: + type: array + items: + $ref: '#/definitions/NewTask' + + TasksWrapper: + type: object + required: + - tasks + properties: + tasks: + type: array + items: + $ref: '#/definitions/Task' + cursor: + type: string + description: Used to paginate results. If this is returned, pass it into the same query again to get more results. + error: + $ref: '#/definitions/ErrorBody' + + TaskWrapper: + type: object + required: + - task + properties: + task: + $ref: '#/definitions/Task' + ErrorBody: type: object properties: diff --git a/main.go b/main.go index 735bda466..18c52b7ba 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,18 @@ package main import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + log "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/config" "github.com/iron-io/functions/api/datastore" + "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/runner" "github.com/iron-io/functions/api/server" @@ -21,18 +30,103 @@ func main() { log.WithError(err).Fatalln("Invalid DB url.") } - mq, err := mqs.New(viper.GetString("mq")) + mqType, err := mqs.New(viper.GetString("MQTYPE")) if err != nil { - log.WithError(err).Fatal("Error on init MQ") + log.WithError(err).Fatal("Error on init MQTYPE") + } + + nasync := viper.GetInt("MQADR") + mqAdr := strings.TrimSpace(viper.GetString("MQADR")) + port := viper.GetInt("PORT") + if port == 0 { + port = 8080 + } + if mqAdr == "" { + mqAdr = fmt.Sprintf("localhost:%d", port) } metricLogger := runner.NewMetricLogger() - runner, err := runner.New(metricLogger) + runner, err := runner.New(metricLogger) if err != nil { log.WithError(err).Fatalln("Failed to create a runner") } - srv := server.New(ds, mq, runner) - srv.Run(ctx) + srv := server.New(ds, mqType, runner) + go srv.Run(ctx) + for i := 0; i < nasync; i++ { + fmt.Println(i) + go runAsyncRunners(mqAdr) + } + + quit := make(chan bool) + for _ = range quit { + } +} + +func runAsyncRunners(mqAdr string) { + + url := fmt.Sprintf("http://%s/tasks", mqAdr) + + logAndWait := func(err error) { + log.WithError(err) + time.Sleep(1 * time.Second) + } + + for { + resp, err := http.Get(url) + if err != nil { + logAndWait(err) + continue + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logAndWait(err) + continue + } + + bodyStr := strings.TrimSpace(string(body)) + if bodyStr == "null" { + logAndWait(err) + continue + } + + var task models.Task + if err := json.Unmarshal(body, &task); err != nil { + logAndWait(err) + continue + } + + var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader + stderr := runner.NewFuncLogger(task.RouteName, "", *task.Image, task.ID) // TODO: missing path here, how do i get that? + + cfg := &runner.Config{ + Image: *task.Image, + Timeout: time.Duration(*task.Timeout), + ID: task.ID, + AppName: task.RouteName, + Stdout: &stdout, + Stderr: stderr, + Env: task.EnvVars, + } + + metricLogger := runner.NewMetricLogger() + + rnr, err := runner.New(metricLogger) + if err != nil { + log.WithError(err) + continue + } + + ctx := context.Background() + if _, err = rnr.Run(ctx, cfg); err != nil { + log.WithError(err) + continue + } + + if _, err = http.NewRequest(http.MethodDelete, url, nil); err != nil { + log.WithError(err) + } + } }