From ef5c35c6f0a8ae8e6db5f6e94ac513883f64e6f7 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Wed, 12 Sep 2018 11:41:06 -0700 Subject: [PATCH] fn: add http.Server options for web/admin/grpc services in server (#1191) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fn: add httpç to Server This allows to time limit slow/malicious clients when reading HTTP headers. In GetBody() buffering, same timeout can be used to time limit to give consistent I/O wait limits for the service in addition to per handler imposed limits we already have. * fn: generic http Server settings for services --- api/server/gin_middlewares.go | 2 +- api/server/server.go | 106 +++++++++++++++++++--------------- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/api/server/gin_middlewares.go b/api/server/gin_middlewares.go index 26e360959..3686ecfb4 100644 --- a/api/server/gin_middlewares.go +++ b/api/server/gin_middlewares.go @@ -155,7 +155,7 @@ func apiMetricsWrap(s *Server) { r := s.Router r.Use(measure(r)) - if s.webListenPort != s.adminListenPort { + if s.svcConfigs[WebServer].Addr != s.svcConfigs[AdminServer].Addr { a := s.AdminRouter a.Use(measure(a)) } diff --git a/api/server/server.go b/api/server/server.go index 24cfb971b..be441888b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -150,13 +150,16 @@ const ( ServerTypePureRunner ) +// Configuration keys to identify grpc, admin, web services: const ( - // TLS Configuration for the GRPC service - TLSGRPCServer = "TLSgRPCServer" - // TLS Configuration for the admin service - TLSAdminServer = "TLSAdminServer" - // TLS Configuration for the web service - TLSWebServer = "TLSWebServer" + // For backwards compat, TLS-prefix configuration-keys: + TLSGRPCServer = "gRPCServer" + TLSAdminServer = "AdminServer" + TLSWebServer = "WebServer" + + AdminServer = "AdminServer" + WebServer = "WebServer" + GRPCServer = "gRPCServer" ) func (s NodeType) String() string { @@ -182,15 +185,17 @@ type Server struct { Router *gin.Engine AdminRouter *gin.Engine - webListenPort int - adminListenPort int - grpcListenPort int - agent agent.Agent - datastore models.Datastore - mq models.MessageQueue - logstore models.LogStore - nodeType NodeType - tlsConfigs map[string]*tls.Config + agent agent.Agent + datastore models.Datastore + mq models.MessageQueue + logstore models.LogStore + nodeType NodeType + + // Service Settings for Admin/Web/gRPC. Note that for gRPC only + // TLSConfig and Addr are transferrable from http.Server to GRPC service. + // TODO: extend this to cover gRPC options. + svcConfigs map[string]*http.Server + // Agent enqueue and read stores lbEnqueue agent.EnqueueDataAccess lbReadAccess agent.ReadDataAccess @@ -207,6 +212,7 @@ type Server struct { promExporter *prometheus.Exporter triggerAnnotator TriggerAnnotator fnAnnotator FnAnnotator + // Extensions can append to this list of contexts so that cancellations are properly handled. extraCtxs []context.Context } @@ -289,10 +295,10 @@ func pwd() string { // WithWebPort maps EnvPort func WithWebPort(port int) Option { return func(ctx context.Context, s *Server) error { - if s.adminListenPort == s.webListenPort { - s.adminListenPort = port + if s.svcConfigs[AdminServer].Addr == s.svcConfigs[WebServer].Addr { + s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", port) } - s.webListenPort = port + s.svcConfigs[WebServer].Addr = fmt.Sprintf(":%d", port) return nil } } @@ -300,7 +306,7 @@ func WithWebPort(port int) Option { // WithGRPCPort maps EnvGRPCPort func WithGRPCPort(port int) Option { return func(ctx context.Context, s *Server) error { - s.grpcListenPort = port + s.svcConfigs[GRPCServer].Addr = fmt.Sprintf(":%d", port) return nil } } @@ -391,7 +397,7 @@ func WithType(t NodeType) Option { // WithTLS configures a service with a provided TLS configuration func WithTLS(service string, tlsCfg *tls.Config) Option { return func(ctx context.Context, s *Server) error { - s.tlsConfigs[service] = tlsCfg + s.svcConfigs[service].TLSConfig = tlsCfg return nil } } @@ -515,9 +521,8 @@ func WithAgentFromEnv() Option { if err != nil { return err } - grpcAddr := fmt.Sprintf(":%d", s.grpcListenPort) cancelCtx, cancel := context.WithCancel(ctx) - prAgent, err := agent.DefaultPureRunner(cancel, grpcAddr, ds, s.tlsConfigs[TLSGRPCServer]) + prAgent, err := agent.DefaultPureRunner(cancel, s.svcConfigs[GRPCServer].Addr, ds, s.svcConfigs[GRPCServer].TLSConfig) if err != nil { return err } @@ -596,7 +601,14 @@ func WithFnAnnotator(provider FnAnnotator) Option { func WithAdminServer(port int) Option { return func(ctx context.Context, s *Server) error { s.AdminRouter = gin.New() - s.adminListenPort = port + s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", port) + return nil + } +} + +func WithHTTPConfig(service string, cfg *http.Server) Option { + return func(ctx context.Context, s *Server) error { + s.svcConfigs[service] = cfg return nil } } @@ -612,12 +624,12 @@ func New(ctx context.Context, opts ...Option) *Server { s := &Server{ Router: engine, AdminRouter: engine, - // Add default ports - webListenPort: DefaultPort, - adminListenPort: DefaultPort, - grpcListenPort: DefaultGRPCPort, - lbEnqueue: agent.NewUnsupportedAsyncEnqueueAccess(), - tlsConfigs: make(map[string]*tls.Config), + lbEnqueue: agent.NewUnsupportedAsyncEnqueueAccess(), + svcConfigs: map[string]*http.Server{ + WebServer: &http.Server{}, + AdminServer: &http.Server{}, + GRPCServer: &http.Server{}, + }, // Almost everything else is configured through opts (see NewFromEnv for ex.) or below } @@ -631,6 +643,16 @@ func New(ctx context.Context, opts ...Option) *Server { } } + if s.svcConfigs[WebServer].Addr == "" { + s.svcConfigs[WebServer].Addr = fmt.Sprintf(":%d", DefaultPort) + } + if s.svcConfigs[AdminServer].Addr == "" { + s.svcConfigs[AdminServer].Addr = fmt.Sprintf(":%d", DefaultPort) + } + if s.svcConfigs[GRPCServer].Addr == "" { + s.svcConfigs[GRPCServer].Addr = fmt.Sprintf(":%d", DefaultGRPCPort) + } + requireConfigSet := func(id string, val interface{}) { if val == nil { log.Fatalf("Invalid configuration for server type %s, %s must be configured during startup", s.nodeType, id) @@ -946,9 +968,6 @@ func (s *Server) Start(ctx context.Context) { } func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { - // By default it serves on :8080 unless a - // FN_PORT environment variable was defined. - listen := fmt.Sprintf(":%d", s.webListenPort) const runHeader = ` ______ @@ -959,16 +978,13 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { fmt.Println(runHeader) fmt.Printf(" v%s\n\n", version.Version) - logrus.WithField("type", s.nodeType).Infof("Fn serving on `%v`", listen) + logrus.WithField("type", s.nodeType).Infof("Fn serving on `%v`", s.svcConfigs[WebServer].Addr) installChildReaper() - server := http.Server{ - Addr: listen, - Handler: &ochttp.Handler{Handler: s.Router}, - TLSConfig: s.tlsConfigs[TLSWebServer], - - // TODO we should set read/write timeouts + server := s.svcConfigs[WebServer] + if server.Handler == nil { + server.Handler = &ochttp.Handler{Handler: s.Router} } go func() { @@ -986,13 +1002,11 @@ func (s *Server) startGears(ctx context.Context, cancel context.CancelFunc) { } }() - if s.webListenPort != s.adminListenPort { - adminListen := fmt.Sprintf(":%d", s.adminListenPort) - logrus.WithField("type", s.nodeType).Infof("Fn Admin serving on `%v`", adminListen) - adminServer := http.Server{ - Addr: adminListen, - Handler: &ochttp.Handler{Handler: s.AdminRouter}, - TLSConfig: s.tlsConfigs[TLSAdminServer], + if s.svcConfigs[WebServer].Addr != s.svcConfigs[AdminServer].Addr { + logrus.WithField("type", s.nodeType).Infof("Fn Admin serving on `%v`", s.svcConfigs[AdminServer].Addr) + adminServer := s.svcConfigs[AdminServer] + if adminServer.Handler == nil { + adminServer.Handler = &ochttp.Handler{Handler: s.AdminRouter} } go func() {