mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
349 lines
8.4 KiB
Go
349 lines
8.4 KiB
Go
package mqs
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/boltdb/bolt"
|
|
"github.com/iron-io/functions/api/models"
|
|
"github.com/iron-io/worker/common"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
type BoltDbMQ struct {
|
|
db *bolt.DB
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
type BoltDbConfig struct {
|
|
FileName string `mapstructure:"filename"`
|
|
}
|
|
|
|
func jobKey(jobID string) []byte {
|
|
b := make([]byte, len(jobID)+1)
|
|
b[0] = 'j'
|
|
copy(b[1:], []byte(jobID))
|
|
return b
|
|
}
|
|
|
|
const timeoutToIDKeyPrefix = "id:"
|
|
|
|
func timeoutToIDKey(timeout []byte) []byte {
|
|
b := make([]byte, len(timeout)+len(timeoutToIDKeyPrefix))
|
|
copy(b[:], []byte(timeoutToIDKeyPrefix))
|
|
copy(b[len(timeoutToIDKeyPrefix):], []byte(timeout))
|
|
return b
|
|
}
|
|
|
|
var delayQueueName = []byte("titan_delay")
|
|
|
|
func queueName(i int) []byte {
|
|
return []byte(fmt.Sprintf("titan_%d_queue", i))
|
|
}
|
|
|
|
func timeoutName(i int) []byte {
|
|
return []byte(fmt.Sprintf("titan_%d_timeout", i))
|
|
}
|
|
|
|
func NewBoltMQ(url *url.URL) (*BoltDbMQ, error) {
|
|
dir := filepath.Dir(url.Path)
|
|
log := logrus.WithFields(logrus.Fields{"mq": url.Scheme, "dir": dir})
|
|
err := os.MkdirAll(dir, 0777)
|
|
if err != nil {
|
|
log.WithError(err).Errorln("Could not create data directory for mq")
|
|
return nil, err
|
|
}
|
|
db, err := bolt.Open(url.Path, 0600, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = db.Update(func(tx *bolt.Tx) error {
|
|
for i := 0; i < 3; i++ {
|
|
_, err := tx.CreateBucketIfNotExists(queueName(i))
|
|
if err != nil {
|
|
log.WithError(err).Errorln("Error creating bucket")
|
|
return err
|
|
}
|
|
_, err = tx.CreateBucketIfNotExists(timeoutName(i))
|
|
if err != nil {
|
|
log.WithError(err).Errorln("Error creating timeout bucket")
|
|
return err
|
|
}
|
|
}
|
|
_, err = tx.CreateBucketIfNotExists(delayQueueName)
|
|
if err != nil {
|
|
log.WithError(err).Errorln("Error creating delay bucket")
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
log.WithError(err).Errorln("Error creating timeout bucket")
|
|
return nil, err
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
mq := &BoltDbMQ{
|
|
ticker: ticker,
|
|
db: db,
|
|
}
|
|
mq.Start()
|
|
log.WithFields(logrus.Fields{"file": url.Path}).Info("BoltDb initialized")
|
|
return mq, nil
|
|
}
|
|
|
|
func (mq *BoltDbMQ) Start() {
|
|
go func() {
|
|
// It would be nice to switch to a tick-less, next-event Timer based model.
|
|
for _ = range mq.ticker.C {
|
|
err := mq.db.Update(func(tx *bolt.Tx) error {
|
|
now := uint64(time.Now().UnixNano())
|
|
for i := 0; i < 3; i++ {
|
|
// Assume our timeouts bucket exists and has resKey encoded keys.
|
|
jobBucket := tx.Bucket(queueName(i))
|
|
timeoutBucket := tx.Bucket(timeoutName(i))
|
|
c := timeoutBucket.Cursor()
|
|
|
|
var err error
|
|
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
|
|
reserved, id := resKeyToProperties(k)
|
|
if reserved > now {
|
|
break
|
|
}
|
|
err = jobBucket.Put(id, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
timeoutBucket.Delete(k)
|
|
timeoutBucket.Delete(timeoutToIDKey(k))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("boltdb reservation check error")
|
|
}
|
|
|
|
err = mq.db.Update(func(tx *bolt.Tx) error {
|
|
now := uint64(time.Now().UnixNano())
|
|
// Assume our timeouts bucket exists and has resKey encoded keys.
|
|
delayBucket := tx.Bucket(delayQueueName)
|
|
c := delayBucket.Cursor()
|
|
|
|
var err error
|
|
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
|
|
reserved, id := resKeyToProperties(k)
|
|
if reserved > now {
|
|
break
|
|
}
|
|
|
|
priority := binary.BigEndian.Uint32(v)
|
|
job := delayBucket.Get(id)
|
|
if job == nil {
|
|
// oops
|
|
logrus.Warnf("Expected delayed job, none found with id %s", id)
|
|
continue
|
|
}
|
|
|
|
jobBucket := tx.Bucket(queueName(int(priority)))
|
|
err = jobBucket.Put(id, job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err := delayBucket.Delete(k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return delayBucket.Delete(id)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("boltdb delay check error")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// We insert a "reservation" at readyAt, and store the json blob at the msg
|
|
// key. The timer loop plucks this out and puts it in the jobs bucket when the
|
|
// time elapses. The value stored at the reservation key is the priority.
|
|
func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) {
|
|
readyAt := time.Now().Add(time.Duration(job.Delay) * time.Second)
|
|
err := mq.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(delayQueueName)
|
|
id, _ := b.NextSequence()
|
|
buf, err := json.Marshal(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
key := msgKey(id)
|
|
err = b.Put(key, buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pb := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(pb[:], uint32(*job.Priority))
|
|
reservation := resKey(key, readyAt)
|
|
return b.Put(reservation, pb)
|
|
})
|
|
return job, err
|
|
}
|
|
|
|
func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
|
|
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
|
|
log.Println("Pushed to MQ")
|
|
|
|
if job.Delay > 0 {
|
|
return mq.delayTask(job)
|
|
}
|
|
|
|
err := mq.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(queueName(int(*job.Priority)))
|
|
|
|
id, _ := b.NextSequence()
|
|
|
|
buf, err := json.Marshal(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return b.Put(msgKey(id), buf)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return job, nil
|
|
|
|
}
|
|
|
|
const msgKeyPrefix = "j:"
|
|
const msgKeyLength = len(msgKeyPrefix) + 8
|
|
const resKeyPrefix = "r:"
|
|
|
|
// r:<timestamp>:msgKey
|
|
// The msgKey is used to introduce uniqueness within the timestamp. It probably isn't required.
|
|
const resKeyLength = len(resKeyPrefix) + msgKeyLength + 8
|
|
|
|
func msgKey(v uint64) []byte {
|
|
b := make([]byte, msgKeyLength)
|
|
copy(b[:], []byte(msgKeyPrefix))
|
|
binary.BigEndian.PutUint64(b[len(msgKeyPrefix):], v)
|
|
return b
|
|
}
|
|
|
|
func resKey(jobKey []byte, reservedUntil time.Time) []byte {
|
|
b := make([]byte, resKeyLength)
|
|
copy(b[:], []byte(resKeyPrefix))
|
|
binary.BigEndian.PutUint64(b[len(resKeyPrefix):], uint64(reservedUntil.UnixNano()))
|
|
copy(b[len(resKeyPrefix)+8:], jobKey)
|
|
return b
|
|
}
|
|
|
|
func resKeyToProperties(key []byte) (uint64, []byte) {
|
|
if len(key) != resKeyLength {
|
|
return 0, nil
|
|
}
|
|
|
|
reservedUntil := binary.BigEndian.Uint64(key[len(resKeyPrefix):])
|
|
return reservedUntil, key[len(resKeyPrefix)+8:]
|
|
}
|
|
|
|
func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) {
|
|
// Start a writable transaction.
|
|
tx, err := mq.db.Begin(true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
for i := 2; i >= 0; i-- {
|
|
// Use the transaction...
|
|
b := tx.Bucket(queueName(i))
|
|
c := b.Cursor()
|
|
key, value := c.Seek([]byte(msgKeyPrefix))
|
|
if key == nil {
|
|
// No jobs, try next bucket
|
|
continue
|
|
}
|
|
|
|
b.Delete(key)
|
|
|
|
var job models.Task
|
|
err = json.Unmarshal([]byte(value), &job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reservationKey := resKey(key, time.Now().Add(time.Minute))
|
|
b = tx.Bucket(timeoutName(i))
|
|
// Reserve introduces 3 keys in timeout bucket:
|
|
// Save reservationKey -> Task to allow release
|
|
// Save job.ID -> reservationKey to allow Deletes
|
|
// Save reservationKey -> job.ID to allow clearing job.ID -> reservationKey in recovery without unmarshaling the job.
|
|
// On Delete:
|
|
// We have job ID, we get the reservationKey
|
|
// Delete job.ID -> reservationKey
|
|
// Delete reservationKey -> job.ID
|
|
// Delete reservationKey -> Task
|
|
// On Release:
|
|
// We have reservationKey, we get the jobID
|
|
// Delete reservationKey -> job.ID
|
|
// Delete job.ID -> reservationKey
|
|
// Move reservationKey -> Task to job bucket.
|
|
b.Put(reservationKey, value)
|
|
b.Put(jobKey(job.ID), reservationKey)
|
|
b.Put(timeoutToIDKey(reservationKey), []byte(job.ID))
|
|
|
|
// Commit the transaction and check for error.
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
|
|
log.Println("Reserved")
|
|
|
|
return &job, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Task) error {
|
|
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
|
|
defer log.Println("Deleted")
|
|
|
|
return mq.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(timeoutName(int(*job.Priority)))
|
|
k := jobKey(job.ID)
|
|
|
|
reservationKey := b.Get(k)
|
|
if reservationKey == nil {
|
|
return errors.New("Not found")
|
|
}
|
|
|
|
for _, k := range [][]byte{k, timeoutToIDKey(reservationKey), reservationKey} {
|
|
err := b.Delete(k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|