Hybrid plumby (#585)

* fix configuration of agent and server to be future proof and plumb in the hybrid client agent

* fixes up the tests, turns off /r/ on api nodes

* fix up defaults for runner nodes

* shove the runner async push code down into agent land to use client

* plumb up async-age

* return full call from async dequeue endpoint, since we're storing a whole
call in the MQ we don't need to worry about caching of app/route [for now]
* fast safe shutdown of dequeue looper in runner / tidying of agent
* nice errors for path not found against /r/, /v1/ or other path not found
* removed some stale TODO in agent
* mq backends are only loud mouths in debug mode now

* update tests

* Add caching to hybrid client

* Fix HTTP error handling in hybrid client.

The type switch was on the value rather than a pointer.

* Gofmt.

* Better caching with a nice caching wrapper

* Remove datastore cache which is now unused

* Don't need to manually wrap interface methods

* Go fmt
This commit is contained in:
Reed Allman
2017-12-12 15:54:55 -08:00
committed by GitHub
parent 05ce2e3868
commit bb92547b95
18 changed files with 433 additions and 375 deletions

View File

@@ -4,18 +4,18 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"os"
"path"
"strconv"
"strings"
"syscall"
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/agent/hybrid"
"github.com/fnproject/fn/api/datastore"
"github.com/fnproject/fn/api/datastore/cache"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/logs"
"github.com/fnproject/fn/api/models"
@@ -37,6 +37,7 @@ const (
EnvMQURL = "FN_MQ_URL"
EnvDBURL = "FN_DB_URL"
EnvLOGDBURL = "FN_LOGSTORE_URL"
EnvRunnerURL = "FN_RUNNER_URL"
EnvNodeType = "FN_NODE_TYPE"
EnvPort = "FN_PORT" // be careful, Gin expects this variable to be "port"
EnvAPICORS = "FN_API_CORS"
@@ -80,122 +81,182 @@ func nodeTypeFromString(value string) ServerNodeType {
// NewFromEnv creates a new Functions server based on env vars.
func NewFromEnv(ctx context.Context, opts ...ServerOption) *Server {
return NewFromURLs(ctx,
getEnv(EnvDBURL, fmt.Sprintf("sqlite3://%s/data/fn.db", currDir)),
getEnv(EnvMQURL, fmt.Sprintf("bolt://%s/data/fn.mq", currDir)),
getEnv(EnvLOGDBURL, ""),
nodeTypeFromString(getEnv(EnvNodeType, "")),
opts...,
)
var defaultDB, defaultMQ string
nodeType := nodeTypeFromString(getEnv(EnvNodeType, "")) // default to full
if nodeType != ServerTypeRunner {
// only want to activate these for full and api nodes
defaultDB = fmt.Sprintf("sqlite3://%s/data/fn.db", currDir)
defaultMQ = fmt.Sprintf("bolt://%s/data/fn.mq", currDir)
}
opts = append(opts, WithZipkin(getEnv(EnvZipkinURL, "")))
opts = append(opts, WithDBURL(getEnv(EnvDBURL, defaultDB)))
opts = append(opts, WithMQURL(getEnv(EnvMQURL, defaultMQ)))
opts = append(opts, WithLogURL(getEnv(EnvLOGDBURL, "")))
opts = append(opts, WithRunnerURL(getEnv(EnvRunnerURL, "")))
opts = append(opts, WithType(nodeType))
return New(ctx, opts...)
}
// Create a new server based on the string URLs for each service.
// Sits in the middle of NewFromEnv and New
func NewFromURLs(ctx context.Context, dbURL, mqURL, logstoreURL string, nodeType ServerNodeType, opts ...ServerOption) *Server {
ds, err := datastore.New(dbURL)
if err != nil {
logrus.WithError(err).Fatalln("Error initializing datastore.")
func WithDBURL(dbURL string) ServerOption {
if dbURL != "" {
ds, err := datastore.New(dbURL)
if err != nil {
logrus.WithError(err).Fatalln("Error initializing datastore.")
}
return WithDatastore(ds)
}
return noop
}
mq, err := mqs.New(mqURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing message queue.")
func WithMQURL(mqURL string) ServerOption {
if mqURL != "" {
mq, err := mqs.New(mqURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing message queue.")
}
return WithMQ(mq)
}
return noop
}
var logDB models.LogStore = ds
if ldb := logstoreURL; ldb != "" && ldb != dbURL {
logDB, err = logs.New(logstoreURL)
func WithLogURL(logstoreURL string) ServerOption {
if ldb := logstoreURL; ldb != "" {
logDB, err := logs.New(logstoreURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing logs store.")
}
return WithLogstore(logDB)
}
return New(ctx, ds, mq, logDB, nodeType, opts...)
return noop
}
// New creates a new Functions server with the passed in datastore, message queue and API URL
func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, ls models.LogStore, nodeType ServerNodeType, opts ...ServerOption) *Server {
setTracer()
var tp agent.AgentNodeType
switch nodeType {
case ServerTypeAPI:
tp = agent.AgentTypeAPI
case ServerTypeRunner:
tp = agent.AgentTypeRunner
default:
tp = agent.AgentTypeFull
func WithRunnerURL(runnerURL string) ServerOption {
if runnerURL != "" {
cl, err := hybrid.NewClient(runnerURL)
if err != nil {
logrus.WithError(err).Fatal("Error initializing runner API client.")
}
return WithAgent(agent.New(agent.NewCachedDataAccess(cl)))
}
return noop
}
func noop(s *Server) {}
func WithType(t ServerNodeType) ServerOption {
return func(s *Server) { s.nodeType = t }
}
func WithDatastore(ds models.Datastore) ServerOption {
return func(s *Server) { s.Datastore = ds }
}
func WithMQ(mq models.MessageQueue) ServerOption {
return func(s *Server) { s.MQ = mq }
}
func WithLogstore(ls models.LogStore) ServerOption {
return func(s *Server) { s.LogDB = ls }
}
func WithAgent(agent agent.Agent) ServerOption {
return func(s *Server) { s.Agent = agent }
}
// New creates a new Functions server with the opts given. For convenience, users may
// prefer to use NewFromEnv but New is more flexible if needed.
func New(ctx context.Context, opts ...ServerOption) *Server {
s := &Server{
Agent: agent.New(cache.Wrap(ds), ls, mq, tp), // only add datastore caching to agent
Router: gin.New(),
Datastore: ds,
MQ: mq,
LogDB: ls,
nodeType: nodeType,
Router: gin.New(),
// Almost everything else is configured through opts (see NewFromEnv for ex.) or below
}
// NOTE: testServer() in tests doesn't use these
setMachineID()
s.Router.Use(loggerWrap, traceWrap, panicWrap)
optionalCorsWrap(s.Router)
s.bindHandlers(ctx)
for _, opt := range opts {
if opt == nil {
continue
}
opt(s)
}
if s.LogDB == nil { // TODO seems weird?
s.LogDB = s.Datastore
}
// TODO we maybe should use the agent.DirectDataAccess in the /runner endpoints server side?
switch s.nodeType {
case ServerTypeAPI:
s.Agent = nil
case ServerTypeRunner:
if s.Agent == nil {
logrus.Fatal("No agent started for a runner node, add FN_RUNNER_URL to configuration.")
}
default:
s.nodeType = ServerTypeFull
if s.Datastore == nil || s.LogDB == nil || s.MQ == nil {
logrus.Fatal("Full nodes must configure FN_DB_URL, FN_LOG_URL, FN_MQ_URL.")
}
// TODO force caller to use WithAgent option ?
// TODO for tests we don't want cache, really, if we force WithAgent this can be fixed. cache needs to be moved anyway so that runner nodes can use it...
s.Agent = agent.New(agent.NewCachedDataAccess(agent.NewDirectDataAccess(s.Datastore, s.LogDB, s.MQ)))
}
setMachineID()
s.Router.Use(loggerWrap, traceWrap, panicWrap) // TODO should be opts
optionalCorsWrap(s.Router) // TODO should be an opt
s.bindHandlers(ctx)
return s
}
func setTracer() {
var (
debugMode = false
serviceName = "fnserver"
serviceHostPort = "localhost:8080" // meh
zipkinHTTPEndpoint = getEnv(EnvZipkinURL, "")
// ex: "http://zipkin:9411/api/v1/spans"
)
// TODO this doesn't need to be an option necessarily since it's just setting
// globals but it makes things uniform. change if you've a better idear
func WithZipkin(zipkinURL string) ServerOption {
return func(s *Server) {
var (
debugMode = false
serviceName = "fnserver"
serviceHostPort = "localhost:8080" // meh
zipkinHTTPEndpoint = zipkinURL
// ex: "http://zipkin:9411/api/v1/spans"
)
var collector zipkintracer.Collector
var collector zipkintracer.Collector
// custom Zipkin collector to send tracing spans to Prometheus
promCollector, promErr := NewPrometheusCollector()
if promErr != nil {
logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector")
}
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
if zipkinHTTPEndpoint != "" {
// Custom PrometheusCollector and Zipkin HTTPCollector
httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if zipErr != nil {
logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector")
// custom Zipkin collector to send tracing spans to Prometheus
promCollector, promErr := NewPrometheusCollector()
if promErr != nil {
logrus.WithError(promErr).Fatalln("couldn't start Prometheus trace collector")
}
collector = zipkintracer.MultiCollector{httpCollector, promCollector}
} else {
// Custom PrometheusCollector only
collector = promCollector
logger := zipkintracer.LoggerFunc(func(i ...interface{}) error { logrus.Error(i...); return nil })
if zipkinHTTPEndpoint != "" {
// Custom PrometheusCollector and Zipkin HTTPCollector
httpCollector, zipErr := zipkintracer.NewHTTPCollector(zipkinHTTPEndpoint, zipkintracer.HTTPLogger(logger))
if zipErr != nil {
logrus.WithError(zipErr).Fatalln("couldn't start Zipkin trace collector")
}
collector = zipkintracer.MultiCollector{httpCollector, promCollector}
} else {
// Custom PrometheusCollector only
collector = promCollector
}
ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
if err != nil {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
// wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus
fntracer := NewFnTracer(ziptracer)
opentracing.SetGlobalTracer(fntracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}
ziptracer, err := zipkintracer.NewTracer(zipkintracer.NewRecorder(collector, debugMode, serviceHostPort, serviceName),
zipkintracer.ClientServerSameSpan(true),
zipkintracer.TraceID128Bit(true),
)
if err != nil {
logrus.WithError(err).Fatalln("couldn't start tracer")
}
// wrap the Zipkin tracer in a FnTracer which will also send spans to Prometheus
fntracer := NewFnTracer(ziptracer)
opentracing.SetGlobalTracer(fntracer)
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
}
func setMachineID() {
@@ -284,7 +345,9 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
logrus.WithError(err).Error("server shutdown error")
}
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish
if s.Agent != nil {
s.Agent.Close() // after we stop taking requests, wait for all tasks to finish
}
}
func (s *Server) bindHandlers(ctx context.Context) {
@@ -335,7 +398,7 @@ func (s *Server) bindHandlers(ctx context.Context) {
}
}
{
if s.nodeType != ServerTypeAPI {
runner := engine.Group("/r")
runner.Use(appWrap)
runner.Any("/:app", s.handleFunctionCall)
@@ -343,8 +406,17 @@ func (s *Server) bindHandlers(ctx context.Context) {
}
engine.NoRoute(func(c *gin.Context) {
logrus.Debugln("not found", c.Request.URL.Path)
c.JSON(http.StatusNotFound, simpleError(errors.New("Path not found")))
var err error
switch {
case s.nodeType == ServerTypeAPI && strings.HasPrefix(c.Request.URL.Path, "/r/"):
err = models.ErrInvokeNotSupported
case s.nodeType == ServerTypeRunner && strings.HasPrefix(c.Request.URL.Path, "/v1/"):
err = models.ErrAPINotSupported
default:
var e models.APIError = models.ErrPathNotFound
err = models.NewAPIError(e.Code(), fmt.Errorf("%v: %s", e.Error(), c.Request.URL.Path))
}
handleErrorResponse(c, err)
})
}