mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
hang the runner, agent=new sheriff (#270)
* fix docker build this is trivially incorrect since glide doesn't actually provide reproducible builds. the idea is to build with the deps that we have checked into git, so that we actually know what code is executing so that we might debug it... all for multi stage build instead of what we had, but adding the glide step is wrong. i added a loud warning so as to discourage this behavior in the future. * hang the runner, agent=new sheriff tl;dr agent is now runner, with a hopefully saner api the general idea is get rid of all the various 'task' structs now, change our terminology to only be 'calls' now, push a lot of the http construction of a call into the agent, allow calls to mutate their state around their execution easily and to simplify the number of code paths, channels and context timeouts in something [hopefully] easy to understand. this introduces the idea of 'slots' which are either hot or cold and are separate from reserving memory (memory is denominated in 'tokens' now). a 'slot' is essentially a container that is ready for execution of a call, be it hot or cold (it just means different things based on hotness). taking a look into Submit should make these relatively easy to grok. sorry, things were pretty broken especially wrt timings. I tried to keep good notes (maybe too good), to highlight stuff so that we don't make the same mistakes again (history repeating itself blah blah quote). even now, there is lots of work to do :) I encourage just reading the agent.go code, Submit is really simple and there's a description of how the whole thing works at the head of the file (after TODOs). call.go contains code for constructing calls, as well as Start / End (small atm). I did some amount of code massaging to try to make things simple / straightforward / fit reasonable mental model, but as always am open to critique (the more negative the better) as I'm just one guy and wth do i know... ----------------------------------------------------------------------------- below enumerates a number of changes as briefly as possible (heh..): models.Call all the things removes models.Task as models.Call is now what it previously was. models.FnCall is now rid of in favor of models.Call, despite the datastore only storing a few fields of it [for now]. we should probably store entire calls in the db, since app & route configurations can change at any given moment, it would be nice to see the parameters of each call (costs db space, obviously). this removes the endpoints for getting & deleting messages, we were just looping back to localhost to call the MQ (wtf? this was for iron integration i think) and just calls the MQ. changes the name of the FnLog to LogStore, confusing cause there's also a `FuncLogger` which uses the Logstore (punting). removes other `Fn` prefixed structs (redundant naming convention). removes some unused and/or weird structs (IDStatus, CompleteTime) updates the swagger makes the db methods consistent to use 'Call' nomenclature. remove runner nuisances: * push down registry stuff to docker driver * remove Environment / Stats stuff of yore * remove unused writers (now in FuncLogger) * remove 2 of the task types, old hot stuff, runner, etc fixes ram available calculation on startup to not always be 300GB (helps a lot on a laptop!) format for DOCKER_AUTH env now is not a list but a map (there are no docs, would prefer to get rid of this altogether anyway). the ~/.docker/cfg expected format is unchanged. removes arbitrary task queue, if a machine is out of ram we can probably just time out without queueing... (can open separate discussion) in any case the old one didn't really account well for hot tasks, it just lined everyone up in the task queue if there wasn't a place to run hot and then timed them out [even if a slot became free]. removes HEADER_ prefixing on any headers in the request to a invoke a call. (this was inconsistent with cli for test anyway) removes TASK_ID header sent in to hot only (this is a dupe of FN_CALL_ID, which has not been removed) now user functions can reply directly to the client. this means that for cold containers if they write to stdout it will send a 200 + headers. for hot containers, the user can reply directly to the client from the container, i.e. with its preferred status code / headers (vs. always getting a 200). the dispatch itself is a little http specific atm, i think we can add an interchange format but the current version is easily extended to add json for now, separate discussion. this eliminates a lot of the request/response rewriting and buffering we were doing (yey). now Dispatch ONLY does input and output, vs. managing the call timeout and having access to a call's fields. cache is pushed down into agent now instead of in the front end, would like to push it down to the datastore actually but it's here for now anyway. cache delete functions removed (b/c fn is distributed anyway?). added app caching, should help with latency. in general, a lot of server/runner.go got pushed down into the agent. i think it will be useful in testing to be able to construct calls without having to invoke http handlers + async also needs to construct calls without a handler. safe shutdown actually works now for everything (leaked / didn't wait on certain things before) now we're waiting for hot slots to open up while we're attempting to get ram to launch a container if we didn't find any hot slots to run the call in immediately. we can change this policy really easily now (no more channel jungle; still some channels). also looking for somewhere else to go while the container is launching now. slots now get sent _out_ of a container, vs. a container receiving calls, which makes this kind of policy easier to implement. this fixes a number of bugs around things like trying to execute calls against containers that have not and may never start and trying to launch a bazillion containers when there are no free containers. the driver api underwent some changes to make this possible (relatively minimal, added Wait). the easiest way to think about this is that allocating ram has moved 'up' instead of just wrapping launching containers, so that we can select on a channel trying to find ram. not dispatching hot calls to containers that died anymore either... the timeout is now started at the beginning of Submit, rather than Dispatch or the container itself having to manage the call timeout, which was an inaccurate way of doing things since finding a slot / allocating ram / pulling image can all take a non-trivial (timeout amount, even!) amount of time. this makes for much more reasonable response times from fn under load, there's still a little TODO about handling cold+timeout container removal response times but it's much improved. if call.Start is called with < call.timeout/2 time left, then the call will not be executed and return a timeout. we can discuss. this makes async play _a lot_ nicer, specifically. for large timeouts / 2 makes less sense. env is no longer getting upper cased (admittedly, this can look a little weird now). our whole route.Config/app.Config/env/headers stuff probably deserves a whole discussion... sync output no longer has the call id in json if there's an error / timeout. we could add this back to signify that it's _us_ writing these but this was out of place. FN_CALL_ID is still shipped out to get the id for sync calls, and async [server] output remains unchanged. async logs are now an entire raw http request (so that a user can write a 400 or something from their hot async container) async hot now 'just works' cold sync calls can now reply to the client before container removal, which shaves a lot of latency off of those (still eat start). still need to figure out async removal if timeout or something. ----------------------------------------------------------------------------- i've located a number of bugs that were generally inherited, and also added a number of TODOs in the head of the agent.go file according to robustness we probably need to add. this is at least at parity with the previous implementation, to my knowledge (hopefully/likely a good bit ahead). I can memorialize these to github quickly enough, not that anybody searches before adding bugs anyway (sigh). the big thing to work on next imo is async being a lot more robust, specifically to survive fn server failures / network issues. thanks for review (gulp)
This commit is contained in:
committed by
Denis Makogon
parent
1b1b64436f
commit
71a88a991c
@@ -4,8 +4,8 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestAppCreate(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
for i, test := range []struct {
|
||||
mock models.Datastore
|
||||
logDB models.FnLog
|
||||
logDB models.LogStore
|
||||
path string
|
||||
body string
|
||||
expectedCode int
|
||||
@@ -48,7 +48,7 @@ func TestAppCreate(t *testing.T) {
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps", `{ "app": { "name": "teste" } }`, http.StatusOK, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr)
|
||||
router := srv.Router
|
||||
|
||||
body := bytes.NewBuffer([]byte(test.body))
|
||||
@@ -78,7 +78,7 @@ func TestAppDelete(t *testing.T) {
|
||||
|
||||
for i, test := range []struct {
|
||||
ds models.Datastore
|
||||
logDB models.FnLog
|
||||
logDB models.LogStore
|
||||
path string
|
||||
body string
|
||||
expectedCode int
|
||||
@@ -88,11 +88,11 @@ func TestAppDelete(t *testing.T) {
|
||||
{datastore.NewMockInit(
|
||||
[]*models.App{{
|
||||
Name: "myapp",
|
||||
}}, nil, nil, nil,
|
||||
}}, nil, nil,
|
||||
), logs.NewMock(), "/v1/apps/myapp", "", http.StatusOK, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
||||
|
||||
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
||||
|
||||
@@ -122,7 +122,7 @@ func TestAppList(t *testing.T) {
|
||||
defer cancel()
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -159,7 +159,7 @@ func TestAppGet(t *testing.T) {
|
||||
defer cancel()
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -194,7 +194,7 @@ func TestAppUpdate(t *testing.T) {
|
||||
|
||||
for i, test := range []struct {
|
||||
mock models.Datastore
|
||||
logDB models.FnLog
|
||||
logDB models.LogStore
|
||||
path string
|
||||
body string
|
||||
expectedCode int
|
||||
@@ -207,18 +207,18 @@ func TestAppUpdate(t *testing.T) {
|
||||
{datastore.NewMockInit(
|
||||
[]*models.App{{
|
||||
Name: "myapp",
|
||||
}}, nil, nil, nil,
|
||||
}}, nil, nil,
|
||||
), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "config": { "test": "1" } } }`, http.StatusOK, nil},
|
||||
|
||||
// Addresses #380
|
||||
{datastore.NewMockInit(
|
||||
[]*models.App{{
|
||||
Name: "myapp",
|
||||
}}, nil, nil, nil,
|
||||
}}, nil, nil,
|
||||
), logs.NewMock(), "/v1/apps/myapp", `{ "app": { "name": "othername" } }`, http.StatusConflict, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(test.mock, &mqs.Mock{}, test.logDB, rnr)
|
||||
|
||||
body := bytes.NewBuffer([]byte(test.body))
|
||||
_, rec := routerRequest(t, srv.Router, "PATCH", test.path, body)
|
||||
|
||||
@@ -10,8 +10,9 @@ import (
|
||||
func (s *Server) handleCallGet(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
appName := c.MustGet(api.AppName).(string)
|
||||
callID := c.Param(api.Call)
|
||||
callObj, err := s.Datastore.GetTask(ctx, callID)
|
||||
callObj, err := s.Datastore.GetCall(ctx, appName, callID)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
|
||||
@@ -20,7 +20,7 @@ func (s *Server) handleCallList(c *gin.Context) {
|
||||
|
||||
filter := models.CallFilter{AppName: appName, Path: c.Query(api.CRoute)}
|
||||
|
||||
calls, err := s.Datastore.GetTasks(ctx, &filter)
|
||||
calls, err := s.Datastore.GetCalls(ctx, &filter)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
|
||||
@@ -10,8 +10,9 @@ import (
|
||||
func (s *Server) handleCallLogGet(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
appName := c.MustGet(api.AppName).(string)
|
||||
callID := c.Param(api.Call)
|
||||
_, err := s.Datastore.GetTask(ctx, callID)
|
||||
_, err := s.Datastore.GetCall(ctx, appName, callID)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
@@ -29,8 +30,9 @@ func (s *Server) handleCallLogGet(c *gin.Context) {
|
||||
func (s *Server) handleCallLogDelete(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
appName := c.MustGet(api.AppName).(string)
|
||||
callID := c.Param(api.Call)
|
||||
_, err := s.Datastore.GetTask(ctx, callID)
|
||||
_, err := s.Datastore.GetCall(ctx, appName, callID)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
|
||||
@@ -7,10 +7,10 @@ import (
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ErrInternalServerError returned when something exceptional happens.
|
||||
@@ -28,13 +28,12 @@ func handleErrorResponse(c *gin.Context, err error) {
|
||||
func HandleErrorResponse(ctx context.Context, w http.ResponseWriter, err error) {
|
||||
log := common.Logger(ctx)
|
||||
var statuscode int
|
||||
switch e := err.(type) {
|
||||
case models.APIError:
|
||||
if e, ok := err.(models.APIError); ok {
|
||||
if e.Code() >= 500 {
|
||||
log.WithFields(logrus.Fields{"code": e.Code()}).WithError(e).Error("api error")
|
||||
}
|
||||
statuscode = e.Code()
|
||||
default:
|
||||
} else {
|
||||
log.WithError(err).WithFields(logrus.Fields{"stack": string(debug.Stack())}).Error("internal server error")
|
||||
statuscode = http.StatusInternalServerError
|
||||
err = ErrInternalServerError
|
||||
|
||||
@@ -4,8 +4,7 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@ func (s *Server) handleRoutesPostPutPatch(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
s.cachedelete(resp.Route.AppName, resp.Route.Path)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,5 @@ func (s *Server) handleRouteDelete(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
s.cachedelete(appName, routePath)
|
||||
c.JSON(http.StatusOK, gin.H{"message": "Route deleted"})
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
|
||||
type routeTestCase struct {
|
||||
ds models.Datastore
|
||||
logDB models.FnLog
|
||||
logDB models.LogStore
|
||||
method string
|
||||
path string
|
||||
body string
|
||||
@@ -24,7 +24,7 @@ type routeTestCase struct {
|
||||
|
||||
func (test *routeTestCase) run(t *testing.T, i int, buf *bytes.Buffer) {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
||||
|
||||
body := bytes.NewBuffer([]byte(test.body))
|
||||
_, rec := routerRequest(t, srv.Router, test.method, test.path, body)
|
||||
@@ -70,7 +70,7 @@ func TestRouteCreate(t *testing.T) {
|
||||
AppName: "a",
|
||||
Path: "/myroute",
|
||||
},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync" } }`, http.StatusConflict, models.ErrRoutesAlreadyExists},
|
||||
|
||||
// success
|
||||
@@ -106,22 +106,22 @@ func TestRoutePut(t *testing.T) {
|
||||
func TestRouteDelete(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
|
||||
routes := models.Routes{{AppName: "a", Path: "/myroute"}}
|
||||
apps := models.Apps{{Name: "a", Routes: routes, Config: nil}}
|
||||
routes := []*models.Route{{AppName: "a", Path: "/myroute"}}
|
||||
apps := []*models.App{{Name: "a", Routes: routes, Config: nil}}
|
||||
|
||||
for i, test := range []struct {
|
||||
ds models.Datastore
|
||||
logDB models.FnLog
|
||||
logDB models.LogStore
|
||||
path string
|
||||
body string
|
||||
expectedCode int
|
||||
expectedError error
|
||||
}{
|
||||
{datastore.NewMock(), logs.NewMock(), "/v1/apps/a/routes/missing", "", http.StatusNotFound, models.ErrRoutesNotFound},
|
||||
{datastore.NewMockInit(apps, routes, nil, nil), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil},
|
||||
{datastore.NewMockInit(apps, routes, nil), logs.NewMock(), "/v1/apps/a/routes/myroute", "", http.StatusOK, nil},
|
||||
} {
|
||||
rnr, cancel := testRunner(t)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(test.ds, &mqs.Mock{}, test.logDB, rnr)
|
||||
_, rec := routerRequest(t, srv.Router, "DELETE", test.path, nil)
|
||||
|
||||
if rec.Code != test.expectedCode {
|
||||
@@ -152,7 +152,7 @@ func TestRouteList(t *testing.T) {
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -191,7 +191,7 @@ func TestRouteGet(t *testing.T) {
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -238,7 +238,7 @@ func TestRouteUpdate(t *testing.T) {
|
||||
AppName: "a",
|
||||
Path: "/myroute/do",
|
||||
},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
), logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "image": "fnproject/hello" } }`, http.StatusOK, nil},
|
||||
|
||||
// Addresses #381
|
||||
@@ -248,7 +248,7 @@ func TestRouteUpdate(t *testing.T) {
|
||||
AppName: "a",
|
||||
Path: "/myroute/do",
|
||||
},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
), logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "path": "/otherpath" } }`, http.StatusConflict, models.ErrRoutesPathImmutable},
|
||||
} {
|
||||
test.run(t, i, buf)
|
||||
|
||||
@@ -3,23 +3,15 @@ package server
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
type runnerResponse struct {
|
||||
@@ -27,15 +19,7 @@ type runnerResponse struct {
|
||||
Error *models.ErrorBody `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func toEnvName(envtype, name string) string {
|
||||
name = strings.ToUpper(strings.Replace(name, "-", "_", -1))
|
||||
if envtype == "" {
|
||||
return name
|
||||
}
|
||||
return fmt.Sprintf("%s_%s", envtype, name)
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
||||
func (s *Server) handleRequest(c *gin.Context) {
|
||||
if strings.HasPrefix(c.Request.URL.Path, "/v1") {
|
||||
c.Status(http.StatusNotFound)
|
||||
return
|
||||
@@ -43,22 +27,10 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
reqID := id.New().String()
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID})
|
||||
|
||||
var err error
|
||||
var payload io.Reader
|
||||
|
||||
if c.Request.Method == "POST" {
|
||||
payload = c.Request.Body
|
||||
// Load complete body and close
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, c.Request.Body)
|
||||
c.Request.Body.Close()
|
||||
}()
|
||||
} else if c.Request.Method == "GET" {
|
||||
reqPayload := c.Request.URL.Query().Get("payload")
|
||||
payload = strings.NewReader(reqPayload)
|
||||
if c.Request.Method == "GET" {
|
||||
// TODO we _could_ check the normal body, this is still weird
|
||||
// TODO do we need to flush the original body if we do this? (hint: yes)
|
||||
c.Request.Body = ioutil.NopCloser(strings.NewReader(c.Request.URL.Query().Get("payload")))
|
||||
}
|
||||
|
||||
r, routeExists := c.Get(api.Path)
|
||||
@@ -73,228 +45,65 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
|
||||
|
||||
s.FireBeforeDispatch(ctx, reqRoute)
|
||||
|
||||
appName := reqRoute.AppName
|
||||
path := reqRoute.Path
|
||||
s.serve(c, reqRoute.AppName, reqRoute.Path)
|
||||
|
||||
app, err := s.Datastore.GetApp(ctx, appName)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
} else if app == nil {
|
||||
handleErrorResponse(c, models.ErrAppsNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{"app": appName, "path": path}).Debug("Finding route on datastore")
|
||||
route, err := s.loadroute(ctx, appName, path)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
if route == nil {
|
||||
handleErrorResponse(c, models.ErrRoutesNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
log = log.WithFields(logrus.Fields{"app": appName, "path": route.Path, "image": route.Image})
|
||||
log.Debug("Got route from datastore")
|
||||
|
||||
if s.serve(ctx, c, appName, route, app, path, reqID, payload, enqueue) {
|
||||
s.FireAfterDispatch(ctx, reqRoute)
|
||||
return
|
||||
}
|
||||
|
||||
handleErrorResponse(c, models.ErrRoutesNotFound)
|
||||
s.FireAfterDispatch(ctx, reqRoute)
|
||||
}
|
||||
|
||||
func (s *Server) loadroute(ctx context.Context, appName, path string) (*models.Route, error) {
|
||||
if route, ok := s.cacheget(appName, path); ok {
|
||||
return route, nil
|
||||
}
|
||||
key := routeCacheKey(appName, path)
|
||||
resp, err := s.singleflight.do(
|
||||
key,
|
||||
func() (interface{}, error) {
|
||||
return s.Datastore.GetRoute(ctx, appName, path)
|
||||
},
|
||||
// TODO it would be nice if we could make this have nothing to do with the gin.Context but meh
|
||||
// TODO make async store an *http.Request? would be sexy until we have different api format...
|
||||
func (s *Server) serve(c *gin.Context, appName, path string) {
|
||||
// GetCall can mod headers, assign an id, look up the route/app (cached),
|
||||
// strip params, etc.
|
||||
call, err := s.Agent.GetCall(
|
||||
agent.WithWriter(c.Writer), // XXX (reed): order matters [for now]
|
||||
agent.FromRequest(appName, path, c.Request),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
route := resp.(*models.Route)
|
||||
s.routeCache.Set(key, route, cache.DefaultExpiration)
|
||||
return route, nil
|
||||
}
|
||||
|
||||
// TODO: Should remove *gin.Context from these functions, should use only context.Context
|
||||
func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, route *models.Route, app *models.App, path, reqID string, payload io.Reader, enqueue models.Enqueue) (ok bool) {
|
||||
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"app": appName, "route": route.Path, "image": route.Image})
|
||||
|
||||
params, match := matchRoute(route.Path, path)
|
||||
if !match {
|
||||
return false
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
|
||||
// TODO we could add FireBeforeDispatch right here with Call in hand
|
||||
|
||||
if route.Format == "" {
|
||||
route.Format = "default"
|
||||
}
|
||||
|
||||
// baseVars are the vars on the route & app, not on this specific request [for hot functions]
|
||||
baseVars := make(map[string]string, len(app.Config)+len(route.Config)+3)
|
||||
baseVars["FN_FORMAT"] = route.Format
|
||||
baseVars["APP_NAME"] = appName
|
||||
baseVars["ROUTE"] = route.Path
|
||||
baseVars["MEMORY_MB"] = fmt.Sprintf("%d", route.Memory)
|
||||
|
||||
// app config
|
||||
for k, v := range app.Config {
|
||||
k = toEnvName("", k)
|
||||
baseVars[k] = v
|
||||
}
|
||||
for k, v := range route.Config {
|
||||
k = toEnvName("", k)
|
||||
baseVars[k] = v
|
||||
}
|
||||
|
||||
// envVars contains the full set of env vars, per request + base
|
||||
envVars := make(map[string]string, len(baseVars)+len(params)+len(c.Request.Header)+3)
|
||||
|
||||
for k, v := range baseVars {
|
||||
envVars[k] = v
|
||||
}
|
||||
|
||||
envVars["CALL_ID"] = reqID
|
||||
envVars["METHOD"] = c.Request.Method
|
||||
envVars["REQUEST_URL"] = fmt.Sprintf("%v://%v%v", func() string {
|
||||
if c.Request.TLS == nil {
|
||||
return "http"
|
||||
}
|
||||
return "https"
|
||||
}(), c.Request.Host, c.Request.URL.String())
|
||||
|
||||
// params
|
||||
for _, param := range params {
|
||||
envVars[toEnvName("PARAM", param.Key)] = param.Value
|
||||
}
|
||||
|
||||
// headers
|
||||
for header, value := range c.Request.Header {
|
||||
envVars[toEnvName("HEADER", header)] = strings.Join(value, ", ")
|
||||
}
|
||||
|
||||
cfg := &task.Config{
|
||||
AppName: appName,
|
||||
Path: route.Path,
|
||||
BaseEnv: baseVars,
|
||||
Env: envVars,
|
||||
Format: route.Format,
|
||||
ID: reqID,
|
||||
Image: route.Image,
|
||||
Memory: route.Memory,
|
||||
Stdin: payload,
|
||||
Stdout: &stdout,
|
||||
Timeout: time.Duration(route.Timeout) * time.Second,
|
||||
IdleTimeout: time.Duration(route.IdleTimeout) * time.Second,
|
||||
ReceivedTime: time.Now(),
|
||||
Ready: make(chan struct{}),
|
||||
}
|
||||
|
||||
// ensure valid values
|
||||
if cfg.Timeout <= 0 {
|
||||
cfg.Timeout = runner.DefaultTimeout
|
||||
}
|
||||
if cfg.IdleTimeout <= 0 {
|
||||
cfg.IdleTimeout = runner.DefaultIdleTimeout
|
||||
}
|
||||
|
||||
s.Runner.Enqueue()
|
||||
newTask := task.TaskFromConfig(cfg)
|
||||
|
||||
switch route.Type {
|
||||
case "async":
|
||||
// TODO we should be able to do hot input to async. plumb protocol stuff
|
||||
// TODO enqueue should unravel the payload?
|
||||
|
||||
// Read payload
|
||||
pl, err := ioutil.ReadAll(cfg.Stdin)
|
||||
if model := call.Model(); model.Type == "async" {
|
||||
// TODO we should push this into GetCall somehow (CallOpt maybe) or maybe agent.Queue(Call) ?
|
||||
buf := bytes.NewBuffer(make([]byte, 0, c.Request.ContentLength)) // TODO sync.Pool me
|
||||
_, err := buf.ReadFrom(c.Request.Body)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, models.ErrInvalidPayload)
|
||||
return true
|
||||
return
|
||||
}
|
||||
// Add in payload
|
||||
newTask.Payload = string(pl)
|
||||
model.Payload = buf.String()
|
||||
|
||||
// Push to queue
|
||||
_, err = enqueue(c, s.MQ, newTask)
|
||||
// TODO we should probably add this to the datastore too. consider the plumber!
|
||||
_, err = s.MQ.Push(c.Request.Context(), model)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return true
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Added new task to queue")
|
||||
c.JSON(http.StatusAccepted, map[string]string{"call_id": cfg.ID})
|
||||
|
||||
default:
|
||||
result, err := s.Runner.RunTrackedTask(newTask, ctx, cfg)
|
||||
if result != nil {
|
||||
waitTime := result.StartTime().Sub(cfg.ReceivedTime)
|
||||
c.Header("XXX-FXLB-WAIT", waitTime.String())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, runnerResponse{
|
||||
RequestID: cfg.ID,
|
||||
Error: &models.ErrorBody{
|
||||
Message: err.Error(),
|
||||
},
|
||||
})
|
||||
log.WithError(err).Error("Failed to run task")
|
||||
break
|
||||
}
|
||||
|
||||
for k, v := range route.Headers {
|
||||
c.Header(k, v[0])
|
||||
}
|
||||
|
||||
// this will help users to track sync execution in a manner of async
|
||||
// FN_CALL_ID is an equivalent of call_id
|
||||
c.Header("FN_CALL_ID", newTask.ID)
|
||||
|
||||
switch result.Status() {
|
||||
case "success":
|
||||
c.Data(http.StatusOK, "", stdout.Bytes())
|
||||
case "timeout":
|
||||
c.JSON(http.StatusGatewayTimeout, runnerResponse{
|
||||
RequestID: cfg.ID,
|
||||
Error: &models.ErrorBody{
|
||||
Message: models.ErrRunnerTimeout.Error(),
|
||||
},
|
||||
})
|
||||
default:
|
||||
c.JSON(http.StatusInternalServerError, runnerResponse{
|
||||
RequestID: cfg.ID,
|
||||
Error: &models.ErrorBody{
|
||||
Message: result.Error(),
|
||||
},
|
||||
})
|
||||
}
|
||||
c.JSON(http.StatusAccepted, map[string]string{"call_id": model.ID})
|
||||
return
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
err = s.Agent.Submit(call)
|
||||
if err != nil {
|
||||
// NOTE if they cancel the request then it will stop the call (kind of cool),
|
||||
// we could filter that error out here too as right now it yells a little
|
||||
|
||||
var fakeHandler = func(http.ResponseWriter, *http.Request, Params) {}
|
||||
|
||||
func matchRoute(baseRoute, route string) (Params, bool) {
|
||||
tree := &node{}
|
||||
tree.addRoute(baseRoute, fakeHandler)
|
||||
handler, p, _ := tree.getValue(route)
|
||||
if handler == nil {
|
||||
return nil, false
|
||||
if err == context.DeadlineExceeded {
|
||||
err = models.ErrCallTimeout // 504 w/ friendly note
|
||||
}
|
||||
// NOTE: if the task wrote the headers already then this will fail to write
|
||||
// a 5xx (and log about it to us) -- that's fine (nice, even!)
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
return p, true
|
||||
// TODO plumb FXLB-WAIT somehow (api?)
|
||||
|
||||
// TODO we need to watch the response writer and if no bytes written
|
||||
// then write a 200 at this point?
|
||||
// c.Data(http.StatusOK)
|
||||
}
|
||||
|
||||
@@ -5,28 +5,23 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, enqueue models.Enqueue) *gin.Engine {
|
||||
func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr agent.Agent) *gin.Engine {
|
||||
ctx := context.Background()
|
||||
|
||||
s := &Server{
|
||||
Runner: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
MQ: mq,
|
||||
Enqueue: enqueue,
|
||||
routeCache: cache.New(60*time.Second, 5*time.Minute),
|
||||
Agent: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
MQ: mq,
|
||||
}
|
||||
|
||||
r := s.Router
|
||||
@@ -46,7 +41,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) {
|
||||
{Type: "async", Path: "/myroute", AppName: "myapp", Image: "fnproject/hello", Config: map[string]string{"test": "true"}},
|
||||
{Type: "async", Path: "/myerror", AppName: "myapp", Image: "fnproject/error", Config: map[string]string{"test": "true"}},
|
||||
{Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "fnproject/hello", Config: map[string]string{"test": "true"}},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
)
|
||||
mq := &mqs.Mock{}
|
||||
|
||||
@@ -75,29 +70,10 @@ func TestRouteRunnerAsyncExecution(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
body := bytes.NewBuffer([]byte(test.body))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
fmt.Println("About to start router")
|
||||
rnr, cancel := testRunner(t)
|
||||
router := testRouterAsync(ds, mq, rnr, func(_ context.Context, _ models.MessageQueue, task *models.Task) (*models.Task, error) {
|
||||
if test.body != task.Payload {
|
||||
t.Errorf("Test %d: Expected task Payload to be the same as the test body", i)
|
||||
}
|
||||
|
||||
if test.expectedEnv != nil {
|
||||
for name, value := range test.expectedEnv {
|
||||
taskName := name
|
||||
if value != task.EnvVars[taskName] {
|
||||
t.Errorf("Test %d: Expected header `%s` to be `%s` but was `%s`",
|
||||
i, name, value, task.EnvVars[taskName])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
return task, nil
|
||||
})
|
||||
rnr, cancel := testRunner(t, ds)
|
||||
router := testRouterAsync(ds, mq, rnr)
|
||||
|
||||
fmt.Println("makeing requests")
|
||||
req, rec := newRouterRequest(t, "POST", test.path, body)
|
||||
@@ -112,8 +88,9 @@ func TestRouteRunnerAsyncExecution(t *testing.T) {
|
||||
t.Errorf("Test %d: Expected status code to be %d but was %d",
|
||||
i, test.expectedCode, rec.Code)
|
||||
}
|
||||
// TODO can test body and headers in the actual mq message w/ an agent that doesn't dequeue?
|
||||
// this just makes sure tasks are submitted (ok)...
|
||||
|
||||
wg.Wait()
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,40 +3,45 @@ package server
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"errors"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
)
|
||||
|
||||
func testRunner(t *testing.T) (*runner.Runner, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
func testRunner(t *testing.T, args ...interface{}) (agent.Agent, context.CancelFunc) {
|
||||
ds := datastore.NewMock()
|
||||
fnl := logs.NewMock()
|
||||
r, err := runner.New(ctx, runner.NewFuncLogger(fnl), ds)
|
||||
if err != nil {
|
||||
t.Fatal("Test: failed to create new runner")
|
||||
var mq models.MessageQueue = &mqs.Mock{}
|
||||
for _, a := range args {
|
||||
switch arg := a.(type) {
|
||||
case models.Datastore:
|
||||
ds = arg
|
||||
case models.MessageQueue:
|
||||
mq = arg
|
||||
}
|
||||
}
|
||||
return r, cancel
|
||||
r := agent.New(ds, mq)
|
||||
return r, func() { r.Close() }
|
||||
}
|
||||
|
||||
func TestRouteRunnerGet(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
rnr, cancel := testRunner(t)
|
||||
defer cancel()
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
}, nil, nil, nil,
|
||||
}, nil, nil,
|
||||
)
|
||||
|
||||
rnr, cancel := testRunner(t, ds)
|
||||
defer cancel()
|
||||
logDB := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -71,16 +76,17 @@ func TestRouteRunnerGet(t *testing.T) {
|
||||
func TestRouteRunnerPost(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
|
||||
rnr, cancel := testRunner(t)
|
||||
defer cancel()
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
}, nil, nil, nil,
|
||||
}, nil, nil,
|
||||
)
|
||||
|
||||
rnr, cancel := testRunner(t, ds)
|
||||
defer cancel()
|
||||
|
||||
fnl := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -117,9 +123,6 @@ func TestRouteRunnerPost(t *testing.T) {
|
||||
func TestRouteRunnerExecution(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
|
||||
rnr, cancelrnr := testRunner(t)
|
||||
defer cancelrnr()
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
@@ -128,11 +131,14 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
{Path: "/", AppName: "myapp", Image: "fnproject/hello", Headers: map[string][]string{"X-Function": {"Test"}}},
|
||||
{Path: "/myroute", AppName: "myapp", Image: "fnproject/hello", Headers: map[string][]string{"X-Function": {"Test"}}},
|
||||
{Path: "/myerror", AppName: "myapp", Image: "fnproject/error", Headers: map[string][]string{"X-Function": {"Test"}}},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
)
|
||||
|
||||
rnr, cancelrnr := testRunner(t, ds)
|
||||
defer cancelrnr()
|
||||
|
||||
fnl := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -172,26 +178,34 @@ func TestRouteRunnerExecution(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// implement models.MQ and models.APIError
|
||||
type errorMQ struct {
|
||||
error
|
||||
code int
|
||||
}
|
||||
|
||||
func (mock *errorMQ) Push(context.Context, *models.Call) (*models.Call, error) { return nil, mock }
|
||||
func (mock *errorMQ) Reserve(context.Context) (*models.Call, error) { return nil, mock }
|
||||
func (mock *errorMQ) Delete(context.Context, *models.Call) error { return mock }
|
||||
func (mock *errorMQ) Code() int { return mock.code }
|
||||
|
||||
func TestFailedEnqueue(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
rnr, cancelrnr := testRunner(t)
|
||||
defer cancelrnr()
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
},
|
||||
[]*models.Route{
|
||||
{Path: "/dummy", AppName: "myapp", Image: "dummy/dummy", Type: "async"},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
)
|
||||
err := errors.New("Unable to push task to queue")
|
||||
mq := &errorMQ{err, http.StatusInternalServerError}
|
||||
fnl := logs.NewMock()
|
||||
rnr, cancelrnr := testRunner(t, ds, mq)
|
||||
defer cancelrnr()
|
||||
|
||||
enqueue := func(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) {
|
||||
return nil, errors.New("Unable to push task to queue")
|
||||
}
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, enqueue)
|
||||
srv := testServer(ds, mq, fnl, rnr)
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
body string
|
||||
@@ -215,19 +229,20 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
||||
t.Skip("doesn't work on old Ubuntu")
|
||||
buf := setLogBuffer()
|
||||
|
||||
rnr, cancelrnr := testRunner(t)
|
||||
defer cancelrnr()
|
||||
|
||||
ds := datastore.NewMockInit(
|
||||
[]*models.App{
|
||||
{Name: "myapp", Config: models.Config{}},
|
||||
},
|
||||
[]*models.Route{
|
||||
{Path: "/sleeper", AppName: "myapp", Image: "fnproject/sleeper", Timeout: 1},
|
||||
}, nil, nil,
|
||||
}, nil,
|
||||
)
|
||||
|
||||
rnr, cancelrnr := testRunner(t, ds)
|
||||
defer cancelrnr()
|
||||
|
||||
fnl := logs.NewMock()
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, fnl, rnr)
|
||||
|
||||
for i, test := range []struct {
|
||||
path string
|
||||
@@ -261,29 +276,29 @@ func TestRouteRunnerTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchRoute(t *testing.T) {
|
||||
buf := setLogBuffer()
|
||||
for i, test := range []struct {
|
||||
baseRoute string
|
||||
route string
|
||||
expectedParams []Param
|
||||
}{
|
||||
{"/myroute/", `/myroute/`, nil},
|
||||
{"/myroute/:mybigparam", `/myroute/1`, []Param{{"mybigparam", "1"}}},
|
||||
{"/:param/*test", `/1/2`, []Param{{"param", "1"}, {"test", "/2"}}},
|
||||
} {
|
||||
if params, match := matchRoute(test.baseRoute, test.route); match {
|
||||
if test.expectedParams != nil {
|
||||
for j, param := range test.expectedParams {
|
||||
if params[j].Key != param.Key || params[j].Value != param.Value {
|
||||
t.Log(buf.String())
|
||||
t.Errorf("Test %d: expected param %d, key = %s, value = %s", i, j, param.Key, param.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
t.Log(buf.String())
|
||||
t.Errorf("Test %d: %s should match %s", i, test.route, test.baseRoute)
|
||||
}
|
||||
}
|
||||
}
|
||||
//func TestMatchRoute(t *testing.T) {
|
||||
//buf := setLogBuffer()
|
||||
//for i, test := range []struct {
|
||||
//baseRoute string
|
||||
//route string
|
||||
//expectedParams []Param
|
||||
//}{
|
||||
//{"/myroute/", `/myroute/`, nil},
|
||||
//{"/myroute/:mybigparam", `/myroute/1`, []Param{{"mybigparam", "1"}}},
|
||||
//{"/:param/*test", `/1/2`, []Param{{"param", "1"}, {"test", "/2"}}},
|
||||
//} {
|
||||
//if params, match := matchRoute(test.baseRoute, test.route); match {
|
||||
//if test.expectedParams != nil {
|
||||
//for j, param := range test.expectedParams {
|
||||
//if params[j].Key != param.Key || params[j].Value != param.Value {
|
||||
//t.Log(buf.String())
|
||||
//t.Errorf("Test %d: expected param %d, key = %s, value = %s", i, j, param.Key, param.Value)
|
||||
//}
|
||||
//}
|
||||
//}
|
||||
//} else {
|
||||
//t.Log(buf.String())
|
||||
//t.Errorf("Test %d: %s should match %s", i, test.route, test.baseRoute)
|
||||
//}
|
||||
//}
|
||||
//}
|
||||
|
||||
@@ -3,31 +3,26 @@ package server
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/ccirello/supervisor"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
"github.com/openzipkin/zipkin-go-opentracing"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
@@ -42,25 +37,17 @@ const (
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
Datastore models.Datastore
|
||||
Runner *runner.Runner
|
||||
Router *gin.Engine
|
||||
Agent agent.Agent
|
||||
Datastore models.Datastore
|
||||
MQ models.MessageQueue
|
||||
Enqueue models.Enqueue
|
||||
LogDB models.FnLog
|
||||
|
||||
apiURL string
|
||||
LogDB models.LogStore
|
||||
|
||||
appListeners []AppListener
|
||||
middlewares []Middleware
|
||||
runnerListeners []RunnerListener
|
||||
|
||||
routeCache *cache.Cache
|
||||
singleflight singleflight // singleflight assists Datastore
|
||||
}
|
||||
|
||||
const cacheSize = 1024
|
||||
|
||||
// NewFromEnv creates a new Functions server based on env vars.
|
||||
func NewFromEnv(ctx context.Context) *Server {
|
||||
ds, err := datastore.New(viper.GetString(EnvDBURL))
|
||||
@@ -73,7 +60,7 @@ func NewFromEnv(ctx context.Context) *Server {
|
||||
logrus.WithError(err).Fatal("Error initializing message queue.")
|
||||
}
|
||||
|
||||
var logDB models.FnLog = ds
|
||||
var logDB models.LogStore = ds
|
||||
if ldb := viper.GetString(EnvLOGDBURL); ldb != "" && ldb != viper.GetString(EnvDBURL) {
|
||||
logDB, err = logs.New(viper.GetString(EnvLOGDBURL))
|
||||
if err != nil {
|
||||
@@ -81,30 +68,17 @@ func NewFromEnv(ctx context.Context) *Server {
|
||||
}
|
||||
}
|
||||
|
||||
apiURL := viper.GetString(EnvAPIURL)
|
||||
|
||||
return New(ctx, ds, mq, logDB, apiURL)
|
||||
return New(ctx, ds, mq, logDB)
|
||||
}
|
||||
|
||||
// New creates a new Functions server with the passed in datastore, message queue and API URL
|
||||
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, apiURL string, opts ...ServerOption) *Server {
|
||||
funcLogger := runner.NewFuncLogger(logDB)
|
||||
|
||||
rnr, err := runner.New(ctx, funcLogger, ds)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalln("Failed to create a runner")
|
||||
return nil
|
||||
}
|
||||
|
||||
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, opts ...ServerOption) *Server {
|
||||
s := &Server{
|
||||
Runner: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
MQ: mq,
|
||||
routeCache: cache.New(5*time.Second, 5*time.Minute),
|
||||
LogDB: logDB,
|
||||
Enqueue: DefaultEnqueue,
|
||||
apiURL: apiURL,
|
||||
Agent: agent.New(ds, mq),
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
MQ: mq,
|
||||
LogDB: logDB,
|
||||
}
|
||||
|
||||
setMachineId()
|
||||
@@ -234,58 +208,8 @@ func loggerWrap(c *gin.Context) {
|
||||
c.Next()
|
||||
}
|
||||
|
||||
func DefaultEnqueue(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) {
|
||||
ctx, _ = common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
|
||||
return mq.Push(ctx, task)
|
||||
}
|
||||
|
||||
func routeCacheKey(appname, path string) string {
|
||||
return fmt.Sprintf("%s_%s", appname, path)
|
||||
}
|
||||
func (s *Server) cacheget(appname, path string) (*models.Route, bool) {
|
||||
route, ok := s.routeCache.Get(routeCacheKey(appname, path))
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return route.(*models.Route), ok
|
||||
}
|
||||
|
||||
func (s *Server) cachedelete(appname, path string) {
|
||||
s.routeCache.Delete(routeCacheKey(appname, path))
|
||||
}
|
||||
|
||||
func (s *Server) handleRunnerRequest(c *gin.Context) {
|
||||
s.handleRequest(c, s.Enqueue)
|
||||
}
|
||||
|
||||
func (s *Server) handleTaskRequest(c *gin.Context) {
|
||||
ctx, _ := common.LoggerWithFields(c, nil)
|
||||
switch c.Request.Method {
|
||||
case "GET":
|
||||
task, err := s.MQ.Reserve(ctx)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, task)
|
||||
case "DELETE":
|
||||
body, err := ioutil.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
var task models.Task
|
||||
if err = json.Unmarshal(body, &task); err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.MQ.Delete(ctx, &task); err != nil {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusAccepted, task)
|
||||
}
|
||||
s.handleRequest(c)
|
||||
}
|
||||
|
||||
func extractFields(c *gin.Context) logrus.Fields {
|
||||
@@ -305,10 +229,6 @@ func (s *Server) startGears(ctx context.Context) {
|
||||
// By default it serves on :8080 unless a
|
||||
// PORT environment variable was defined.
|
||||
listen := fmt.Sprintf(":%d", viper.GetInt(EnvPort))
|
||||
listener, err := net.Listen("tcp", listen)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalln("Failed to serve functions API.")
|
||||
}
|
||||
|
||||
const runHeader = `
|
||||
______
|
||||
@@ -320,29 +240,23 @@ func (s *Server) startGears(ctx context.Context) {
|
||||
fmt.Println(runHeader)
|
||||
logrus.Infof("Serving Functions API on address `%s`", listen)
|
||||
|
||||
svr := &supervisor.Supervisor{
|
||||
MaxRestarts: supervisor.AlwaysRestart,
|
||||
Log: func(msg interface{}) {
|
||||
logrus.Debug("supervisor: ", msg)
|
||||
},
|
||||
server := http.Server{
|
||||
Addr: listen,
|
||||
Handler: s.Router,
|
||||
// TODO we should set read/write timeouts
|
||||
}
|
||||
|
||||
svr.AddFunc(func(ctx context.Context) {
|
||||
go func() {
|
||||
err := http.Serve(listener, s.Router)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Error serving API: %v", err)
|
||||
}
|
||||
}()
|
||||
<-ctx.Done()
|
||||
})
|
||||
go func() {
|
||||
<-ctx.Done() // listening for signals...
|
||||
server.Shutdown(context.Background()) // we can wait
|
||||
}()
|
||||
|
||||
svr.AddFunc(func(ctx context.Context) {
|
||||
runner.RunAsyncRunner(ctx, s.apiURL, s.Runner, s.Datastore)
|
||||
})
|
||||
err := server.ListenAndServe()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("error opening server")
|
||||
}
|
||||
|
||||
svr.Serve(ctx)
|
||||
s.Runner.Wait() // wait for tasks to finish (safe shutdown)
|
||||
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish
|
||||
}
|
||||
|
||||
func (s *Server) bindHandlers(ctx context.Context) {
|
||||
@@ -380,8 +294,6 @@ func (s *Server) bindHandlers(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
engine.DELETE("/tasks", s.handleTaskRequest)
|
||||
engine.GET("/tasks", s.handleTaskRequest)
|
||||
engine.Any("/r/:app", s.handleRunnerRequest)
|
||||
engine.Any("/r/:app/*route", s.handleRunnerRequest)
|
||||
|
||||
@@ -397,8 +309,8 @@ type appResponse struct {
|
||||
}
|
||||
|
||||
type appsResponse struct {
|
||||
Message string `json:"message"`
|
||||
Apps models.Apps `json:"apps"`
|
||||
Message string `json:"message"`
|
||||
Apps []*models.App `json:"apps"`
|
||||
}
|
||||
|
||||
type routeResponse struct {
|
||||
@@ -407,26 +319,21 @@ type routeResponse struct {
|
||||
}
|
||||
|
||||
type routesResponse struct {
|
||||
Message string `json:"message"`
|
||||
Routes models.Routes `json:"routes"`
|
||||
}
|
||||
|
||||
type tasksResponse struct {
|
||||
Message string `json:"message"`
|
||||
Task models.Task `json:"tasksResponse"`
|
||||
Message string `json:"message"`
|
||||
Routes []*models.Route `json:"routes"`
|
||||
}
|
||||
|
||||
type fnCallResponse struct {
|
||||
Message string `json:"message"`
|
||||
Call *models.FnCall `json:"call"`
|
||||
Message string `json:"message"`
|
||||
Call *models.Call `json:"call"`
|
||||
}
|
||||
|
||||
type fnCallsResponse struct {
|
||||
Message string `json:"message"`
|
||||
Calls models.FnCalls `json:"calls"`
|
||||
Calls []*models.Call `json:"calls"`
|
||||
}
|
||||
|
||||
type fnCallLogResponse struct {
|
||||
Message string `json:"message"`
|
||||
Log *models.FnCallLog `json:"log"`
|
||||
Message string `json:"message"`
|
||||
Log *models.CallLog `json:"log"`
|
||||
}
|
||||
|
||||
@@ -10,29 +10,25 @@ import (
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/agent"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
var tmpDatastoreTests = "/tmp/func_test_datastore.db"
|
||||
|
||||
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.FnLog, rnr *runner.Runner, enqueue models.Enqueue) *Server {
|
||||
func testServer(ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, rnr agent.Agent) *Server {
|
||||
ctx := context.Background()
|
||||
|
||||
s := &Server{
|
||||
Runner: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
LogDB: logDB,
|
||||
MQ: mq,
|
||||
Enqueue: enqueue,
|
||||
routeCache: cache.New(60*time.Second, 5*time.Minute),
|
||||
Agent: rnr,
|
||||
Router: gin.New(),
|
||||
Datastore: ds,
|
||||
LogDB: logDB,
|
||||
MQ: mq,
|
||||
}
|
||||
|
||||
r := s.Router
|
||||
@@ -81,7 +77,7 @@ func getErrorResponse(t *testing.T, rec *httptest.ResponseRecorder) models.Error
|
||||
return errResp
|
||||
}
|
||||
|
||||
func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.FnLog, func()) {
|
||||
func prepareDB(ctx context.Context, t *testing.T) (models.Datastore, models.LogStore, func()) {
|
||||
os.Remove(tmpDatastoreTests)
|
||||
ds, err := datastore.New("sqlite3://" + tmpDatastoreTests)
|
||||
if err != nil {
|
||||
@@ -99,10 +95,10 @@ func TestFullStack(t *testing.T) {
|
||||
ds, logDB, close := prepareDB(ctx, t)
|
||||
defer close()
|
||||
|
||||
rnr, rnrcancel := testRunner(t)
|
||||
rnr, rnrcancel := testRunner(t, ds)
|
||||
defer rnrcancel()
|
||||
|
||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr, DefaultEnqueue)
|
||||
srv := testServer(ds, &mqs.Mock{}, logDB, rnr)
|
||||
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
@@ -110,7 +106,7 @@ func TestFullStack(t *testing.T) {
|
||||
path string
|
||||
body string
|
||||
expectedCode int
|
||||
expectedCacheSize int
|
||||
expectedCacheSize int // TODO kill me
|
||||
}{
|
||||
{"create my app", "POST", "/v1/apps", `{ "app": { "name": "myapp" } }`, http.StatusOK, 0},
|
||||
{"list apps", "GET", "/v1/apps", ``, http.StatusOK, 0},
|
||||
@@ -138,10 +134,5 @@ func TestFullStack(t *testing.T) {
|
||||
t.Errorf("Test \"%s\": Expected status code to be %d but was %d",
|
||||
test.name, test.expectedCode, rec.Code)
|
||||
}
|
||||
if srv.routeCache.ItemCount() != test.expectedCacheSize {
|
||||
t.Log(buf.String())
|
||||
t.Errorf("Test \"%s\": Expected cache size to be %d but was %d",
|
||||
test.name, test.expectedCacheSize, srv.routeCache.ItemCount())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
package server
|
||||
|
||||
// Imported from https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// call is an in-flight or completed do call
|
||||
type call struct {
|
||||
wg sync.WaitGroup
|
||||
val interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
type singleflight struct {
|
||||
mu sync.Mutex // protects m
|
||||
m map[interface{}]*call // lazily initialized
|
||||
}
|
||||
|
||||
// do executes and returns the results of the given function, making
|
||||
// sure that only one execution is in-flight for a given key at a
|
||||
// time. If a duplicate comes in, the duplicate caller waits for the
|
||||
// original to complete and receives the same results.
|
||||
func (g *singleflight) do(key interface{}, fn func() (interface{}, error)) (interface{}, error) {
|
||||
g.mu.Lock()
|
||||
if g.m == nil {
|
||||
g.m = make(map[interface{}]*call)
|
||||
}
|
||||
if c, ok := g.m[key]; ok {
|
||||
g.mu.Unlock()
|
||||
c.wg.Wait()
|
||||
return c.val, c.err
|
||||
}
|
||||
c := new(call)
|
||||
c.wg.Add(1)
|
||||
g.m[key] = c
|
||||
g.mu.Unlock()
|
||||
|
||||
c.val, c.err = fn()
|
||||
c.wg.Done()
|
||||
|
||||
g.mu.Lock()
|
||||
delete(g.m, key)
|
||||
g.mu.Unlock()
|
||||
|
||||
return c.val, c.err
|
||||
}
|
||||
@@ -7,5 +7,5 @@ import (
|
||||
)
|
||||
|
||||
func (s *Server) handleStats(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, s.Runner.Stats())
|
||||
c.JSON(http.StatusOK, s.Agent.Stats())
|
||||
}
|
||||
|
||||
@@ -1,657 +0,0 @@
|
||||
// Copyright 2013 Julien Schmidt. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be found
|
||||
// in the LICENSE file.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
type Handle func(http.ResponseWriter, *http.Request, Params)
|
||||
type Param struct {
|
||||
Key string
|
||||
Value string
|
||||
}
|
||||
type Params []Param
|
||||
|
||||
func min(a, b int) int {
|
||||
if a <= b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func countParams(path string) uint8 {
|
||||
var n uint
|
||||
for i := 0; i < len(path); i++ {
|
||||
if path[i] != ':' && path[i] != '*' {
|
||||
continue
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n >= 255 {
|
||||
return 255
|
||||
}
|
||||
return uint8(n)
|
||||
}
|
||||
|
||||
type nodeType uint8
|
||||
|
||||
const (
|
||||
static nodeType = iota // default
|
||||
root
|
||||
param
|
||||
catchAll
|
||||
)
|
||||
|
||||
type node struct {
|
||||
path string
|
||||
wildChild bool
|
||||
nType nodeType
|
||||
maxParams uint8
|
||||
indices string
|
||||
children []*node
|
||||
handle Handle
|
||||
priority uint32
|
||||
}
|
||||
|
||||
// increments priority of the given child and reorders if necessary
|
||||
func (n *node) incrementChildPrio(pos int) int {
|
||||
n.children[pos].priority++
|
||||
prio := n.children[pos].priority
|
||||
|
||||
// adjust position (move to front)
|
||||
newPos := pos
|
||||
for newPos > 0 && n.children[newPos-1].priority < prio {
|
||||
// swap node positions
|
||||
tmpN := n.children[newPos-1]
|
||||
n.children[newPos-1] = n.children[newPos]
|
||||
n.children[newPos] = tmpN
|
||||
|
||||
newPos--
|
||||
}
|
||||
|
||||
// build new index char string
|
||||
if newPos != pos {
|
||||
n.indices = n.indices[:newPos] + // unchanged prefix, might be empty
|
||||
n.indices[pos:pos+1] + // the index char we move
|
||||
n.indices[newPos:pos] + n.indices[pos+1:] // rest without char at 'pos'
|
||||
}
|
||||
|
||||
return newPos
|
||||
}
|
||||
|
||||
// addRoute adds a node with the given handle to the path.
|
||||
// Not concurrency-safe!
|
||||
func (n *node) addRoute(path string, handle Handle) {
|
||||
fullPath := path
|
||||
n.priority++
|
||||
numParams := countParams(path)
|
||||
|
||||
// non-empty tree
|
||||
if len(n.path) > 0 || len(n.children) > 0 {
|
||||
walk:
|
||||
for {
|
||||
// Update maxParams of the current node
|
||||
if numParams > n.maxParams {
|
||||
n.maxParams = numParams
|
||||
}
|
||||
|
||||
// Find the longest common prefix.
|
||||
// This also implies that the common prefix contains no ':' or '*'
|
||||
// since the existing key can't contain those chars.
|
||||
i := 0
|
||||
max := min(len(path), len(n.path))
|
||||
for i < max && path[i] == n.path[i] {
|
||||
i++
|
||||
}
|
||||
|
||||
// Split edge
|
||||
if i < len(n.path) {
|
||||
child := node{
|
||||
path: n.path[i:],
|
||||
wildChild: n.wildChild,
|
||||
nType: static,
|
||||
indices: n.indices,
|
||||
children: n.children,
|
||||
handle: n.handle,
|
||||
priority: n.priority - 1,
|
||||
}
|
||||
|
||||
// Update maxParams (max of all children)
|
||||
for i := range child.children {
|
||||
if child.children[i].maxParams > child.maxParams {
|
||||
child.maxParams = child.children[i].maxParams
|
||||
}
|
||||
}
|
||||
|
||||
n.children = []*node{&child}
|
||||
// []byte for proper unicode char conversion, see #65
|
||||
n.indices = string([]byte{n.path[i]})
|
||||
n.path = path[:i]
|
||||
n.handle = nil
|
||||
n.wildChild = false
|
||||
}
|
||||
|
||||
// Make new node a child of this node
|
||||
if i < len(path) {
|
||||
path = path[i:]
|
||||
|
||||
if n.wildChild {
|
||||
n = n.children[0]
|
||||
n.priority++
|
||||
|
||||
// Update maxParams of the child node
|
||||
if numParams > n.maxParams {
|
||||
n.maxParams = numParams
|
||||
}
|
||||
numParams--
|
||||
|
||||
// Check if the wildcard matches
|
||||
if len(path) >= len(n.path) && n.path == path[:len(n.path)] {
|
||||
// check for longer wildcard, e.g. :name and :names
|
||||
if len(n.path) >= len(path) || path[len(n.path)] == '/' {
|
||||
continue walk
|
||||
}
|
||||
}
|
||||
|
||||
panic("path segment '" + path +
|
||||
"' conflicts with existing wildcard '" + n.path +
|
||||
"' in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
c := path[0]
|
||||
|
||||
// slash after param
|
||||
if n.nType == param && c == '/' && len(n.children) == 1 {
|
||||
n = n.children[0]
|
||||
n.priority++
|
||||
continue walk
|
||||
}
|
||||
|
||||
// Check if a child with the next path byte exists
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
if c == n.indices[i] {
|
||||
i = n.incrementChildPrio(i)
|
||||
n = n.children[i]
|
||||
continue walk
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise insert it
|
||||
if c != ':' && c != '*' {
|
||||
// []byte for proper unicode char conversion, see #65
|
||||
n.indices += string([]byte{c})
|
||||
child := &node{
|
||||
maxParams: numParams,
|
||||
}
|
||||
n.children = append(n.children, child)
|
||||
n.incrementChildPrio(len(n.indices) - 1)
|
||||
n = child
|
||||
}
|
||||
n.insertChild(numParams, path, fullPath, handle)
|
||||
return
|
||||
|
||||
} else if i == len(path) { // Make node a (in-path) leaf
|
||||
if n.handle != nil {
|
||||
panic("a handle is already registered for path '" + fullPath + "'")
|
||||
}
|
||||
n.handle = handle
|
||||
}
|
||||
return
|
||||
}
|
||||
} else { // Empty tree
|
||||
n.insertChild(numParams, path, fullPath, handle)
|
||||
n.nType = root
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) insertChild(numParams uint8, path, fullPath string, handle Handle) {
|
||||
var offset int // already handled bytes of the path
|
||||
|
||||
// find prefix until first wildcard (beginning with ':'' or '*'')
|
||||
for i, max := 0, len(path); numParams > 0; i++ {
|
||||
c := path[i]
|
||||
if c != ':' && c != '*' {
|
||||
continue
|
||||
}
|
||||
|
||||
// find wildcard end (either '/' or path end)
|
||||
end := i + 1
|
||||
for end < max && path[end] != '/' {
|
||||
switch path[end] {
|
||||
// the wildcard name must not contain ':' and '*'
|
||||
case ':', '*':
|
||||
panic("only one wildcard per path segment is allowed, has: '" +
|
||||
path[i:] + "' in path '" + fullPath + "'")
|
||||
default:
|
||||
end++
|
||||
}
|
||||
}
|
||||
|
||||
// check if this Node existing children which would be
|
||||
// unreachable if we insert the wildcard here
|
||||
if len(n.children) > 0 {
|
||||
panic("wildcard route '" + path[i:end] +
|
||||
"' conflicts with existing children in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
// check if the wildcard has a name
|
||||
if end-i < 2 {
|
||||
panic("wildcards must be named with a non-empty name in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
if c == ':' { // param
|
||||
// split path at the beginning of the wildcard
|
||||
if i > 0 {
|
||||
n.path = path[offset:i]
|
||||
offset = i
|
||||
}
|
||||
|
||||
child := &node{
|
||||
nType: param,
|
||||
maxParams: numParams,
|
||||
}
|
||||
n.children = []*node{child}
|
||||
n.wildChild = true
|
||||
n = child
|
||||
n.priority++
|
||||
numParams--
|
||||
|
||||
// if the path doesn't end with the wildcard, then there
|
||||
// will be another non-wildcard subpath starting with '/'
|
||||
if end < max {
|
||||
n.path = path[offset:end]
|
||||
offset = end
|
||||
|
||||
child := &node{
|
||||
maxParams: numParams,
|
||||
priority: 1,
|
||||
}
|
||||
n.children = []*node{child}
|
||||
n = child
|
||||
}
|
||||
|
||||
} else { // catchAll
|
||||
if end != max || numParams > 1 {
|
||||
panic("catch-all routes are only allowed at the end of the path in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
if len(n.path) > 0 && n.path[len(n.path)-1] == '/' {
|
||||
panic("catch-all conflicts with existing handle for the path segment root in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
// currently fixed width 1 for '/'
|
||||
i--
|
||||
if path[i] != '/' {
|
||||
panic("no / before catch-all in path '" + fullPath + "'")
|
||||
}
|
||||
|
||||
n.path = path[offset:i]
|
||||
|
||||
// first node: catchAll node with empty path
|
||||
child := &node{
|
||||
wildChild: true,
|
||||
nType: catchAll,
|
||||
maxParams: 1,
|
||||
}
|
||||
n.children = []*node{child}
|
||||
n.indices = string(path[i])
|
||||
n = child
|
||||
n.priority++
|
||||
|
||||
// second node: node holding the variable
|
||||
child = &node{
|
||||
path: path[i:],
|
||||
nType: catchAll,
|
||||
maxParams: 1,
|
||||
handle: handle,
|
||||
priority: 1,
|
||||
}
|
||||
n.children = []*node{child}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// insert remaining path part and handle to the leaf
|
||||
n.path = path[offset:]
|
||||
n.handle = handle
|
||||
}
|
||||
|
||||
// Returns the handle registered with the given path (key). The values of
|
||||
// wildcards are saved to a map.
|
||||
// If no handle can be found, a TSR (trailing slash redirect) recommendation is
|
||||
// made if a handle exists with an extra (without the) trailing slash for the
|
||||
// given path.
|
||||
func (n *node) getValue(path string) (handle Handle, p Params, tsr bool) {
|
||||
walk: // outer loop for walking the tree
|
||||
for {
|
||||
if len(path) > len(n.path) {
|
||||
if path[:len(n.path)] == n.path {
|
||||
path = path[len(n.path):]
|
||||
// If this node does not have a wildcard (param or catchAll)
|
||||
// child, we can just look up the next child node and continue
|
||||
// to walk down the tree
|
||||
if !n.wildChild {
|
||||
c := path[0]
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
if c == n.indices[i] {
|
||||
n = n.children[i]
|
||||
continue walk
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing found.
|
||||
// We can recommend to redirect to the same URL without a
|
||||
// trailing slash if a leaf exists for that path.
|
||||
tsr = (path == "/" && n.handle != nil)
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
// handle wildcard child
|
||||
n = n.children[0]
|
||||
switch n.nType {
|
||||
case param:
|
||||
// find param end (either '/' or path end)
|
||||
end := 0
|
||||
for end < len(path) && path[end] != '/' {
|
||||
end++
|
||||
}
|
||||
|
||||
// save param value
|
||||
if p == nil {
|
||||
// lazy allocation
|
||||
p = make(Params, 0, n.maxParams)
|
||||
}
|
||||
i := len(p)
|
||||
p = p[:i+1] // expand slice within preallocated capacity
|
||||
p[i].Key = n.path[1:]
|
||||
p[i].Value = path[:end]
|
||||
|
||||
// we need to go deeper!
|
||||
if end < len(path) {
|
||||
if len(n.children) > 0 {
|
||||
path = path[end:]
|
||||
n = n.children[0]
|
||||
continue walk
|
||||
}
|
||||
|
||||
// ... but we can't
|
||||
tsr = (len(path) == end+1)
|
||||
return
|
||||
}
|
||||
|
||||
if handle = n.handle; handle != nil {
|
||||
return
|
||||
} else if len(n.children) == 1 {
|
||||
// No handle found. Check if a handle for this path + a
|
||||
// trailing slash exists for TSR recommendation
|
||||
n = n.children[0]
|
||||
tsr = (n.path == "/" && n.handle != nil)
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
case catchAll:
|
||||
// save param value
|
||||
if p == nil {
|
||||
// lazy allocation
|
||||
p = make(Params, 0, n.maxParams)
|
||||
}
|
||||
i := len(p)
|
||||
p = p[:i+1] // expand slice within preallocated capacity
|
||||
p[i].Key = n.path[2:]
|
||||
p[i].Value = path
|
||||
|
||||
handle = n.handle
|
||||
return
|
||||
|
||||
default:
|
||||
panic("invalid node type")
|
||||
}
|
||||
}
|
||||
} else if path == n.path {
|
||||
// We should have reached the node containing the handle.
|
||||
// Check if this node has a handle registered.
|
||||
if handle = n.handle; handle != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if path == "/" && n.wildChild && n.nType != root {
|
||||
tsr = true
|
||||
return
|
||||
}
|
||||
|
||||
// No handle found. Check if a handle for this path + a
|
||||
// trailing slash exists for trailing slash recommendation
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
if n.indices[i] == '/' {
|
||||
n = n.children[i]
|
||||
tsr = (len(n.path) == 1 && n.handle != nil) ||
|
||||
(n.nType == catchAll && n.children[0].handle != nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Nothing found. We can recommend to redirect to the same URL with an
|
||||
// extra trailing slash if a leaf exists for that path
|
||||
tsr = (path == "/") ||
|
||||
(len(n.path) == len(path)+1 && n.path[len(path)] == '/' &&
|
||||
path == n.path[:len(n.path)-1] && n.handle != nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Makes a case-insensitive lookup of the given path and tries to find a handler.
|
||||
// It can optionally also fix trailing slashes.
|
||||
// It returns the case-corrected path and a bool indicating whether the lookup
|
||||
// was successful.
|
||||
func (n *node) findCaseInsensitivePath(path string, fixTrailingSlash bool) (ciPath []byte, found bool) {
|
||||
return n.findCaseInsensitivePathRec(
|
||||
path,
|
||||
strings.ToLower(path),
|
||||
make([]byte, 0, len(path)+1), // preallocate enough memory for new path
|
||||
[4]byte{}, // empty rune buffer
|
||||
fixTrailingSlash,
|
||||
)
|
||||
}
|
||||
|
||||
// shift bytes in array by n bytes left
|
||||
func shiftNRuneBytes(rb [4]byte, n int) [4]byte {
|
||||
switch n {
|
||||
case 0:
|
||||
return rb
|
||||
case 1:
|
||||
return [4]byte{rb[1], rb[2], rb[3], 0}
|
||||
case 2:
|
||||
return [4]byte{rb[2], rb[3]}
|
||||
case 3:
|
||||
return [4]byte{rb[3]}
|
||||
default:
|
||||
return [4]byte{}
|
||||
}
|
||||
}
|
||||
|
||||
// recursive case-insensitive lookup function used by n.findCaseInsensitivePath
|
||||
func (n *node) findCaseInsensitivePathRec(path, loPath string, ciPath []byte, rb [4]byte, fixTrailingSlash bool) ([]byte, bool) {
|
||||
loNPath := strings.ToLower(n.path)
|
||||
|
||||
walk: // outer loop for walking the tree
|
||||
for len(loPath) >= len(loNPath) && (len(loNPath) == 0 || loPath[1:len(loNPath)] == loNPath[1:]) {
|
||||
// add common path to result
|
||||
ciPath = append(ciPath, n.path...)
|
||||
|
||||
if path = path[len(n.path):]; len(path) > 0 {
|
||||
loOld := loPath
|
||||
loPath = loPath[len(loNPath):]
|
||||
|
||||
// If this node does not have a wildcard (param or catchAll) child,
|
||||
// we can just look up the next child node and continue to walk down
|
||||
// the tree
|
||||
if !n.wildChild {
|
||||
// skip rune bytes already processed
|
||||
rb = shiftNRuneBytes(rb, len(loNPath))
|
||||
|
||||
if rb[0] != 0 {
|
||||
// old rune not finished
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
if n.indices[i] == rb[0] {
|
||||
// continue with child node
|
||||
n = n.children[i]
|
||||
loNPath = strings.ToLower(n.path)
|
||||
continue walk
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// process a new rune
|
||||
var rv rune
|
||||
|
||||
// find rune start
|
||||
// runes are up to 4 byte long,
|
||||
// -4 would definitely be another rune
|
||||
var off int
|
||||
for max := min(len(loNPath), 3); off < max; off++ {
|
||||
if i := len(loNPath) - off; utf8.RuneStart(loOld[i]) {
|
||||
// read rune from cached lowercase path
|
||||
rv, _ = utf8.DecodeRuneInString(loOld[i:])
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// calculate lowercase bytes of current rune
|
||||
utf8.EncodeRune(rb[:], rv)
|
||||
// skipp already processed bytes
|
||||
rb = shiftNRuneBytes(rb, off)
|
||||
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
// lowercase matches
|
||||
if n.indices[i] == rb[0] {
|
||||
// must use a recursive approach since both the
|
||||
// uppercase byte and the lowercase byte might exist
|
||||
// as an index
|
||||
if out, found := n.children[i].findCaseInsensitivePathRec(
|
||||
path, loPath, ciPath, rb, fixTrailingSlash,
|
||||
); found {
|
||||
return out, true
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// same for uppercase rune, if it differs
|
||||
if up := unicode.ToUpper(rv); up != rv {
|
||||
utf8.EncodeRune(rb[:], up)
|
||||
rb = shiftNRuneBytes(rb, off)
|
||||
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
// uppercase matches
|
||||
if n.indices[i] == rb[0] {
|
||||
// continue with child node
|
||||
n = n.children[i]
|
||||
loNPath = strings.ToLower(n.path)
|
||||
continue walk
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing found. We can recommend to redirect to the same URL
|
||||
// without a trailing slash if a leaf exists for that path
|
||||
return ciPath, (fixTrailingSlash && path == "/" && n.handle != nil)
|
||||
}
|
||||
|
||||
n = n.children[0]
|
||||
switch n.nType {
|
||||
case param:
|
||||
// find param end (either '/' or path end)
|
||||
k := 0
|
||||
for k < len(path) && path[k] != '/' {
|
||||
k++
|
||||
}
|
||||
|
||||
// add param value to case insensitive path
|
||||
ciPath = append(ciPath, path[:k]...)
|
||||
|
||||
// we need to go deeper!
|
||||
if k < len(path) {
|
||||
if len(n.children) > 0 {
|
||||
// continue with child node
|
||||
n = n.children[0]
|
||||
loNPath = strings.ToLower(n.path)
|
||||
loPath = loPath[k:]
|
||||
path = path[k:]
|
||||
continue
|
||||
}
|
||||
|
||||
// ... but we can't
|
||||
if fixTrailingSlash && len(path) == k+1 {
|
||||
return ciPath, true
|
||||
}
|
||||
return ciPath, false
|
||||
}
|
||||
|
||||
if n.handle != nil {
|
||||
return ciPath, true
|
||||
} else if fixTrailingSlash && len(n.children) == 1 {
|
||||
// No handle found. Check if a handle for this path + a
|
||||
// trailing slash exists
|
||||
n = n.children[0]
|
||||
if n.path == "/" && n.handle != nil {
|
||||
return append(ciPath, '/'), true
|
||||
}
|
||||
}
|
||||
return ciPath, false
|
||||
|
||||
case catchAll:
|
||||
return append(ciPath, path...), true
|
||||
|
||||
default:
|
||||
panic("invalid node type")
|
||||
}
|
||||
} else {
|
||||
// We should have reached the node containing the handle.
|
||||
// Check if this node has a handle registered.
|
||||
if n.handle != nil {
|
||||
return ciPath, true
|
||||
}
|
||||
|
||||
// No handle found.
|
||||
// Try to fix the path by adding a trailing slash
|
||||
if fixTrailingSlash {
|
||||
for i := 0; i < len(n.indices); i++ {
|
||||
if n.indices[i] == '/' {
|
||||
n = n.children[i]
|
||||
if (len(n.path) == 1 && n.handle != nil) ||
|
||||
(n.nType == catchAll && n.children[0].handle != nil) {
|
||||
return append(ciPath, '/'), true
|
||||
}
|
||||
return ciPath, false
|
||||
}
|
||||
}
|
||||
}
|
||||
return ciPath, false
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing found.
|
||||
// Try to fix the path by adding / removing a trailing slash
|
||||
if fixTrailingSlash {
|
||||
if path == "/" {
|
||||
return ciPath, true
|
||||
}
|
||||
if len(loPath)+1 == len(loNPath) && loNPath[len(loPath)] == '/' &&
|
||||
loPath[1:] == loNPath[1:len(loPath)] && n.handle != nil {
|
||||
return append(ciPath, n.path...), true
|
||||
}
|
||||
}
|
||||
return ciPath, false
|
||||
}
|
||||
Reference in New Issue
Block a user