functions: performance improvements - LRU & singleflight DB calls (#322)

* functions: add cache and singleflight to ease database load

* runner: upgrade

* deps: upgrade glide files

* license: add third party notifications

* functions: fix handling of implicitly created apps

* functions: code deduplication

* functions: fix missing variable
This commit is contained in:
C Cirello
2016-11-21 19:48:11 +01:00
committed by GitHub
parent a4d360fb2e
commit f6d19c3cc9
14 changed files with 289 additions and 69 deletions

View File

@@ -9,7 +9,7 @@ import (
"github.com/iron-io/runner/common"
)
func handleAppCreate(c *gin.Context) {
func (s *Server) handleAppCreate(c *gin.Context) {
ctx := c.MustGet("ctx").(context.Context)
log := common.Logger(ctx)
@@ -55,5 +55,7 @@ func handleAppCreate(c *gin.Context) {
return
}
s.resetcache(wapp.App.Name, 1)
c.JSON(http.StatusCreated, appResponse{"App successfully created", wapp.App})
}

View File

@@ -0,0 +1,77 @@
// Package routecache is meant to assist in resolving the most used routes at
// an application. Implemented as a LRU, it returns always its full context for
// iteration at the router handler.
package routecache
// based on groupcache's LRU
import (
"container/list"
"github.com/iron-io/functions/api/models"
)
// Cache holds an internal linkedlist for hotness management. It is not safe
// for concurrent use, must be guarded externally.
type Cache struct {
MaxEntries int
ll *list.List
cache map[string]*list.Element
}
// New returns a route cache.
func New(maxentries int) *Cache {
return &Cache{
MaxEntries: maxentries,
ll: list.New(),
cache: make(map[string]*list.Element),
}
}
// Refresh updates internal linkedlist either adding a new route to the front,
// or moving it to the front when used. It will discard seldom used routes.
func (c *Cache) Refresh(route *models.Route) {
if c.cache == nil {
return
}
if ee, ok := c.cache[route.Path]; ok {
c.ll.MoveToFront(ee)
ee.Value = route
return
}
ele := c.ll.PushFront(route)
c.cache[route.Path] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.removeOldest()
}
}
// Get looks up a path's route from the cache.
func (c *Cache) Get(path string) (route *models.Route, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[path]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*models.Route), true
}
return
}
func (c *Cache) removeOldest() {
if c.cache == nil {
return
}
if ele := c.ll.Back(); ele != nil {
c.removeElement(ele)
}
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*models.Route)
delete(c.cache, kv.Path)
}

View File

