From 05ac5b6d93849737f64dc953192f1623f6b63903 Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Sun, 11 Jun 2017 13:55:50 -0700 Subject: [PATCH] use redis pool more better event loop was reusing a connection even if it was broken forever... db was only using one connection ever even if it was broken... no mas. seems to pass tests, hopefully chad can push it --- api/datastore/redis/redis.go | 82 ++++++++++++++++++++++++------------ api/mqs/redis.go | 26 ++++++------ 2 files changed, 69 insertions(+), 39 deletions(-) diff --git a/api/datastore/redis/redis.go b/api/datastore/redis/redis.go index 2c6c437b7..558ce96f0 100644 --- a/api/datastore/redis/redis.go +++ b/api/datastore/redis/redis.go @@ -1,6 +1,7 @@ package redis import ( + "context" "encoding/json" "fmt" "net/url" @@ -8,23 +9,21 @@ import ( "strings" "time" - "context" - "github.com/Sirupsen/logrus" "github.com/garyburd/redigo/redis" - "gitlab-odx.oracle.com/odx/functions/api/models" "gitlab-odx.oracle.com/odx/functions/api/datastore/internal/datastoreutil" + "gitlab-odx.oracle.com/odx/functions/api/models" ) type RedisDataStore struct { - conn redis.Conn + pool *redis.Pool } func New(url *url.URL) (models.Datastore, error) { pool := &redis.Pool{ - MaxIdle: 4, + MaxIdle: 512, // I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea. - MaxActive: 16, + MaxActive: 512, Wait: true, IdleTimeout: 300 * time.Second, Dial: func() (redis.Conn, error) { @@ -42,7 +41,7 @@ func New(url *url.URL) (models.Datastore, error) { logrus.WithError(err).Fatal("Error connecting to redis") } ds := &RedisDataStore{ - conn: conn, + pool: pool, } return datastoreutil.NewValidator(ds), nil } @@ -53,14 +52,18 @@ func (ds *RedisDataStore) setApp(app *models.App) (*models.App, error) { return nil, err } - if _, err := ds.conn.Do("HSET", "apps", app.Name, appBytes); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if _, err := conn.Do("HSET", "apps", app.Name, appBytes); err != nil { return nil, err } return app, nil } func (ds *RedisDataStore) InsertApp(ctx context.Context, app *models.App) (*models.App, error) { - reply, err := ds.conn.Do("HEXISTS", "apps", app.Name) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HEXISTS", "apps", app.Name) if err != nil { return nil, err } @@ -85,7 +88,9 @@ func (ds *RedisDataStore) UpdateApp(ctx context.Context, newapp *models.App) (*m } func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error { - if _, err := ds.conn.Do("HDEL", "apps", appName); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if _, err := conn.Do("HDEL", "apps", appName); err != nil { return err } @@ -93,7 +98,9 @@ func (ds *RedisDataStore) RemoveApp(ctx context.Context, appName string) error { } func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, error) { - reply, err := ds.conn.Do("HGET", "apps", name) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGET", "apps", name) if err != nil { return nil, err } else if reply == nil { @@ -111,7 +118,9 @@ func (ds *RedisDataStore) GetApp(ctx context.Context, name string) (*models.App, func (ds *RedisDataStore) GetApps(ctx context.Context, filter *models.AppFilter) ([]*models.App, error) { res := []*models.App{} - reply, err := ds.conn.Do("HGETALL", "apps") + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGETALL", "apps") if err != nil { return nil, err } @@ -139,7 +148,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou return nil, err } - if _, err := ds.conn.Do("HSET", set, route.Path, buf); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if _, err := conn.Do("HSET", set, route.Path, buf); err != nil { return nil, err } @@ -147,7 +158,9 @@ func (ds *RedisDataStore) setRoute(set string, route *models.Route) (*models.Rou } func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) (*models.Route, error) { - reply, err := ds.conn.Do("HEXISTS", "apps", route.AppName) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HEXISTS", "apps", route.AppName) if err != nil { return nil, err } @@ -159,7 +172,7 @@ func (ds *RedisDataStore) InsertRoute(ctx context.Context, route *models.Route) hset := fmt.Sprintf("routes:%s", route.AppName) - reply, err = ds.conn.Do("HEXISTS", hset, route.Path) + reply, err = conn.Do("HEXISTS", hset, route.Path) if err != nil { return nil, err } @@ -181,7 +194,6 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout route.Update(newroute) - hset := fmt.Sprintf("routes:%s", route.AppName) return ds.setRoute(hset, route) @@ -189,7 +201,9 @@ func (ds *RedisDataStore) UpdateRoute(ctx context.Context, newroute *models.Rout func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath string) error { hset := fmt.Sprintf("routes:%s", appName) - if n, err := ds.conn.Do("HDEL", hset, routePath); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if n, err := conn.Do("HDEL", hset, routePath); err != nil { return err } else if n == 0 { return models.ErrRoutesRemoving @@ -200,7 +214,9 @@ func (ds *RedisDataStore) RemoveRoute(ctx context.Context, appName, routePath st func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath string) (*models.Route, error) { hset := fmt.Sprintf("routes:%s", appName) - reply, err := ds.conn.Do("HGET", hset, routePath) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGET", hset, routePath) if err != nil { return nil, err } else if reply == nil { @@ -218,7 +234,9 @@ func (ds *RedisDataStore) GetRoute(ctx context.Context, appName, routePath strin func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFilter) ([]*models.Route, error) { res := []*models.Route{} - reply, err := ds.conn.Do("HKEYS", "apps") + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HKEYS", "apps") if err != nil { return nil, err } else if reply == nil { @@ -228,7 +246,7 @@ func (ds *RedisDataStore) GetRoutes(ctx context.Context, filter *models.RouteFil for _, path := range paths { hset := fmt.Sprintf("routes:%s", path) - reply, err := ds.conn.Do("HGETALL", hset) + reply, err := conn.Do("HGETALL", hset) if err != nil { return nil, err } else if reply == nil { @@ -258,7 +276,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi res := []*models.Route{} hset := fmt.Sprintf("routes:%s", appName) - reply, err := ds.conn.Do("HGETALL", hset) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGETALL", hset) if err != nil { return nil, err } else if reply == nil { @@ -280,7 +300,9 @@ func (ds *RedisDataStore) GetRoutesByApp(ctx context.Context, appName string, fi } func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error { - if _, err := ds.conn.Do("HSET", "extras", key, value); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if _, err := conn.Do("HSET", "extras", key, value); err != nil { return err } @@ -288,7 +310,9 @@ func (ds *RedisDataStore) Put(ctx context.Context, key, value []byte) error { } func (ds *RedisDataStore) Get(ctx context.Context, key []byte) ([]byte, error) { - value, err := ds.conn.Do("HGET", "extras", key) + conn := ds.pool.Get() + defer conn.Close() + value, err := conn.Do("HGET", "extras", key) if err != nil { return nil, err } @@ -322,14 +346,18 @@ func (ds *RedisDataStore) InsertTask(ctx context.Context, task *models.Task) err return err } - if _, err := ds.conn.Do("HSET", "calls", task.ID, taskBytes); err != nil { + conn := ds.pool.Get() + defer conn.Close() + if _, err := conn.Do("HSET", "calls", task.ID, taskBytes); err != nil { return err } return nil } func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.FnCall, error) { - reply, err := ds.conn.Do("HGET", "calls", callID) + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGET", "calls", callID) if err != nil { return nil, err } else if reply == nil { @@ -345,7 +373,9 @@ func (ds *RedisDataStore) GetTask(ctx context.Context, callID string) (*models.F func (ds *RedisDataStore) GetTasks(ctx context.Context, filter *models.CallFilter) (models.FnCalls, error) { res := models.FnCalls{} - reply, err := ds.conn.Do("HGETALL", "calls") + conn := ds.pool.Get() + defer conn.Close() + reply, err := conn.Do("HGETALL", "calls") if err != nil { return nil, err } diff --git a/api/mqs/redis.go b/api/mqs/redis.go index 288f0bb9e..d48bc1cc0 100644 --- a/api/mqs/redis.go +++ b/api/mqs/redis.go @@ -23,11 +23,10 @@ type RedisMQ struct { } func NewRedisMQ(url *url.URL) (*RedisMQ, error) { - pool := &redis.Pool{ - MaxIdle: 4, + MaxIdle: 512, // I'm not sure if allowing the pool to block if more than 16 connections are required is a good idea. - MaxActive: 16, + MaxActive: 512, Wait: true, IdleTimeout: 300 * time.Second, Dial: func() (redis.Conn, error) { @@ -70,7 +69,10 @@ func getFirstKeyValue(resp map[string]string) (string, string, error) { return "", "", errors.New("Blank map") } -func (mq *RedisMQ) processPendingReservations(conn redis.Conn) { +func (mq *RedisMQ) processPendingReservations() { + conn := mq.pool.Get() + defer conn.Close() + resp, err := redis.StringMap(conn.Do("ZRANGE", mq.k("timeouts"), 0, 0, "WITHSCORES")) if mq.checkNilResponse(err) || len(resp) == 0 { return @@ -105,13 +107,17 @@ func (mq *RedisMQ) processPendingReservations(conn redis.Conn) { return } + // :( because fuck atomicity right? conn.Do("ZREM", mq.k("timeouts"), reservationId) conn.Do("HDEL", mq.k("timeout_jobs"), reservationId) conn.Do("HDEL", mq.k("reservations"), job.ID) redisPush(conn, mq.queueName, &job) } -func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) { +func (mq *RedisMQ) processDelayedTasks() { + conn := mq.pool.Get() + defer conn.Close() + // List of reservation ids between -inf time and the current time will get us // everything that is now ready to be queued. now := time.Now().UTC().Unix() @@ -156,15 +162,9 @@ func (mq *RedisMQ) processDelayedTasks(conn redis.Conn) { func (mq *RedisMQ) start() { go func() { - conn := mq.pool.Get() - defer conn.Close() - if err := conn.Err(); err != nil { - logrus.WithError(err).Fatal("Could not start redis MQ reservation system") - } - for range mq.ticker.C { - mq.processPendingReservations(conn) - mq.processDelayedTasks(conn) + mq.processPendingReservations() + mq.processDelayedTasks() } }() }