mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
functions: remove lru (#399)
This commit is contained in:
@@ -55,7 +55,5 @@ func (s *Server) handleAppCreate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.resetcache(wapp.App.Name, 1)
|
|
||||||
|
|
||||||
c.JSON(http.StatusCreated, appResponse{"App successfully created", wapp.App})
|
c.JSON(http.StatusCreated, appResponse{"App successfully created", wapp.App})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
// Package routecache is meant to assist in resolving the most used routes at
|
|
||||||
// an application. Implemented as a LRU, it returns always its full context for
|
|
||||||
// iteration at the router handler.
|
|
||||||
package routecache
|
|
||||||
|
|
||||||
// based on groupcache's LRU
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/list"
|
|
||||||
|
|
||||||
"github.com/iron-io/functions/api/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Cache holds an internal linkedlist for hotness management. It is not safe
|
|
||||||
// for concurrent use, must be guarded externally.
|
|
||||||
type Cache struct {
|
|
||||||
MaxEntries int
|
|
||||||
|
|
||||||
ll *list.List
|
|
||||||
cache map[string]*list.Element
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a route cache.
|
|
||||||
func New(maxentries int) *Cache {
|
|
||||||
return &Cache{
|
|
||||||
MaxEntries: maxentries,
|
|
||||||
ll: list.New(),
|
|
||||||
cache: make(map[string]*list.Element),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Refresh updates internal linkedlist either adding a new route to the front,
|
|
||||||
// or moving it to the front when used. It will discard seldom used routes.
|
|
||||||
func (c *Cache) Refresh(route *models.Route) {
|
|
||||||
if c.cache == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ee, ok := c.cache[route.Path]; ok {
|
|
||||||
c.ll.MoveToFront(ee)
|
|
||||||
ee.Value = route
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ele := c.ll.PushFront(route)
|
|
||||||
c.cache[route.Path] = ele
|
|
||||||
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
|
|
||||||
c.removeOldest()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get looks up a path's route from the cache.
|
|
||||||
func (c *Cache) Get(path string) (route *models.Route, ok bool) {
|
|
||||||
if c.cache == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ele, hit := c.cache[path]; hit {
|
|
||||||
c.ll.MoveToFront(ele)
|
|
||||||
return ele.Value.(*models.Route), true
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache) removeOldest() {
|
|
||||||
if c.cache == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ele := c.ll.Back(); ele != nil {
|
|
||||||
c.removeElement(ele)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cache) removeElement(e *list.Element) {
|
|
||||||
c.ll.Remove(e)
|
|
||||||
kv := e.Value.(*models.Route)
|
|
||||||
delete(c.cache, kv.Path)
|
|
||||||
}
|
|
||||||
@@ -93,7 +93,5 @@ func (s *Server) handleRouteCreate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.resetcache(wroute.Route.AppName, 1)
|
|
||||||
|
|
||||||
c.JSON(http.StatusCreated, routeResponse{"Route successfully created", wroute.Route})
|
c.JSON(http.StatusCreated, routeResponse{"Route successfully created", wroute.Route})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,5 @@ func (s *Server) handleRouteDelete(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.resetcache(appName, 0)
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, gin.H{"message": "Route deleted"})
|
c.JSON(http.StatusOK, gin.H{"message": "Route deleted"})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,13 +89,6 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on LRU cache")
|
|
||||||
route, ok := s.cacheget(appName, path)
|
|
||||||
if ok && s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
|
|
||||||
s.refreshcache(appName, route)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on datastore")
|
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on datastore")
|
||||||
routes, err := s.loadroutes(ctx, models.RouteFilter{AppName: appName, Path: path})
|
routes, err := s.loadroutes(ctx, models.RouteFilter{AppName: appName, Path: path})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -111,11 +104,10 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.WithField("routes", len(routes)).Debug("Got routes from datastore")
|
log.WithField("routes", len(routes)).Debug("Got routes from datastore")
|
||||||
route = routes[0]
|
route := routes[0]
|
||||||
log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image})
|
log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image})
|
||||||
|
|
||||||
if s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
|
if s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
|
||||||
s.refreshcache(appName, route)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,10 +5,8 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -16,7 +14,6 @@ import (
|
|||||||
"github.com/iron-io/functions/api/models"
|
"github.com/iron-io/functions/api/models"
|
||||||
"github.com/iron-io/functions/api/runner"
|
"github.com/iron-io/functions/api/runner"
|
||||||
"github.com/iron-io/functions/api/runner/task"
|
"github.com/iron-io/functions/api/runner/task"
|
||||||
"github.com/iron-io/functions/api/server/internal/routecache"
|
|
||||||
"github.com/iron-io/runner/common"
|
"github.com/iron-io/runner/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,9 +33,6 @@ type Server struct {
|
|||||||
|
|
||||||
tasks chan task.Request
|
tasks chan task.Request
|
||||||
|
|
||||||
mu sync.Mutex // protects hotroutes
|
|
||||||
hotroutes map[string]*routecache.Cache
|
|
||||||
|
|
||||||
singleflight singleflight // singleflight assists Datastore
|
singleflight singleflight // singleflight assists Datastore
|
||||||
Datastore models.Datastore
|
Datastore models.Datastore
|
||||||
}
|
}
|
||||||
@@ -49,7 +43,6 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *ru
|
|||||||
Router: gin.New(),
|
Router: gin.New(),
|
||||||
Datastore: ds,
|
Datastore: ds,
|
||||||
MQ: mq,
|
MQ: mq,
|
||||||
hotroutes: make(map[string]*routecache.Cache),
|
|
||||||
tasks: tasks,
|
tasks: tasks,
|
||||||
Enqueue: enqueue,
|
Enqueue: enqueue,
|
||||||
}
|
}
|
||||||
@@ -59,43 +52,10 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, r *ru
|
|||||||
c.Set("ctx", ctx)
|
c.Set("ctx", ctx)
|
||||||
c.Next()
|
c.Next()
|
||||||
})
|
})
|
||||||
Api.primeCache(ctx)
|
|
||||||
|
|
||||||
return Api
|
return Api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) primeCache(ctx context.Context) {
|
|
||||||
logrus.Info("priming cache with known routes")
|
|
||||||
apps, err := s.Datastore.GetApps(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("cannot prime cache - could not load application list")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, app := range apps {
|
|
||||||
routes, err := s.Datastore.GetRoutesByApp(ctx, app.Name, &models.RouteFilter{AppName: app.Name})
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).WithField("appName", app.Name).Error("cannot prime cache - could not load routes")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
entries := len(routes)
|
|
||||||
// The idea here is to prevent both extremes: cache being too small that is ineffective,
|
|
||||||
// or too large that it takes too much memory. Up to 1k routes, the cache will try to hold
|
|
||||||
// all routes in the memory, thus taking up to 48K per application. After this threshold,
|
|
||||||
// it will keep 1024 routes + 20% of the total entries - in a hybrid incarnation of Pareto rule
|
|
||||||
// 1024+20% of the remaining routes will likelly be responsible for 80% of the workload.
|
|
||||||
if entries > cacheParetoThreshold {
|
|
||||||
entries = int(math.Ceil(float64(entries-1024)*0.2)) + 1024
|
|
||||||
}
|
|
||||||
s.hotroutes[app.Name] = routecache.New(entries)
|
|
||||||
|
|
||||||
for i := 0; i < entries; i++ {
|
|
||||||
s.refreshcache(app.Name, routes[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logrus.Info("cached prime")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) AddSpecialHandler(handler ifaces.SpecialHandler) {
|
func (s *Server) AddSpecialHandler(handler ifaces.SpecialHandler) {
|
||||||
s.SpecialHandlers = append(s.SpecialHandlers, handler)
|
s.SpecialHandlers = append(s.SpecialHandlers, handler)
|
||||||
}
|
}
|
||||||
@@ -125,41 +85,6 @@ func (s *Server) handleRunnerRequest(c *gin.Context) {
|
|||||||
s.handleRequest(c, s.Enqueue)
|
s.handleRequest(c, s.Enqueue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheParetoThreshold is both the mark from which the LRU starts caching only
|
|
||||||
// the most likely hot routes, and also as a stopping mark for the cache priming
|
|
||||||
// during start.
|
|
||||||
const cacheParetoThreshold = 1024
|
|
||||||
|
|
||||||
func (s *Server) cacheget(appname, path string) (*models.Route, bool) {
|
|
||||||
s.mu.Lock()
|
|
||||||
cache, ok := s.hotroutes[appname]
|
|
||||||
if !ok {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
route, ok := cache.Get(path)
|
|
||||||
s.mu.Unlock()
|
|
||||||
return route, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) refreshcache(appname string, route *models.Route) {
|
|
||||||
s.mu.Lock()
|
|
||||||
cache := s.hotroutes[appname]
|
|
||||||
cache.Refresh(route)
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) resetcache(appname string, delta int) {
|
|
||||||
s.mu.Lock()
|
|
||||||
hr, ok := s.hotroutes[appname]
|
|
||||||
if !ok {
|
|
||||||
s.hotroutes[appname] = routecache.New(0)
|
|
||||||
hr = s.hotroutes[appname]
|
|
||||||
}
|
|
||||||
s.hotroutes[appname] = routecache.New(hr.MaxEntries + delta)
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) handleTaskRequest(c *gin.Context) {
|
func (s *Server) handleTaskRequest(c *gin.Context) {
|
||||||
ctx, _ := common.LoggerWithFields(c, nil)
|
ctx, _ := common.LoggerWithFields(c, nil)
|
||||||
switch c.Request.Method {
|
switch c.Request.Method {
|
||||||
|
|||||||
Reference in New Issue
Block a user