From b623fc27e467bed6d88c48c0ce4b588319aad149 Mon Sep 17 00:00:00 2001 From: Seif Lotfy Date: Wed, 14 Sep 2016 16:11:37 -0700 Subject: [PATCH] Initial work on async functions --- api/config/config.go | 1 + api/models/complete.go | 30 +++ api/models/group.go | 71 +++++++ api/models/group_wrapper.go | 50 +++++ api/models/groups_wrapper.go | 48 +++++ api/models/id_status.go | 115 +++++++++++ api/models/mq.go | 52 +++++ api/models/new_job.go | 95 +++++++++ api/models/new_jobs_wrapper.go | 48 +++++ api/models/new_task.go | 95 +++++++++ api/models/new_tasks_wrapper.go | 48 +++++ api/models/reason.go | 57 ++++++ api/models/route.go | 14 ++ api/models/start.go | 22 +++ api/models/task.go | 135 +++++++++++++ api/models/task_wrapper.go | 50 +++++ api/models/tasks_wrapper.go | 56 ++++++ api/mqs/bolt.go | 336 ++++++++++++++++++++++++++++++++ api/mqs/memory.go | 184 +++++++++++++++++ api/mqs/new.go | 29 +++ api/mqs/redis.go | 298 ++++++++++++++++++++++++++++ api/server/helpers.go | 4 +- api/server/routes_create.go | 1 + api/server/runner.go | 40 +++- api/server/server.go | 19 +- glide.lock | 103 +++++----- glide.yaml | 3 +- hello-async.sh | 16 ++ hello-sync.sh | 16 ++ main.go | 9 +- 30 files changed, 1986 insertions(+), 59 deletions(-) create mode 100644 api/models/complete.go create mode 100644 api/models/group.go create mode 100644 api/models/group_wrapper.go create mode 100644 api/models/groups_wrapper.go create mode 100644 api/models/id_status.go create mode 100644 api/models/mq.go create mode 100644 api/models/new_job.go create mode 100644 api/models/new_jobs_wrapper.go create mode 100644 api/models/new_task.go create mode 100644 api/models/new_tasks_wrapper.go create mode 100644 api/models/reason.go create mode 100644 api/models/start.go create mode 100644 api/models/task.go create mode 100644 api/models/task_wrapper.go create mode 100644 api/models/tasks_wrapper.go create mode 100644 api/mqs/bolt.go create mode 100644 api/mqs/memory.go create mode 100644 api/mqs/new.go create mode 100644 api/mqs/redis.go create mode 100755 hello-async.sh create mode 100755 hello-sync.sh diff --git a/api/config/config.go b/api/config/config.go index e3b6fa626..7a789bb25 100644 --- a/api/config/config.go +++ b/api/config/config.go @@ -13,6 +13,7 @@ func InitConfig() { cwd, _ := os.Getwd() viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.SetDefault("log_level", "info") + viper.SetDefault("mq", fmt.Sprintf("bolt://%s/data/worker_mq.db", cwd)) viper.SetDefault("db", fmt.Sprintf("bolt://%s/data/bolt.db?bucket=funcs", cwd)) viper.SetConfigName("config") viper.AddConfigPath(".") diff --git a/api/models/complete.go b/api/models/complete.go new file mode 100644 index 000000000..2d3b92ac1 --- /dev/null +++ b/api/models/complete.go @@ -0,0 +1,30 @@ +package models + +import "github.com/go-openapi/strfmt" + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +/*Complete complete + +swagger:model Complete +*/ +type Complete struct { + + /* Time when task was completed. Always in UTC. + */ + CompletedAt strfmt.DateTime `json:"completed_at,omitempty"` + + /* Error message, if status=error. Only used by the /error endpoint. + */ + Error string `json:"error,omitempty"` + + /* Machine readable reason failure, if status=error. Only used by the /error endpoint. + */ + Reason string `json:"reason,omitempty"` +} + +// Validate validates this complete +func (m *Complete) Validate(formats strfmt.Registry) error { + return nil +} diff --git a/api/models/group.go b/api/models/group.go new file mode 100644 index 000000000..2db37f7ff --- /dev/null +++ b/api/models/group.go @@ -0,0 +1,71 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*Group group + +swagger:model Group +*/ +type Group struct { + + /* Time when image first used/created. + + Read Only: true + */ + CreatedAt strfmt.DateTime `json:"created_at,omitempty"` + + /* User defined environment variables that will be passed in to each task in this group. + */ + EnvVars map[string]string `json:"env_vars,omitempty"` + + /* Name of Docker image to use in this group. You should include the image tag, which should be a version number, to be more accurate. Can be overridden on a per task basis with task.image. + */ + Image string `json:"image,omitempty"` + + /* The maximum number of tasks that will run at the exact same time in this group. + */ + MaxConcurrency int32 `json:"max_concurrency,omitempty"` + + /* Name of this group. Must be different than the image name. Can ony contain alphanumeric, -, and _. + + Read Only: true + */ + Name string `json:"name,omitempty"` +} + +// Validate validates this group +func (m *Group) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateEnvVars(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *Group) validateEnvVars(formats strfmt.Registry) error { + + if swag.IsZero(m.EnvVars) { // not required + return nil + } + + if err := validate.Required("env_vars", "body", m.EnvVars); err != nil { + return err + } + + return nil +} diff --git a/api/models/group_wrapper.go b/api/models/group_wrapper.go new file mode 100644 index 000000000..04cd6736e --- /dev/null +++ b/api/models/group_wrapper.go @@ -0,0 +1,50 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" +) + +/*GroupWrapper group wrapper + +swagger:model GroupWrapper +*/ +type GroupWrapper struct { + + /* group + + Required: true + */ + Group *Group `json:"group"` +} + +// Validate validates this group wrapper +func (m *GroupWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateGroup(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *GroupWrapper) validateGroup(formats strfmt.Registry) error { + + if m.Group != nil { + + if err := m.Group.Validate(formats); err != nil { + return err + } + } + + return nil +} diff --git a/api/models/groups_wrapper.go b/api/models/groups_wrapper.go new file mode 100644 index 000000000..0145f46f0 --- /dev/null +++ b/api/models/groups_wrapper.go @@ -0,0 +1,48 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*GroupsWrapper groups wrapper + +swagger:model GroupsWrapper +*/ +type GroupsWrapper struct { + + /* groups + + Required: true + */ + Groups []*Group `json:"groups"` +} + +// Validate validates this groups wrapper +func (m *GroupsWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateGroups(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *GroupsWrapper) validateGroups(formats strfmt.Registry) error { + + if err := validate.Required("groups", "body", m.Groups); err != nil { + return err + } + + return nil +} diff --git a/api/models/id_status.go b/api/models/id_status.go new file mode 100644 index 000000000..70a00bab6 --- /dev/null +++ b/api/models/id_status.go @@ -0,0 +1,115 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + strfmt "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*IDStatus Id status + +swagger:model IdStatus +*/ +type IDStatus struct { + + /* Unique identifier representing a specific task. + + Read Only: true + */ + ID string `json:"id,omitempty"` + + /* States and valid transitions. + + +---------+ + +---------> delayed <----------------+ + +----+----+ | + | | + | | + +----v----+ | + +---------> queued <----------------+ + +----+----+ * + | * + | retry * creates new task + +----v----+ * + | running | * + +--+-+-+--+ | + +---------|-|-|-----+-------------+ + +---|---------+ | +-----|---------+ | + | | | | | | + +-----v---^-+ +--v-------^+ +--v---^-+ + | success | | cancelled | | error | + +-----------+ +-----------+ +--------+ + + * delayed - has a delay. + * queued - Ready to be consumed when it's turn comes. + * running - Currently consumed by a runner which will attempt to process it. + * success - (or complete? success/error is common javascript terminology) + * error - Something went wrong. In this case more information can be obtained + by inspecting the "reason" field. + - timeout + - killed - forcibly killed by worker due to resource restrictions or access + violations. + - bad_exit - exited with non-zero status due to program termination/crash. + * cancelled - cancelled via API. More information in the reason field. + - client_request - Request was cancelled by a client. + + + Read Only: true + */ + Status string `json:"status,omitempty"` +} + +// Validate validates this Id status +func (m *IDStatus) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateStatus(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +var idStatusTypeStatusPropEnum []interface{} + +// prop value enum +func (m *IDStatus) validateStatusEnum(path, location string, value string) error { + if idStatusTypeStatusPropEnum == nil { + var res []string + if err := json.Unmarshal([]byte(`["delayed","queued","running","success","error","cancelled"]`), &res); err != nil { + return err + } + for _, v := range res { + idStatusTypeStatusPropEnum = append(idStatusTypeStatusPropEnum, v) + } + } + if err := validate.Enum(path, location, value, idStatusTypeStatusPropEnum); err != nil { + return err + } + return nil +} + +func (m *IDStatus) validateStatus(formats strfmt.Registry) error { + + if swag.IsZero(m.Status) { // not required + return nil + } + + // value enum + if err := m.validateStatusEnum("status", "body", m.Status); err != nil { + return err + } + + return nil +} diff --git a/api/models/mq.go b/api/models/mq.go new file mode 100644 index 000000000..76d24a2e1 --- /dev/null +++ b/api/models/mq.go @@ -0,0 +1,52 @@ +package models + +// Titan uses a Message Queue to impose a total ordering on jobs that it will +// execute in order. Tasks are added to the queue via the Push() interface. The +// MQ must support a reserve-delete 2 step dequeue to allow Titan to implement +// timeouts and retries. +// +// The Reserve() operation must return a job based on this total ordering +// (described below). At this point, the MQ backend must start a timeout on the +// job. If Delete() is not called on the Task within the timeout, the Task should +// be restored to the queue. +// +// Total ordering: The queue should maintain an ordering based on priority and +// logical time. Priorities are currently 0-2 and available in the Task's +// priority field. Tasks with higher priority always get pulled off the queue +// first. Within the same priority, jobs should be available in FIFO order. + +// When a job is required to be restored to the queue, it should maintain it's +// approximate order in the queue. That is, for jobs [A, B, C], with A being +// the head of the queue: +// Reserve() leads to A being passed to a consumer, and timeout started. +// Next Reserve() leads to B being dequeued. This consumer finishes running the +// task, leading to Delete() being called. B is now permanently erased from the +// queue. +// A's timeout occurs before the job is finished. At this point the ordering +// should be [A, C] and not [C, A]. +type MessageQueue interface { + // Push a Task onto the queue. If any error is returned, the Task SHOULD not be + // queued. Note that this does not completely avoid double queueing, that is + // OK, Titan will perform a check against the datastore after a dequeue. + // + // If the job's Delay value is > 0, the job should NOT be enqueued. The job + // should only be available in the queue after at least Delay seconds have + // elapsed. No ordering is required among multiple jobs queued with similar + // delays. That is, if jobs {A, C} are queued at t seconds, both with Delay + // = 5 seconds, and the same priority, then they may be available on the + // queue as [C, A] or [A, C]. + Push(*Task) (*Task, error) + + // Remove a job from the front of the queue, reserve it for a timeout and + // return it. MQ implementations MUST NOT lose jobs in case of errors. That + // is, in case of reservation failure, it should be possible to retrieve the + // job on a future reservation. + Reserve() (*Task, error) + + // If a reservation is pending, consider it acknowledged and delete it. If + // the job does not have an outstanding reservation, error. If a job did not + // exist, succeed. + Delete(*Task) error +} + +type Enqueue func(*Task) (*Task, error) diff --git a/api/models/new_job.go b/api/models/new_job.go new file mode 100644 index 000000000..e55581da3 --- /dev/null +++ b/api/models/new_job.go @@ -0,0 +1,95 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*NewJob new job + +swagger:model NewJob +*/ +type NewJob struct { + + /* Number of seconds to wait before queueing the job for consumption for the first time. Must be a positive integer. Jobs with a delay start in state "delayed" and transition to "running" after delay seconds. + */ + Delay int32 `json:"delay,omitempty"` + + /* Name of Docker image to use. This is optional and can be used to override the image defined at the group level. + + Required: true + */ + Image *string `json:"image"` + + /* "Number of automatic retries this job is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new job with the same image id and payload. The new job's max_retries is one less than the original. The new job's `retry_of` field is set to the original Job ID. Titan will delay the new job for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried." + + */ + MaxRetries int32 `json:"max_retries,omitempty"` + + /* Payload for the job. This is what you pass into each job to make it do something. + */ + Payload string `json:"payload,omitempty"` + + /* Priority of the job. Higher has more priority. 3 levels from 0-2. Jobs at same priority are processed in FIFO order. + + Required: true + */ + Priority *int32 `json:"priority"` + + /* Time in seconds to wait before retrying the job. Must be a non-negative integer. + */ + RetriesDelay *int32 `json:"retries_delay,omitempty"` + + /* Maximum runtime in seconds. If a consumer retrieves the + job, but does not change it's status within timeout seconds, the job + is considered failed, with reason timeout (Titan may allow a small + grace period). The consumer should also kill the job after timeout + seconds. If a consumer tries to change status after Titan has already + timed out the job, the consumer will be ignored. + + */ + Timeout *int32 `json:"timeout,omitempty"` +} + +// Validate validates this new job +func (m *NewJob) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateImage(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validatePriority(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *NewJob) validateImage(formats strfmt.Registry) error { + + if err := validate.Required("image", "body", m.Image); err != nil { + return err + } + + return nil +} + +func (m *NewJob) validatePriority(formats strfmt.Registry) error { + + if err := validate.Required("priority", "body", m.Priority); err != nil { + return err + } + + return nil +} diff --git a/api/models/new_jobs_wrapper.go b/api/models/new_jobs_wrapper.go new file mode 100644 index 000000000..16bf0422d --- /dev/null +++ b/api/models/new_jobs_wrapper.go @@ -0,0 +1,48 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*NewJobsWrapper new jobs wrapper + +swagger:model NewJobsWrapper +*/ +type NewJobsWrapper struct { + + /* jobs + + Required: true + */ + Jobs []*NewJob `json:"jobs"` +} + +// Validate validates this new jobs wrapper +func (m *NewJobsWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateJobs(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *NewJobsWrapper) validateJobs(formats strfmt.Registry) error { + + if err := validate.Required("jobs", "body", m.Jobs); err != nil { + return err + } + + return nil +} diff --git a/api/models/new_task.go b/api/models/new_task.go new file mode 100644 index 000000000..75816eaea --- /dev/null +++ b/api/models/new_task.go @@ -0,0 +1,95 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*NewTask new task + +swagger:model NewTask +*/ +type NewTask struct { + + /* Number of seconds to wait before queueing the task for consumption for the first time. Must be a positive integer. Tasks with a delay start in state "delayed" and transition to "running" after delay seconds. + */ + Delay int32 `json:"delay,omitempty"` + + /* Name of Docker image to use. This is optional and can be used to override the image defined at the group level. + + Required: true + */ + Image *string `json:"image"` + + /* "Number of automatic retries this task is allowed. A retry will be attempted if a task fails. Max 25. Automatic retries are performed by titan when a task reaches a failed state and has `max_retries` > 0. A retry is performed by queueing a new task with the same image id and payload. The new task's max_retries is one less than the original. The new task's `retry_of` field is set to the original Task ID. The old task's `retry_at` field is set to the new Task's ID. Titan will delay the new task for retries_delay seconds before queueing it. Cancelled or successful tasks are never automatically retried." + + */ + MaxRetries int32 `json:"max_retries,omitempty"` + + /* Payload for the task. This is what you pass into each task to make it do something. + */ + Payload string `json:"payload,omitempty"` + + /* Priority of the task. Higher has more priority. 3 levels from 0-2. Tasks at same priority are processed in FIFO order. + + Required: true + */ + Priority *int32 `json:"priority"` + + /* Time in seconds to wait before retrying the task. Must be a non-negative integer. + */ + RetriesDelay *int32 `json:"retries_delay,omitempty"` + + /* Maximum runtime in seconds. If a consumer retrieves the + task, but does not change it's status within timeout seconds, the task + is considered failed, with reason timeout (Titan may allow a small + grace period). The consumer should also kill the task after timeout + seconds. If a consumer tries to change status after Titan has already + timed out the task, the consumer will be ignored. + + */ + Timeout *int32 `json:"timeout,omitempty"` +} + +// Validate validates this new task +func (m *NewTask) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateImage(formats); err != nil { + // prop + res = append(res, err) + } + + if err := m.validatePriority(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *NewTask) validateImage(formats strfmt.Registry) error { + + if err := validate.Required("image", "body", m.Image); err != nil { + return err + } + + return nil +} + +func (m *NewTask) validatePriority(formats strfmt.Registry) error { + + if err := validate.Required("priority", "body", m.Priority); err != nil { + return err + } + + return nil +} diff --git a/api/models/new_tasks_wrapper.go b/api/models/new_tasks_wrapper.go new file mode 100644 index 000000000..b4f5a77eb --- /dev/null +++ b/api/models/new_tasks_wrapper.go @@ -0,0 +1,48 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*NewTasksWrapper new tasks wrapper + +swagger:model NewTasksWrapper +*/ +type NewTasksWrapper struct { + + /* tasks + + Required: true + */ + Tasks []*NewTask `json:"tasks"` +} + +// Validate validates this new tasks wrapper +func (m *NewTasksWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateTasks(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *NewTasksWrapper) validateTasks(formats strfmt.Registry) error { + + if err := validate.Required("tasks", "body", m.Tasks); err != nil { + return err + } + + return nil +} diff --git a/api/models/reason.go b/api/models/reason.go new file mode 100644 index 000000000..cc647ecb4 --- /dev/null +++ b/api/models/reason.go @@ -0,0 +1,57 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*Reason Machine usable reason for job being in this state. +Valid values for error status are `timeout | killed | bad_exit`. +Valid values for cancelled status are `client_request`. +For everything else, this is undefined. + + +swagger:model Reason +*/ +type Reason string + +// for schema +var reasonEnum []interface{} + +func (m Reason) validateReasonEnum(path, location string, value Reason) error { + if reasonEnum == nil { + var res []Reason + if err := json.Unmarshal([]byte(`["timeout","killed","bad_exit","client_request"]`), &res); err != nil { + return err + } + for _, v := range res { + reasonEnum = append(reasonEnum, v) + } + } + if err := validate.Enum(path, location, value, reasonEnum); err != nil { + return err + } + return nil +} + +// Validate validates this reason +func (m Reason) Validate(formats strfmt.Registry) error { + var res []error + + // value enum + if err := m.validateReasonEnum("", "body", m); err != nil { + return err + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/api/models/route.go b/api/models/route.go index e146311f3..aec62b703 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -2,6 +2,7 @@ package models import ( "errors" + "fmt" "net/http" "path" @@ -26,6 +27,7 @@ type Route struct { Image string `json:"image,omitempty"` Memory uint64 `json:"memory,omitempty"` Headers http.Header `json:"headers,omitempty"` + Type string `json:"type,omitempty"` Config `json:"config"` } @@ -35,6 +37,8 @@ var ( ErrRoutesValidationMissingAppName = errors.New("Missing route AppName") ErrRoutesValidationMissingPath = errors.New("Missing route Path") ErrRoutesValidationInvalidPath = errors.New("Invalid Path format") + ErrRoutesValidationMissingType = errors.New("Missing route Type") + ErrRoutesValidationInvalidType = errors.New("Invalid route Type") ) func (r *Route) Validate() error { @@ -60,6 +64,16 @@ func (r *Route) Validate() error { res = append(res, ErrRoutesValidationInvalidPath) } + if r.Type == "" { + r.Type = "sync" + } + + if r.Type != "async" && r.Type != "sync" { + res = append(res, ErrRoutesValidationInvalidType) + } + + fmt.Println(">>>", r.Type) + if len(res) > 0 { return apiErrors.CompositeValidationError(res...) } diff --git a/api/models/start.go b/api/models/start.go new file mode 100644 index 000000000..ec5784090 --- /dev/null +++ b/api/models/start.go @@ -0,0 +1,22 @@ +package models + +import "github.com/go-openapi/strfmt" + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +/*Start start + +swagger:model Start +*/ +type Start struct { + + /* Time when task started execution. Always in UTC. + */ + StartedAt strfmt.DateTime `json:"started_at,omitempty"` +} + +// Validate validates this start +func (m *Start) Validate(formats strfmt.Registry) error { + return nil +} diff --git a/api/models/task.go b/api/models/task.go new file mode 100644 index 000000000..ec6f115d4 --- /dev/null +++ b/api/models/task.go @@ -0,0 +1,135 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*Task task + +swagger:model Task +*/ +type Task struct { + NewTask + + IDStatus + + /* Time when task completed, whether it was successul or failed. Always in UTC. + */ + CompletedAt strfmt.DateTime `json:"completed_at,omitempty"` + + /* Time when task was submitted. Always in UTC. + + Read Only: true + */ + CreatedAt strfmt.DateTime `json:"created_at,omitempty"` + + /* Env vars for the task. Comes from the ones set on the Group. + */ + EnvVars map[string]string `json:"env_vars,omitempty"` + + /* The error message, if status is 'error'. This is errors due to things outside the task itself. Errors from user code will be found in the log. + */ + Error string `json:"error,omitempty"` + + /* Group this task belongs to. + + Read Only: true + */ + GroupName string `json:"group_name,omitempty"` + + /* Machine usable reason for task being in this state. + Valid values for error status are `timeout | killed | bad_exit`. + Valid values for cancelled status are `client_request`. + For everything else, this is undefined. + + */ + Reason string `json:"reason,omitempty"` + + /* If this field is set, then this task was retried by the task referenced in this field. + + Read Only: true + */ + RetryAt string `json:"retry_at,omitempty"` + + /* If this field is set, then this task is a retry of the ID in this field. + + Read Only: true + */ + RetryOf string `json:"retry_of,omitempty"` + + /* Time when task started execution. Always in UTC. + */ + StartedAt strfmt.DateTime `json:"started_at,omitempty"` +} + +// Validate validates this task +func (m *Task) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.NewTask.Validate(formats); err != nil { + res = append(res, err) + } + + if err := m.IDStatus.Validate(formats); err != nil { + res = append(res, err) + } + + if err := m.validateEnvVars(formats); err != nil { + res = append(res, err) + } + + if err := m.validateReason(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *Task) validateEnvVars(formats strfmt.Registry) error { + + if err := validate.Required("env_vars", "body", m.EnvVars); err != nil { + return err + } + + return nil +} + +var taskTypeReasonPropEnum []interface{} + +// property enum +func (m *Task) validateReasonEnum(path, location string, value string) error { + if taskTypeReasonPropEnum == nil { + var res []string + if err := json.Unmarshal([]byte(`["timeout","killed","bad_exit","client_request"]`), &res); err != nil { + return err + } + for _, v := range res { + taskTypeReasonPropEnum = append(taskTypeReasonPropEnum, v) + } + } + if err := validate.Enum(path, location, value, taskTypeReasonPropEnum); err != nil { + return err + } + return nil +} + +func (m *Task) validateReason(formats strfmt.Registry) error { + + // value enum + if err := m.validateReasonEnum("reason", "body", m.Reason); err != nil { + return err + } + + return nil +} diff --git a/api/models/task_wrapper.go b/api/models/task_wrapper.go new file mode 100644 index 000000000..fb3b4ec89 --- /dev/null +++ b/api/models/task_wrapper.go @@ -0,0 +1,50 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" +) + +/*TaskWrapper task wrapper + +swagger:model TaskWrapper +*/ +type TaskWrapper struct { + + /* task + + Required: true + */ + Task *Task `json:"task"` +} + +// Validate validates this task wrapper +func (m *TaskWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateTask(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *TaskWrapper) validateTask(formats strfmt.Registry) error { + + if m.Task != nil { + + if err := m.Task.Validate(formats); err != nil { + return err + } + } + + return nil +} diff --git a/api/models/tasks_wrapper.go b/api/models/tasks_wrapper.go new file mode 100644 index 000000000..8d3c76cd0 --- /dev/null +++ b/api/models/tasks_wrapper.go @@ -0,0 +1,56 @@ +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/validate" +) + +/*TasksWrapper tasks wrapper + +swagger:model TasksWrapper +*/ +type TasksWrapper struct { + + /* Used to paginate results. If this is returned, pass it into the same query again to get more results. + */ + Cursor string `json:"cursor,omitempty"` + + /* error + */ + Error *ErrorBody `json:"error,omitempty"` + + /* tasks + + Required: true + */ + Tasks []*Task `json:"tasks"` +} + +// Validate validates this tasks wrapper +func (m *TasksWrapper) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateTasks(formats); err != nil { + // prop + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +func (m *TasksWrapper) validateTasks(formats strfmt.Registry) error { + + if err := validate.Required("tasks", "body", m.Tasks); err != nil { + return err + } + + return nil +} diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go new file mode 100644 index 000000000..7568f6496 --- /dev/null +++ b/api/mqs/bolt.go @@ -0,0 +1,336 @@ +package mqs + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" + "github.com/iron-io/functions/api/models" +) + +type BoltDbMQ struct { + db *bolt.DB + ticker *time.Ticker +} + +type BoltDbConfig struct { + FileName string `mapstructure:"filename"` +} + +func jobKey(jobID string) []byte { + b := make([]byte, len(jobID)+1) + b[0] = 'j' + copy(b[1:], []byte(jobID)) + return b +} + +const timeoutToIDKeyPrefix = "id:" + +func timeoutToIDKey(timeout []byte) []byte { + b := make([]byte, len(timeout)+len(timeoutToIDKeyPrefix)) + copy(b[:], []byte(timeoutToIDKeyPrefix)) + copy(b[len(timeoutToIDKeyPrefix):], []byte(timeout)) + return b +} + +var delayQueueName = []byte("titan_delay") + +func queueName(i int) []byte { + return []byte(fmt.Sprintf("titan_%d_queue", i)) +} + +func timeoutName(i int) []byte { + return []byte(fmt.Sprintf("titan_%d_timeout", i)) +} + +func NewBoltMQ(url *url.URL) (*BoltDbMQ, error) { + dir := filepath.Dir(url.Path) + log := logrus.WithFields(logrus.Fields{"mq": url.Scheme, "dir": dir}) + err := os.MkdirAll(dir, 0777) + if err != nil { + log.WithError(err).Errorln("Could not create data directory for mq") + return nil, err + } + db, err := bolt.Open(url.Path, 0600, nil) + if err != nil { + return nil, err + } + + err = db.Update(func(tx *bolt.Tx) error { + for i := 0; i < 3; i++ { + _, err := tx.CreateBucketIfNotExists(queueName(i)) + if err != nil { + log.WithError(err).Errorln("Error creating bucket") + return err + } + _, err = tx.CreateBucketIfNotExists(timeoutName(i)) + if err != nil { + log.WithError(err).Errorln("Error creating timeout bucket") + return err + } + } + _, err = tx.CreateBucketIfNotExists(delayQueueName) + if err != nil { + log.WithError(err).Errorln("Error creating delay bucket") + return err + } + return nil + }) + if err != nil { + log.WithError(err).Errorln("Error creating timeout bucket") + return nil, err + } + + ticker := time.NewTicker(time.Second) + mq := &BoltDbMQ{ + ticker: ticker, + db: db, + } + mq.Start() + log.WithFields(logrus.Fields{"file": url.Path}).Info("BoltDb initialized") + return mq, nil +} + +func (mq *BoltDbMQ) Start() { + go func() { + // It would be nice to switch to a tick-less, next-event Timer based model. + for _ = range mq.ticker.C { + err := mq.db.Update(func(tx *bolt.Tx) error { + now := uint64(time.Now().UnixNano()) + for i := 0; i < 3; i++ { + // Assume our timeouts bucket exists and has resKey encoded keys. + jobBucket := tx.Bucket(queueName(i)) + timeoutBucket := tx.Bucket(timeoutName(i)) + c := timeoutBucket.Cursor() + + var err error + for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() { + reserved, id := resKeyToProperties(k) + if reserved > now { + break + } + err = jobBucket.Put(id, v) + if err != nil { + return err + } + timeoutBucket.Delete(k) + timeoutBucket.Delete(timeoutToIDKey(k)) + } + } + + return nil + }) + if err != nil { + logrus.WithError(err).Error("boltdb reservation check error") + } + + err = mq.db.Update(func(tx *bolt.Tx) error { + now := uint64(time.Now().UnixNano()) + // Assume our timeouts bucket exists and has resKey encoded keys. + delayBucket := tx.Bucket(delayQueueName) + c := delayBucket.Cursor() + + var err error + for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() { + reserved, id := resKeyToProperties(k) + if reserved > now { + break + } + + priority := binary.BigEndian.Uint32(v) + job := delayBucket.Get(id) + if job == nil { + // oops + logrus.Warnf("Expected delayed job, none found with id %s", id) + continue + } + + jobBucket := tx.Bucket(queueName(int(priority))) + err = jobBucket.Put(id, job) + if err != nil { + return err + } + + err := delayBucket.Delete(k) + if err != nil { + return err + } + + return delayBucket.Delete(id) + } + return nil + }) + if err != nil { + logrus.WithError(err).Error("boltdb delay check error") + } + } + }() +} + +// We insert a "reservation" at readyAt, and store the json blob at the msg +// key. The timer loop plucks this out and puts it in the jobs bucket when the +// time elapses. The value stored at the reservation key is the priority. +func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) { + readyAt := time.Now().Add(time.Duration(job.Delay) * time.Second) + err := mq.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(delayQueueName) + id, _ := b.NextSequence() + buf, err := json.Marshal(job) + if err != nil { + return err + } + + key := msgKey(id) + err = b.Put(key, buf) + if err != nil { + return err + } + + pb := make([]byte, 4) + binary.BigEndian.PutUint32(pb[:], uint32(*job.Priority)) + reservation := resKey(key, readyAt) + return b.Put(reservation, pb) + }) + return job, err +} + +func (mq *BoltDbMQ) Push(job *models.Task) (*models.Task, error) { + if job.Delay > 0 { + return mq.delayTask(job) + } + + err := mq.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(queueName(int(*job.Priority))) + + id, _ := b.NextSequence() + + buf, err := json.Marshal(job) + if err != nil { + return err + } + + return b.Put(msgKey(id), buf) + }) + if err != nil { + return nil, err + } + + return job, nil + +} + +const msgKeyPrefix = "j:" +const msgKeyLength = len(msgKeyPrefix) + 8 +const resKeyPrefix = "r:" + +// r::msgKey +// The msgKey is used to introduce uniqueness within the timestamp. It probably isn't required. +const resKeyLength = len(resKeyPrefix) + msgKeyLength + 8 + +func msgKey(v uint64) []byte { + b := make([]byte, msgKeyLength) + copy(b[:], []byte(msgKeyPrefix)) + binary.BigEndian.PutUint64(b[len(msgKeyPrefix):], v) + return b +} + +func resKey(jobKey []byte, reservedUntil time.Time) []byte { + b := make([]byte, resKeyLength) + copy(b[:], []byte(resKeyPrefix)) + binary.BigEndian.PutUint64(b[len(resKeyPrefix):], uint64(reservedUntil.UnixNano())) + copy(b[len(resKeyPrefix)+8:], jobKey) + return b +} + +func resKeyToProperties(key []byte) (uint64, []byte) { + if len(key) != resKeyLength { + return 0, nil + } + + reservedUntil := binary.BigEndian.Uint64(key[len(resKeyPrefix):]) + return reservedUntil, key[len(resKeyPrefix)+8:] +} + +func (mq *BoltDbMQ) Reserve() (*models.Task, error) { + // Start a writable transaction. + tx, err := mq.db.Begin(true) + if err != nil { + return nil, err + } + defer tx.Rollback() + + for i := 2; i >= 0; i-- { + // Use the transaction... + b := tx.Bucket(queueName(i)) + c := b.Cursor() + key, value := c.Seek([]byte(msgKeyPrefix)) + if key == nil { + // No jobs, try next bucket + continue + } + + b.Delete(key) + + var job models.Task + err = json.Unmarshal([]byte(value), &job) + if err != nil { + return nil, err + } + + reservationKey := resKey(key, time.Now().Add(time.Minute)) + b = tx.Bucket(timeoutName(i)) + // Reserve introduces 3 keys in timeout bucket: + // Save reservationKey -> Task to allow release + // Save job.ID -> reservationKey to allow Deletes + // Save reservationKey -> job.ID to allow clearing job.ID -> reservationKey in recovery without unmarshaling the job. + // On Delete: + // We have job ID, we get the reservationKey + // Delete job.ID -> reservationKey + // Delete reservationKey -> job.ID + // Delete reservationKey -> Task + // On Release: + // We have reservationKey, we get the jobID + // Delete reservationKey -> job.ID + // Delete job.ID -> reservationKey + // Move reservationKey -> Task to job bucket. + b.Put(reservationKey, value) + b.Put(jobKey(job.ID), reservationKey) + b.Put(timeoutToIDKey(reservationKey), []byte(job.ID)) + + // Commit the transaction and check for error. + if err := tx.Commit(); err != nil { + return nil, err + } + return &job, nil + } + + return nil, nil +} + +func (mq *BoltDbMQ) Delete(job *models.Task) error { + return mq.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(timeoutName(int(*job.Priority))) + k := jobKey(job.ID) + + reservationKey := b.Get(k) + if reservationKey == nil { + return errors.New("Not found") + } + + for _, k := range [][]byte{k, timeoutToIDKey(reservationKey), reservationKey} { + err := b.Delete(k) + if err != nil { + return err + } + } + + return nil + }) +} diff --git a/api/mqs/memory.go b/api/mqs/memory.go new file mode 100644 index 000000000..57d99fd03 --- /dev/null +++ b/api/mqs/memory.go @@ -0,0 +1,184 @@ +package mqs + +import ( + "errors" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/google/btree" + "github.com/iron-io/functions/api/models" +) + +type MemoryMQ struct { + // WorkQueue A buffered channel that we can send work requests on. + PriorityQueues []chan *models.Task + Ticker *time.Ticker + BTree *btree.BTree + Timeouts map[string]*TaskItem + // Protects B-tree and Timeouts + // If this becomes a bottleneck, consider separating the two mutexes. The + // goroutine to clear up timed out messages could also become a bottleneck at + // some point. May need to switch to bucketing of some sort. + Mutex sync.Mutex +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq(n int) string { + rand.Seed(time.Now().Unix()) + b := make([]rune, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +const NumPriorities = 3 + +func NewMemoryMQ() *MemoryMQ { + var queues []chan *models.Task + for i := 0; i < NumPriorities; i++ { + queues = append(queues, make(chan *models.Task, 5000)) + } + ticker := time.NewTicker(time.Second) + mq := &MemoryMQ{ + PriorityQueues: queues, + Ticker: ticker, + BTree: btree.New(2), + Timeouts: make(map[string]*TaskItem, 0), + } + mq.start() + logrus.Info("MemoryMQ initialized") + return mq +} + +func (mq *MemoryMQ) start() { + // start goroutine to check for delayed jobs and put them onto regular queue when ready + go func() { + for _ = range mq.Ticker.C { + ji := &TaskItem{ + StartAt: time.Now(), + } + mq.Mutex.Lock() + mq.BTree.AscendLessThan(ji, func(a btree.Item) bool { + logrus.WithFields(logrus.Fields{"queue": a}).Debug("delayed job move to queue") + ji2 := mq.BTree.Delete(a).(*TaskItem) + // put it onto the regular queue now + _, err := mq.pushForce(ji2.Task) + if err != nil { + logrus.WithError(err).Error("Couldn't push delayed message onto main queue") + } + return true + }) + mq.Mutex.Unlock() + } + }() + // start goroutine to check for messages that have timed out and put them back onto regular queue + // TODO: this should be like the delayed messages above. Could even be the same thing as delayed messages, but remove them if job is completed. + go func() { + for _ = range mq.Ticker.C { + ji := &TaskItem{ + StartAt: time.Now(), + } + mq.Mutex.Lock() + for _, jobItem := range mq.Timeouts { + if jobItem.Less(ji) { + delete(mq.Timeouts, jobItem.Task.ID) + _, err := mq.pushForce(jobItem.Task) + if err != nil { + logrus.WithError(err).Error("Couldn't push timed out message onto main queue") + } + } + } + mq.Mutex.Unlock() + } + }() +} + +// TaskItem is for the Btree, implements btree.Item +type TaskItem struct { + Task *models.Task + StartAt time.Time +} + +func (ji *TaskItem) Less(than btree.Item) bool { + // TODO: this could lose jobs: https://godoc.org/github.com/google/btree#Item + ji2 := than.(*TaskItem) + return ji.StartAt.Before(ji2.StartAt) +} + +func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) { + + // It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation. + // job.MsgId = randSeq(20) + if job.Delay > 0 { + // then we'll put into short term storage until ready + ji := &TaskItem{ + Task: job, + StartAt: time.Now().Add(time.Second * time.Duration(job.Delay)), + } + mq.Mutex.Lock() + replaced := mq.BTree.ReplaceOrInsert(ji) + mq.Mutex.Unlock() + if replaced != nil { + logrus.Warn("Ooops! an item was replaced and therefore lost, not good.") + } + return job, nil + } + + // Push the work onto the queue. + return mq.pushForce(job) +} +func (mq *MemoryMQ) pushTimeout(job *models.Task) error { + + ji := &TaskItem{ + Task: job, + StartAt: time.Now().Add(time.Minute), + } + mq.Mutex.Lock() + mq.Timeouts[job.ID] = ji + mq.Mutex.Unlock() + return nil +} + +func (mq *MemoryMQ) pushForce(job *models.Task) (*models.Task, error) { + mq.PriorityQueues[*job.Priority] <- job + return job, nil +} + +// This is recursive, so be careful how many channels you pass in. +func pickEarliestNonblocking(channels ...chan *models.Task) *models.Task { + if len(channels) == 0 { + return nil + } + + select { + case job := <-channels[0]: + return job + default: + return pickEarliestNonblocking(channels[1:]...) + } +} + +func (mq *MemoryMQ) Reserve() (*models.Task, error) { + job := pickEarliestNonblocking(mq.PriorityQueues[2], mq.PriorityQueues[1], mq.PriorityQueues[0]) + if job == nil { + return nil, nil + } + + return job, mq.pushTimeout(job) +} + +func (mq *MemoryMQ) Delete(job *models.Task) error { + mq.Mutex.Lock() + defer mq.Mutex.Unlock() + _, exists := mq.Timeouts[job.ID] + if !exists { + return errors.New("Not reserved") + } + + delete(mq.Timeouts, job.ID) + return nil +} diff --git a/api/mqs/new.go b/api/mqs/new.go new file mode 100644 index 000000000..7ef96af92 --- /dev/null +++ b/api/mqs/new.go @@ -0,0 +1,29 @@ +package mqs + +import ( + "fmt" + "net/url" + + "github.com/Sirupsen/logrus" + "github.com/iron-io/functions/api/models" +) + +// New will parse the URL and return the correct MQ implementation. +func New(mqURL string) (models.MessageQueue, error) { + // Play with URL schemes here: https://play.golang.org/p/xWAf9SpCBW + u, err := url.Parse(mqURL) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"url": mqURL}).Fatal("bad MQ URL") + } + logrus.WithFields(logrus.Fields{"mq": u.Scheme}).Info("selecting MQ") + switch u.Scheme { + case "memory": + return NewMemoryMQ(), nil + case "redis": + return NewRedisMQ(u) + case "bolt": + return NewBoltMQ(u) + } + + return nil, fmt.Errorf("mq type not supported %v", u.Scheme) +} diff --git a/api/mqs/redis.go b/api/mqs/redis.go new file mode 100644 index 000000000..cad4472bd --- /dev/null +++ b/api/mqs/redis.go @@ -0,0 +1,298 @@ +package mqs + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + "time" + + "github.com/Sirupsen/logrus" + "github.com/garyburd/redigo/redis" + "github.com/iron-io/functions/api/models" +) + +type RedisMQ struct { + pool *redis.Pool + queueName string + ticker *time.Ticker + prefix string +} + +func NewRedisMQ(url *url.URL) (*RedisMQ, error) { + + pool := &redis.Pool{ + MaxIdle: 4, + // I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea. + MaxActive: 16, + Wait: true, + IdleTimeout: 300 * time.Second, + Dial: func() (redis.Conn, error) { + return redis.DialURL(url.String()) + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } + + // Force a connection so we can fail in case of error. + conn := pool.Get() + if err := conn.Err(); err != nil { + logrus.WithError(err).Fatal("Error connecting to redis") + } + conn.Close() + + mq := &RedisMQ{ + pool: pool, + ticker: time.NewTicker(time.Second), + prefix: url.Path, + } + mq.queueName = mq.k("queue") + logrus.WithFields(logrus.Fields{"name": mq.queueName}).Info("Redis initialized with queue name") + + mq.start() + return mq, nil +} + +func (mq *RedisMQ) k(s string) string { + return mq.prefix + s +} + +func getFirstKeyValue(resp map[string]string) (string, string, error) { + + for key, value := range resp { + return key, value, nil + } + return "", "", errors.New("Blank map") +} + +func (mq *RedisMQ) processPendingReservations(conn redis.Conn) { + resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES")) + if mq.checkNilResponse(err) || len(resp) == 0 { + return + } + if err != nil { + logrus.WithError(err).Error("Redis command error") + } + + reservationId, timeoutString, err := getFirstKeyValue(resp) + if err != nil { + logrus.WithError(err).Error("error getting kv") + return + } + + timeout, err := strconv.ParseInt(timeoutString, 10, 64) + if err != nil || timeout > time.Now().Unix() { + return + } + response, err := redis.Bytes(conn.Do("HGET", mq.k("timeout_jobs"), reservationId)) + if mq.checkNilResponse(err) { + return + } + if err != nil { + logrus.WithError(err).Error("redis get timeout_jobs error") + return + } + + var job models.Task + err = json.Unmarshal(response, &job) + if err != nil { + logrus.WithError(err).Error("error unmarshaling job json") + return + } + + conn.Do("ZREM", mq.k("timeouts"), reservationId) + conn.Do("HDEL", mq.k("timeout_jobs"), reservationId) + conn.Do("HDEL", mq.k("reservations"), job.ID) + redisPush(conn, mq.queueName, &job) +} + +func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) { + // List of reservation ids between -inf time and the current time will get us + // everything that is now ready to be queued. + now := time.Now().UTC().Unix() + resIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", mq.k("delays"), "-inf", now)) + if err != nil { + logrus.WithError(err).Error("Error getting delayed jobs") + return + } + + for _, resId := range resIds { + // Might be a good idea to do this transactionally so we do not have left over reservationIds if the delete fails. + buf, err := redis.Bytes(conn.Do("HGET", mq.k("delayed_jobs"), resId)) + // If: + // a) A HSET in Push() failed, or + // b) A previous zremrangebyscore failed, + // we can get ids that we never associated with a job, or already placed in the queue, just skip these. + if err == redis.ErrNil { + continue + } else if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Error HGET delayed_jobs") + continue + } + + var job models.Task + err = json.Unmarshal(buf, &job) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"buf": buf, "reservationId": resId}).Error("Error unmarshaling job") + return + } + + _, err = redisPush(conn, mq.queueName, &job) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Pushing delayed job") + return + } + conn.Do("HDEL", mq.k("delayed_jobs"), resId) + } + + // Remove everything we processed. + conn.Do("ZREMRANGEBYSCORE", mq.k("delays"), "-inf", now) +} + +func (mq *RedisMQ) start() { + go func() { + conn := mq.pool.Get() + defer conn.Close() + if err := conn.Err(); err != nil { + logrus.WithError(err).Fatal("Could not start redis MQ reservation system") + } + + for _ = range mq.ticker.C { + mq.processPendingReservations(conn) + mq.processDelayedTasks(conn) + } + }() +} + +func redisPush(conn redis.Conn, queue string, job *models.Task) (*models.Task, error) { + buf, err := json.Marshal(job) + if err != nil { + return nil, err + } + _, err = conn.Do("LPUSH", fmt.Sprintf("%s%d", queue, *job.Priority), buf) + if err != nil { + return nil, err + } + return job, nil +} + +func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, error) { + buf, err := json.Marshal(job) + if err != nil { + return nil, err + } + + resp, err := redis.Int64(conn.Do("INCR", mq.k("delays_counter"))) + if err != nil { + return nil, err + } + + reservationId := strconv.FormatInt(resp, 10) + + // Timestamp -> resID + _, err = conn.Do("ZADD", mq.k("delays"), time.Now().UTC().Add(time.Duration(job.Delay)*time.Second).Unix(), reservationId) + if err != nil { + return nil, err + } + + // resID -> Task + _, err = conn.Do("HSET", mq.k("delayed_jobs"), reservationId, buf) + if err != nil { + return nil, err + } + + return job, nil +} + +func (mq *RedisMQ) Push(job *models.Task) (*models.Task, error) { + conn := mq.pool.Get() + defer conn.Close() + + if job.Delay > 0 { + return mq.delayTask(conn, job) + } + return redisPush(conn, mq.queueName, job) +} +func (mq *RedisMQ) checkNilResponse(err error) bool { + return err != nil && err.Error() == redis.ErrNil.Error() +} + +// Would be nice to switch to this model http://redis.io/commands/rpoplpush#pattern-reliable-queue +func (mq *RedisMQ) Reserve() (*models.Task, error) { + + conn := mq.pool.Get() + defer conn.Close() + var job models.Task + var resp []byte + var err error + for i := 2; i >= 0; i-- { + resp, err = redis.Bytes(conn.Do("RPOP", fmt.Sprintf("%s%d", mq.queueName, i))) + if mq.checkNilResponse(err) { + if i == 0 { + // Out of queues! + return nil, nil + } + + // No valid job on this queue, try lower priority queue. + continue + } else if err != nil { + // Some other error! + return nil, err + } + + // We got a valid high priority job. + break + } + + if err != nil { + return nil, err + } + err = json.Unmarshal(resp, &job) + if err != nil { + return nil, err + } + + response, err := redis.Int64(conn.Do("INCR", mq.queueName+"_incr")) + if err != nil { + return nil, err + } + reservationId := strconv.FormatInt(response, 10) + _, err = conn.Do("ZADD", "timeout:", time.Now().Add(time.Minute).Unix(), reservationId) + if err != nil { + return nil, err + } + _, err = conn.Do("HSET", "timeout", reservationId, resp) + if err != nil { + return nil, err + } + + // Map from job.ID -> reservation ID + _, err = conn.Do("HSET", "reservations", job.ID, reservationId) + if err != nil { + return nil, err + } + + return &job, nil +} + +func (mq *RedisMQ) Delete(job *models.Task) error { + conn := mq.pool.Get() + defer conn.Close() + resId, err := conn.Do("HGET", "reservations", job.ID) + if err != nil { + return err + } + _, err = conn.Do("HDEL", "reservations", job.ID) + if err != nil { + return err + } + _, err = conn.Do("ZREM", "timeout:", resId) + if err != nil { + return err + } + _, err = conn.Do("HDEL", "timeout", resId) + return err +} diff --git a/api/server/helpers.go b/api/server/helpers.go index d1054f8c6..886f18abb 100644 --- a/api/server/helpers.go +++ b/api/server/helpers.go @@ -44,7 +44,9 @@ func testRouter() *gin.Engine { c.Set("ctx", ctx) c.Next() }) - bindHandlers(r) + bindHandlers(r, func(ctx *gin.Context) { + handleRequest(ctx, nil) + }) return r } diff --git a/api/server/routes_create.go b/api/server/routes_create.go index 1fbe36c78..c06d39e61 100644 --- a/api/server/routes_create.go +++ b/api/server/routes_create.go @@ -52,6 +52,7 @@ func handleRouteCreate(c *gin.Context) { c.JSON(http.StatusInternalServerError, simpleError(models.ErrAppsGet)) return } + if app == nil { newapp := &models.App{Name: wroute.Route.AppName} if err := newapp.Validate(); err != nil { diff --git a/api/server/runner.go b/api/server/runner.go index 4afa3c1c4..5a2053ece 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -17,6 +17,7 @@ import ( "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/runner" titancommon "github.com/iron-io/worker/common" + "github.com/iron-io/worker/runner/drivers" "github.com/satori/go.uuid" ) @@ -31,7 +32,7 @@ func handleSpecial(c *gin.Context) { } } -func handleRunner(c *gin.Context) { +func handleRequest(c *gin.Context, enqueue models.Enqueue) { if strings.HasPrefix(c.Request.URL.Path, "/v1") { c.Status(http.StatusNotFound) return @@ -151,10 +152,29 @@ func handleRunner(c *gin.Context) { Memory: el.Memory, } - if result, err := Api.Runner.Run(c, cfg); err != nil { - log.WithError(err).Error(models.ErrRunnerRunRoute) - c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) - } else { + // Request count metric + metricBaseName := "server.handleRequest." + appName + "." + runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1) + + metricStart := time.Now() + + var err error + var result drivers.RunResult + switch el.Type { + case "async": + // TODO: Create Task + priority := int32(0) + task := &models.Task{} + task.Image = &cfg.Image + task.ID = cfg.ID + task.GroupName = cfg.AppName + task.Priority = &priority + // TODO: Push to queue + enqueue(task) + default: + if result, err = Api.Runner.Run(c, cfg); err != nil { + break + } for k, v := range el.Headers { c.Header(k, v[0]) } @@ -166,6 +186,16 @@ func handleRunner(c *gin.Context) { c.AbortWithStatus(http.StatusInternalServerError) } } + + if err != nil { + log.WithError(err).Error(models.ErrRunnerRunRoute) + c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) + } + + // Execution time metric + metricElapsed := time.Since(metricStart) + runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed) + runner.LogMetricTime(ctx, "server.handleRunner.exec_time", metricElapsed) return } } diff --git a/api/server/server.go b/api/server/server.go index 4d84d84f7..371647955 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -21,14 +21,16 @@ type Server struct { Runner *runner.Runner Router *gin.Engine Datastore models.Datastore + MQ models.MessageQueue AppListeners []ifaces.AppListener SpecialHandlers []ifaces.SpecialHandler } -func New(ds models.Datastore, r *runner.Runner) *Server { +func New(ds models.Datastore, mq models.MessageQueue, r *runner.Runner) *Server { Api = &Server{ Router: gin.Default(), Datastore: ds, + MQ: mq, Runner: r, } return Api @@ -75,10 +77,17 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { } } // now call the normal runner call - handleRunner(ginC) + handleRequest(ginC, nil) return nil } +func (s *Server) handleRequest(ginC *gin.Context) { + enqueue := func(task *models.Task) (*models.Task, error) { + return s.MQ.Push(task) + } + handleRequest(ginC, enqueue) +} + func extractFields(c *gin.Context) logrus.Fields { fields := logrus.Fields{"action": path.Base(c.HandlerName())} for _, param := range c.Params { @@ -95,14 +104,14 @@ func (s *Server) Run(ctx context.Context) { c.Next() }) - bindHandlers(s.Router) + bindHandlers(s.Router, s.handleRequest) // By default it serves on :8080 unless a // PORT environment variable was defined. s.Router.Run() } -func bindHandlers(engine *gin.Engine) { +func bindHandlers(engine *gin.Engine, reqHandler func(ginC *gin.Context)) { engine.GET("/", handlePing) engine.GET("/version", handleVersion) @@ -127,7 +136,7 @@ func bindHandlers(engine *gin.Engine) { } } - engine.Any("/r/:app/*route", handleRunner) + engine.Any("/r/:app/*route", reqHandler) // This final route is used for extensions, see Server.Add engine.NoRoute(handleSpecial) diff --git a/glide.lock b/glide.lock index e966b7e07..712ac5b16 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 3681d7248a9e90a7540f709e4844bbac6ae98806f0bdeb2f2945616655f78ad6 -updated: 2016-09-12T12:38:41.400655672-03:00 +hash: 168e1ab65d5e2e781369df5dc87655f682df5c5d9448dee547c6acce78720ecc +updated: 2016-09-23T20:45:34.389837453+02:00 imports: - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -8,11 +8,13 @@ imports: - name: github.com/asaskevich/govalidator version: 593d64559f7600f29581a3ee42177f5dbded27a9 - name: github.com/boltdb/bolt - version: 583e8937c61f1af6513608ccc75c97b6abdf4ff9 + version: fff57c100f4dea1905678da7e90d92429dff2904 - name: github.com/cactus/go-statsd-client version: 1c27c506c7a0584d017ca479f91b88d6a6538332 subpackages: - statsd +- name: github.com/dgrijalva/jwt-go + version: 24c63f56522a87ec5339cc3567883f1039378fdb - name: github.com/docker/distribution version: 9ca7921603852314b18a6ecc19f91806935f34bd subpackages: @@ -25,35 +27,35 @@ imports: - pkg/archive - pkg/fileutils - pkg/homedir - - pkg/stdcopy + - pkg/ioutils - pkg/parsers - - pkg/ulimit - - volume - pkg/pools - pkg/promise - - pkg/system - - pkg/ioutils - - pkg/units - - pkg/tarsum - pkg/random + - pkg/stdcopy + - pkg/system + - pkg/tarsum + - pkg/ulimit + - pkg/units + - volume - name: github.com/docker/engine-api - version: 2f8c367944a28130f3c2fb9f0aad7f1d5db952a9 + version: 4290f40c056686fcaa5c9caf02eac1dde9315adf subpackages: - - types/swarm - types/mount + - types/swarm - name: github.com/docker/go-units - version: eb879ae3e2b84e2a142af415b679ddeda47ec71c + version: f2145db703495b2e525c59662db69a7344b00bb8 - name: github.com/docker/libtrust version: 9cbd2a1374f46905c68a4eb3694a130610adc62a - name: github.com/fsnotify/fsnotify version: f12c6236fe7b5cf6bcf30e5935d08cb079d78334 - name: github.com/fsouza/go-dockerclient - version: 0436d420da98515cfe6370c9c5cdde868415637b + version: 4efaf0ea3c8990e1648f68672d011289f0c0cb0a - name: github.com/garyburd/redigo version: 4ed1111375cbeb698249ffe48dd463e9b0a63a7a subpackages: - - redis - internal + - redis - name: github.com/gin-gonic/gin version: 4a6bc4aac4607e253bcda67c8c5bcda693d2388e subpackages: @@ -62,7 +64,7 @@ imports: - name: github.com/go-openapi/analysis version: b44dc874b601d9e4e2f6e19140e794ba24bead3b - name: github.com/go-openapi/errors - version: d24ebc2075bad502fac3a8ae27aa6dd58e1952dc + version: b6a74a9df33099a5a952e7d1dbfe93dfa6b7a076 - name: github.com/go-openapi/jsonpointer version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 - name: github.com/go-openapi/jsonreference @@ -74,28 +76,30 @@ imports: subpackages: - client - name: github.com/go-openapi/spec - version: 6aced65f8501fe1217321abf0749d354824ba2ff + version: 2433d2f0fc794728337e0c5d65716e79e163f04d - name: github.com/go-openapi/strfmt - version: d65c7fdb29eca313476e529628176fe17e58c488 + version: f827e8cf30538b3983710c9fb4dd76ea04e3c8e0 - name: github.com/go-openapi/swag version: 0e04f5e499b19bf51031c01a00f098f25067d8dc - name: github.com/go-openapi/validate - version: deaf2c9013bc1a7f4c774662259a506ba874d80f + version: e6da236c48ce621803fc5d3883d8739aad0ce318 - name: github.com/golang/protobuf version: 2402d76f3d41f928c7902a765dfc872356dd3aad subpackages: - proto +- name: github.com/google/btree + version: 7d79101e329e5a3adf994758c578dab82b90c017 - name: github.com/hashicorp/go-cleanhttp version: ad28ea4487f05916463e2423a55166280e8254b5 - name: github.com/hashicorp/hcl - version: baeb59c710717b06aac1dbe2270e8192ec593244 + version: 99df0eb941dd8ddbc83d3f3605a34f6a686ac85e subpackages: - hcl/ast - hcl/parser - - hcl/token - - json/parser - hcl/scanner - hcl/strconv + - hcl/token + - json/parser - json/scanner - json/token - name: github.com/heroku/docker-registry-client @@ -103,54 +107,54 @@ imports: subpackages: - registry - name: github.com/iron-io/worker - version: 15dcee7470fed6dcd956b5565e3dc1404cb1b96b + version: 1487ce7896770b3d052e0909f564441fe59ee1df repo: git@github.com:iron-io/worker.git vcs: git subpackages: - common + - common/stats - runner/agent - runner/drivers - runner/drivers/docker - runner/drivers/mock - runner/tasker - - common/stats - runner/tasker/client/models - runner/tasker/client/titan - - runner/tasker/client/titan/jobs - runner/tasker/client/titan/groups - runner/tasker/client/titan/runner + - runner/tasker/client/titan/tasks - name: github.com/kr/fs version: 2788f0dbd16903de03cb8186e5c7d97b69ad387b - name: github.com/lib/pq - version: 80f8150043c80fb52dee6bc863a709cdac7ec8f8 + version: 50761b0867bd1d9d069276790bcd4a3bccf2324a subpackages: - oid - name: github.com/magiconair/properties - version: 61b492c03cf472e0c6419be5899b8e0dc28b1b88 + version: 0723e352fa358f9322c938cc2dadda874e9151a9 - name: github.com/mailru/easyjson - version: 34560e358dc05e2c28f6fda2f5c9e7494a4b9b19 + version: e978125a7e335d8f4db746a9ac5b44643f27416b subpackages: + - buffer - jlexer - jwriter - - buffer - name: github.com/manucorporat/sse version: ee05b128a739a0fb76c7ebd3ae4810c1de808d6d - name: github.com/mitchellh/mapstructure version: ca63d7c062ee3c9f34db231e352b60012b4fd0c1 - name: github.com/opencontainers/runc - version: 46d9535096662d8d6a734e4fdfc1c27ab03bc328 + version: b1e602e8ba592a663a4eb63ed9c18d6b2b475fec subpackages: - libcontainer/user - name: github.com/pelletier/go-buffruneio version: df1e16fde7fc330a0ca68167c23bf7ed6ac31d6d - name: github.com/pelletier/go-toml - version: 5a62685873ef617233ab5f1b825a6e4a758e16cf + version: 31055c2ff0bb0c7f9095aec0d220aed21108121e - name: github.com/pivotal-golang/bytefmt version: 24c06ce13e176cf7a7a440c8cf3b64d91c36c8e2 - name: github.com/pkg/errors version: 17b591df37844cde689f4d5813e5cea0927d8dd2 - name: github.com/pkg/sftp - version: a71e8f580e3b622ebff585309160b1cc549ef4d2 + version: 8197a2e580736b78d704be0fc47b2324c0591a32 - name: github.com/PuerkitoBio/purell version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 - name: github.com/PuerkitoBio/urlesc @@ -171,41 +175,48 @@ imports: - name: github.com/spf13/jwalterweatherman version: 33c24e77fb80341fe7130ee7c594256ff08ccc46 - name: github.com/spf13/pflag - version: 103ce5cd2042f2fe629c1957abb64ab3e7f50235 + version: c7e63cf4530bcd3ba943729cee0efeff2ebea63f - name: github.com/spf13/viper - version: 7fb2782df3d83e0036cc89f461ed0422628776f4 + version: 16990631d4aa7e38f73dbbbf37fa13e67c648531 - name: golang.org/x/crypto version: c10c31b5e94b6f7a0283272dc2bb27163dcea24b subpackages: + - bcrypt + - blowfish - ssh - - curve25519 - - ed25519 - name: golang.org/x/net version: f315505cf3349909cdf013ea56690da34e96a451 subpackages: - context - context/ctxhttp - - proxy - idna + - proxy - name: golang.org/x/sys - version: a646d33e2ee3172a661fc09bca23bb4889a41bc8 + version: 30de6d19a3bd89a5f38ae4028e23aaa5582648af subpackages: - unix - name: golang.org/x/text - version: d69c40b4be55797923cec7457fac7a244d91a9b6 + version: 04b8648d973c126ae60143b3e1473bc1576c7597 subpackages: - - transform - - unicode/norm - - secure/precis - cases + - internal/tag + - language - runes - secure/bidirule - - width - - language + - secure/precis + - transform - unicode/bidi - - internal/tag + - unicode/norm + - width - name: gopkg.in/go-playground/validator.v8 version: c193cecd124b5cc722d7ee5538e945bdb3348435 +- name: gopkg.in/mgo.v2 + version: 3f83fa5005286a7fe593b055f0d7771a7dce4655 + subpackages: + - bson + - internal/json + - internal/sasl + - internal/scram - name: gopkg.in/yaml.v2 - version: e4d366fc3c7938e2958e662b4258c7a89e1f0e3e + version: 31c299268d302dd0aa9a0dcf765a3d58971ac83f testImports: [] diff --git a/glide.yaml b/glide.yaml index 9f83580c9..e106cabc0 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,8 @@ import: - package: github.com/go-openapi/strfmt - package: github.com/go-openapi/validate - package: github.com/iron-io/worker + version: master repo: git@github.com:iron-io/worker.git vcs: git - version: master - package: github.com/lib/pq +- package: github.com/google/btree diff --git a/hello-async.sh b/hello-async.sh new file mode 100755 index 000000000..0c02d868b --- /dev/null +++ b/hello-async.sh @@ -0,0 +1,16 @@ +curl -H "Content-Type: application/json" -X POST -d '{ + "app": { "name":"myapp" } +}' http://localhost:8080/v1/apps + +curl -H "Content-Type: application/json" -X POST -d '{ + "route": { + "type": "async", + "path":"/hello-async", + "image":"iron/hello" + } +}' http://localhost:8080/v1/apps/myapp/routes + +curl -H "Content-Type: application/json" -X POST -d '{ + "name":"Johnny" +}' http://localhost:8080/r/myapp/hello-async + diff --git a/hello-sync.sh b/hello-sync.sh new file mode 100755 index 000000000..fc54de881 --- /dev/null +++ b/hello-sync.sh @@ -0,0 +1,16 @@ +curl -H "Content-Type: application/json" -X POST -d '{ + "app": { "name":"myapp" } +}' http://localhost:8080/v1/apps + +curl -H "Content-Type: application/json" -X POST -d '{ + "route": { + "type": "sync", + "path":"/hello-sync", + "image":"iron/hello" + } +}' http://localhost:8080/v1/apps/myapp/routes + +curl -H "Content-Type: application/json" -X POST -d '{ + "name":"Johnny" +}' http://localhost:8080/r/myapp/hello-sync + diff --git a/main.go b/main.go index f396ff858..735bda466 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/iron-io/functions/api/config" "github.com/iron-io/functions/api/datastore" + "github.com/iron-io/functions/api/mqs" "github.com/iron-io/functions/api/runner" "github.com/iron-io/functions/api/server" "github.com/spf13/viper" @@ -20,12 +21,18 @@ func main() { log.WithError(err).Fatalln("Invalid DB url.") } + mq, err := mqs.New(viper.GetString("mq")) + if err != nil { + log.WithError(err).Fatal("Error on init MQ") + } + metricLogger := runner.NewMetricLogger() runner, err := runner.New(metricLogger) + if err != nil { log.WithError(err).Fatalln("Failed to create a runner") } - srv := server.New(ds, runner) + srv := server.New(ds, mq, runner) srv.Run(ctx) }