mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Initial work on async functions
This commit is contained in:
336
api/mqs/bolt.go
Normal file
336
api/mqs/bolt.go
Normal file
@@ -0,0 +1,336 @@
|
||||
package mqs
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
)
|
||||
|
||||
type BoltDbMQ struct {
|
||||
db *bolt.DB
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
type BoltDbConfig struct {
|
||||
FileName string `mapstructure:"filename"`
|
||||
}
|
||||
|
||||
func jobKey(jobID string) []byte {
|
||||
b := make([]byte, len(jobID)+1)
|
||||
b[0] = 'j'
|
||||
copy(b[1:], []byte(jobID))
|
||||
return b
|
||||
}
|
||||
|
||||
const timeoutToIDKeyPrefix = "id:"
|
||||
|
||||
func timeoutToIDKey(timeout []byte) []byte {
|
||||
b := make([]byte, len(timeout)+len(timeoutToIDKeyPrefix))
|
||||
copy(b[:], []byte(timeoutToIDKeyPrefix))
|
||||
copy(b[len(timeoutToIDKeyPrefix):], []byte(timeout))
|
||||
return b
|
||||
}
|
||||
|
||||
var delayQueueName = []byte("titan_delay")
|
||||
|
||||
func queueName(i int) []byte {
|
||||
return []byte(fmt.Sprintf("titan_%d_queue", i))
|
||||
}
|
||||
|
||||
func timeoutName(i int) []byte {
|
||||
return []byte(fmt.Sprintf("titan_%d_timeout", i))
|
||||
}
|
||||
|
||||
func NewBoltMQ(url *url.URL) (*BoltDbMQ, error) {
|
||||
dir := filepath.Dir(url.Path)
|
||||
log := logrus.WithFields(logrus.Fields{"mq": url.Scheme, "dir": dir})
|
||||
err := os.MkdirAll(dir, 0777)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Could not create data directory for mq")
|
||||
return nil, err
|
||||
}
|
||||
db, err := bolt.Open(url.Path, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := tx.CreateBucketIfNotExists(queueName(i))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error creating bucket")
|
||||
return err
|
||||
}
|
||||
_, err = tx.CreateBucketIfNotExists(timeoutName(i))
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error creating timeout bucket")
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = tx.CreateBucketIfNotExists(delayQueueName)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error creating delay bucket")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error creating timeout bucket")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
mq := &BoltDbMQ{
|
||||
ticker: ticker,
|
||||
db: db,
|
||||
}
|
||||
mq.Start()
|
||||
log.WithFields(logrus.Fields{"file": url.Path}).Info("BoltDb initialized")
|
||||
return mq, nil
|
||||
}
|
||||
|
||||
func (mq *BoltDbMQ) Start() {
|
||||
go func() {
|
||||
// It would be nice to switch to a tick-less, next-event Timer based model.
|
||||
for _ = range mq.ticker.C {
|
||||
err := mq.db.Update(func(tx *bolt.Tx) error {
|
||||
now := uint64(time.Now().UnixNano())
|
||||
for i := 0; i < 3; i++ {
|
||||
// Assume our timeouts bucket exists and has resKey encoded keys.
|
||||
jobBucket := tx.Bucket(queueName(i))
|
||||
timeoutBucket := tx.Bucket(timeoutName(i))
|
||||
c := timeoutBucket.Cursor()
|
||||
|
||||
var err error
|
||||
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
|
||||
reserved, id := resKeyToProperties(k)
|
||||
if reserved > now {
|
||||
break
|
||||
}
|
||||
err = jobBucket.Put(id, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
timeoutBucket.Delete(k)
|
||||
timeoutBucket.Delete(timeoutToIDKey(k))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("boltdb reservation check error")
|
||||
}
|
||||
|
||||
err = mq.db.Update(func(tx *bolt.Tx) error {
|
||||
now := uint64(time.Now().UnixNano())
|
||||
// Assume our timeouts bucket exists and has resKey encoded keys.
|
||||
delayBucket := tx.Bucket(delayQueueName)
|
||||
c := delayBucket.Cursor()
|
||||
|
||||
var err error
|
||||
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
|
||||
reserved, id := resKeyToProperties(k)
|
||||
if reserved > now {
|
||||
break
|
||||
}
|
||||
|
||||
priority := binary.BigEndian.Uint32(v)
|
||||
job := delayBucket.Get(id)
|
||||
if job == nil {
|
||||
// oops
|
||||
logrus.Warnf("Expected delayed job, none found with id %s", id)
|
||||
continue
|
||||
}
|
||||
|
||||
jobBucket := tx.Bucket(queueName(int(priority)))
|
||||
err = jobBucket.Put(id, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := delayBucket.Delete(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return delayBucket.Delete(id)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("boltdb delay check error")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// We insert a "reservation" at readyAt, and store the json blob at the msg
|
||||
// key. The timer loop plucks this out and puts it in the jobs bucket when the
|
||||
// time elapses. The value stored at the reservation key is the priority.
|
||||
func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) {
|
||||
readyAt := time.Now().Add(time.Duration(job.Delay) * time.Second)
|
||||
err := mq.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(delayQueueName)
|
||||
id, _ := b.NextSequence()
|
||||
buf, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := msgKey(id)
|
||||
err = b.Put(key, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pb := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(pb[:], uint32(*job.Priority))
|
||||
reservation := resKey(key, readyAt)
|
||||
return b.Put(reservation, pb)
|
||||
})
|
||||
return job, err
|
||||
}
|
||||
|
||||
func (mq *BoltDbMQ) Push(job *models.Task) (*models.Task, error) {
|
||||
if job.Delay > 0 {
|
||||
return mq.delayTask(job)
|
||||
}
|
||||
|
||||
err := mq.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(queueName(int(*job.Priority)))
|
||||
|
||||
id, _ := b.NextSequence()
|
||||
|
||||
buf, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.Put(msgKey(id), buf)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
|
||||
}
|
||||
|
||||
const msgKeyPrefix = "j:"
|
||||
const msgKeyLength = len(msgKeyPrefix) + 8
|
||||
const resKeyPrefix = "r:"
|
||||
|
||||
// r:<timestamp>:msgKey
|
||||
// The msgKey is used to introduce uniqueness within the timestamp. It probably isn't required.
|
||||
const resKeyLength = len(resKeyPrefix) + msgKeyLength + 8
|
||||
|
||||
func msgKey(v uint64) []byte {
|
||||
b := make([]byte, msgKeyLength)
|
||||
copy(b[:], []byte(msgKeyPrefix))
|
||||
binary.BigEndian.PutUint64(b[len(msgKeyPrefix):], v)
|
||||
return b
|
||||
}
|
||||
|
||||
func resKey(jobKey []byte, reservedUntil time.Time) []byte {
|
||||
b := make([]byte, resKeyLength)
|
||||
copy(b[:], []byte(resKeyPrefix))
|
||||
binary.BigEndian.PutUint64(b[len(resKeyPrefix):], uint64(reservedUntil.UnixNano()))
|
||||
copy(b[len(resKeyPrefix)+8:], jobKey)
|
||||
return b
|
||||
}
|
||||
|
||||
func resKeyToProperties(key []byte) (uint64, []byte) {
|
||||
if len(key) != resKeyLength {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
reservedUntil := binary.BigEndian.Uint64(key[len(resKeyPrefix):])
|
||||
return reservedUntil, key[len(resKeyPrefix)+8:]
|
||||
}
|
||||
|
||||
func (mq *BoltDbMQ) Reserve() (*models.Task, error) {
|
||||
// Start a writable transaction.
|
||||
tx, err := mq.db.Begin(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
for i := 2; i >= 0; i-- {
|
||||
// Use the transaction...
|
||||
b := tx.Bucket(queueName(i))
|
||||
c := b.Cursor()
|
||||
key, value := c.Seek([]byte(msgKeyPrefix))
|
||||
if key == nil {
|
||||
// No jobs, try next bucket
|
||||
continue
|
||||
}
|
||||
|
||||
b.Delete(key)
|
||||
|
||||
var job models.Task
|
||||
err = json.Unmarshal([]byte(value), &job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reservationKey := resKey(key, time.Now().Add(time.Minute))
|
||||
b = tx.Bucket(timeoutName(i))
|
||||
// Reserve introduces 3 keys in timeout bucket:
|
||||
// Save reservationKey -> Task to allow release
|
||||
// Save job.ID -> reservationKey to allow Deletes
|
||||
// Save reservationKey -> job.ID to allow clearing job.ID -> reservationKey in recovery without unmarshaling the job.
|
||||
// On Delete:
|
||||
// We have job ID, we get the reservationKey
|
||||
// Delete job.ID -> reservationKey
|
||||
// Delete reservationKey -> job.ID
|
||||
// Delete reservationKey -> Task
|
||||
// On Release:
|
||||
// We have reservationKey, we get the jobID
|
||||
// Delete reservationKey -> job.ID
|
||||
// Delete job.ID -> reservationKey
|
||||
// Move reservationKey -> Task to job bucket.
|
||||
b.Put(reservationKey, value)
|
||||
b.Put(jobKey(job.ID), reservationKey)
|
||||
b.Put(timeoutToIDKey(reservationKey), []byte(job.ID))
|
||||
|
||||
// Commit the transaction and check for error.
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &job, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (mq *BoltDbMQ) Delete(job *models.Task) error {
|
||||
return mq.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(timeoutName(int(*job.Priority)))
|
||||
k := jobKey(job.ID)
|
||||
|
||||
reservationKey := b.Get(k)
|
||||
if reservationKey == nil {
|
||||
return errors.New("Not found")
|
||||
}
|
||||
|
||||
for _, k := range [][]byte{k, timeoutToIDKey(reservationKey), reservationKey} {
|
||||
err := b.Delete(k)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
184
api/mqs/memory.go
Normal file
184
api/mqs/memory.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package mqs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/google/btree"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
)
|
||||
|
||||
type MemoryMQ struct {
|
||||
// WorkQueue A buffered channel that we can send work requests on.
|
||||
PriorityQueues []chan *models.Task
|
||||
Ticker *time.Ticker
|
||||
BTree *btree.BTree
|
||||
Timeouts map[string]*TaskItem
|
||||
// Protects B-tree and Timeouts
|
||||
// If this becomes a bottleneck, consider separating the two mutexes. The
|
||||
// goroutine to clear up timed out messages could also become a bottleneck at
|
||||
// some point. May need to switch to bucketing of some sort.
|
||||
Mutex sync.Mutex
|
||||
}
|
||||
|
||||
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
|
||||
func randSeq(n int) string {
|
||||
rand.Seed(time.Now().Unix())
|
||||
b := make([]rune, n)
|
||||
for i := range b {
|
||||
b[i] = letters[rand.Intn(len(letters))]
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
const NumPriorities = 3
|
||||
|
||||
func NewMemoryMQ() *MemoryMQ {
|
||||
var queues []chan *models.Task
|
||||
for i := 0; i < NumPriorities; i++ {
|
||||
queues = append(queues, make(chan *models.Task, 5000))
|
||||
}
|
||||
ticker := time.NewTicker(time.Second)
|
||||
mq := &MemoryMQ{
|
||||
PriorityQueues: queues,
|
||||
Ticker: ticker,
|
||||
BTree: btree.New(2),
|
||||
Timeouts: make(map[string]*TaskItem, 0),
|
||||
}
|
||||
mq.start()
|
||||
logrus.Info("MemoryMQ initialized")
|
||||
return mq
|
||||
}
|
||||
|
||||
func (mq *MemoryMQ) start() {
|
||||
// start goroutine to check for delayed jobs and put them onto regular queue when ready
|
||||
go func() {
|
||||
for _ = range mq.Ticker.C {
|
||||
ji := &TaskItem{
|
||||
StartAt: time.Now(),
|
||||
}
|
||||
mq.Mutex.Lock()
|
||||
mq.BTree.AscendLessThan(ji, func(a btree.Item) bool {
|
||||
logrus.WithFields(logrus.Fields{"queue": a}).Debug("delayed job move to queue")
|
||||
ji2 := mq.BTree.Delete(a).(*TaskItem)
|
||||
// put it onto the regular queue now
|
||||
_, err := mq.pushForce(ji2.Task)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Couldn't push delayed message onto main queue")
|
||||
}
|
||||
return true
|
||||
})
|
||||
mq.Mutex.Unlock()
|
||||
}
|
||||
}()
|
||||
// start goroutine to check for messages that have timed out and put them back onto regular queue
|
||||
// TODO: this should be like the delayed messages above. Could even be the same thing as delayed messages, but remove them if job is completed.
|
||||
go func() {
|
||||
for _ = range mq.Ticker.C {
|
||||
ji := &TaskItem{
|
||||
StartAt: time.Now(),
|
||||
}
|
||||
mq.Mutex.Lock()
|
||||
for _, jobItem := range mq.Timeouts {
|
||||
if jobItem.Less(ji) {
|
||||
delete(mq.Timeouts, jobItem.Task.ID)
|
||||
_, err := mq.pushForce(jobItem.Task)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Couldn't push timed out message onto main queue")
|
||||
}
|
||||
}
|
||||
}
|
||||
mq.Mutex.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// TaskItem is for the Btree, implements btree.Item
|
||||
type TaskItem struct {
|
||||
Task *models.Task
|
||||
StartAt time.Time
|
||||
}
|
||||
|
||||
func (ji *TaskItem) Less(than btree.Item) bool {
|
||||
// TODO: this could lose jobs: https://godoc.org/github.com/google/btree#Item
|
||||
ji2 := than.(*TaskItem)
|
||||
return ji.StartAt.Before(ji2.StartAt)
|
||||
}
|
||||
|
||||
func (mq *MemoryMQ) Push(job *models.Task) (*models.Task, error) {
|
||||
|
||||
// It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation.
|
||||
// job.MsgId = randSeq(20)
|
||||
if job.Delay > 0 {
|
||||
// then we'll put into short term storage until ready
|
||||
ji := &TaskItem{
|
||||
Task: job,
|
||||
StartAt: time.Now().Add(time.Second * time.Duration(job.Delay)),
|
||||
}
|
||||
mq.Mutex.Lock()
|
||||
replaced := mq.BTree.ReplaceOrInsert(ji)
|
||||
mq.Mutex.Unlock()
|
||||
if replaced != nil {
|
||||
logrus.Warn("Ooops! an item was replaced and therefore lost, not good.")
|
||||
}
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// Push the work onto the queue.
|
||||
return mq.pushForce(job)
|
||||
}
|
||||
func (mq *MemoryMQ) pushTimeout(job *models.Task) error {
|
||||
|
||||
ji := &TaskItem{
|
||||
Task: job,
|
||||
StartAt: time.Now().Add(time.Minute),
|
||||
}
|
||||
mq.Mutex.Lock()
|
||||
mq.Timeouts[job.ID] = ji
|
||||
mq.Mutex.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mq *MemoryMQ) pushForce(job *models.Task) (*models.Task, error) {
|
||||
mq.PriorityQueues[*job.Priority] <- job
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// This is recursive, so be careful how many channels you pass in.
|
||||
func pickEarliestNonblocking(channels ...chan *models.Task) *models.Task {
|
||||
if len(channels) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case job := <-channels[0]:
|
||||
return job
|
||||
default:
|
||||
return pickEarliestNonblocking(channels[1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
func (mq *MemoryMQ) Reserve() (*models.Task, error) {
|
||||
job := pickEarliestNonblocking(mq.PriorityQueues[2], mq.PriorityQueues[1], mq.PriorityQueues[0])
|
||||
if job == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return job, mq.pushTimeout(job)
|
||||
}
|
||||
|
||||
func (mq *MemoryMQ) Delete(job *models.Task) error {
|
||||
mq.Mutex.Lock()
|
||||
defer mq.Mutex.Unlock()
|
||||
_, exists := mq.Timeouts[job.ID]
|
||||
if !exists {
|
||||
return errors.New("Not reserved")
|
||||
}
|
||||
|
||||
delete(mq.Timeouts, job.ID)
|
||||
return nil
|
||||
}
|
||||
29
api/mqs/new.go
Normal file
29
api/mqs/new.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package mqs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
)
|
||||
|
||||
// New will parse the URL and return the correct MQ implementation.
|
||||
func New(mqURL string) (models.MessageQueue, error) {
|
||||
// Play with URL schemes here: https://play.golang.org/p/xWAf9SpCBW
|
||||
u, err := url.Parse(mqURL)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"url": mqURL}).Fatal("bad MQ URL")
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{"mq": u.Scheme}).Info("selecting MQ")
|
||||
switch u.Scheme {
|
||||
case "memory":
|
||||
return NewMemoryMQ(), nil
|
||||
case "redis":
|
||||
return NewRedisMQ(u)
|
||||
case "bolt":
|
||||
return NewBoltMQ(u)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("mq type not supported %v", u.Scheme)
|
||||
}
|
||||
298
api/mqs/redis.go
Normal file
298
api/mqs/redis.go
Normal file
@@ -0,0 +1,298 @@
|
||||
package mqs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
)
|
||||
|
||||
type RedisMQ struct {
|
||||
pool *redis.Pool
|
||||
queueName string
|
||||
ticker *time.Ticker
|
||||
prefix string
|
||||
}
|
||||
|
||||
func NewRedisMQ(url *url.URL) (*RedisMQ, error) {
|
||||
|
||||
pool := &redis.Pool{
|
||||
MaxIdle: 4,
|
||||
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
||||
MaxActive: 16,
|
||||
Wait: true,
|
||||
IdleTimeout: 300 * time.Second,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
return redis.DialURL(url.String())
|
||||
},
|
||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
||||
// Force a connection so we can fail in case of error.
|
||||
conn := pool.Get()
|
||||
if err := conn.Err(); err != nil {
|
||||
logrus.WithError(err).Fatal("Error connecting to redis")
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
mq := &RedisMQ{
|
||||
pool: pool,
|
||||
ticker: time.NewTicker(time.Second),
|
||||
prefix: url.Path,
|
||||
}
|
||||
mq.queueName = mq.k("queue")
|
||||
logrus.WithFields(logrus.Fields{"name": mq.queueName}).Info("Redis initialized with queue name")
|
||||
|
||||
mq.start()
|
||||
return mq, nil
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) k(s string) string {
|
||||
return mq.prefix + s
|
||||
}
|
||||
|
||||
func getFirstKeyValue(resp map[string]string) (string, string, error) {
|
||||
|
||||
for key, value := range resp {
|
||||
return key, value, nil
|
||||
}
|
||||
return "", "", errors.New("Blank map")
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) processPendingReservations(conn redis.Conn) {
|
||||
resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES"))
|
||||
if mq.checkNilResponse(err) || len(resp) == 0 {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Redis command error")
|
||||
}
|
||||
|
||||
reservationId, timeoutString, err := getFirstKeyValue(resp)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("error getting kv")
|
||||
return
|
||||
}
|
||||
|
||||
timeout, err := strconv.ParseInt(timeoutString, 10, 64)
|
||||
if err != nil || timeout > time.Now().Unix() {
|
||||
return
|
||||
}
|
||||
response, err := redis.Bytes(conn.Do("HGET", mq.k("timeout_jobs"), reservationId))
|
||||
if mq.checkNilResponse(err) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("redis get timeout_jobs error")
|
||||
return
|
||||
}
|
||||
|
||||
var job models.Task
|
||||
err = json.Unmarshal(response, &job)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("error unmarshaling job json")
|
||||
return
|
||||
}
|
||||
|
||||
conn.Do("ZREM", mq.k("timeouts"), reservationId)
|
||||
conn.Do("HDEL", mq.k("timeout_jobs"), reservationId)
|
||||
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
||||
redisPush(conn, mq.queueName, &job)
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) {
|
||||
// List of reservation ids between -inf time and the current time will get us
|
||||
// everything that is now ready to be queued.
|
||||
now := time.Now().UTC().Unix()
|
||||
resIds, err := redis.Strings(conn.Do("ZRANGEBYSCORE", mq.k("delays"), "-inf", now))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Error getting delayed jobs")
|
||||
return
|
||||
}
|
||||
|
||||
for _, resId := range resIds {
|
||||
// Might be a good idea to do this transactionally so we do not have left over reservationIds if the delete fails.
|
||||
buf, err := redis.Bytes(conn.Do("HGET", mq.k("delayed_jobs"), resId))
|
||||
// If:
|
||||
// a) A HSET in Push() failed, or
|
||||
// b) A previous zremrangebyscore failed,
|
||||
// we can get ids that we never associated with a job, or already placed in the queue, just skip these.
|
||||
if err == redis.ErrNil {
|
||||
continue
|
||||
} else if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Error HGET delayed_jobs")
|
||||
continue
|
||||
}
|
||||
|
||||
var job models.Task
|
||||
err = json.Unmarshal(buf, &job)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"buf": buf, "reservationId": resId}).Error("Error unmarshaling job")
|
||||
return
|
||||
}
|
||||
|
||||
_, err = redisPush(conn, mq.queueName, &job)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Pushing delayed job")
|
||||
return
|
||||
}
|
||||
conn.Do("HDEL", mq.k("delayed_jobs"), resId)
|
||||
}
|
||||
|
||||
// Remove everything we processed.
|
||||
conn.Do("ZREMRANGEBYSCORE", mq.k("delays"), "-inf", now)
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) start() {
|
||||
go func() {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
if err := conn.Err(); err != nil {
|
||||
logrus.WithError(err).Fatal("Could not start redis MQ reservation system")
|
||||
}
|
||||
|
||||
for _ = range mq.ticker.C {
|
||||
mq.processPendingReservations(conn)
|
||||
mq.processDelayedTasks(conn)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func redisPush(conn redis.Conn, queue string, job *models.Task) (*models.Task, error) {
|
||||
buf, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = conn.Do("LPUSH", fmt.Sprintf("%s%d", queue, *job.Priority), buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, error) {
|
||||
buf, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := redis.Int64(conn.Do("INCR", mq.k("delays_counter")))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reservationId := strconv.FormatInt(resp, 10)
|
||||
|
||||
// Timestamp -> resID
|
||||
_, err = conn.Do("ZADD", mq.k("delays"), time.Now().UTC().Add(time.Duration(job.Delay)*time.Second).Unix(), reservationId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// resID -> Task
|
||||
_, err = conn.Do("HSET", mq.k("delayed_jobs"), reservationId, buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) Push(job *models.Task) (*models.Task, error) {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if job.Delay > 0 {
|
||||
return mq.delayTask(conn, job)
|
||||
}
|
||||
return redisPush(conn, mq.queueName, job)
|
||||
}
|
||||
func (mq *RedisMQ) checkNilResponse(err error) bool {
|
||||
return err != nil && err.Error() == redis.ErrNil.Error()
|
||||
}
|
||||
|
||||
// Would be nice to switch to this model http://redis.io/commands/rpoplpush#pattern-reliable-queue
|
||||
func (mq *RedisMQ) Reserve() (*models.Task, error) {
|
||||
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
var job models.Task
|
||||
var resp []byte
|
||||
var err error
|
||||
for i := 2; i >= 0; i-- {
|
||||
resp, err = redis.Bytes(conn.Do("RPOP", fmt.Sprintf("%s%d", mq.queueName, i)))
|
||||
if mq.checkNilResponse(err) {
|
||||
if i == 0 {
|
||||
// Out of queues!
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// No valid job on this queue, try lower priority queue.
|
||||
continue
|
||||
} else if err != nil {
|
||||
// Some other error!
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We got a valid high priority job.
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.Unmarshal(resp, &job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response, err := redis.Int64(conn.Do("INCR", mq.queueName+"_incr"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reservationId := strconv.FormatInt(response, 10)
|
||||
_, err = conn.Do("ZADD", "timeout:", time.Now().Add(time.Minute).Unix(), reservationId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = conn.Do("HSET", "timeout", reservationId, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Map from job.ID -> reservation ID
|
||||
_, err = conn.Do("HSET", "reservations", job.ID, reservationId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) Delete(job *models.Task) error {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
resId, err := conn.Do("HGET", "reservations", job.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Do("HDEL", "reservations", job.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Do("ZREM", "timeout:", resId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Do("HDEL", "timeout", resId)
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user