@@ -10,7 +10,7 @@ import (
"github.com/iron-io/runner/common"
)
func handleRouteCreate(c *gin.Context) {
func (s *Server) handleRouteCreate(c *gin.Context) {
ctx := c.MustGet("ctx").(context.Context)
log := common.Logger(ctx)
@@ -80,5 +80,7 @@ func handleRouteCreate(c *gin.Context) {
return
}
s.resetcache(wroute.Route.AppName, 1)
c.JSON(http.StatusCreated, routeResponse{"Route successfully created", wroute.Route})
}

View File

@@ -9,7 +9,7 @@ import (
"github.com/iron-io/runner/common"
)
func handleRouteDelete(c *gin.Context) {
func (s *Server) handleRouteDelete(c *gin.Context) {
ctx := c.MustGet("ctx").(context.Context)
log := common.Logger(ctx)
@@ -23,5 +23,7 @@ func handleRouteDelete(c *gin.Context) {
return
}
s.resetcache(appName, 0)
c.JSON(http.StatusOK, gin.H{"message": "Route deleted"})
}

View File

@@ -76,13 +76,11 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
c.JSON(http.StatusBadRequest, simpleError(models.ErrAppsNotFound))
return
}
route := c.Param("route")
if route == "" {
route = c.Request.URL.Path
path := c.Param("route")
if path == "" {
path = c.Request.URL.Path
}
log.WithFields(logrus.Fields{"app": appName, "path": route}).Debug("Finding route on datastore")
app, err := Api.Datastore.GetApp(appName)
if err != nil || app == nil {
log.WithError(err).Error(models.ErrAppsNotFound)
@@ -90,30 +88,56 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
return
}
routes, err := Api.Datastore.GetRoutesByApp(appName, &models.RouteFilter{AppName: appName, Path: route})
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on LRU cache")
route, ok := s.cacheget(appName, path)
if ok && s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) {
s.refreshcache(appName, route)
return
}
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on datastore")
routes, err := s.loadroutes(models.RouteFilter{AppName: appName, Path: path})
if err != nil {
log.WithError(err).Error(models.ErrRoutesList)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRoutesList))
return
}
log.WithField("routes", routes).Debug("Got routes from datastore")
if len(routes) == 0 {
log.WithError(err).Error(models.ErrRunnerRouteNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrRunnerRouteNotFound))
return
}
found := routes[0]
log = log.WithFields(logrus.Fields{
"app": appName, "route": found.Path, "image": found.Image})
log.WithField("routes", len(routes)).Debug("Got routes from datastore")
route = routes[0]
log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image})
if s.serve(c, log, appName, route, app, path, reqID, payload, enqueue) {
s.refreshcache(appName, route)
return
}
log.Error(models.ErrRunnerRouteNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrRunnerRouteNotFound))
}
func (s *Server) loadroutes(filter models.RouteFilter) ([]*models.Route, error) {
resp, err := s.singleflight.do(
filter,
func() (interface{}, error) {
return Api.Datastore.GetRoutesByApp(filter.AppName, &filter)
},
)
return resp.([]*models.Route), err
}
func (s *Server) serve(c *gin.Context, log logrus.FieldLogger, appName string, found *models.Route, app *models.App, route, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
log = log.WithFields(logrus.Fields{"app": appName, "route": found.Path, "image": found.Image})
params, match := matchRoute(found.Path, route)
if !match {
log.WithError(err).Error(models.ErrRunnerRouteNotFound)
c.JSON(http.StatusNotFound, simpleError(models.ErrRunnerRouteNotFound))
return
return false
}
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
@@ -162,7 +186,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
if err != nil {
log.WithError(err).Error(models.ErrInvalidPayload)
c.JSON(http.StatusBadRequest, simpleError(models.ErrInvalidPayload))
return
return true
}
// Create Task
@@ -176,13 +200,12 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
task.EnvVars = cfg.Env
task.Payload = string(pl)
// Push to queue
enqueue(ctx, s.MQ, task)
enqueue(c, s.MQ, task)
log.Info("Added new task to queue")
c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID})
default:
result, err := runner.RunTask(s.tasks, ctx, cfg)
result, err := runner.RunTask(s.tasks, c, cfg)
if err != nil {
break
}
@@ -197,6 +220,8 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
}
}
return true
}
var fakeHandler = func(http.ResponseWriter, *http.Request, Params) {}

View File

