diff --git a/api/mqs/bolt.go b/api/mqs/bolt.go index 92f9bc830..c7ed4d715 100644 --- a/api/mqs/bolt.go +++ b/api/mqs/bolt.go @@ -205,7 +205,7 @@ func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) { func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("push") + log.Println("Pushed to MQ") if job.Delay > 0 { return mq.delayTask(job) @@ -315,7 +315,7 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("reserved") + log.Println("Reserved") return &job, nil } @@ -325,8 +325,8 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) { func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Task) error { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("delete") - defer log.Println("deleted") + defer log.Println("Deleted") + return mq.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(timeoutName(int(*job.Priority))) k := jobKey(job.ID) diff --git a/api/mqs/memory.go b/api/mqs/memory.go index ff7bb1f55..fd0df3083 100644 --- a/api/mqs/memory.go +++ b/api/mqs/memory.go @@ -113,6 +113,7 @@ func (ji *TaskItem) Less(than btree.Item) bool { func (mq *MemoryMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) + log.Println("Pushed to MQ") // It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation. // job.MsgId = randSeq(20) @@ -173,7 +174,7 @@ func (mq *MemoryMQ) Reserve(ctx context.Context) (*models.Task, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("reserved") + log.Println("Reserved") return job, mq.pushTimeout(job) } @@ -187,8 +188,7 @@ func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Task) error { return errors.New("Not reserved") } - log.Println("delete") delete(mq.Timeouts, job.ID) - log.Println("deleted") + log.Println("Deleted") return nil } diff --git a/api/mqs/redis.go b/api/mqs/redis.go index 6d358e5d7..949934076 100644 --- a/api/mqs/redis.go +++ b/api/mqs/redis.go @@ -211,7 +211,8 @@ func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, e func (mq *RedisMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - defer log.Println("pushed") + defer log.Println("Pushed to MQ") + conn := mq.pool.Get() defer conn.Close() @@ -280,14 +281,14 @@ func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Task, error) { } _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - log.Println("reserved") + log.Println("Reserved") return &job, nil } func (mq *RedisMQ) Delete(ctx context.Context, job *models.Task) error { _, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID}) - defer log.Println("deleted") + defer log.Println("Deleted") conn := mq.pool.Get() defer conn.Close()