Files
fn-serverless/api/server/server.go
Reed Allman 337e962416 add pagination to all list endpoints
calls, apps, and routes listing were previously returning the entire data set,
which just won't scale. this adds pagination with cursoring forward to each of
these endpoints (see the [docs](docs/definitions.md)).

the patch is really mostly tests, shouldn't be that bad to pick through.

some blarble about implementation is in order:

calls are sorted by ids but allow searching within certain `created_at` ranges
(finally). this is because sorting by `created_at` isn't feasible when
combined with paging, as `created_at` is not guaranteed to be unique -- id's
are (eliding theoreticals). i.e. on a page boundary, if there are 200 calls
with the same `created_at`, providing a `cursor` of that `created_at` will
skip over the remaining N calls with that `created_at`.  also using id will be
better on the index anyway (well, less of them). yay having sortable ids! I
can't discern any issues doing this, as even if 200 calls have the same
created_at, they will have different ids, and the sort should allow paginating
them just fine. ids are also url safe, so the id works as the cursor value
just fine.

apps and routes are sorted by alphabetical order. as they aren't guaranteed to
be url safe, we are base64'ing them in the front end to a url safe format and
then returning them, and then base64 decoding them when we get them. this does
mean that they can be relatively large if the path/app is long, but if we
don't want to add ids then they were going to be pretty big anyway. a bonus
that this kind of obscures them. if somebody has better idea on formatting, by
all means.

notably, we are not using the sql paging facilities, and we are baking our own
based on cursors, which ends up being much more efficient for querying longer
lists of resources. this also should be easy to implement in other non-sql dbs
and the cursoring formats we can change on the fly since we are just exposing
them as opaque strings. the front end deals with the base64 / formatting, etc
and the back end is taking raw values (strfmt.DateTime or the id for calls).
the cursor that is being passed to/by the user is simply the last resource on the
previous page, so in theory we don't even need to return it, but it does make
it a little easier to use, also, cursor being blank on the last page depends
on page full-ness, so sometimes users will get a cursor when there are no
results on next page (1/N chance, and it's not really end of world -- actually
searching for the next thing would make things more complex). there are ample
tests for this behavior.

I've turned off all query parameters allowing `LIKE` queries on certain listing
endpoints, as we should not expose sql behavior through our API in the event
that we end up not using a sql db down the road. I think we should only allow
prefix matching, which sql can support as well as other types of databases
relatively cheaply, but this is not hooked up here as it didn't 'just work'
when I was fiddling with it (can add later, they're unnecessary and weren't
wired in before in front end).