@@ -5,14 +5,17 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"math"
"net/http"
"path"
"sync"
"github.com/Sirupsen/logrus"
"github.com/gin-gonic/gin"
"github.com/iron-io/functions/api/ifaces"
"github.com/iron-io/functions/api/models"
"github.com/iron-io/functions/api/runner"
"github.com/iron-io/functions/api/server/internal/routecache"
"github.com/iron-io/runner/common"
)
@@ -23,13 +26,18 @@ var Api *Server
type Server struct {
Runner *runner.Runner
Router *gin.Engine
Datastore models.Datastore
MQ models.MessageQueue
AppListeners []ifaces.AppListener
SpecialHandlers []ifaces.SpecialHandler
Enqueue models.Enqueue
tasks chan runner.TaskRequest
mu sync.Mutex // protects hotroutes
hotroutes map[string]*routecache.Cache
singleflight singleflight // singleflight assists Datastore
Datastore models.Datastore
}
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *runner.Runner, tasks chan runner.TaskRequest, enqueue models.Enqueue) *Server {
@@ -38,6 +46,7 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *ru
Router: gin.New(),
Datastore: ds,
MQ: mq,
hotroutes: make(map[string]*routecache.Cache),
tasks: tasks,
Enqueue: enqueue,
}
@@ -47,10 +56,43 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *ru
c.Set("ctx", ctx)
c.Next()
})
Api.primeCache()
return Api
}
func (s *Server) primeCache() {
logrus.Info("priming cache with known routes")
apps, err := s.Datastore.GetApps(nil)
if err != nil {
logrus.WithError(err).Error("cannot prime cache - could not load application list")
return
}
for _, app := range apps {
routes, err := s.Datastore.GetRoutesByApp(app.Name, &models.RouteFilter{AppName: app.Name})
if err != nil {
logrus.WithError(err).WithField("appName", app.Name).Error("cannot prime cache - could not load routes")
continue
}
entries := len(routes)
// The idea here is to prevent both extremes: cache being too small that is ineffective,
// or too large that it takes too much memory. Up to 1k routes, the cache will try to hold
// all routes in the memory, thus taking up to 48K per application. After this threshold,
// it will keep 1024 routes + 20% of the total entries - in a hybrid incarnation of Pareto rule
// 1024+20% of the remaining routes will likelly be responsible for 80% of the workload.
if entries > cacheParetoThreshold {
entries = int(math.Ceil(float64(entries-1024)*0.2)) + 1024
}
s.hotroutes[app.Name] = routecache.New(entries)
for i := 0; i < entries; i++ {
s.refreshcache(app.Name, routes[i])
}
}
logrus.Info("cached prime")
}
// AddAppListener adds a listener that will be notified on App changes.
func (s *Server) AddAppListener(listener ifaces.AppListener) {
s.AppListeners = append(s.AppListeners, listener)
@@ -105,6 +147,41 @@ func (s *Server) handleRunnerRequest(c *gin.Context) {
s.handleRequest(c, s.Enqueue)
}
// cacheParetoThreshold is both the mark from which the LRU starts caching only
// the most likely hot routes, and also as a stopping mark for the cache priming
// during start.
const cacheParetoThreshold = 1024
func (s *Server) cacheget(appname, path string) (*models.Route, bool) {
s.mu.Lock()
cache, ok := s.hotroutes[appname]
if !ok {
s.mu.Unlock()
return nil, false
}
route, ok := cache.Get(path)
s.mu.Unlock()
return route, ok
}
func (s *Server) refreshcache(appname string, route *models.Route) {
s.mu.Lock()
cache := s.hotroutes[appname]
cache.Refresh(route)
s.mu.Unlock()
}
func (s *Server) resetcache(appname string, delta int) {
s.mu.Lock()
hr, ok := s.hotroutes[appname]
if !ok {
s.hotroutes[appname] = routecache.New(0)
hr = s.hotroutes[appname]
}
s.hotroutes[appname] = routecache.New(hr.MaxEntries + delta)
s.mu.Unlock()
}
func (s *Server) handleTaskRequest(c *gin.Context) {
ctx, _ := common.LoggerWithFields(c, nil)
switch c.Request.Method {
@@ -164,7 +241,7 @@ func (s *Server) bindHandlers() {
v1 := engine.Group("/v1")
{
v1.GET("/apps", handleAppList)
v1.POST("/apps", handleAppCreate)
v1.POST("/apps", s.handleAppCreate)
v1.GET("/apps/:app", handleAppGet)
v1.PUT("/apps/:app", handleAppUpdate)
@@ -175,10 +252,10 @@ func (s *Server) bindHandlers() {
apps := v1.Group("/apps/:app")
{
apps.GET("/routes", handleRouteList)
apps.POST("/routes", handleRouteCreate)
apps.POST("/routes", s.handleRouteCreate)
apps.GET("/routes/*route", handleRouteGet)
apps.PUT("/routes/*route", handleRouteUpdate)
apps.DELETE("/routes/*route", handleRouteDelete)
apps.DELETE("/routes/*route", s.handleRouteDelete)
}
}

View File

@@ -0,0 +1,50 @@
package server
// Imported from https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
import (
"sync"
"github.com/iron-io/functions/api/models"
)
// call is an in-flight or completed do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type singleflight struct {
mu sync.Mutex // protects m
m map[models.RouteFilter]*call // lazily initialized
}
// do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *singleflight) do(key models.RouteFilter, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[models.RouteFilter]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}