mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge branch 'shut-redis-up' into 'master'
use redis pool more better See merge request !69
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
@@ -8,23 +9,21 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
|
||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||
)
|
||||
|
||||
type RedisDataStore struct {
|
||||
conn redis.Conn
|
||||
pool *redis.Pool
|
||||
}
|
||||
|
||||
func New(url *url.URL) (models.Datastore, error) {
|
||||
pool := &redis.Pool{
|
||||
MaxIdle: 4,
|
||||
MaxIdle: 512,
|
||||
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
||||
MaxActive: 16,
|
||||
MaxActive: 512,
|
||||
Wait: true,
|
||||
IdleTimeout: 300 * time.Second,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
@@ -42,7 +41,7 @@ func New(url *url.URL) (models.Datastore, error) {
|
||||
logrus.WithError(err).Fatal("Error connecting to redis")
|
||||
}
|
||||
ds := &RedisDataStore{
|
||||
conn: conn,
|
||||
pool: pool,
|
||||
}
|
||||
return datastoreutil.NewValidator(ds), nil
|
||||
}
|
||||
@@ -53,14 +52,18 @@ func (ds *RedisDataStore) setApp(app *models.App) (*models.App, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err := ds.conn.Do("HSET", "apps", app.Name, appBytes); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if _, err := conn.Do("HSET", "apps", app.Name, appBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
||||
reply, err := ds.conn.Do("HEXISTS", "apps", app.Name)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HEXISTS", "apps", app.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -85,7 +88,9 @@ func (ds *RedisDataStore) UpdateApp(ctx context.Context, newapp *models.App) (*m
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error {
|
||||
if _, err := ds.conn.Do("HDEL", "apps", appName); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if _, err := conn.Do("HDEL", "apps", appName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -93,7 +98,9 @@ func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error {
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, error) {
|
||||
reply, err := ds.conn.Do("HGET", "apps", name)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGET", "apps", name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -111,7 +118,9 @@ func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App,
|
||||
func (ds *RedisDataStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
||||
res := []*models.App{}
|
||||
|
||||
reply, err := ds.conn.Do("HGETALL", "apps")
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGETALL", "apps")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -139,7 +148,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err := ds.conn.Do("HSET", set, route.Path, buf); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if _, err := conn.Do("HSET", set, route.Path, buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -147,7 +158,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
||||
reply, err := ds.conn.Do("HEXISTS", "apps", route.AppName)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HEXISTS", "apps", route.AppName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -159,7 +172,7 @@ func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route)
|
||||
|
||||
hset := fmt.Sprintf("routes:%s", route.AppName)
|
||||
|
||||
reply, err = ds.conn.Do("HEXISTS", hset, route.Path)
|
||||
reply, err = conn.Do("HEXISTS", hset, route.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -181,7 +194,6 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
||||
|
||||
route.Update(newroute)
|
||||
|
||||
|
||||
hset := fmt.Sprintf("routes:%s", route.AppName)
|
||||
|
||||
return ds.setRoute(hset, route)
|
||||
@@ -189,7 +201,9 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
||||
|
||||
func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
|
||||
hset := fmt.Sprintf("routes:%s", appName)
|
||||
if n, err := ds.conn.Do("HDEL", hset, routePath); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if n, err := conn.Do("HDEL", hset, routePath); err != nil {
|
||||
return err
|
||||
} else if n == 0 {
|
||||
return models.ErrRoutesRemoving
|
||||
@@ -200,7 +214,9 @@ func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath st
|
||||
|
||||
func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
|
||||
hset := fmt.Sprintf("routes:%s", appName)
|
||||
reply, err := ds.conn.Do("HGET", hset, routePath)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGET", hset, routePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -218,7 +234,9 @@ func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath strin
|
||||
func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) {
|
||||
res := []*models.Route{}
|
||||
|
||||
reply, err := ds.conn.Do("HKEYS", "apps")
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HKEYS", "apps")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -228,7 +246,7 @@ func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFil
|
||||
|
||||
for _, path := range paths {
|
||||
hset := fmt.Sprintf("routes:%s", path)
|
||||
reply, err := ds.conn.Do("HGETALL", hset)
|
||||
reply, err := conn.Do("HGETALL", hset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -258,7 +276,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi
|
||||
res := []*models.Route{}
|
||||
|
||||
hset := fmt.Sprintf("routes:%s", appName)
|
||||
reply, err := ds.conn.Do("HGETALL", hset)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGETALL", hset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -280,7 +300,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error {
|
||||
if _, err := ds.conn.Do("HSET", "extras", key, value); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if _, err := conn.Do("HSET", "extras", key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -288,7 +310,9 @@ func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error {
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||
value, err := ds.conn.Do("HGET", "extras", key)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
value, err := conn.Do("HGET", "extras", key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -322,14 +346,18 @@ func (ds *RedisDataStore) InsertTask(ctx context.Context, task *models.Task) err
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := ds.conn.Do("HSET", "calls", task.ID, taskBytes); err != nil {
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
if _, err := conn.Do("HSET", "calls", task.ID, taskBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) {
|
||||
reply, err := ds.conn.Do("HGET", "calls", callID)
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGET", "calls", callID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if reply == nil {
|
||||
@@ -345,7 +373,9 @@ func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.F
|
||||
func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) {
|
||||
res := models.FnCalls{}
|
||||
|
||||
reply, err := ds.conn.Do("HGETALL", "calls")
|
||||
conn := ds.pool.Get()
|
||||
defer conn.Close()
|
||||
reply, err := conn.Do("HGETALL", "calls")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -23,11 +23,10 @@ type RedisMQ struct {
|
||||
}
|
||||
|
||||
func NewRedisMQ(url *url.URL) (*RedisMQ, error) {
|
||||
|
||||
pool := &redis.Pool{
|
||||
MaxIdle: 4,
|
||||
MaxIdle: 512,
|
||||
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
||||
MaxActive: 16,
|
||||
MaxActive: 512,
|
||||
Wait: true,
|
||||
IdleTimeout: 300 * time.Second,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
@@ -70,7 +69,10 @@ func getFirstKeyValue(resp map[string]string) (string, string, error) {
|
||||
return "", "", errors.New("Blank map")
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) processPendingReservations(conn redis.Conn) {
|
||||
func (mq *RedisMQ) processPendingReservations() {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES"))
|
||||
if mq.checkNilResponse(err) || len(resp) == 0 {
|
||||
return
|
||||
@@ -105,13 +107,17 @@ func (mq *RedisMQ) processPendingReservations(conn redis.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
// :( because fuck atomicity right?
|
||||
conn.Do("ZREM", mq.k("timeouts"), reservationId)
|
||||
conn.Do("HDEL", mq.k("timeout_jobs"), reservationId)
|
||||
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
||||
redisPush(conn, mq.queueName, &job)
|
||||
}
|
||||
|
||||
func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) {
|
||||
func (mq *RedisMQ) processDelayedTasks() {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// List of reservation ids between -inf time and the current time will get us
|
||||
// everything that is now ready to be queued.
|
||||
now := time.Now().UTC().Unix()
|
||||
@@ -156,15 +162,9 @@ func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) {
|
||||
|
||||
func (mq *RedisMQ) start() {
|
||||
go func() {
|
||||
conn := mq.pool.Get()
|
||||
defer conn.Close()
|
||||
if err := conn.Err(); err != nil {
|
||||
logrus.WithError(err).Fatal("Could not start redis MQ reservation system")
|
||||
}
|
||||
|
||||
for range mq.ticker.C {
|
||||
mq.processPendingReservations(conn)
|
||||
mq.processDelayedTasks(conn)
|
||||
mq.processPendingReservations()
|
||||
mq.processDelayedTasks()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user