* remove route listing across apps (unused)
* fix panic when doing `/app//`. this is prob possible for other types of
endpoints, out of scope here. added a guard in front of all endpoints for this
* adds `from_time` and `to_time` query parameters to calls, so you can e.g.
list the last hour of tasks. these are not required and default to
oldest/newest.
* hooked back up the datastore tests to the sql db, only run with sqlite atm,
but these are useful, added a lot to them too.
* added a bunch of tests to the front end, so pretty sure this all works now.
* added to swagger, we'll need to re-gen. also wrote some words about
pagination workings, I'm not sure how best to link to these, feedback welcome.
* not sure how we want to manage indexes, but we may need to add some (looking
at created_at, mostly)
* `?route` changed to `?path` in routes listing, to keep consistency with
everything else
* don't 404 when searching for calls where the route doesn't exist, just
return an empty list (it's a query param ffs)

closes #141
2017-09-20 06:50:49 -07:00

380 lines
9.8 KiB
Go

package server
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"os"
"path"
"strconv"
"github.com/fnproject/fn/api"
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/datastore"
"github.com/fnproject/fn/api/datastore/cache"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/logs"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/api/mqs"
"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/openzipkin/zipkin-go-opentracing"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
const (
EnvLogLevel = "log_level"
EnvMQURL = "mq_url"
EnvDBURL = "db_url"
EnvLOGDBURL = "logstore_url"
EnvPort = "port" // be careful, Gin expects this variable to be "port"
EnvAPIURL = "api_url"
EnvZipkinURL = "zipkin_url"
)
type Server struct {
Router *gin.Engine
Agent agent.Agent
Datastore models.Datastore
MQ models.MessageQueue
LogDB models.LogStore
appListeners []AppListener
middlewares []Middleware
runnerListeners []RunnerListener
}
// NewFromEnv creates a new Functions server based on env vars.
func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
ds, err := datastore.New(viper.GetString(EnvDBURL))
if err != nil {
logrus.WithError(err).Fatalln("Error initializing datastore.")
}
mq, err := mqs.New(viper.GetString(EnvMQURL))
if err != nil {
logrus.WithError(err).Fatal("Error initializing message queue.")
}
var logDB models.LogStore = ds
if ldb := viper.GetString(EnvLOGDBURL); ldb != "" && ldb != viper.GetString(EnvDBURL) {
logDB, err = logs.New(viper.GetString(EnvLOGDBURL))
if err != nil {
logrus.WithError(err).Fatal("Error initializing logs store.")
}
}
return New(ctx, ds, mq, logDB, opts...)
}
// New creates a new Functions server with the passed in datastore, message queue and API URL
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, opts ...ServerOption) *Server {
s := &Server{
Agent: agent.New(cache.Wrap(ds), mq), // only add datastore caching to agent
Router: gin.New(),
Datastore: ds,
MQ: mq,
LogDB: logDB,
}
setMachineId()
setTracer()
s.Router.Use(loggerWrap, traceWrap, panicWrap)
s.bindHandlers(ctx)
for _, opt := range opts {
if opt == nil {
continue
}
opt(s)
}
return s
}
// we should use http grr
func traceWrap(c *gin.Context) {
// try to grab a span from the request if made from another service, ignore err if not
wireContext, _ := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(c.Request.Header))
// Create the span referring to the RPC client if available.
// If wireContext == nil, a root span will be created.
// TODO we should add more tags?
serverSpan := opentracing.StartSpan("serve_http", ext.RPCServerOption(wireContext), opentracing.Tag{"path", c.Request.URL.Path})
defer serverSpan.Finish()
ctx := opentracing.ContextWithSpan(c.Request.Context(), serverSpan)
c.Request = c.Request.WithContext(ctx)
c.Next()
}
func setTracer() {
var (
debugMode = false
serviceName = "fn-server"
serviceHostPort = "localhost:8080" // meh
zipkinHTTPEndpoint = viper.GetString(EnvZipkinURL)
// ex: "http://zipkin:9411/api/v1/spans"
)
if zipkinHTTPEndpoint == "" {
return
}
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
collector, err := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if err != nil {
logrus.WithError(err).Fatalln("couldn't start trace collector")
}
tracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
if err != nil {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
opentracing.SetGlobalTracer(tracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}
func setMachineId() {
port := uint16(viper.GetInt(EnvPort))
addr := whoAmI().To4()
if addr == nil {
addr = net.ParseIP("127.0.0.1").To4()
logrus.Warn("could not find non-local ipv4 address to use, using '127.0.0.1' for ids, if this is a cluster beware of duplicate ids!")
}
id.SetMachineIdHost(addr, port)
}
// whoAmI searches for a non-local address on any network interface, returning
// the first one it finds. it could be expanded to search eth0 or en0 only but
// to date this has been unnecessary.
func whoAmI() net.IP {
ints, _ := net.Interfaces()
for _, i := range ints {
if i.Name == "docker0" || i.Name == "lo" {
// not perfect
continue
}
addrs, _ := i.Addrs()
for _, a := range addrs {
ip, _, err := net.ParseCIDR(a.String())
if a.Network() == "ip+net" && err == nil && ip.To4() != nil {
if !bytes.Equal(ip, net.ParseIP("127.0.0.1")) {
return ip
}
}
}
}
return nil
}
func panicWrap(c *gin.Context) {
defer func(c *gin.Context) {
if rec := recover(); rec != nil {
err, ok := rec.(error)
if !ok {
err = fmt.Errorf("fn: %v", rec)
}
handleErrorResponse(c, err)
c.Abort()
}
}(c)
c.Next()
}
func loggerWrap(c *gin.Context) {
ctx, _ := common.LoggerWithFields(c.Request.Context(), extractFields(c))
if appName := c.Param(api.CApp); appName != "" {
c.Set(api.AppName, appName)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.AppName, appName))
}
if routePath := c.Param(api.CRoute); routePath != "" {
c.Set(api.Path, routePath)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.Path, routePath))
}
c.Request = c.Request.WithContext(ctx)
c.Next()
}
func appWrap(c *gin.Context) {
appName := c.GetString(api.AppName)
if appName == "" {
handleErrorResponse(c, models.ErrMissingAppName)
c.Abort()
return
}
c.Next()
}
func (s *Server) handleRunnerRequest(c *gin.Context) {
s.handleRequest(c)
}
func extractFields(c *gin.Context) logrus.Fields {
fields := logrus.Fields{"action": path.Base(c.HandlerName())}
for _, param := range c.Params {
fields[param.Key] = param.Value
}
return fields
}
func (s *Server) Start(ctx context.Context) {
ctx = contextWithSignal(ctx, os.Interrupt)
s.startGears(ctx)
}
func (s *Server) startGears(ctx context.Context) {
// By default it serves on :8080 unless a
// PORT environment variable was defined.
listen := fmt.Sprintf(":%d", viper.GetInt(EnvPort))
const runHeader = `
______
/ ____/___
/ /_ / __ \
/ __/ / / / /
/_/ /_/ /_/
`
fmt.Println(runHeader)
logrus.Infof("Serving Functions API on address `%s`", listen)
server := http.Server{
Addr: listen,
Handler: s.Router,
// TODO we should set read/write timeouts
}
go func() {
<-ctx.Done() // listening for signals...
server.Shutdown(context.Background()) // we can wait
}()
err := server.ListenAndServe()
if err != nil {
logrus.WithError(err).Error("error opening server")
}
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish
}
func (s *Server) bindHandlers(ctx context.Context) {
engine := s.Router
engine.GET("/", handlePing)
engine.GET("/version", handleVersion)
engine.GET("/stats", s.handleStats)
{
v1 := engine.Group("/v1")
v1.Use(s.middlewareWrapperFunc(ctx))
v1.GET("/apps", s.handleAppList)
v1.POST("/apps", s.handleAppCreate)
{
apps := v1.Group("/apps/:app")
apps.Use(appWrap)
apps.GET("", s.handleAppGet)
apps.PATCH("", s.handleAppUpdate)
apps.DELETE("", s.handleAppDelete)
apps.GET("/routes", s.handleRouteList)
apps.POST("/routes", s.handleRoutesPostPutPatch)
apps.GET("/routes/*route", s.handleRouteGet)
apps.PATCH("/routes/*route", s.handleRoutesPostPutPatch)
apps.PUT("/routes/*route", s.handleRoutesPostPutPatch)
apps.DELETE("/routes/*route", s.handleRouteDelete)
apps.GET("/calls", s.handleCallList)
apps.GET("/calls/:call", s.handleCallGet)
apps.GET("/calls/:call/log", s.handleCallLogGet)
apps.DELETE("/calls/:call/log", s.handleCallLogDelete)
}
}
{
runner := engine.Group("/r")
runner.Use(appWrap)
runner.Any("/:app", s.handleRunnerRequest)
runner.Any("/:app/*route", s.handleRunnerRequest)
}
engine.NoRoute(func(c *gin.Context) {
logrus.Debugln("not found", c.Request.URL.Path)
c.JSON(http.StatusNotFound, simpleError(errors.New("Path not found")))
})
}
// returns the unescaped ?cursor and ?perPage values
// pageParams clamps 0 < ?perPage <= 100 and defaults to 30 if 0
// ignores parsing errors and falls back to defaults.
func pageParams(c *gin.Context, base64d bool) (cursor string, perPage int) {
cursor = c.Query("cursor")
if base64d {
cbytes, _ := base64.RawURLEncoding.DecodeString(cursor)
cursor = string(cbytes)
}
perPage, _ = strconv.Atoi(c.Query("per_page"))
if perPage > 100 {
perPage = 100
} else if perPage <= 0 {
perPage = 30
}
return cursor, perPage
}
type appResponse struct {
Message string `json:"message"`
App *models.App `json:"app"`
}
type appsResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Apps []*models.App `json:"apps"`
}
type routeResponse struct {
Message string `json:"message"`
Route *models.Route `json:"route"`
}
type routesResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Routes []*models.Route `json:"routes"`
}
type callResponse struct {
Message string `json:"message"`
Call *models.Call `json:"call"`
}
type callsResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Calls []*models.Call `json:"calls"`
}
type callLogResponse struct {
Message string `json:"message"`
Log *models.CallLog `json:"log"`
}