Files
fn-serverless/api/server/server.go
Reed Allman f335d34636 add server option to limit request size (#320)
we're going to want to do this in our service version of this thing, but
adding this here so that it's usable by everyone. just an option, can add it
to server configuration, but response is nicely formatted, etc.

closes #277
2017-09-18 22:34:19 -07:00

341 lines
8.9 KiB
Go

package server
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path"
"github.com/fnproject/fn/api"
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/datastore"
"github.com/fnproject/fn/api/datastore/cache"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/logs"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/api/mqs"
"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/openzipkin/zipkin-go-opentracing"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
const (
EnvLogLevel = "log_level"
EnvMQURL = "mq_url"
EnvDBURL = "db_url"
EnvLOGDBURL = "logstore_url"
EnvPort = "port" // be careful, Gin expects this variable to be "port"
EnvAPIURL = "api_url"
EnvZipkinURL = "zipkin_url"
)
type Server struct {
Router *gin.Engine
Agent agent.Agent
Datastore models.Datastore
MQ models.MessageQueue
LogDB models.LogStore
appListeners []AppListener
middlewares []Middleware
runnerListeners []RunnerListener
}
// NewFromEnv creates a new Functions server based on env vars.
func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
ds, err := datastore.New(viper.GetString(EnvDBURL))
if err != nil {
logrus.WithError(err).Fatalln("Error initializing datastore.")
}
mq, err := mqs.New(viper.GetString(EnvMQURL))
if err != nil {
logrus.WithError(err).Fatal("Error initializing message queue.")
}
var logDB models.LogStore = ds
if ldb := viper.GetString(EnvLOGDBURL); ldb != "" && ldb != viper.GetString(EnvDBURL) {
logDB, err = logs.New(viper.GetString(EnvLOGDBURL))
if err != nil {
logrus.WithError(err).Fatal("Error initializing logs store.")
}
}
return New(ctx, ds, mq, logDB, opts...)
}
// New creates a new Functions server with the passed in datastore, message queue and API URL
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB models.LogStore, opts ...ServerOption) *Server {
s := &Server{
Agent: agent.New(cache.Wrap(ds), mq), // only add datastore caching to agent
Router: gin.New(),
Datastore: ds,
MQ: mq,
LogDB: logDB,
}
setMachineId()
setTracer()
s.Router.Use(loggerWrap, traceWrap, panicWrap)
s.bindHandlers(ctx)
for _, opt := range opts {
if opt == nil {
continue
}
opt(s)
}
return s
}
// we should use http grr
func traceWrap(c *gin.Context) {
// try to grab a span from the request if made from another service, ignore err if not
wireContext, _ := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(c.Request.Header))
// Create the span referring to the RPC client if available.
// If wireContext == nil, a root span will be created.
// TODO we should add more tags?
serverSpan := opentracing.StartSpan("serve_http", ext.RPCServerOption(wireContext), opentracing.Tag{"path", c.Request.URL.Path})
defer serverSpan.Finish()
ctx := opentracing.ContextWithSpan(c.Request.Context(), serverSpan)
c.Request = c.Request.WithContext(ctx)
c.Next()
}
func setTracer() {
var (
debugMode = false
serviceName = "fn-server"
serviceHostPort = "localhost:8080" // meh
zipkinHTTPEndpoint = viper.GetString(EnvZipkinURL)
// ex: "http://zipkin:9411/api/v1/spans"
)
if zipkinHTTPEndpoint == "" {
return
}
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
collector, err := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if err != nil {
logrus.WithError(err).Fatalln("couldn't start trace collector")
}
tracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
if err != nil {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
opentracing.SetGlobalTracer(tracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}
func setMachineId() {
port := uint16(viper.GetInt(EnvPort))
addr := whoAmI().To4()
if addr == nil {
addr = net.ParseIP("127.0.0.1").To4()
logrus.Warn("could not find non-local ipv4 address to use, using '127.0.0.1' for ids, if this is a cluster beware of duplicate ids!")
}
id.SetMachineIdHost(addr, port)
}
// whoAmI searches for a non-local address on any network interface, returning
// the first one it finds. it could be expanded to search eth0 or en0 only but
// to date this has been unnecessary.
func whoAmI() net.IP {
ints, _ := net.Interfaces()
for _, i := range ints {
if i.Name == "docker0" || i.Name == "lo" {
// not perfect
continue
}
addrs, _ := i.Addrs()
for _, a := range addrs {
ip, _, err := net.ParseCIDR(a.String())
if a.Network() == "ip+net" && err == nil && ip.To4() != nil {
if !bytes.Equal(ip, net.ParseIP("127.0.0.1")) {
return ip
}
}
}
}
return nil
}
func panicWrap(c *gin.Context) {
defer func(c *gin.Context) {
if rec := recover(); rec != nil {
err, ok := rec.(error)
if !ok {
err = fmt.Errorf("fn: %v", rec)
}
handleErrorResponse(c, err)
c.Abort()
}
}(c)
c.Next()
}
func loggerWrap(c *gin.Context) {
ctx, _ := common.LoggerWithFields(c.Request.Context(), extractFields(c))
if appName := c.Param(api.CApp); appName != "" {
c.Set(api.AppName, appName)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.AppName, appName))
}
if routePath := c.Param(api.CRoute); routePath != "" {
c.Set(api.Path, routePath)
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), api.Path, routePath))
}
c.Request = c.Request.WithContext(ctx)
c.Next()
}
func (s *Server) handleRunnerRequest(c *gin.Context) {
s.handleRequest(c)
}
func extractFields(c *gin.Context) logrus.Fields {
fields := logrus.Fields{"action": path.Base(c.HandlerName())}
for _, param := range c.Params {
fields[param.Key] = param.Value
}
return fields
}
func (s *Server) Start(ctx context.Context) {
ctx = contextWithSignal(ctx, os.Interrupt)
s.startGears(ctx)
}
func (s *Server) startGears(ctx context.Context) {
// By default it serves on :8080 unless a
// PORT environment variable was defined.
listen := fmt.Sprintf(":%d", viper.GetInt(EnvPort))
const runHeader = `
______
/ ____/___
/ /_ / __ \
/ __/ / / / /
/_/ /_/ /_/
`
fmt.Println(runHeader)
logrus.Infof("Serving Functions API on address `%s`", listen)
server := http.Server{
Addr: listen,
Handler: s.Router,
// TODO we should set read/write timeouts
}
go func() {
<-ctx.Done() // listening for signals...
server.Shutdown(context.Background()) // we can wait
}()
err := server.ListenAndServe()
if err != nil {
logrus.WithError(err).Error("error opening server")
}
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish
}
func (s *Server) bindHandlers(ctx context.Context) {
engine := s.Router
engine.GET("/", handlePing)
engine.GET("/version", handleVersion)
engine.GET("/stats", s.handleStats)
v1 := engine.Group("/v1")
v1.Use(s.middlewareWrapperFunc(ctx))
{
v1.GET("/apps", s.handleAppList)
v1.POST("/apps", s.handleAppCreate)
v1.GET("/apps/:app", s.handleAppGet)
v1.PATCH("/apps/:app", s.handleAppUpdate)
v1.DELETE("/apps/:app", s.handleAppDelete)
apps := v1.Group("/apps/:app")
{
apps.GET("/routes", s.handleRouteList)
apps.POST("/routes", s.handleRoutesPostPutPatch)
apps.GET("/routes/*route", s.handleRouteGet)
apps.PATCH("/routes/*route", s.handleRoutesPostPutPatch)
apps.PUT("/routes/*route", s.handleRoutesPostPutPatch)
apps.DELETE("/routes/*route", s.handleRouteDelete)
apps.GET("/calls", s.handleCallList)
apps.GET("/calls/:call", s.handleCallGet)
apps.GET("/calls/:call/log", s.handleCallLogGet)
apps.DELETE("/calls/:call/log", s.handleCallLogDelete)
}
}
engine.Any("/r/:app", s.handleRunnerRequest)
engine.Any("/r/:app/*route", s.handleRunnerRequest)
engine.NoRoute(func(c *gin.Context) {
logrus.Debugln("not found", c.Request.URL.Path)
c.JSON(http.StatusNotFound, simpleError(errors.New("Path not found")))
})
}
type appResponse struct {
Message string `json:"message"`
App *models.App `json:"app"`
}
type appsResponse struct {
Message string `json:"message"`
Apps []*models.App `json:"apps"`
}
type routeResponse struct {
Message string `json:"message"`
Route *models.Route `json:"route"`
}
type routesResponse struct {
Message string `json:"message"`
Routes []*models.Route `json:"routes"`
}
type fnCallResponse struct {
Message string `json:"message"`
Call *models.Call `json:"call"`
}
type fnCallsResponse struct {
Message string `json:"message"`
Calls []*models.Call `json:"calls"`
}
type fnCallLogResponse struct {
Message string `json:"message"`
Log *models.CallLog `json:"log"`
}