Files
fn-serverless/api/mqs/bolt/bolt.go
Owen Cliffe 1ad27f4f0d Inverting deps on SQL, Log and MQ plugins to make them optional dependencies of extended servers, Removing some dead code that brought in unused dependencies Filtering out some non-linux transitive deps. (#1057)
* initial Db helper split - make SQL and datastore packages optional

* abstracting log store

* break out DB, MQ and log drivers as extensions

* cleanup

* fewer deps

* fixing docker test

* hmm dbness

* updating db startup

* Consolidate all your extensions into one convenient package

* cleanup

* clean up dep constraints
2018-06-11 18:23:28 +01:00

376 lines
9.0 KiB
Go

package bolt
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/api/mqs"
"github.com/sirupsen/logrus"
)
type boltProvider int
type BoltDbMQ struct {
db *bolt.DB
ticker *time.Ticker
}
type BoltDbConfig struct {
FileName string `mapstructure:"filename"`
}
func jobKey(jobID string) []byte {
b := make([]byte, len(jobID)+1)
b[0] = 'j'
copy(b[1:], []byte(jobID))
return b
}
const timeoutToIDKeyPrefix = "id:"
func timeoutToIDKey(timeout []byte) []byte {
b := make([]byte, len(timeout)+len(timeoutToIDKeyPrefix))
copy(b[:], []byte(timeoutToIDKeyPrefix))
copy(b[len(timeoutToIDKeyPrefix):], []byte(timeout))
return b
}
var delayQueueName = []byte("functions_delay")
func queueName(i int) []byte {
return []byte(fmt.Sprintf("functions_%d_queue", i))
}
func timeoutName(i int) []byte {
return []byte(fmt.Sprintf("functions_%d_timeout", i))
}
func (boltProvider) Supports(url *url.URL) bool {
switch url.Scheme {
case "bolt":
return true
}
return false
}
func (boltProvider) String() string {
return "bolt"
}
func (boltProvider) New(url *url.URL) (models.MessageQueue, error) {
dir := filepath.Dir(url.Path)
log := logrus.WithFields(logrus.Fields{"mq": url.Scheme, "dir": dir})
err := os.MkdirAll(dir, 0755)
if err != nil {
log.WithError(err).Errorln("Could not create data directory for mq")
return nil, err
}
db, err := bolt.Open(url.Path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
log.WithError(err).Errorln("Could not open BoltDB file for MQ")
return nil, err
}
err = db.Update(func(tx *bolt.Tx) error {
for i := 0; i < 3; i++ {
_, err := tx.CreateBucketIfNotExists(queueName(i))
if err != nil {
log.WithError(err).Errorln("Error creating bucket")
return err
}
_, err = tx.CreateBucketIfNotExists(timeoutName(i))
if err != nil {
log.WithError(err).Errorln("Error creating timeout bucket")
return err
}
}
_, err = tx.CreateBucketIfNotExists(delayQueueName)
if err != nil {
log.WithError(err).Errorln("Error creating delay bucket")
return err
}
return nil
})
if err != nil {
log.WithError(err).Errorln("Error creating timeout bucket")
return nil, err
}
ticker := time.NewTicker(time.Second)
mq := &BoltDbMQ{
ticker: ticker,
db: db,
}
mq.Start()
log.WithFields(logrus.Fields{"file": url.Path}).Debug("BoltDb initialized")
return mq, nil
}
func (mq *BoltDbMQ) Start() {
go func() {
// It would be nice to switch to a tick-less, next-event Timer based model.
for range mq.ticker.C {
err := mq.db.Update(func(tx *bolt.Tx) error {
now := uint64(time.Now().UnixNano())
for i := 0; i < 3; i++ {
// Assume our timeouts bucket exists and has resKey encoded keys.
jobBucket := tx.Bucket(queueName(i))
timeoutBucket := tx.Bucket(timeoutName(i))
c := timeoutBucket.Cursor()
var err error
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
reserved, id := resKeyToProperties(k)
if reserved > now {
break
}
err = jobBucket.Put(id, v)
if err != nil {
return err
}
timeoutBucket.Delete(k)
timeoutBucket.Delete(timeoutToIDKey(k))
}
}
return nil
})
if err != nil {
logrus.WithError(err).Error("boltdb reservation check error")
}
err = mq.db.Update(func(tx *bolt.Tx) error {
now := uint64(time.Now().UnixNano())
// Assume our timeouts bucket exists and has resKey encoded keys.
delayBucket := tx.Bucket(delayQueueName)
c := delayBucket.Cursor()
var err error
for k, v := c.Seek([]byte(resKeyPrefix)); k != nil; k, v = c.Next() {
reserved, id := resKeyToProperties(k)
if reserved > now {
break
}
priority := binary.BigEndian.Uint32(v)
job := delayBucket.Get(id)
if job == nil {
// oops
logrus.Warnf("Expected delayed job, none found with id %s", id)
continue
}
jobBucket := tx.Bucket(queueName(int(priority)))
err = jobBucket.Put(id, job)
if err != nil {
return err
}
err := delayBucket.Delete(k)
if err != nil {
return err
}
return delayBucket.Delete(id)
}
return nil
})
if err != nil {
logrus.WithError(err).Error("boltdb delay check error")
}
}
}()
}
// We insert a "reservation" at readyAt, and store the json blob at the msg
// key. The timer loop plucks this out and puts it in the jobs bucket when the
// time elapses. The value stored at the reservation key is the priority.
func (mq *BoltDbMQ) delayCall(job *models.Call) (*models.Call, error) {
readyAt := time.Now().Add(time.Duration(job.Delay) * time.Second)
err := mq.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(delayQueueName)
id, _ := b.NextSequence()
buf, err := json.Marshal(job)
if err != nil {
return err
}
key := msgKey(id)
err = b.Put(key, buf)
if err != nil {
return err
}
pb := make([]byte, 4)
binary.BigEndian.PutUint32(pb[:], uint32(*job.Priority))
reservation := resKey(key, readyAt)
return b.Put(reservation, pb)
})
return job, err
}
func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Call) (*models.Call, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Debugln("Pushed to MQ")
if job.Delay > 0 {
return mq.delayCall(job)
}
err := mq.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(queueName(int(*job.Priority)))
id, _ := b.NextSequence()
buf, err := json.Marshal(job)
if err != nil {
return err
}
return b.Put(msgKey(id), buf)
})
if err != nil {
return nil, err
}
return job, nil
}
const msgKeyPrefix = "j:"
const msgKeyLength = len(msgKeyPrefix) + 8
const resKeyPrefix = "r:"
// r:<timestamp>:msgKey
// The msgKey is used to introduce uniqueness within the timestamp. It probably isn't required.
const resKeyLength = len(resKeyPrefix) + msgKeyLength + 8
func msgKey(v uint64) []byte {
b := make([]byte, msgKeyLength)
copy(b[:], []byte(msgKeyPrefix))
binary.BigEndian.PutUint64(b[len(msgKeyPrefix):], v)
return b
}
func resKey(jobKey []byte, reservedUntil time.Time) []byte {
b := make([]byte, resKeyLength)
copy(b[:], []byte(resKeyPrefix))
binary.BigEndian.PutUint64(b[len(resKeyPrefix):], uint64(reservedUntil.UnixNano()))
copy(b[len(resKeyPrefix)+8:], jobKey)
return b
}
func resKeyToProperties(key []byte) (uint64, []byte) {
if len(key) != resKeyLength {
return 0, nil
}
reservedUntil := binary.BigEndian.Uint64(key[len(resKeyPrefix):])
return reservedUntil, key[len(resKeyPrefix)+8:]
}
func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Call, error) {
// Start a writable transaction.
tx, err := mq.db.Begin(true)
if err != nil {
return nil, err
}
defer tx.Rollback()
for i := 2; i >= 0; i-- {
// Use the transaction...
b := tx.Bucket(queueName(i))
c := b.Cursor()
key, value := c.Seek([]byte(msgKeyPrefix))
if key == nil {
// No jobs, try next bucket
continue
}
b.Delete(key)
var job models.Call
err = json.Unmarshal([]byte(value), &job)
if err != nil {
return nil, err
}
reservationKey := resKey(key, time.Now().Add(time.Minute))
b = tx.Bucket(timeoutName(i))
// Reserve introduces 3 keys in timeout bucket:
// Save reservationKey -> Task to allow release
// Save job.ID -> reservationKey to allow Deletes
// Save reservationKey -> job.ID to allow clearing job.ID -> reservationKey in recovery without unmarshaling the job.
// On Delete:
// We have job ID, we get the reservationKey
// Delete job.ID -> reservationKey
// Delete reservationKey -> job.ID
// Delete reservationKey -> Task
// On Release:
// We have reservationKey, we get the jobID
// Delete reservationKey -> job.ID
// Delete job.ID -> reservationKey
// Move reservationKey -> Task to job bucket.
b.Put(reservationKey, value)
b.Put(jobKey(job.ID), reservationKey)
b.Put(timeoutToIDKey(reservationKey), []byte(job.ID))
// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
return nil, err
}
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
log.Debugln("Reserved")
return &job, nil
}
return nil, nil
}
func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Call) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
defer log.Debugln("Deleted")
return mq.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(timeoutName(int(*job.Priority)))
k := jobKey(job.ID)
reservationKey := b.Get(k)
if reservationKey == nil {
return errors.New("Not found")
}
for _, k := range [][]byte{k, timeoutToIDKey(reservationKey), reservationKey} {
err := b.Delete(k)
if err != nil {
return err
}
}
return nil
})
}
// Close shuts down the bolt db connection and
// stops the goroutine associated with the ticker
func (mq *BoltDbMQ) Close() error {
mq.ticker.Stop()
return mq.db.Close()
}
func init() {
mqs.AddProvider(boltProvider(0))
}