mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
the mqs are storing a models.Task, which was not incorporating all the fields that are in a task.Config. I would very much like to merge these two things, but expect to do this in a future restructuring as both are used widely and not cordoned off properly (Config has a channel, stdin, stdout, stderr -- and isn't just a 'config', so to speak, as Task is). Since a task.Config is what is used to actually run a container, the result of the aforementioned deficiency was #193 where tasks are improperly configured and ran (namely, memory wrong). async tasks can still not be hot, they will be reverted to default format. would also like to fix this (also part of restructuring). I actually started doing this, hence the changes to those files (the surface area of the change is small and discourages improper future use, so I've left what I've done). this will: closes #193 closes #195 closes #154 removes many unused fields in models.Task, since we have not implemented retries. priority & delay are left, even though they are not used either, the main goal of this is to resolve #193 and both these fields are strongly plumbed into all the mqs, so punting on those two.
165 lines
3.8 KiB
Go
165 lines
3.8 KiB
Go
package mqs
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/fnproject/fn/api/models"
|
|
mq_config "github.com/iron-io/iron_go3/config"
|
|
ironmq "github.com/iron-io/iron_go3/mq"
|
|
)
|
|
|
|
type assoc struct {
|
|
msgId string
|
|
reservationId string
|
|
}
|
|
|
|
type IronMQ struct {
|
|
queues []ironmq.Queue
|
|
// Protects the map
|
|
sync.Mutex
|
|
// job id to {msgid, reservationid}
|
|
msgAssoc map[string]*assoc
|
|
}
|
|
|
|
type IronMQConfig struct {
|
|
Token string `mapstructure:"token"`
|
|
ProjectId string `mapstructure:"project_id"`
|
|
Host string `mapstructure:"host"`
|
|
Scheme string `mapstructure:"scheme"`
|
|
Port uint16 `mapstructure:"port"`
|
|
QueuePrefix string `mapstructure:"queue_prefix"`
|
|
}
|
|
|
|
func NewIronMQ(url *url.URL) *IronMQ {
|
|
|
|
if url.User == nil || url.User.Username() == "" {
|
|
logrus.Fatal("IronMQ requires PROJECT_ID and TOKEN")
|
|
}
|
|
p, ok := url.User.Password()
|
|
if !ok {
|
|
logrus.Fatal("IronMQ requires PROJECT_ID and TOKEN")
|
|
}
|
|
settings := &mq_config.Settings{
|
|
Token: p,
|
|
ProjectId: url.User.Username(),
|
|
Host: url.Host,
|
|
Scheme: "https",
|
|
}
|
|
|
|
if url.Scheme == "ironmq+http" {
|
|
settings.Scheme = "http"
|
|
}
|
|
|
|
parts := strings.Split(url.Host, ":")
|
|
if len(parts) > 1 {
|
|
settings.Host = parts[0]
|
|
p, err := strconv.Atoi(parts[1])
|
|
if err != nil {
|
|
logrus.WithFields(logrus.Fields{"host_port": url.Host}).Fatal("Invalid host+port combination")
|
|
}
|
|
settings.Port = uint16(p)
|
|
}
|
|
|
|
var queueName string
|
|
if url.Path != "" {
|
|
queueName = url.Path
|
|
} else {
|
|
queueName = "fn"
|
|
}
|
|
mq := &IronMQ{
|
|
queues: make([]ironmq.Queue, 3),
|
|
msgAssoc: make(map[string]*assoc),
|
|
}
|
|
|
|
// Check we can connect by trying to create one of the queues. Create is
|
|
// idempotent, so this is fine.
|
|
_, err := ironmq.ConfigCreateQueue(ironmq.QueueInfo{Name: fmt.Sprintf("%s_%d", queueName, 0)}, settings)
|
|
if err != nil {
|
|
logrus.WithError(err).Fatal("Could not connect to IronMQ")
|
|
}
|
|
|
|
for i := 0; i < 3; i++ {
|
|
mq.queues[i] = ironmq.ConfigNew(fmt.Sprintf("%s_%d", queueName, i), settings)
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"base_queue": queueName}).Info("IronMQ initialized")
|
|
return mq
|
|
}
|
|
|
|
func (mq *IronMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
|
|
if job.Priority == nil || *job.Priority < 0 || *job.Priority > 2 {
|
|
return nil, fmt.Errorf("IronMQ Push job %s: Bad priority", job.ID)
|
|
}
|
|
|
|
// Push the work onto the queue.
|
|
buf, err := json.Marshal(job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = mq.queues[*job.Priority].PushMessage(ironmq.Message{Body: string(buf), Delay: int64(job.Delay)})
|
|
return job, err
|
|
}
|
|
|
|
func (mq *IronMQ) Reserve(ctx context.Context) (*models.Task, error) {
|
|
var job models.Task
|
|
|
|
var messages []ironmq.Message
|
|
var err error
|
|
for i := 2; i >= 0; i-- {
|
|
messages, err = mq.queues[i].LongPoll(1, 60, 0 /* wait */, false /* delete */)
|
|
if err != nil {
|
|
// It is OK if the queue does not exist, it will be created when a message is queued.
|
|
if !strings.Contains(err.Error(), "404 Not Found") {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if len(messages) == 0 {
|
|
// Try next priority.
|
|
if i == 0 {
|
|
return nil, nil
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Found a message!
|
|
break
|
|
}
|
|
|
|
message := messages[0]
|
|
if message.Body == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(message.Body), &job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mq.Lock()
|
|
mq.msgAssoc[job.ID] = &assoc{message.Id, message.ReservationId}
|
|
mq.Unlock()
|
|
return &job, nil
|
|
}
|
|
|
|
func (mq *IronMQ) Delete(ctx context.Context, job *models.Task) error {
|
|
if job.Priority == nil || *job.Priority < 0 || *job.Priority > 2 {
|
|
return fmt.Errorf("IronMQ Delete job %s: Bad priority", job.ID)
|
|
}
|
|
mq.Lock()
|
|
assoc, exists := mq.msgAssoc[job.ID]
|
|
delete(mq.msgAssoc, job.ID)
|
|
mq.Unlock()
|
|
|
|
if exists {
|
|
return mq.queues[*job.Priority].DeleteMessage(assoc.msgId, assoc.reservationId)
|
|
}
|
|
return nil
|
|
}
|