diff --git a/api/models/mq.go b/api/models/mq.go index 76d24a2e1..30ad389ca 100644 --- a/api/models/mq.go +++ b/api/models/mq.go @@ -1,5 +1,9 @@ package models +import ( + "golang.org/x/net/context" +) + // Titan uses a Message Queue to impose a total ordering on jobs that it will // execute in order. Tasks are added to the queue via the Push() interface. The // MQ must support a reserve-delete 2 step dequeue to allow Titan to implement @@ -35,18 +39,18 @@ type MessageQueue interface { // delays. That is, if jobs {A, C} are queued at t seconds, both with Delay // = 5 seconds, and the same priority, then they may be available on the // queue as [C, A] or [A, C]. - Push(*Task) (*Task, error) + Push(context.Context, *Task) (*Task, error) // Remove a job from the front of the queue, reserve it for a timeout and // return it. MQ implementations MUST NOT lose jobs in case of errors. That // is, in case of reservation failure, it should be possible to retrieve the // job on a future reservation. - Reserve() (*Task, error) + Reserve(context.Context) (*Task, error) // If a reservation is pending, consider it acknowledged and delete it. If // the job does not have an outstanding reservation, error. If a job did not // exist, succeed. - Delete(*Task) error + Delete(context.Context, *Task) error } type Enqueue func(*Task) (*Task, error) diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 7568f6496..92f9bc830 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -13,6 +13,8 @@ import ( "github.com/Sirupsen/logrus" "github.com/boltdb/bolt" "github.com/iron-io/functions/api/models" + "github.com/iron-io/worker/common" + "golang.org/x/net/context" ) type BoltDbMQ struct { @@ -201,7 +203,10 @@ func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) { return job, err } -func (mq *BoltDbMQ) Push(job *models.Task) (*models.Task, error) { +func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { + ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("push") + if job.Delay > 0 { return mq.delayTask(job) } @@ -258,7 +263,7 @@ func resKeyToProperties(key []byte) (uint64, []byte) { return reservedUntil, key[len(resKeyPrefix)+8:] } -func (mq *BoltDbMQ) Reserve() (*models.Task, error) { +func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) { // Start a writable transaction. tx, err := mq.db.Begin(true) if err != nil { @@ -308,13 +313,20 @@ func (mq *BoltDbMQ) Reserve() (*models.Task, error) { if err := tx.Commit(); err != nil { return nil, err } + + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("reserved") + return &job, nil } return nil, nil } -func (mq *BoltDbMQ) Delete(job *models.Task) error { +func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Task) error { + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("delete") + defer log.Println("deleted") return mq.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(timeoutName(int(*job.Priority))) k := jobKey(job.ID) diff --git a/api/mqs/memory.go b/api/mqs/memory.go index 5f8265322..ff7bb1f55 100644 --- a/api/mqs/memory.go +++ b/api/mqs/memory.go @@ -9,6 +9,8 @@ import ( "github.com/Sirupsen/logrus" "github.com/google/btree" "github.com/iron-io/functions/api/models" + "github.com/iron-io/worker/common" + "golang.org/x/net/context" ) type MemoryMQ struct { @@ -109,7 +111,8 @@ func (ji *TaskItem) Less(than btree.Item) bool { return ji.StartAt.Before(ji2.StartAt) } -func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) { +func (mq *MemoryMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) // It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation. // job.MsgId = randSeq(20) @@ -123,7 +126,7 @@ func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) { replaced := mq.BTree.ReplaceOrInsert(ji) mq.Mutex.Unlock() if replaced != nil { - logrus.Warn("Ooops! an item was replaced and therefore lost, not good.") + log.Warn("Ooops! an item was replaced and therefore lost, not good.") } return job, nil } @@ -163,16 +166,20 @@ func pickEarliestNonblocking(channels ...chan *models.Task) *models.Task { } } -func (mq *MemoryMQ) Reserve() (*models.Task, error) { +func (mq *MemoryMQ) Reserve(ctx context.Context) (*models.Task, error) { job := pickEarliestNonblocking(mq.PriorityQueues[2], mq.PriorityQueues[1], mq.PriorityQueues[0]) if job == nil { return nil, nil } + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("reserved") return job, mq.pushTimeout(job) } -func (mq *MemoryMQ) Delete(job *models.Task) error { +func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Task) error { + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + mq.Mutex.Lock() defer mq.Mutex.Unlock() _, exists := mq.Timeouts[job.ID] @@ -180,6 +187,8 @@ func (mq *MemoryMQ) Delete(job *models.Task) error { return errors.New("Not reserved") } + log.Println("delete") delete(mq.Timeouts, job.ID) + log.Println("deleted") return nil } diff --git a/api/mqs/mock.go b/api/mqs/mock.go index 7bcd56259..810c26aac 100644 --- a/api/mqs/mock.go +++ b/api/mqs/mock.go @@ -1,6 +1,9 @@ package mqs -import "github.com/iron-io/functions/api/models" +import ( + "github.com/iron-io/functions/api/models" + "golang.org/x/net/context" +) type Mock struct { FakeApp *models.App @@ -9,14 +12,14 @@ type Mock struct { FakeRoutes []*models.Route } -func (mock *Mock) Push(*models.Task) (*models.Task, error) { +func (mock *Mock) Push(context.Context, *models.Task) (*models.Task, error) { return nil, nil } -func (mock *Mock) Reserve() (*models.Task, error) { +func (mock *Mock) Reserve(context.Context) (*models.Task, error) { return nil, nil } -func (mock *Mock) Delete(*models.Task) error { +func (mock *Mock) Delete(context.Context, *models.Task) error { return nil } diff --git a/api/mqs/redis.go b/api/mqs/redis.go index cad4472bd..6d358e5d7 100644 --- a/api/mqs/redis.go +++ b/api/mqs/redis.go @@ -11,6 +11,8 @@ import ( "github.com/Sirupsen/logrus" "github.com/garyburd/redigo/redis" "github.com/iron-io/functions/api/models" + "github.com/iron-io/worker/common" + "golang.org/x/net/context" ) type RedisMQ struct { @@ -207,7 +209,9 @@ func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, e return job, nil } -func (mq *RedisMQ) Push(job *models.Task) (*models.Task, error) { +func (mq *RedisMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + defer log.Println("pushed") conn := mq.pool.Get() defer conn.Close() @@ -221,7 +225,7 @@ func (mq *RedisMQ) checkNilResponse(err error) bool { } // Would be nice to switch to this model http://redis.io/commands/rpoplpush#pattern-reliable-queue -func (mq *RedisMQ) Reserve() (*models.Task, error) { +func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Task, error) { conn := mq.pool.Get() defer conn.Close() @@ -275,10 +279,16 @@ func (mq *RedisMQ) Reserve() (*models.Task, error) { return nil, err } + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("reserved") + return &job, nil } -func (mq *RedisMQ) Delete(job *models.Task) error { +func (mq *RedisMQ) Delete(ctx context.Context, job *models.Task) error { + _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + defer log.Println("deleted") + conn := mq.pool.Get() defer conn.Close() resId, err := conn.Do("HGET", "reservations", job.ID) diff --git a/api/server/server.go b/api/server/server.go index 8c3e45742..9b02a146e 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/iron-io/functions/api/ifaces" "github.com/iron-io/functions/api/models" "github.com/iron-io/functions/api/runner" + "github.com/iron-io/worker/common" titancommon "github.com/iron-io/worker/common" ) @@ -87,15 +88,17 @@ func (s *Server) UseSpecialHandlers(ginC *gin.Context) error { func (s *Server) handleRunnerRequest(c *gin.Context) { enqueue := func(task *models.Task) (*models.Task, error) { c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID}) - return s.MQ.Push(task) + ctx, _ := common.LoggerWithFields(c, logrus.Fields{"call_id": task.ID}) + return s.MQ.Push(ctx, task) } handleRequest(c, enqueue) } func (s *Server) handleTaskRequest(c *gin.Context) { + ctx, _ := common.LoggerWithFields(c, nil) switch c.Request.Method { case "GET": - task, err := s.MQ.Reserve() + task, err := s.MQ.Reserve(ctx) if err != nil { logrus.WithError(err) c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList)) @@ -116,7 +119,7 @@ func (s *Server) handleTaskRequest(c *gin.Context) { return } - if err := s.MQ.Delete(&task); err != nil { + if err := s.MQ.Delete(ctx, &task); err != nil { logrus.WithError(err) c.JSON(http.StatusInternalServerError, err) return