mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
use redis pool more better
event loop was reusing a connection even if it was broken forever... db was only using one connection ever even if it was broken... no mas. seems to pass tests, hopefully chad can push it
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package redis
|
package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
@@ -8,23 +9,21 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/garyburd/redigo/redis"
|
"github.com/garyburd/redigo/redis"
|
||||||
"gitlab-odx.oracle.com/odx/functions/api/models"
|
|
||||||
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
|
"gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil"
|
||||||
|
"gitlab-odx.oracle.com/odx/functions/api/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RedisDataStore struct {
|
type RedisDataStore struct {
|
||||||
conn redis.Conn
|
pool *redis.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(url *url.URL) (models.Datastore, error) {
|
func New(url *url.URL) (models.Datastore, error) {
|
||||||
pool := &redis.Pool{
|
pool := &redis.Pool{
|
||||||
MaxIdle: 4,
|
MaxIdle: 512,
|
||||||
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
||||||
MaxActive: 16,
|
MaxActive: 512,
|
||||||
Wait: true,
|
Wait: true,
|
||||||
IdleTimeout: 300 * time.Second,
|
IdleTimeout: 300 * time.Second,
|
||||||
Dial: func() (redis.Conn, error) {
|
Dial: func() (redis.Conn, error) {
|
||||||
@@ -42,7 +41,7 @@ func New(url *url.URL) (models.Datastore, error) {
|
|||||||
logrus.WithError(err).Fatal("Error connecting to redis")
|
logrus.WithError(err).Fatal("Error connecting to redis")
|
||||||
}
|
}
|
||||||
ds := &RedisDataStore{
|
ds := &RedisDataStore{
|
||||||
conn: conn,
|
pool: pool,
|
||||||
}
|
}
|
||||||
return datastoreutil.NewValidator(ds), nil
|
return datastoreutil.NewValidator(ds), nil
|
||||||
}
|
}
|
||||||
@@ -53,14 +52,18 @@ func (ds *RedisDataStore) setApp(app *models.App) (*models.App, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := ds.conn.Do("HSET", "apps", app.Name, appBytes); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.Do("HSET", "apps", app.Name, appBytes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return app, nil
|
return app, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
func (ds *RedisDataStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) {
|
||||||
reply, err := ds.conn.Do("HEXISTS", "apps", app.Name)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HEXISTS", "apps", app.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -85,7 +88,9 @@ func (ds *RedisDataStore) UpdateApp(ctx context.Context, newapp *models.App) (*m
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error {
|
func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error {
|
||||||
if _, err := ds.conn.Do("HDEL", "apps", appName); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.Do("HDEL", "apps", appName); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,7 +98,9 @@ func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, error) {
|
func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, error) {
|
||||||
reply, err := ds.conn.Do("HGET", "apps", name)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGET", "apps", name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -111,7 +118,9 @@ func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App,
|
|||||||
func (ds *RedisDataStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
func (ds *RedisDataStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) {
|
||||||
res := []*models.App{}
|
res := []*models.App{}
|
||||||
|
|
||||||
reply, err := ds.conn.Do("HGETALL", "apps")
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGETALL", "apps")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -139,7 +148,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := ds.conn.Do("HSET", set, route.Path, buf); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.Do("HSET", set, route.Path, buf); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,7 +158,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) {
|
||||||
reply, err := ds.conn.Do("HEXISTS", "apps", route.AppName)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HEXISTS", "apps", route.AppName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -159,7 +172,7 @@ func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route)
|
|||||||
|
|
||||||
hset := fmt.Sprintf("routes:%s", route.AppName)
|
hset := fmt.Sprintf("routes:%s", route.AppName)
|
||||||
|
|
||||||
reply, err = ds.conn.Do("HEXISTS", hset, route.Path)
|
reply, err = conn.Do("HEXISTS", hset, route.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -181,7 +194,6 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
|||||||
|
|
||||||
route.Update(newroute)
|
route.Update(newroute)
|
||||||
|
|
||||||
|
|
||||||
hset := fmt.Sprintf("routes:%s", route.AppName)
|
hset := fmt.Sprintf("routes:%s", route.AppName)
|
||||||
|
|
||||||
return ds.setRoute(hset, route)
|
return ds.setRoute(hset, route)
|
||||||
@@ -189,7 +201,9 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout
|
|||||||
|
|
||||||
func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
|
func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath string) error {
|
||||||
hset := fmt.Sprintf("routes:%s", appName)
|
hset := fmt.Sprintf("routes:%s", appName)
|
||||||
if n, err := ds.conn.Do("HDEL", hset, routePath); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if n, err := conn.Do("HDEL", hset, routePath); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if n == 0 {
|
} else if n == 0 {
|
||||||
return models.ErrRoutesRemoving
|
return models.ErrRoutesRemoving
|
||||||
@@ -200,7 +214,9 @@ func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath st
|
|||||||
|
|
||||||
func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
|
func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) {
|
||||||
hset := fmt.Sprintf("routes:%s", appName)
|
hset := fmt.Sprintf("routes:%s", appName)
|
||||||
reply, err := ds.conn.Do("HGET", hset, routePath)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGET", hset, routePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -218,7 +234,9 @@ func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath strin
|
|||||||
func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) {
|
func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) {
|
||||||
res := []*models.Route{}
|
res := []*models.Route{}
|
||||||
|
|
||||||
reply, err := ds.conn.Do("HKEYS", "apps")
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HKEYS", "apps")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -228,7 +246,7 @@ func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFil
|
|||||||
|
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
hset := fmt.Sprintf("routes:%s", path)
|
hset := fmt.Sprintf("routes:%s", path)
|
||||||
reply, err := ds.conn.Do("HGETALL", hset)
|
reply, err := conn.Do("HGETALL", hset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -258,7 +276,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi
|
|||||||
res := []*models.Route{}
|
res := []*models.Route{}
|
||||||
|
|
||||||
hset := fmt.Sprintf("routes:%s", appName)
|
hset := fmt.Sprintf("routes:%s", appName)
|
||||||
reply, err := ds.conn.Do("HGETALL", hset)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGETALL", hset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -280,7 +300,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error {
|
func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error {
|
||||||
if _, err := ds.conn.Do("HSET", "extras", key, value); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.Do("HSET", "extras", key, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,7 +310,9 @@ func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) Get(ctx context.Context, key []byte) ([]byte, error) {
|
func (ds *RedisDataStore) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||||
value, err := ds.conn.Do("HGET", "extras", key)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
value, err := conn.Do("HGET", "extras", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -322,14 +346,18 @@ func (ds *RedisDataStore) InsertTask(ctx context.Context, task *models.Task) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := ds.conn.Do("HSET", "calls", task.ID, taskBytes); err != nil {
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
if _, err := conn.Do("HSET", "calls", task.ID, taskBytes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) {
|
func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) {
|
||||||
reply, err := ds.conn.Do("HGET", "calls", callID)
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGET", "calls", callID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if reply == nil {
|
} else if reply == nil {
|
||||||
@@ -345,7 +373,9 @@ func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.F
|
|||||||
func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) {
|
func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) {
|
||||||
res := models.FnCalls{}
|
res := models.FnCalls{}
|
||||||
|
|
||||||
reply, err := ds.conn.Do("HGETALL", "calls")
|
conn := ds.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
reply, err := conn.Do("HGETALL", "calls")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,11 +23,10 @@ type RedisMQ struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewRedisMQ(url *url.URL) (*RedisMQ, error) {
|
func NewRedisMQ(url *url.URL) (*RedisMQ, error) {
|
||||||
|
|
||||||
pool := &redis.Pool{
|
pool := &redis.Pool{
|
||||||
MaxIdle: 4,
|
MaxIdle: 512,
|
||||||
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
// I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea.
|
||||||
MaxActive: 16,
|
MaxActive: 512,
|
||||||
Wait: true,
|
Wait: true,
|
||||||
IdleTimeout: 300 * time.Second,
|
IdleTimeout: 300 * time.Second,
|
||||||
Dial: func() (redis.Conn, error) {
|
Dial: func() (redis.Conn, error) {
|
||||||
@@ -70,7 +69,10 @@ func getFirstKeyValue(resp map[string]string) (string, string, error) {
|
|||||||
return "", "", errors.New("Blank map")
|
return "", "", errors.New("Blank map")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mq *RedisMQ) processPendingReservations(conn redis.Conn) {
|
func (mq *RedisMQ) processPendingReservations() {
|
||||||
|
conn := mq.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES"))
|
resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES"))
|
||||||
if mq.checkNilResponse(err) || len(resp) == 0 {
|
if mq.checkNilResponse(err) || len(resp) == 0 {
|
||||||
return
|
return
|
||||||
@@ -105,13 +107,17 @@ func (mq *RedisMQ) processPendingReservations(conn redis.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// :( because fuck atomicity right?
|
||||||
conn.Do("ZREM", mq.k("timeouts"), reservationId)
|
conn.Do("ZREM", mq.k("timeouts"), reservationId)
|
||||||
conn.Do("HDEL", mq.k("timeout_jobs"), reservationId)
|
conn.Do("HDEL", mq.k("timeout_jobs"), reservationId)
|
||||||
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
||||||
redisPush(conn, mq.queueName, &job)
|
redisPush(conn, mq.queueName, &job)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) {
|
func (mq *RedisMQ) processDelayedTasks() {
|
||||||
|
conn := mq.pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
// List of reservation ids between -inf time and the current time will get us
|
// List of reservation ids between -inf time and the current time will get us
|
||||||
// everything that is now ready to be queued.
|
// everything that is now ready to be queued.
|
||||||
now := time.Now().UTC().Unix()
|
now := time.Now().UTC().Unix()
|
||||||
@@ -156,15 +162,9 @@ func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) {
|
|||||||
|
|
||||||
func (mq *RedisMQ) start() {
|
func (mq *RedisMQ) start() {
|
||||||
go func() {
|
go func() {
|
||||||
conn := mq.pool.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
if err := conn.Err(); err != nil {
|
|
||||||
logrus.WithError(err).Fatal("Could not start redis MQ reservation system")
|
|
||||||
}
|
|
||||||
|
|
||||||
for range mq.ticker.C {
|
for range mq.ticker.C {
|
||||||
mq.processPendingReservations(conn)
|
mq.processPendingReservations()
|
||||||
mq.processDelayedTasks(conn)
|
mq.processDelayedTasks()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user