mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* Initial stab at the protocol * initial protocol sketch for node pool manager * Added http header frame as a message * Force the use of WithAgent variants when creating a server * adds grpc models for node pool manager plus go deps * Naming things is really hard * Merge (and optionally purge) details received by the NPM * WIP: starting to add the runner-side functionality of the new data plane * WIP: Basic startup of grpc server for pure runner. Needs proper certs. * Go fmt * Initial agent for LB nodes. * Agent implementation for LB nodes. * Pass keys and certs to LB node agent. * Remove accidentally left reference to env var. * Add env variables for certificate files * stub out the capacity and group membership server channels * implement server-side runner manager service * removes unused variable * fixes build error * splits up GetCall and GetLBGroupId * Change LB node agent to use TLS connection. * Encode call model as JSON to send to runner node. * Use hybrid client in LB node agent. This should provide access to get app and route information for the call from an API node. * More error handling on the pure runner side * Tentative fix for GetCall problem: set deadlines correctly when reserving slot * Connect loop for LB agent to runner nodes. * Extract runner connection function in LB agent. * drops committed capacity counts * Bugfix - end state tracker only in submit * Do logs properly * adds first pass of tracking capacity metrics in agent * maked memory capacity metric uint64 * maked memory capacity metric uint64 * removes use of old capacity field * adds remove capacity call * merges overwritten reconnect logic * First pass of a NPM Provide a service that talks to a (simulated) CP. - Receive incoming capacity assertions from LBs for LBGs - expire LB requests after a short period - ask the CP to add runners to a LBG - note runner set changes and readvertise - scale down by marking runners as "draining" - shut off draining runners after some cool-down period * add capacity update on schedule * Send periodic capcacity metrics Sending capcacity metrics to node pool manager * splits grpc and api interfaces for capacity manager * failure to advertise capacity shouldn't panic * Add some instructions for starting DP/CP parts. * Create the poolmanager server with TLS * Use logrus * Get npm compiling with cert fixups. * Fix: pure runner should not start async processing * brings runner, nulb and npm together * Add field to acknowledgment to record slot allocation latency; fix a bug too * iterating on pool manager locking issue * raises timeout of placement retry loop * Fix up NPM Improve logging Ensure that channels etc. are actually initialised in the structure creation! * Update the docs - runners GRPC port is 9120 * Bugfix: return runner pool accurately. * Double locking * Note purges as LBs stop talking to us * Get the purging of old LBs working. * Tweak: on restart, load runner set before making scaling decisions. * more agent synchronization improvements * Deal with teh CP pulling out active hosts from under us. * lock at lbgroup level * Send request and receive response from runner. * Add capacity check right before slot reservation * Pass the full Call into the receive loop. * Wait for the data from the runner before finishing * force runner list refresh every time * Don't init db and mq for pure runners * adds shutdown of npm * fixes broken log line * Extract an interface for the Predictor used by the NPM * purge drained connections from npm * Refactor of the LB agent into the agent package * removes capacitytest wip * Fix undefined err issue * updating README for poolmanager set up * ues retrying dial for lb to npm connections * Rename lb_calls to lb_agent now that all functionality is there * Use the right deadline and errors in LBAgent * Make stream error flag per-call rather than global otherwise the whole runner is damaged by one call dropping * abstracting gRPCNodePool * Make stream error flag per-call rather than global otherwise the whole runner is damaged by one call dropping * Add some init checks for LB and pure runner nodes * adding some useful debug * Fix default db and mq for lb node * removes unreachable code, fixes typo * Use datastore as logstore in API nodes. This fixes a bug caused by trying to insert logs into a nil logstore. It was nil because it wasn't being set for API nodes. * creates placement abstraction and moves capacity APIs to NodePool * removed TODO, added logging * Dial reconnections for LB <-> runners LB grpc connections to runners are established using a backoff stategy in event of reconnections, this allows to let the LB up even in case one of the runners go away and reconnect to it as soon as it is back. * Add a status call to the Runner protocol Stub at the moment. To be used for things like draindown, health checks. * Remove comment. * makes assign/release capacity lockless * Fix hanging issue in lb agent when connections drop * Add the CH hash from fnlb Select this with FN_PLACER=ch when launching the LB. * small improvement for locking on reloadLBGmembership * Stabilise the list of Runenrs returned by NodePool The NodePoolManager makes some attempt to keep the list of runner nodes advertised as stable as possible. Let's preserve this effort in the client side. The main point of this is to attempt to keep the same runner at the same inxed in the []Runner returned by NodePool.Runners(lbgid); the ch algorithm likes it when this is the case. * Factor out a generator function for the Runners so that mocks can be injected * temporarily allow lbgroup to be specified in HTTP header, while we sort out changes to the model * fixes bug with nil runners * Initial work for mocking things in tests * fix for anonymouse go routine error * fixing lb_test to compile * Refactor: internal objects for gRPCNodePool are now injectable, with defaults for the real world case * Make GRPC port configurable, fix weird handling of web port too * unit test reload Members * check on runner creation failure * adding nullRunner in case of failure during runner creation * Refactored capacity advertisements/aggregations. Made grpc advertisement post asynchronous and non-blocking. * make capacityEntry private * Change the runner gRPC bind address. This uses the existing `whoAmI` function, so that the gRPC server works when the runner is running on a different host. * Add support for multiple fixed runners to pool mgr * Added harness for dataplane system tests, minor refactors * Add Dockerfiles for components, along with docs. * Doc fix: second runner needs a different name. * Let us have three runners in system tests, why not * The first system test running a function in API/LB/PureRunner mode * Add unit test for Advertiser logic * Fix issue with Pure Runner not sending the last data frame * use config in models.Call as a temporary mechanism to override lb group ID * make gofmt happy * Updates documentation for how to configure lb groups for an app/route * small refactor unit test * Factor NodePool into its own package * Lots of fixes to Pure Runner - concurrency woes with errors and cancellations * New dataplane with static runnerpool (#813) Added static node pool as default implementation * moved nullRunner to grpc package * remove duplication in README * fix go vet issues * Fix server initialisation in api tests * Tiny logging changes in pool manager. Using `WithError` instead of `Errorf` when appropriate. * Change some log levels in the pure runner * fixing readme * moves multitenant compute documentation * adds introduction to multitenant readme * Proper triggering of system tests in makefile * Fix insructions about starting up the components * Change db file for system tests to avoid contention in parallel tests * fixes revisions from merge * Fix merge issue with handling of reserved slot * renaming nulb to lb in the doc and images folder * better TryExec sleep logic clean shutdown In this change we implement a better way to deal with the sleep inside the for loop during the attempt for placing a call. Plus we added a clean way to shutdown the connections with external component when we shut down the server. * System_test mysql port set mysql port for system test to a different value to the one set for the api tests to avoid conflicts as they can run in parallel. * change the container name for system-test * removes flaky test TestRouteRunnerExecution pending resolution by issue #796 * amend remove_containers to remove new added containers * Rework capacity reservation logic at a higher level for now * LB agent implements Submit rather than delegating. * Fix go vet linting errors * Changed a couple of error levels * Fix formatting * removes commmented out test * adds snappy to vendor directory * updates Gopkg and vendor directories, removing snappy and addhing siphash * wait for db containers to come up before starting the tests * make system tests start API node on 8085 to avoid port conflict with api_tests * avoid port conflicts with api_test.sh which are run in parallel * fixes postgres port conflict and issue with removal of old containers * Remove spurious println
1326 lines
38 KiB
Go
1326 lines
38 KiB
Go
/*
|
|
*
|
|
* Copyright 2014 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package grpc
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net"
|
|
"net/http"
|
|
"reflect"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"io/ioutil"
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/trace"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/encoding"
|
|
"google.golang.org/grpc/encoding/proto"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/internal"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/stats"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/tap"
|
|
"google.golang.org/grpc/transport"
|
|
)
|
|
|
|
const (
|
|
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
|
|
defaultServerMaxSendMessageSize = math.MaxInt32
|
|
)
|
|
|
|
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
|
|
|
|
// MethodDesc represents an RPC service's method specification.
|
|
type MethodDesc struct {
|
|
MethodName string
|
|
Handler methodHandler
|
|
}
|
|
|
|
// ServiceDesc represents an RPC service's specification.
|
|
type ServiceDesc struct {
|
|
ServiceName string
|
|
// The pointer to the service interface. Used to check whether the user
|
|
// provided implementation satisfies the interface requirements.
|
|
HandlerType interface{}
|
|
Methods []MethodDesc
|
|
Streams []StreamDesc
|
|
Metadata interface{}
|
|
}
|
|
|
|
// service consists of the information of the server serving this service and
|
|
// the methods in this service.
|
|
type service struct {
|
|
server interface{} // the server for service methods
|
|
md map[string]*MethodDesc
|
|
sd map[string]*StreamDesc
|
|
mdata interface{}
|
|
}
|
|
|
|
// Server is a gRPC server to serve RPC requests.
|
|
type Server struct {
|
|
opts options
|
|
|
|
mu sync.Mutex // guards following
|
|
lis map[net.Listener]bool
|
|
conns map[io.Closer]bool
|
|
serve bool
|
|
drain bool
|
|
cv *sync.Cond // signaled when connections close for GracefulStop
|
|
m map[string]*service // service name -> service info
|
|
events trace.EventLog
|
|
|
|
quit chan struct{}
|
|
done chan struct{}
|
|
quitOnce sync.Once
|
|
doneOnce sync.Once
|
|
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
|
|
}
|
|
|
|
type options struct {
|
|
creds credentials.TransportCredentials
|
|
codec baseCodec
|
|
cp Compressor
|
|
dc Decompressor
|
|
unaryInt UnaryServerInterceptor
|
|
streamInt StreamServerInterceptor
|
|
inTapHandle tap.ServerInHandle
|
|
statsHandler stats.Handler
|
|
maxConcurrentStreams uint32
|
|
maxReceiveMessageSize int
|
|
maxSendMessageSize int
|
|
useHandlerImpl bool // use http.Handler-based server
|
|
unknownStreamDesc *StreamDesc
|
|
keepaliveParams keepalive.ServerParameters
|
|
keepalivePolicy keepalive.EnforcementPolicy
|
|
initialWindowSize int32
|
|
initialConnWindowSize int32
|
|
writeBufferSize int
|
|
readBufferSize int
|
|
connectionTimeout time.Duration
|
|
}
|
|
|
|
var defaultServerOptions = options{
|
|
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
|
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
|
connectionTimeout: 120 * time.Second,
|
|
}
|
|
|
|
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
|
type ServerOption func(*options)
|
|
|
|
// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
|
|
// before doing a write on the wire.
|
|
func WriteBufferSize(s int) ServerOption {
|
|
return func(o *options) {
|
|
o.writeBufferSize = s
|
|
}
|
|
}
|
|
|
|
// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
|
|
// for one read syscall.
|
|
func ReadBufferSize(s int) ServerOption {
|
|
return func(o *options) {
|
|
o.readBufferSize = s
|
|
}
|
|
}
|
|
|
|
// InitialWindowSize returns a ServerOption that sets window size for stream.
|
|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
|
func InitialWindowSize(s int32) ServerOption {
|
|
return func(o *options) {
|
|
o.initialWindowSize = s
|
|
}
|
|
}
|
|
|
|
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
|
|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
|
func InitialConnWindowSize(s int32) ServerOption {
|
|
return func(o *options) {
|
|
o.initialConnWindowSize = s
|
|
}
|
|
}
|
|
|
|
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
|
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
|
return func(o *options) {
|
|
o.keepaliveParams = kp
|
|
}
|
|
}
|
|
|
|
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
|
|
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
|
|
return func(o *options) {
|
|
o.keepalivePolicy = kep
|
|
}
|
|
}
|
|
|
|
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
|
|
//
|
|
// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
|
|
func CustomCodec(codec Codec) ServerOption {
|
|
return func(o *options) {
|
|
o.codec = codec
|
|
}
|
|
}
|
|
|
|
// RPCCompressor returns a ServerOption that sets a compressor for outbound
|
|
// messages. For backward compatibility, all outbound messages will be sent
|
|
// using this compressor, regardless of incoming message compression. By
|
|
// default, server messages will be sent using the same compressor with which
|
|
// request messages were sent.
|
|
//
|
|
// Deprecated: use encoding.RegisterCompressor instead.
|
|
func RPCCompressor(cp Compressor) ServerOption {
|
|
return func(o *options) {
|
|
o.cp = cp
|
|
}
|
|
}
|
|
|
|
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
|
|
// messages. It has higher priority than decompressors registered via
|
|
// encoding.RegisterCompressor.
|
|
//
|
|
// Deprecated: use encoding.RegisterCompressor instead.
|
|
func RPCDecompressor(dc Decompressor) ServerOption {
|
|
return func(o *options) {
|
|
o.dc = dc
|
|
}
|
|
}
|
|
|
|
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
|
|
// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
|
|
func MaxMsgSize(m int) ServerOption {
|
|
return MaxRecvMsgSize(m)
|
|
}
|
|
|
|
// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
|
|
// If this is not set, gRPC uses the default 4MB.
|
|
func MaxRecvMsgSize(m int) ServerOption {
|
|
return func(o *options) {
|
|
o.maxReceiveMessageSize = m
|
|
}
|
|
}
|
|
|
|
// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
|
|
// If this is not set, gRPC uses the default 4MB.
|
|
func MaxSendMsgSize(m int) ServerOption {
|
|
return func(o *options) {
|
|
o.maxSendMessageSize = m
|
|
}
|
|
}
|
|
|
|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
|
|
// of concurrent streams to each ServerTransport.
|
|
func MaxConcurrentStreams(n uint32) ServerOption {
|
|
return func(o *options) {
|
|
o.maxConcurrentStreams = n
|
|
}
|
|
}
|
|
|
|
// Creds returns a ServerOption that sets credentials for server connections.
|
|
func Creds(c credentials.TransportCredentials) ServerOption {
|
|
return func(o *options) {
|
|
o.creds = c
|
|
}
|
|
}
|
|
|
|
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
|
|
// server. Only one unary interceptor can be installed. The construction of multiple
|
|
// interceptors (e.g., chaining) can be implemented at the caller.
|
|
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
|
|
return func(o *options) {
|
|
if o.unaryInt != nil {
|
|
panic("The unary server interceptor was already set and may not be reset.")
|
|
}
|
|
o.unaryInt = i
|
|
}
|
|
}
|
|
|
|
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
|
|
// server. Only one stream interceptor can be installed.
|
|
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
|
|
return func(o *options) {
|
|
if o.streamInt != nil {
|
|
panic("The stream server interceptor was already set and may not be reset.")
|
|
}
|
|
o.streamInt = i
|
|
}
|
|
}
|
|
|
|
// InTapHandle returns a ServerOption that sets the tap handle for all the server
|
|
// transport to be created. Only one can be installed.
|
|
func InTapHandle(h tap.ServerInHandle) ServerOption {
|
|
return func(o *options) {
|
|
if o.inTapHandle != nil {
|
|
panic("The tap handle was already set and may not be reset.")
|
|
}
|
|
o.inTapHandle = h
|
|
}
|
|
}
|
|
|
|
// StatsHandler returns a ServerOption that sets the stats handler for the server.
|
|
func StatsHandler(h stats.Handler) ServerOption {
|
|
return func(o *options) {
|
|
o.statsHandler = h
|
|
}
|
|
}
|
|
|
|
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
|
|
// unknown service handler. The provided method is a bidi-streaming RPC service
|
|
// handler that will be invoked instead of returning the "unimplemented" gRPC
|
|
// error whenever a request is received for an unregistered service or method.
|
|
// The handling function has full access to the Context of the request and the
|
|
// stream, and the invocation bypasses interceptors.
|
|
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
|
|
return func(o *options) {
|
|
o.unknownStreamDesc = &StreamDesc{
|
|
StreamName: "unknown_service_handler",
|
|
Handler: streamHandler,
|
|
// We need to assume that the users of the streamHandler will want to use both.
|
|
ClientStreams: true,
|
|
ServerStreams: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
// ConnectionTimeout returns a ServerOption that sets the timeout for
|
|
// connection establishment (up to and including HTTP/2 handshaking) for all
|
|
// new connections. If this is not set, the default is 120 seconds. A zero or
|
|
// negative value will result in an immediate timeout.
|
|
//
|
|
// This API is EXPERIMENTAL.
|
|
func ConnectionTimeout(d time.Duration) ServerOption {
|
|
return func(o *options) {
|
|
o.connectionTimeout = d
|
|
}
|
|
}
|
|
|
|
// NewServer creates a gRPC server which has no service registered and has not
|
|
// started to accept requests yet.
|
|
func NewServer(opt ...ServerOption) *Server {
|
|
opts := defaultServerOptions
|
|
for _, o := range opt {
|
|
o(&opts)
|
|
}
|
|
s := &Server{
|
|
lis: make(map[net.Listener]bool),
|
|
opts: opts,
|
|
conns: make(map[io.Closer]bool),
|
|
m: make(map[string]*service),
|
|
quit: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
s.cv = sync.NewCond(&s.mu)
|
|
if EnableTracing {
|
|
_, file, line, _ := runtime.Caller(1)
|
|
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
|
}
|
|
return s
|
|
}
|
|
|
|
// printf records an event in s's event log, unless s has been stopped.
|
|
// REQUIRES s.mu is held.
|
|
func (s *Server) printf(format string, a ...interface{}) {
|
|
if s.events != nil {
|
|
s.events.Printf(format, a...)
|
|
}
|
|
}
|
|
|
|
// errorf records an error in s's event log, unless s has been stopped.
|
|
// REQUIRES s.mu is held.
|
|
func (s *Server) errorf(format string, a ...interface{}) {
|
|
if s.events != nil {
|
|
s.events.Errorf(format, a...)
|
|
}
|
|
}
|
|
|
|
// RegisterService registers a service and its implementation to the gRPC
|
|
// server. It is called from the IDL generated code. This must be called before
|
|
// invoking Serve.
|
|
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
|
|
ht := reflect.TypeOf(sd.HandlerType).Elem()
|
|
st := reflect.TypeOf(ss)
|
|
if !st.Implements(ht) {
|
|
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
|
|
}
|
|
s.register(sd, ss)
|
|
}
|
|
|
|
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.printf("RegisterService(%q)", sd.ServiceName)
|
|
if s.serve {
|
|
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
|
|
}
|
|
if _, ok := s.m[sd.ServiceName]; ok {
|
|
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
|
|
}
|
|
srv := &service{
|
|
server: ss,
|
|
md: make(map[string]*MethodDesc),
|
|
sd: make(map[string]*StreamDesc),
|
|
mdata: sd.Metadata,
|
|
}
|
|
for i := range sd.Methods {
|
|
d := &sd.Methods[i]
|
|
srv.md[d.MethodName] = d
|
|
}
|
|
for i := range sd.Streams {
|
|
d := &sd.Streams[i]
|
|
srv.sd[d.StreamName] = d
|
|
}
|
|
s.m[sd.ServiceName] = srv
|
|
}
|
|
|
|
// MethodInfo contains the information of an RPC including its method name and type.
|
|
type MethodInfo struct {
|
|
// Name is the method name only, without the service name or package name.
|
|
Name string
|
|
// IsClientStream indicates whether the RPC is a client streaming RPC.
|
|
IsClientStream bool
|
|
// IsServerStream indicates whether the RPC is a server streaming RPC.
|
|
IsServerStream bool
|
|
}
|
|
|
|
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
|
|
type ServiceInfo struct {
|
|
Methods []MethodInfo
|
|
// Metadata is the metadata specified in ServiceDesc when registering service.
|
|
Metadata interface{}
|
|
}
|
|
|
|
// GetServiceInfo returns a map from service names to ServiceInfo.
|
|
// Service names include the package names, in the form of <package>.<service>.
|
|
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
|
|
ret := make(map[string]ServiceInfo)
|
|
for n, srv := range s.m {
|
|
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
|
|
for m := range srv.md {
|
|
methods = append(methods, MethodInfo{
|
|
Name: m,
|
|
IsClientStream: false,
|
|
IsServerStream: false,
|
|
})
|
|
}
|
|
for m, d := range srv.sd {
|
|
methods = append(methods, MethodInfo{
|
|
Name: m,
|
|
IsClientStream: d.ClientStreams,
|
|
IsServerStream: d.ServerStreams,
|
|
})
|
|
}
|
|
|
|
ret[n] = ServiceInfo{
|
|
Methods: methods,
|
|
Metadata: srv.mdata,
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// ErrServerStopped indicates that the operation is now illegal because of
|
|
// the server being stopped.
|
|
var ErrServerStopped = errors.New("grpc: the server has been stopped")
|
|
|
|
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
|
if s.opts.creds == nil {
|
|
return rawConn, nil, nil
|
|
}
|
|
return s.opts.creds.ServerHandshake(rawConn)
|
|
}
|
|
|
|
// Serve accepts incoming connections on the listener lis, creating a new
|
|
// ServerTransport and service goroutine for each. The service goroutines
|
|
// read gRPC requests and then call the registered handlers to reply to them.
|
|
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
|
|
// this method returns.
|
|
// Serve will return a non-nil error unless Stop or GracefulStop is called.
|
|
func (s *Server) Serve(lis net.Listener) error {
|
|
s.mu.Lock()
|
|
s.printf("serving")
|
|
s.serve = true
|
|
if s.lis == nil {
|
|
// Serve called after Stop or GracefulStop.
|
|
s.mu.Unlock()
|
|
lis.Close()
|
|
return ErrServerStopped
|
|
}
|
|
|
|
s.serveWG.Add(1)
|
|
defer func() {
|
|
s.serveWG.Done()
|
|
select {
|
|
// Stop or GracefulStop called; block until done and return nil.
|
|
case <-s.quit:
|
|
<-s.done
|
|
default:
|
|
}
|
|
}()
|
|
|
|
s.lis[lis] = true
|
|
s.mu.Unlock()
|
|
defer func() {
|
|
s.mu.Lock()
|
|
if s.lis != nil && s.lis[lis] {
|
|
lis.Close()
|
|
delete(s.lis, lis)
|
|
}
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
var tempDelay time.Duration // how long to sleep on accept failure
|
|
|
|
for {
|
|
rawConn, err := lis.Accept()
|
|
if err != nil {
|
|
if ne, ok := err.(interface {
|
|
Temporary() bool
|
|
}); ok && ne.Temporary() {
|
|
if tempDelay == 0 {
|
|
tempDelay = 5 * time.Millisecond
|
|
} else {
|
|
tempDelay *= 2
|
|
}
|
|
if max := 1 * time.Second; tempDelay > max {
|
|
tempDelay = max
|
|
}
|
|
s.mu.Lock()
|
|
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
|
|
s.mu.Unlock()
|
|
timer := time.NewTimer(tempDelay)
|
|
select {
|
|
case <-timer.C:
|
|
case <-s.quit:
|
|
timer.Stop()
|
|
return nil
|
|
}
|
|
continue
|
|
}
|
|
s.mu.Lock()
|
|
s.printf("done serving; Accept = %v", err)
|
|
s.mu.Unlock()
|
|
|
|
select {
|
|
case <-s.quit:
|
|
return nil
|
|
default:
|
|
}
|
|
return err
|
|
}
|
|
tempDelay = 0
|
|
// Start a new goroutine to deal with rawConn so we don't stall this Accept
|
|
// loop goroutine.
|
|
//
|
|
// Make sure we account for the goroutine so GracefulStop doesn't nil out
|
|
// s.conns before this conn can be added.
|
|
s.serveWG.Add(1)
|
|
go func() {
|
|
s.handleRawConn(rawConn)
|
|
s.serveWG.Done()
|
|
}()
|
|
}
|
|
}
|
|
|
|
// handleRawConn forks a goroutine to handle a just-accepted connection that
|
|
// has not had any I/O performed on it yet.
|
|
func (s *Server) handleRawConn(rawConn net.Conn) {
|
|
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
|
|
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
|
s.mu.Unlock()
|
|
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
|
// If serverHandshake returns ErrConnDispatched, keep rawConn open.
|
|
if err != credentials.ErrConnDispatched {
|
|
rawConn.Close()
|
|
}
|
|
rawConn.SetDeadline(time.Time{})
|
|
return
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.conns == nil {
|
|
s.mu.Unlock()
|
|
conn.Close()
|
|
return
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
var serve func()
|
|
c := conn.(io.Closer)
|
|
if s.opts.useHandlerImpl {
|
|
serve = func() { s.serveUsingHandler(conn) }
|
|
} else {
|
|
// Finish handshaking (HTTP2)
|
|
st := s.newHTTP2Transport(conn, authInfo)
|
|
if st == nil {
|
|
return
|
|
}
|
|
c = st
|
|
serve = func() { s.serveStreams(st) }
|
|
}
|
|
|
|
rawConn.SetDeadline(time.Time{})
|
|
if !s.addConn(c) {
|
|
return
|
|
}
|
|
go func() {
|
|
serve()
|
|
s.removeConn(c)
|
|
}()
|
|
}
|
|
|
|
// newHTTP2Transport sets up a http/2 transport (using the
|
|
// gRPC http2 server transport in transport/http2_server.go).
|
|
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
|
|
config := &transport.ServerConfig{
|
|
MaxStreams: s.opts.maxConcurrentStreams,
|
|
AuthInfo: authInfo,
|
|
InTapHandle: s.opts.inTapHandle,
|
|
StatsHandler: s.opts.statsHandler,
|
|
KeepaliveParams: s.opts.keepaliveParams,
|
|
KeepalivePolicy: s.opts.keepalivePolicy,
|
|
InitialWindowSize: s.opts.initialWindowSize,
|
|
InitialConnWindowSize: s.opts.initialConnWindowSize,
|
|
WriteBufferSize: s.opts.writeBufferSize,
|
|
ReadBufferSize: s.opts.readBufferSize,
|
|
}
|
|
st, err := transport.NewServerTransport("http2", c, config)
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
|
|
s.mu.Unlock()
|
|
c.Close()
|
|
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
return nil
|
|
}
|
|
return st
|
|
}
|
|
|
|
func (s *Server) serveStreams(st transport.ServerTransport) {
|
|
defer st.Close()
|
|
var wg sync.WaitGroup
|
|
st.HandleStreams(func(stream *transport.Stream) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
s.handleStream(st, stream, s.traceInfo(st, stream))
|
|
}()
|
|
}, func(ctx context.Context, method string) context.Context {
|
|
if !EnableTracing {
|
|
return ctx
|
|
}
|
|
tr := trace.New("grpc.Recv."+methodFamily(method), method)
|
|
return trace.NewContext(ctx, tr)
|
|
})
|
|
wg.Wait()
|
|
}
|
|
|
|
var _ http.Handler = (*Server)(nil)
|
|
|
|
// serveUsingHandler is called from handleRawConn when s is configured
|
|
// to handle requests via the http.Handler interface. It sets up a
|
|
// net/http.Server to handle the just-accepted conn. The http.Server
|
|
// is configured to route all incoming requests (all HTTP/2 streams)
|
|
// to ServeHTTP, which creates a new ServerTransport for each stream.
|
|
// serveUsingHandler blocks until conn closes.
|
|
//
|
|
// This codepath is only used when Server.TestingUseHandlerImpl has
|
|
// been configured. This lets the end2end tests exercise the ServeHTTP
|
|
// method as one of the environment types.
|
|
//
|
|
// conn is the *tls.Conn that's already been authenticated.
|
|
func (s *Server) serveUsingHandler(conn net.Conn) {
|
|
h2s := &http2.Server{
|
|
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
|
|
}
|
|
h2s.ServeConn(conn, &http2.ServeConnOpts{
|
|
Handler: s,
|
|
})
|
|
}
|
|
|
|
// ServeHTTP implements the Go standard library's http.Handler
|
|
// interface by responding to the gRPC request r, by looking up
|
|
// the requested gRPC method in the gRPC server s.
|
|
//
|
|
// The provided HTTP request must have arrived on an HTTP/2
|
|
// connection. When using the Go standard library's server,
|
|
// practically this means that the Request must also have arrived
|
|
// over TLS.
|
|
//
|
|
// To share one port (such as 443 for https) between gRPC and an
|
|
// existing http.Handler, use a root http.Handler such as:
|
|
//
|
|
// if r.ProtoMajor == 2 && strings.HasPrefix(
|
|
// r.Header.Get("Content-Type"), "application/grpc") {
|
|
// grpcServer.ServeHTTP(w, r)
|
|
// } else {
|
|
// yourMux.ServeHTTP(w, r)
|
|
// }
|
|
//
|
|
// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
|
|
// separate from grpc-go's HTTP/2 server. Performance and features may vary
|
|
// between the two paths. ServeHTTP does not support some gRPC features
|
|
// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
|
|
// and subject to change.
|
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if !s.addConn(st) {
|
|
return
|
|
}
|
|
defer s.removeConn(st)
|
|
s.serveStreams(st)
|
|
}
|
|
|
|
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
|
|
// If tracing is not enabled, it returns nil.
|
|
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
|
|
tr, ok := trace.FromContext(stream.Context())
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
trInfo = &traceInfo{
|
|
tr: tr,
|
|
}
|
|
trInfo.firstLine.client = false
|
|
trInfo.firstLine.remoteAddr = st.RemoteAddr()
|
|
|
|
if dl, ok := stream.Context().Deadline(); ok {
|
|
trInfo.firstLine.deadline = dl.Sub(time.Now())
|
|
}
|
|
return trInfo
|
|
}
|
|
|
|
func (s *Server) addConn(c io.Closer) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conns == nil {
|
|
c.Close()
|
|
return false
|
|
}
|
|
if s.drain {
|
|
// Transport added after we drained our existing conns: drain it
|
|
// immediately.
|
|
c.(transport.ServerTransport).Drain()
|
|
}
|
|
s.conns[c] = true
|
|
return true
|
|
}
|
|
|
|
func (s *Server) removeConn(c io.Closer) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conns != nil {
|
|
delete(s.conns, c)
|
|
s.cv.Broadcast()
|
|
}
|
|
}
|
|
|
|
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
|
|
var (
|
|
outPayload *stats.OutPayload
|
|
)
|
|
if s.opts.statsHandler != nil {
|
|
outPayload = &stats.OutPayload{}
|
|
}
|
|
hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp)
|
|
if err != nil {
|
|
grpclog.Errorln("grpc: server failed to encode response: ", err)
|
|
return err
|
|
}
|
|
if len(data) > s.opts.maxSendMessageSize {
|
|
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
|
|
}
|
|
err = t.Write(stream, hdr, data, opts)
|
|
if err == nil && outPayload != nil {
|
|
outPayload.SentTime = time.Now()
|
|
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
|
|
sh := s.opts.statsHandler
|
|
if sh != nil {
|
|
begin := &stats.Begin{
|
|
BeginTime: time.Now(),
|
|
}
|
|
sh.HandleRPC(stream.Context(), begin)
|
|
defer func() {
|
|
end := &stats.End{
|
|
EndTime: time.Now(),
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
end.Error = toRPCErr(err)
|
|
}
|
|
sh.HandleRPC(stream.Context(), end)
|
|
}()
|
|
}
|
|
if trInfo != nil {
|
|
defer trInfo.tr.Finish()
|
|
trInfo.firstLine.client = false
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
defer func() {
|
|
if err != nil && err != io.EOF {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// comp and cp are used for compression. decomp and dc are used for
|
|
// decompression. If comp and decomp are both set, they are the same;
|
|
// however they are kept separate to ensure that at most one of the
|
|
// compressor/decompressor variable pairs are set for use later.
|
|
var comp, decomp encoding.Compressor
|
|
var cp Compressor
|
|
var dc Decompressor
|
|
|
|
// If dc is set and matches the stream's compression, use it. Otherwise, try
|
|
// to find a matching registered compressor for decomp.
|
|
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
|
|
dc = s.opts.dc
|
|
} else if rc != "" && rc != encoding.Identity {
|
|
decomp = encoding.GetCompressor(rc)
|
|
if decomp == nil {
|
|
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
|
|
t.WriteStatus(stream, st)
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
// If cp is set, use it. Otherwise, attempt to compress the response using
|
|
// the incoming message compression method.
|
|
//
|
|
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
|
|
if s.opts.cp != nil {
|
|
cp = s.opts.cp
|
|
stream.SetSendCompress(cp.Type())
|
|
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
|
|
// Legacy compressor not specified; attempt to respond with same encoding.
|
|
comp = encoding.GetCompressor(rc)
|
|
if comp != nil {
|
|
stream.SetSendCompress(rc)
|
|
}
|
|
}
|
|
|
|
p := &parser{r: stream}
|
|
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
|
|
if err == io.EOF {
|
|
// The entire stream is done (for unary RPC only).
|
|
return err
|
|
}
|
|
if err == io.ErrUnexpectedEOF {
|
|
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
|
}
|
|
if err != nil {
|
|
if st, ok := status.FromError(err); ok {
|
|
if e := t.WriteStatus(stream, st); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
} else {
|
|
switch st := err.(type) {
|
|
case transport.ConnectionError:
|
|
// Nothing to do here.
|
|
case transport.StreamError:
|
|
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
|
|
if e := t.WriteStatus(stream, st); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
return st.Err()
|
|
}
|
|
var inPayload *stats.InPayload
|
|
if sh != nil {
|
|
inPayload = &stats.InPayload{
|
|
RecvTime: time.Now(),
|
|
}
|
|
}
|
|
df := func(v interface{}) error {
|
|
if inPayload != nil {
|
|
inPayload.WireLength = len(req)
|
|
}
|
|
if pf == compressionMade {
|
|
var err error
|
|
if dc != nil {
|
|
req, err = dc.Do(bytes.NewReader(req))
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, err.Error())
|
|
}
|
|
} else {
|
|
tmp, _ := decomp.Decompress(bytes.NewReader(req))
|
|
req, err = ioutil.ReadAll(tmp)
|
|
if err != nil {
|
|
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
|
}
|
|
}
|
|
}
|
|
if len(req) > s.opts.maxReceiveMessageSize {
|
|
// TODO: Revisit the error code. Currently keep it consistent with
|
|
// java implementation.
|
|
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
|
|
}
|
|
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
|
|
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
|
|
}
|
|
if inPayload != nil {
|
|
inPayload.Payload = v
|
|
inPayload.Data = req
|
|
inPayload.Length = len(req)
|
|
sh.HandleRPC(stream.Context(), inPayload)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
|
}
|
|
return nil
|
|
}
|
|
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
|
|
if appErr != nil {
|
|
appStatus, ok := status.FromError(appErr)
|
|
if !ok {
|
|
// Convert appErr if it is not a grpc status error.
|
|
appErr = status.Error(convertCode(appErr), appErr.Error())
|
|
appStatus, _ = status.FromError(appErr)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
if e := t.WriteStatus(stream, appStatus); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
|
|
}
|
|
return appErr
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(stringer("OK"), false)
|
|
}
|
|
opts := &transport.Options{
|
|
Last: true,
|
|
Delay: false,
|
|
}
|
|
|
|
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
|
|
if err == io.EOF {
|
|
// The entire stream is done (for unary RPC only).
|
|
return err
|
|
}
|
|
if s, ok := status.FromError(err); ok {
|
|
if e := t.WriteStatus(stream, s); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
|
|
}
|
|
} else {
|
|
switch st := err.(type) {
|
|
case transport.ConnectionError:
|
|
// Nothing to do here.
|
|
case transport.StreamError:
|
|
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
|
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
|
|
}
|
|
// TODO: Should we be logging if writing status failed here, like above?
|
|
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
|
|
// error or allow the stats handler to see it?
|
|
return t.WriteStatus(stream, status.New(codes.OK, ""))
|
|
}
|
|
|
|
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
|
|
sh := s.opts.statsHandler
|
|
if sh != nil {
|
|
begin := &stats.Begin{
|
|
BeginTime: time.Now(),
|
|
}
|
|
sh.HandleRPC(stream.Context(), begin)
|
|
defer func() {
|
|
end := &stats.End{
|
|
EndTime: time.Now(),
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
end.Error = toRPCErr(err)
|
|
}
|
|
sh.HandleRPC(stream.Context(), end)
|
|
}()
|
|
}
|
|
ss := &serverStream{
|
|
t: t,
|
|
s: stream,
|
|
p: &parser{r: stream},
|
|
codec: s.getCodec(stream.ContentSubtype()),
|
|
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
|
|
maxSendMessageSize: s.opts.maxSendMessageSize,
|
|
trInfo: trInfo,
|
|
statsHandler: sh,
|
|
}
|
|
|
|
// If dc is set and matches the stream's compression, use it. Otherwise, try
|
|
// to find a matching registered compressor for decomp.
|
|
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
|
|
ss.dc = s.opts.dc
|
|
} else if rc != "" && rc != encoding.Identity {
|
|
ss.decomp = encoding.GetCompressor(rc)
|
|
if ss.decomp == nil {
|
|
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
|
|
t.WriteStatus(ss.s, st)
|
|
return st.Err()
|
|
}
|
|
}
|
|
|
|
// If cp is set, use it. Otherwise, attempt to compress the response using
|
|
// the incoming message compression method.
|
|
//
|
|
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
|
|
if s.opts.cp != nil {
|
|
ss.cp = s.opts.cp
|
|
stream.SetSendCompress(s.opts.cp.Type())
|
|
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
|
|
// Legacy compressor not specified; attempt to respond with same encoding.
|
|
ss.comp = encoding.GetCompressor(rc)
|
|
if ss.comp != nil {
|
|
stream.SetSendCompress(rc)
|
|
}
|
|
}
|
|
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
defer func() {
|
|
ss.mu.Lock()
|
|
if err != nil && err != io.EOF {
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
ss.trInfo.tr.SetError()
|
|
}
|
|
ss.trInfo.tr.Finish()
|
|
ss.trInfo.tr = nil
|
|
ss.mu.Unlock()
|
|
}()
|
|
}
|
|
var appErr error
|
|
var server interface{}
|
|
if srv != nil {
|
|
server = srv.server
|
|
}
|
|
if s.opts.streamInt == nil {
|
|
appErr = sd.Handler(server, ss)
|
|
} else {
|
|
info := &StreamServerInfo{
|
|
FullMethod: stream.Method(),
|
|
IsClientStream: sd.ClientStreams,
|
|
IsServerStream: sd.ServerStreams,
|
|
}
|
|
appErr = s.opts.streamInt(server, ss, info, sd.Handler)
|
|
}
|
|
if appErr != nil {
|
|
appStatus, ok := status.FromError(appErr)
|
|
if !ok {
|
|
switch err := appErr.(type) {
|
|
case transport.StreamError:
|
|
appStatus = status.New(err.Code, err.Desc)
|
|
default:
|
|
appStatus = status.New(convertCode(appErr), appErr.Error())
|
|
}
|
|
appErr = appStatus.Err()
|
|
}
|
|
if trInfo != nil {
|
|
ss.mu.Lock()
|
|
ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
|
|
ss.trInfo.tr.SetError()
|
|
ss.mu.Unlock()
|
|
}
|
|
t.WriteStatus(ss.s, appStatus)
|
|
// TODO: Should we log an error from WriteStatus here and below?
|
|
return appErr
|
|
}
|
|
if trInfo != nil {
|
|
ss.mu.Lock()
|
|
ss.trInfo.tr.LazyLog(stringer("OK"), false)
|
|
ss.mu.Unlock()
|
|
}
|
|
return t.WriteStatus(ss.s, status.New(codes.OK, ""))
|
|
|
|
}
|
|
|
|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
|
|
sm := stream.Method()
|
|
if sm != "" && sm[0] == '/' {
|
|
sm = sm[1:]
|
|
}
|
|
pos := strings.LastIndex(sm, "/")
|
|
if pos == -1 {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
|
|
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
return
|
|
}
|
|
service := sm[:pos]
|
|
method := sm[pos+1:]
|
|
srv, ok := s.m[service]
|
|
if !ok {
|
|
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
|
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
|
return
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
errDesc := fmt.Sprintf("unknown service %v", service)
|
|
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
return
|
|
}
|
|
// Unary RPC or Streaming RPC?
|
|
if md, ok := srv.md[method]; ok {
|
|
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
|
return
|
|
}
|
|
if sd, ok := srv.sd[method]; ok {
|
|
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
|
return
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
|
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
|
return
|
|
}
|
|
errDesc := fmt.Sprintf("unknown method %v", method)
|
|
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
}
|
|
|
|
// Stop stops the gRPC server. It immediately closes all open
|
|
// connections and listeners.
|
|
// It cancels all active RPCs on the server side and the corresponding
|
|
// pending RPCs on the client side will get notified by connection
|
|
// errors.
|
|
func (s *Server) Stop() {
|
|
s.quitOnce.Do(func() {
|
|
close(s.quit)
|
|
})
|
|
|
|
defer func() {
|
|
s.serveWG.Wait()
|
|
s.doneOnce.Do(func() {
|
|
close(s.done)
|
|
})
|
|
}()
|
|
|
|
s.mu.Lock()
|
|
listeners := s.lis
|
|
s.lis = nil
|
|
st := s.conns
|
|
s.conns = nil
|
|
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
|
|
s.cv.Broadcast()
|
|
s.mu.Unlock()
|
|
|
|
for lis := range listeners {
|
|
lis.Close()
|
|
}
|
|
for c := range st {
|
|
c.Close()
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.events != nil {
|
|
s.events.Finish()
|
|
s.events = nil
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// GracefulStop stops the gRPC server gracefully. It stops the server from
|
|
// accepting new connections and RPCs and blocks until all the pending RPCs are
|
|
// finished.
|
|
func (s *Server) GracefulStop() {
|
|
s.quitOnce.Do(func() {
|
|
close(s.quit)
|
|
})
|
|
|
|
defer func() {
|
|
s.doneOnce.Do(func() {
|
|
close(s.done)
|
|
})
|
|
}()
|
|
|
|
s.mu.Lock()
|
|
if s.conns == nil {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
for lis := range s.lis {
|
|
lis.Close()
|
|
}
|
|
s.lis = nil
|
|
if !s.drain {
|
|
for c := range s.conns {
|
|
c.(transport.ServerTransport).Drain()
|
|
}
|
|
s.drain = true
|
|
}
|
|
|
|
// Wait for serving threads to be ready to exit. Only then can we be sure no
|
|
// new conns will be created.
|
|
s.mu.Unlock()
|
|
s.serveWG.Wait()
|
|
s.mu.Lock()
|
|
|
|
for len(s.conns) != 0 {
|
|
s.cv.Wait()
|
|
}
|
|
s.conns = nil
|
|
if s.events != nil {
|
|
s.events.Finish()
|
|
s.events = nil
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func init() {
|
|
internal.TestingUseHandlerImpl = func(arg interface{}) {
|
|
arg.(*Server).opts.useHandlerImpl = true
|
|
}
|
|
}
|
|
|
|
// contentSubtype must be lowercase
|
|
// cannot return nil
|
|
func (s *Server) getCodec(contentSubtype string) baseCodec {
|
|
if s.opts.codec != nil {
|
|
return s.opts.codec
|
|
}
|
|
if contentSubtype == "" {
|
|
return encoding.GetCodec(proto.Name)
|
|
}
|
|
codec := encoding.GetCodec(contentSubtype)
|
|
if codec == nil {
|
|
return encoding.GetCodec(proto.Name)
|
|
}
|
|
return codec
|
|
}
|
|
|
|
// SetHeader sets the header metadata.
|
|
// When called multiple times, all the provided metadata will be merged.
|
|
// All the metadata will be sent out when one of the following happens:
|
|
// - grpc.SendHeader() is called;
|
|
// - The first response is sent out;
|
|
// - An RPC status is sent out (error or success).
|
|
func SetHeader(ctx context.Context, md metadata.MD) error {
|
|
if md.Len() == 0 {
|
|
return nil
|
|
}
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
return stream.SetHeader(md)
|
|
}
|
|
|
|
// SendHeader sends header metadata. It may be called at most once.
|
|
// The provided md and headers set by SetHeader() will be sent.
|
|
func SendHeader(ctx context.Context, md metadata.MD) error {
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
t := stream.ServerTransport()
|
|
if t == nil {
|
|
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
|
|
}
|
|
if err := t.WriteHeader(stream, md); err != nil {
|
|
return toRPCErr(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
|
|
// When called more than once, all the provided metadata will be merged.
|
|
func SetTrailer(ctx context.Context, md metadata.MD) error {
|
|
if md.Len() == 0 {
|
|
return nil
|
|
}
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
return stream.SetTrailer(md)
|
|
}
|