fn: add http.Server options for web/admin/grpc services in server (#1191)

* fn: add httpç to Server

This allows to time limit slow/malicious clients when
reading HTTP headers. In GetBody() buffering, same timeout
can be used to time limit to give consistent I/O wait
limits for the service in addition to per handler
imposed limits we already have.

* fn: generic http Server settings for services
This commit is contained in:
Tolga Ceylan
2018-09-12 11:41:06 -07:00
committed by GitHub
parent dcc94e514e
commit ef5c35c6f0
2 changed files with 61 additions and 47 deletions

View File

@@ -155,7 +155,7 @@ func apiMetricsWrap(s *Server) {
r := s.Router r := s.Router
r.Use(measure(r)) r.Use(measure(r))
if s.webListenPort != s.adminListenPort { if s.svcConfigs[WebServer].Addr != s.svcConfigs[AdminServer].Addr {
a := s.AdminRouter a := s.AdminRouter
a.Use(measure(a)) a.Use(measure(a))
} }

View File

@@ -150,13 +150,16 @@ const (
ServerTypePureRunner ServerTypePureRunner
) )
// Configuration keys to identify grpc, admin, web services:
const ( const (
// TLS Configuration for the GRPC service // For backwards compat, TLS-prefix configuration-keys:
TLSGRPCServer = "TLSgRPCServer" TLSGRPCServer = "gRPCServer"
// TLS Configuration for the admin service TLSAdminServer = "AdminServer"
TLSAdminServer = "TLSAdminServer" TLSWebServer = "WebServer"
// TLS Configuration for the web service
TLSWebServer = "TLSWebServer" AdminServer = "AdminServer"
WebServer = "WebServer"
GRPCServer = "gRPCServer"
) )
func (s NodeType) String() string { func (s NodeType) String() string {
@@ -182,15 +185,17 @@ type Server struct {
Router *gin.Engine Router *gin.Engine
AdminRouter *gin.Engine AdminRouter *gin.Engine
webListenPort int agent agent.Agent
adminListenPort int datastore models.Datastore
grpcListenPort int mq models.MessageQueue
agent agent.Agent logstore models.LogStore
datastore models.Datastore nodeType NodeType
mq models.MessageQueue
logstore models.LogStore // Service Settings for Admin/Web/gRPC. Note that for gRPC only
nodeType NodeType // TLSConfig and Addr are transferrable from http.Server to GRPC service.
tlsConfigs map[string]*tls.Config // TODO: extend this to cover gRPC options.
svcConfigs map[string]*http.Server
// Agent enqueue and read stores // Agent enqueue and read stores
lbEnqueue agent.EnqueueDataAccess lbEnqueue agent.EnqueueDataAccess
lbReadAccess agent.ReadDataAccess lbReadAccess agent.ReadDataAccess
@@ -207,6 +212,7 @@ type Server struct {
promExporter *prometheus.Exporter promExporter *prometheus.Exporter
triggerAnnotator TriggerAnnotator triggerAnnotator TriggerAnnotator
fnAnnotator FnAnnotator fnAnnotator FnAnnotator
// Extensions can append to this list of contexts so that cancellations are properly handled. // Extensions can append to this list of contexts so that cancellations are properly handled.
extraCtxs []context.Context extraCtxs []context.Context
} }
@@ -289,10 +295,10 @@ func pwd() string {
// WithWebPort maps EnvPort // WithWebPort maps EnvPort
func WithWebPort(port int) Option { func WithWebPort(port int) Option {
return func(ctx context.Context, s *Server) error { return func(ctx context.Context, s *Server) error {
if s.adminListenPort == s.webListenPort { if s.svcConfigs[AdminServer].Addr == s.svcConfigs[WebServer].Addr {
s.adminListenPort = port s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", port)
} }
s.webListenPort = port s.svcConfigs[WebServer].Addr = fmt.Sprintf(":%d", port)
return nil return nil
} }
} }
@@ -300,7 +306,7 @@ func WithWebPort(port int) Option {
// WithGRPCPort maps EnvGRPCPort // WithGRPCPort maps EnvGRPCPort
func WithGRPCPort(port int) Option { func WithGRPCPort(port int) Option {
return func(ctx context.Context, s *Server) error { return func(ctx context.Context, s *Server) error {
s.grpcListenPort = port s.svcConfigs[GRPCServer].Addr = fmt.Sprintf(":%d", port)
return nil return nil
} }
} }
@@ -391,7 +397,7 @@ func WithType(t NodeType) Option {
// WithTLS configures a service with a provided TLS configuration // WithTLS configures a service with a provided TLS configuration
func WithTLS(service string, tlsCfg *tls.Config) Option { func WithTLS(service string, tlsCfg *tls.Config) Option {
return func(ctx context.Context, s *Server) error { return func(ctx context.Context, s *Server) error {
s.tlsConfigs[service] = tlsCfg s.svcConfigs[service].TLSConfig = tlsCfg
return nil return nil
} }
} }
@@ -515,9 +521,8 @@ func WithAgentFromEnv() Option {
if err != nil { if err != nil {
return err return err
} }
grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort)
cancelCtx, cancel := context.WithCancel(ctx) cancelCtx, cancel := context.WithCancel(ctx)
prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, ds, s.tlsConfigs[TLSGRPCServer]) prAgent, err := agent.DefaultPureRunner(cancel, s.svcConfigs[GRPCServer].Addr, ds, s.svcConfigs[GRPCServer].TLSConfig)
if err != nil { if err != nil {
return err return err
} }
@@ -596,7 +601,14 @@ func WithFnAnnotator(provider FnAnnotator) Option {
func WithAdminServer(port int) Option { func WithAdminServer(port int) Option {
return func(ctx context.Context, s *Server) error { return func(ctx context.Context, s *Server) error {
s.AdminRouter = gin.New() s.AdminRouter = gin.New()
s.adminListenPort = port s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", port)
return nil
}
}
func WithHTTPConfig(service string, cfg *http.Server) Option {
return func(ctx context.Context, s *Server) error {
s.svcConfigs[service] = cfg
return nil return nil
} }
} }
@@ -612,12 +624,12 @@ func New(ctx context.Context, opts ...Option) *Server {
s := &Server{ s := &Server{
Router: engine, Router: engine,
AdminRouter: engine, AdminRouter: engine,
// Add default ports lbEnqueue: agent.NewUnsupportedAsyncEnqueueAccess(),
webListenPort: DefaultPort, svcConfigs: map[string]*http.Server{
adminListenPort: DefaultPort, WebServer: &http.Server{},
grpcListenPort: DefaultGRPCPort, AdminServer: &http.Server{},
lbEnqueue: agent.NewUnsupportedAsyncEnqueueAccess(), GRPCServer: &http.Server{},
tlsConfigs: make(map[string]*tls.Config), },
// Almost everything else is configured through opts (see NewFromEnv for ex.) or below // Almost everything else is configured through opts (see NewFromEnv for ex.) or below
} }
@@ -631,6 +643,16 @@ func New(ctx context.Context, opts ...Option) *Server {
} }
} }
if s.svcConfigs[WebServer].Addr == "" {
s.svcConfigs[WebServer].Addr = fmt.Sprintf(":%d", DefaultPort)
}
if s.svcConfigs[AdminServer].Addr == "" {
s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", DefaultPort)
}
if s.svcConfigs[GRPCServer].Addr == "" {
s.svcConfigs[GRPCServer].Addr = fmt.Sprintf(":%d", DefaultGRPCPort)
}
requireConfigSet := func(id string, val interface{}) { requireConfigSet := func(id string, val interface{}) {
if val == nil { if val == nil {
log.Fatalf("Invalid configuration for server type %s, %s must be configured during startup", s.nodeType, id) log.Fatalf("Invalid configuration for server type %s, %s must be configured during startup", s.nodeType, id)
@@ -946,9 +968,6 @@ func (s *Server) Start(ctx context.Context) {
} }
func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
// By default it serves on :8080 unless a
// FN_PORT environment variable was defined.
listen := fmt.Sprintf(":%d", s.webListenPort)
const runHeader = ` const runHeader = `
______ ______
@@ -959,16 +978,13 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
fmt.Println(runHeader) fmt.Println(runHeader)
fmt.Printf(" v%s\n\n", version.Version) fmt.Printf(" v%s\n\n", version.Version)
logrus.WithField("type", s.nodeType).Infof("Fn serving on `%v`", listen) logrus.WithField("type", s.nodeType).Infof("Fn serving on `%v`", s.svcConfigs[WebServer].Addr)
installChildReaper() installChildReaper()
server := http.Server{ server := s.svcConfigs[WebServer]
Addr: listen, if server.Handler == nil {
Handler: &ochttp.Handler{Handler: s.Router}, server.Handler = &ochttp.Handler{Handler: s.Router}
TLSConfig: s.tlsConfigs[TLSWebServer],
// TODO we should set read/write timeouts
} }
go func() { go func() {
@@ -986,13 +1002,11 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) {
} }
}() }()
if s.webListenPort != s.adminListenPort { if s.svcConfigs[WebServer].Addr != s.svcConfigs[AdminServer].Addr {
adminListen := fmt.Sprintf(":%d", s.adminListenPort) logrus.WithField("type", s.nodeType).Infof("Fn Admin serving on `%v`", s.svcConfigs[AdminServer].Addr)
logrus.WithField("type", s.nodeType).Infof("Fn Admin serving on `%v`", adminListen) adminServer := s.svcConfigs[AdminServer]
adminServer := http.Server{ if adminServer.Handler == nil {
Addr: adminListen, adminServer.Handler = &ochttp.Handler{Handler: s.AdminRouter}
Handler: &ochttp.Handler{Handler: s.AdminRouter},
TLSConfig: s.tlsConfigs[TLSAdminServer],
} }
go func() { go func() {