Merge pull request #118 from iron-io/issue-114

Add context do models.MessageQueue interface
This commit is contained in:
Seif Lotfy سيف لطفي
2016-10-05 01:05:34 +02:00
committed by GitHub
7 changed files with 68 additions and 23 deletions

View File

@@ -13,6 +13,8 @@ import (
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/worker/common"
"golang.org/x/net/context"
)
type BoltDbMQ struct {
@@ -202,7 +204,10 @@ func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) {
return job, err
}
func (mq *BoltDbMQ) Push(job *models.Task) (*models.Task, error) {
func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Println("Pushed to MQ")
if job.Delay > 0 {
return mq.delayTask(job)
}
@@ -259,7 +264,7 @@ func resKeyToProperties(key []byte) (uint64, []byte) {
return reservedUntil, key[len(resKeyPrefix)+8:]
}
func (mq *BoltDbMQ) Reserve() (*models.Task, error) {
func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) {
// Start a writable transaction.
tx, err := mq.db.Begin(true)
if err != nil {
@@ -309,13 +314,20 @@ func (mq *BoltDbMQ) Reserve() (*models.Task, error) {
if err := tx.Commit(); err != nil {
return nil, err
}
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Println("Reserved")
return &job, nil
}
return nil, nil
}
func (mq *BoltDbMQ) Delete(job *models.Task) error {
func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
defer log.Println("Deleted")
return mq.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(timeoutName(int(*job.Priority)))
k := jobKey(job.ID)

View File

@@ -9,6 +9,8 @@ import (
"github.com/Sirupsen/logrus"
"github.com/google/btree"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/worker/common"
"golang.org/x/net/context"
)
type MemoryMQ struct {
@@ -109,7 +111,9 @@ func (ji *TaskItem) Less(than btree.Item) bool {
return ji.StartAt.Before(ji2.StartAt)
}
func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) {
func (mq *MemoryMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Println("Pushed to MQ")
// It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation.
// job.MsgId = randSeq(20)
@@ -123,7 +127,7 @@ func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) {
replaced := mq.BTree.ReplaceOrInsert(ji)
mq.Mutex.Unlock()
if replaced != nil {
logrus.Warn("Ooops! an item was replaced and therefore lost, not good.")
log.Warn("Ooops! an item was replaced and therefore lost, not good.")
}
return job, nil
}
@@ -163,16 +167,20 @@ func pickEarliestNonblocking(channels ...chan *models.Task) *models.Task {
}
}
func (mq *MemoryMQ) Reserve() (*models.Task, error) {
func (mq *MemoryMQ) Reserve(ctx context.Context) (*models.Task, error) {
job := pickEarliestNonblocking(mq.PriorityQueues[2], mq.PriorityQueues[1], mq.PriorityQueues[0])
if job == nil {
return nil, nil
}
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Println("Reserved")
return job, mq.pushTimeout(job)
}
func (mq *MemoryMQ) Delete(job *models.Task) error {
func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
mq.Mutex.Lock()
defer mq.Mutex.Unlock()
_, exists := mq.Timeouts[job.ID]
@@ -181,5 +189,6 @@ func (mq *MemoryMQ) Delete(job *models.Task) error {
}
delete(mq.Timeouts, job.ID)
log.Println("Deleted")
return nil
}

View File

@@ -1,6 +1,9 @@
package mqs
import "github.com/iron-io/functions/api/models"
import (
"github.com/iron-io/functions/api/models"
"golang.org/x/net/context"
)
type Mock struct {
FakeApp *models.App
@@ -9,14 +12,14 @@ type Mock struct {
FakeRoutes []*models.Route
}
func (mock *Mock) Push(*models.Task) (*models.Task, error) {
func (mock *Mock) Push(context.Context, *models.Task) (*models.Task, error) {
return nil, nil
}
func (mock *Mock) Reserve() (*models.Task, error) {
func (mock *Mock) Reserve(context.Context) (*models.Task, error) {
return nil, nil
}
func (mock *Mock) Delete(*models.Task) error {
func (mock *Mock) Delete(context.Context, *models.Task) error {
return nil
}

View File

@@ -11,6 +11,8 @@ import (
"github.com/Sirupsen/logrus"
"github.com/garyburd/redigo/redis"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/worker/common"
"golang.org/x/net/context"
)
type RedisMQ struct {
@@ -207,7 +209,10 @@ func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, e
return job, nil
}
func (mq *RedisMQ) Push(job *models.Task) (*models.Task, error) {
func (mq *RedisMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
defer log.Println("Pushed to MQ")
conn := mq.pool.Get()
defer conn.Close()
@@ -221,7 +226,7 @@ func (mq *RedisMQ) checkNilResponse(err error) bool {
}
// Would be nice to switch to this model http://redis.io/commands/rpoplpush#pattern-reliable-queue
func (mq *RedisMQ) Reserve() (*models.Task, error) {
func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Task, error) {
conn := mq.pool.Get()
defer conn.Close()
@@ -275,10 +280,16 @@ func (mq *RedisMQ) Reserve() (*models.Task, error) {
return nil, err
}
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Println("Reserved")
return &job, nil
}
func (mq *RedisMQ) Delete(job *models.Task) error {
func (mq *RedisMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
defer log.Println("Deleted")
conn := mq.pool.Get()
defer conn.Close()
resId, err := conn.Do("HGET", "reservations", job.ID)