Files
fn-serverless/api/server/server.go
Reed Allman a8a3e143c7 unexport all data abstractions on Server (#618)
this patch has no behavior changes, changes are:

* server.Datastore() -> server.datastore
* server.MQ -> server.mq
* server.LogDB -> server.logstore
* server.Agent -> server.agent

these were at a minimum not uniform. further, it's probably better to force
configuration through initialization in `server.New` to ensure thread safety
of referencing if someone does want to modify these as well as forcing things
into our initialization path and reducing the surface area of the Server
abstraction.
2017-12-21 13:21:02 -06:00

500 lines
13 KiB
Go

package server
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
"syscall"
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/agent/hybrid"
"github.com/fnproject/fn/api/datastore"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/logs"
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/api/mqs"
"github.com/fnproject/fn/api/version"
"github.com/fnproject/fn/fnext"
"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/openzipkin/zipkin-go-opentracing"
"github.com/sirupsen/logrus"
)
const (
EnvLogLevel = "FN_LOG_LEVEL"
EnvMQURL = "FN_MQ_URL"
EnvDBURL = "FN_DB_URL"
EnvLOGDBURL = "FN_LOGSTORE_URL"
EnvRunnerURL = "FN_RUNNER_API_URL"
EnvNodeType = "FN_NODE_TYPE"
EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port"
EnvAPICORS = "FN_API_CORS"
EnvZipkinURL = "FN_ZIPKIN_URL"
// Defaults
DefaultLogLevel = "info"
DefaultPort = 8080
)
type ServerNodeType int32
const (
ServerTypeFull ServerNodeType = iota
ServerTypeAPI
ServerTypeRunner
)
func (s ServerNodeType) String() string {
switch s {
default:
return "full"
case ServerTypeAPI:
return "api"
case ServerTypeRunner:
return "runner"
}
}
type Server struct {
// TODO this one maybe we have `AddRoute` in extensions?
Router *gin.Engine
agent agent.Agent
datastore models.Datastore
mq models.MessageQueue
logstore models.LogStore
nodeType ServerNodeType
appListeners []fnext.AppListener
rootMiddlewares []fnext.Middleware
apiMiddlewares []fnext.Middleware
}
func nodeTypeFromString(value string) ServerNodeType {
switch value {
case "api":
return ServerTypeAPI
case "runner":
return ServerTypeRunner
default:
return ServerTypeFull
}
}
// NewFromEnv creates a new Functions server based on env vars.
func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
curDir := pwd()
var defaultDB, defaultMQ string
nodeType := nodeTypeFromString(getEnv(EnvNodeType, "")) // default to full
if nodeType != ServerTypeRunner {
// only want to activate these for full and api nodes
defaultDB = fmt.Sprintf("sqlite3://%s/data/fn.db", curDir)
defaultMQ = fmt.Sprintf("bolt://%s/data/fn.mq", curDir)
}
opts = append(opts, WithDBURL(getEnv(EnvDBURL, defaultDB)))
opts = append(opts, WithMQURL(getEnv(EnvMQURL, defaultMQ)))
opts = append(opts, WithLogURL(getEnv(EnvLOGDBURL, "")))
opts = append(opts, WithRunnerURL(getEnv(EnvRunnerURL, "")))
opts = append(opts, WithType(nodeType))
return New(ctx, opts...)
}
func pwd() string {
cwd, err := os.Getwd()
if err != nil {
logrus.WithError(err).Fatalln("couldn't get working directory, possibly unsupported platform?")
}
// Replace forward slashes in case this is windows, URL parser errors
return strings.Replace(cwd, "\\", "/", -1)
}
func WithDBURL(dbURL string) ServerOption {
if dbURL != "" {
ds, err := datastore.New(dbURL)
if err != nil {
logrus.WithError(err).Fatalln("Error initializing datastore.")
}
return WithDatastore(ds)
}
return noop
}
func WithMQURL(mqURL string) ServerOption {
if mqURL != "" {
mq, err := mqs.New(mqURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing message queue.")
}
return WithMQ(mq)
}
return noop
}
func WithLogURL(logstoreURL string) ServerOption {
if ldb := logstoreURL; ldb != "" {
logDB, err := logs.New(logstoreURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing logs store.")
}
return WithLogstore(logDB)
}
return noop
}
func WithRunnerURL(runnerURL string) ServerOption {
if runnerURL != "" {
cl, err := hybrid.NewClient(runnerURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing runner API client.")
}
return WithAgent(agent.New(agent.NewCachedDataAccess(cl)))
}
return noop
}
func noop(s *Server) {}
func WithType(t ServerNodeType) ServerOption {
return func(s *Server) { s.nodeType = t }
}
func WithDatastore(ds models.Datastore) ServerOption {
return func(s *Server) { s.datastore = ds }
}
func WithMQ(mq models.MessageQueue) ServerOption {
return func(s *Server) { s.mq = mq }
}
func WithLogstore(ls models.LogStore) ServerOption {
return func(s *Server) { s.logstore = ls }
}
func WithAgent(agent agent.Agent) ServerOption {
return func(s *Server) { s.agent = agent }
}
// New creates a new Functions server with the opts given. For convenience, users may
// prefer to use NewFromEnv but New is more flexible if needed.
func New(ctx context.Context, opts ...ServerOption) *Server {
s := &Server{
Router: gin.New(),
// Almost everything else is configured through opts (see NewFromEnv for ex.) or below
}
for _, opt := range opts {
if opt == nil {
continue
}
opt(s)
}
if s.logstore == nil { // TODO seems weird?
s.logstore = s.datastore
}
// TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side?
switch s.nodeType {
case ServerTypeAPI:
s.agent = nil
case ServerTypeRunner:
if s.agent == nil {
logrus.Fatal("No agent started for a runner node, add FN_RUNNER_API_URL to configuration.")
}
default:
s.nodeType = ServerTypeFull
if s.datastore == nil || s.logstore == nil || s.mq == nil {
logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
}
// TODO force caller to use WithAgent option ?
// TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it...
s.agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.datastore, s.logstore, s.mq)))
}
setMachineID()
s.Router.Use(loggerWrap, traceWrap, panicWrap) // TODO should be opts
optionalCorsWrap(s.Router) // TODO should be an opt
s.bindHandlers(ctx)
return s
}
// TODO need to fix this to handle the nil case better
func setupTracer(zipkinURL string) {
var (
debugMode = false
serviceName = "fnserver"
serviceHostPort = "localhost:8080" // meh
zipkinHTTPEndpoint = zipkinURL
// ex: "http://zipkin:9411/api/v1/spans"
)
var collector zipkintracer.Collector
// custom Zipkin collector to send tracing spans to Prometheus
promCollector, promErr := NewPrometheusCollector()
if promErr != nil {
logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector")
}
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
if zipkinHTTPEndpoint != "" {
// Custom PrometheusCollector and Zipkin HTTPCollector
httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if zipErr != nil {
logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector")
}
collector = zipkintracer.MultiCollector{httpCollector, promCollector}
} else {
// Custom PrometheusCollector only
collector = promCollector
}
ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
if err != nil {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
// wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus
fntracer := NewFnTracer(ziptracer)
opentracing.SetGlobalTracer(fntracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}
func setMachineID() {
port := uint16(getEnvInt(EnvPort, DefaultPort))
addr := whoAmI().To4()
if addr == nil {
addr = net.ParseIP("127.0.0.1").To4()
logrus.Warn("could not find non-local ipv4 address to use, using '127.0.0.1' for ids, if this is a cluster beware of duplicate ids!")
}
id.SetMachineIdHost(addr, port)
}
// whoAmI searches for a non-local address on any network interface, returning
// the first one it finds. it could be expanded to search eth0 or en0 only but
// to date this has been unnecessary.
func whoAmI() net.IP {
ints, _ := net.Interfaces()
for _, i := range ints {
if i.Name == "docker0" || i.Name == "lo" {
// not perfect
continue
}
addrs, _ := i.Addrs()
for _, a := range addrs {
ip, _, err := net.ParseCIDR(a.String())
if a.Network() == "ip+net" && err == nil && ip.To4() != nil {
if !bytes.Equal(ip, net.ParseIP("127.0.0.1")) {
return ip
}
}
}
}
return nil
}
func extractFields(c *gin.Context) logrus.Fields {
fields := logrus.Fields{"action": path.Base(c.HandlerName())}
for _, param := range c.Params {
fields[param.Key] = param.Value
}
return fields
}
func (s *Server) Start(ctx context.Context) {
newctx, cancel := contextWithSignal(ctx, os.Interrupt, syscall.SIGTERM)
s.startGears(newctx, cancel)
}
func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
// By default it serves on :8080 unless a
// FN_PORT environment variable was defined.
listen := fmt.Sprintf(":%d", getEnvInt(EnvPort, DefaultPort))
const runHeader = `
______
/ ____/___
/ /_ / __ \
/ __/ / / / /
/_/ /_/ /_/`
fmt.Println(runHeader)
fmt.Printf(" v%s\n\n", version.Version)
logrus.WithField("type", s.nodeType).Infof("Fn serving on `%v`", listen)
server := http.Server{
Addr: listen,
Handler: s.Router,
// TODO we should set read/write timeouts
}
go func() {
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("server error")
cancel()
} else {
logrus.Info("server stopped")
}
}()
// listening for signals or listener errors...
<-ctx.Done()
// TODO: do not wait forever during graceful shutdown (add graceful shutdown timeout)
if err := server.Shutdown(context.Background()); err != nil {
logrus.WithError(err).Error("server shutdown error")
}
if s.agent != nil {
s.agent.Close() // after we stop taking requests, wait for all tasks to finish
}
}
func (s *Server) bindHandlers(ctx context.Context) {
engine := s.Router
// now for extendible middleware
engine.Use(s.rootMiddlewareWrapper())
engine.GET("/", handlePing)
engine.GET("/version", handleVersion)
// TODO: move the following under v1
engine.GET("/stats", s.handleStats)
engine.GET("/metrics", s.handlePrometheusMetrics)
if s.nodeType != ServerTypeRunner {
v1 := engine.Group("/v1")
v1.Use(setAppNameInCtx)
v1.Use(s.apiMiddlewareWrapper())
v1.GET("/apps", s.handleAppList)
v1.POST("/apps", s.handleAppCreate)
{
apps := v1.Group("/apps/:app")
apps.Use(appNameCheck)
apps.GET("", s.handleAppGet)
apps.PATCH("", s.handleAppUpdate)
apps.DELETE("", s.handleAppDelete)
apps.GET("/routes", s.handleRouteList)
apps.POST("/routes", s.handleRoutesPostPutPatch)
apps.GET("/routes/:route", s.handleRouteGet)
apps.PATCH("/routes/*route", s.handleRoutesPostPutPatch)
apps.PUT("/routes/*route", s.handleRoutesPostPutPatch)
apps.DELETE("/routes/*route", s.handleRouteDelete)
apps.GET("/calls", s.handleCallList)
apps.GET("/calls/:call", s.handleCallGet)
apps.GET("/calls/:call/log", s.handleCallLogGet)
}
{
runner := v1.Group("/runner")
runner.PUT("/async", s.handleRunnerEnqueue)
runner.GET("/async", s.handleRunnerDequeue)
runner.POST("/start", s.handleRunnerStart)
runner.POST("/finish", s.handleRunnerFinish)
}
}
if s.nodeType != ServerTypeAPI {
runner := engine.Group("/r")
runner.Use(appNameCheck)
runner.Any("/:app", s.handleFunctionCall)
runner.Any("/:app/*route", s.handleFunctionCall)
}
engine.NoRoute(func(c *gin.Context) {
var err error
switch {
case s.nodeType == ServerTypeAPI && strings.HasPrefix(c.Request.URL.Path, "/r/"):
err = models.ErrInvokeNotSupported
case s.nodeType == ServerTypeRunner && strings.HasPrefix(c.Request.URL.Path, "/v1/"):
err = models.ErrAPINotSupported
default:
var e models.APIError = models.ErrPathNotFound
err = models.NewAPIError(e.Code(), fmt.Errorf("%v: %s", e.Error(), c.Request.URL.Path))
}
handleErrorResponse(c, err)
})
}
// implements fnext.ExtServer
func (s *Server) Datastore() models.Datastore {
return s.datastore
}
// returns the unescaped ?cursor and ?perPage values
// pageParams clamps 0 < ?perPage <= 100 and defaults to 30 if 0
// ignores parsing errors and falls back to defaults.
func pageParams(c *gin.Context, base64d bool) (cursor string, perPage int) {
cursor = c.Query("cursor")
if base64d {
cbytes, _ := base64.RawURLEncoding.DecodeString(cursor)
cursor = string(cbytes)
}
perPage, _ = strconv.Atoi(c.Query("per_page"))
if perPage > 100 {
perPage = 100
} else if perPage <= 0 {
perPage = 30
}
return cursor, perPage
}
type appResponse struct {
Message string `json:"message"`
App *models.App `json:"app"`
}
type appsResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Apps []*models.App `json:"apps"`
}
type routeResponse struct {
Message string `json:"message"`
Route *models.Route `json:"route"`
}
type routesResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Routes []*models.Route `json:"routes"`
}
type callResponse struct {
Message string `json:"message"`
Call *models.Call `json:"call"`
}
type callsResponse struct {
Message string `json:"message"`
NextCursor string `json:"next_cursor"`
Calls []*models.Call `json:"calls"`
}
type callLogResponse struct {
Message string `json:"message"`
Log *models.CallLog `json:"log"`
}