mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: upgrade to gRPC 1.16.0 (#1352)
This commit is contained in:
2
go.mod
2
go.mod
@@ -39,7 +39,7 @@ require (
|
||||
golang.org/x/sys v0.0.0-20181019160139-8e24a49d80f8
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
|
||||
google.golang.org/api v0.0.0-20181019000435-7fb5a8353b60 // indirect
|
||||
google.golang.org/grpc v1.15.0
|
||||
google.golang.org/grpc v1.16.0
|
||||
gopkg.in/go-playground/validator.v8 v8.18.2 // indirect
|
||||
)
|
||||
|
||||
|
||||
2
go.sum
2
go.sum
@@ -251,6 +251,8 @@ google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e/go.mod h1:JiN7NxoA
|
||||
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
||||
google.golang.org/grpc v1.15.0 h1:Az/KuahOM4NAidTEuJCv/RonAA7rYsTPkqXVjr+8OOw=
|
||||
google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
|
||||
google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY=
|
||||
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
|
||||
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
|
||||
3
vendor/google.golang.org/grpc/.travis.yml
generated
vendored
3
vendor/google.golang.org/grpc/.travis.yml
generated
vendored
@@ -26,7 +26,8 @@ before_install:
|
||||
- if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then VET_SKIP_PROTO=1; fi
|
||||
|
||||
install:
|
||||
- if [[ "${GO111MODULE}" = "on" ]]; then go mod download; else make testdeps; fi
|
||||
- try3() { eval "$*" || eval "$*" || eval "$*"; }
|
||||
- try3 'if [[ "${GO111MODULE}" = "on" ]]; then go mod download; else make testdeps; fi'
|
||||
- if [[ "${GAE}" = 1 ]]; then source ./install_gae.sh; make testappenginedeps; fi
|
||||
- if [[ "${VET}" = 1 ]]; then ./vet.sh -install; fi
|
||||
|
||||
|
||||
4
vendor/google.golang.org/grpc/Makefile
generated
vendored
4
vendor/google.golang.org/grpc/Makefile
generated
vendored
@@ -17,10 +17,10 @@ proto:
|
||||
go generate google.golang.org/grpc/...
|
||||
|
||||
test: testdeps
|
||||
go test -cpu 1,4 -timeout 5m google.golang.org/grpc/...
|
||||
go test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
|
||||
|
||||
testappengine: testappenginedeps
|
||||
goapp test -cpu 1,4 -timeout 5m google.golang.org/grpc/...
|
||||
goapp test -cpu 1,4 -timeout 7m google.golang.org/grpc/...
|
||||
|
||||
testappenginedeps:
|
||||
goapp get -d -v -t -tags 'appengine appenginevm' google.golang.org/grpc/...
|
||||
|
||||
22
vendor/google.golang.org/grpc/README.md
generated
vendored
22
vendor/google.golang.org/grpc/README.md
generated
vendored
@@ -43,3 +43,25 @@ Please update proto package, gRPC package and rebuild the proto files:
|
||||
- `go get -u github.com/golang/protobuf/{proto,protoc-gen-go}`
|
||||
- `go get -u google.golang.org/grpc`
|
||||
- `protoc --go_out=plugins=grpc:. *.proto`
|
||||
|
||||
#### How to turn on logging
|
||||
|
||||
The default logger is controlled by the environment variables. Turn everything
|
||||
on by setting:
|
||||
|
||||
```
|
||||
GRPC_GO_LOG_VERBOSITY_LEVEL=99 GRPC_GO_LOG_SEVERITY_LEVEL=info
|
||||
```
|
||||
|
||||
#### The RPC failed with error `"code = Unavailable desc = transport is closing"`
|
||||
|
||||
This error means the connection the RPC is using was closed, and there are many
|
||||
possible reasons, including:
|
||||
1. mis-configured transport credentials, connection failed on handshaking
|
||||
1. bytes disrupted, possibly by a proxy in between
|
||||
1. server shutdown
|
||||
|
||||
It can be tricky to debug this because the error happens on the client side but
|
||||
the root cause of the connection being closed is on the server side. Turn on
|
||||
logging on __both client and server__, and see if there are any transport
|
||||
errors.
|
||||
|
||||
15
vendor/google.golang.org/grpc/balancer/balancer.go
generated
vendored
15
vendor/google.golang.org/grpc/balancer/balancer.go
generated
vendored
@@ -28,6 +28,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
@@ -88,7 +89,12 @@ type SubConn interface {
|
||||
}
|
||||
|
||||
// NewSubConnOptions contains options to create new SubConn.
|
||||
type NewSubConnOptions struct{}
|
||||
type NewSubConnOptions struct {
|
||||
// CredsBundle is the credentials bundle that will be used in the created
|
||||
// SubConn. If it's nil, the original creds from grpc DialOptions will be
|
||||
// used.
|
||||
CredsBundle credentials.Bundle
|
||||
}
|
||||
|
||||
// ClientConn represents a gRPC ClientConn.
|
||||
//
|
||||
@@ -125,6 +131,8 @@ type BuildOptions struct {
|
||||
// use to dial to a remote load balancer server. The Balancer implementations
|
||||
// can ignore this if it does not need to talk to another party securely.
|
||||
DialCreds credentials.TransportCredentials
|
||||
// CredsBundle is the credentials bundle that the Balancer can use.
|
||||
CredsBundle credentials.Bundle
|
||||
// Dialer is the custom dialer the Balancer implementation can use to dial
|
||||
// to a remote load balancer server. The Balancer implementations
|
||||
// can ignore this if it doesn't need to talk to remote balancer.
|
||||
@@ -147,12 +155,17 @@ type PickOptions struct {
|
||||
// FullMethodName is the method name that NewClientStream() is called
|
||||
// with. The canonical format is /service/Method.
|
||||
FullMethodName string
|
||||
// Header contains the metadata from the RPC's client header. The metadata
|
||||
// should not be modified; make a copy first if needed.
|
||||
Header metadata.MD
|
||||
}
|
||||
|
||||
// DoneInfo contains additional information for done.
|
||||
type DoneInfo struct {
|
||||
// Err is the rpc error the RPC finished with. It could be nil.
|
||||
Err error
|
||||
// Trailer contains the metadata from the RPC's trailer, if present.
|
||||
Trailer metadata.MD
|
||||
// BytesSent indicates if any bytes have been sent to the server.
|
||||
BytesSent bool
|
||||
// BytesReceived indicates if any byte has been received from the server.
|
||||
|
||||
5
vendor/google.golang.org/grpc/balancer_conn_wrappers.go
generated
vendored
5
vendor/google.golang.org/grpc/balancer_conn_wrappers.go
generated
vendored
@@ -197,7 +197,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
|
||||
if ccb.subConns == nil {
|
||||
return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
|
||||
}
|
||||
ac, err := ccb.cc.newAddrConn(addrs)
|
||||
ac, err := ccb.cc.newAddrConn(addrs, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -257,6 +257,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
|
||||
}
|
||||
if !acbw.ac.tryUpdateAddrs(addrs) {
|
||||
cc := acbw.ac.cc
|
||||
opts := acbw.ac.scopts
|
||||
acbw.ac.mu.Lock()
|
||||
// Set old ac.acbw to nil so the Shutdown state update will be ignored
|
||||
// by balancer.
|
||||
@@ -272,7 +273,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
|
||||
return
|
||||
}
|
||||
|
||||
ac, err := cc.newAddrConn(addrs)
|
||||
ac, err := cc.newAddrConn(addrs, opts)
|
||||
if err != nil {
|
||||
grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
|
||||
return
|
||||
|
||||
701
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
701
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@@ -30,7 +30,6 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/balancer"
|
||||
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -41,6 +40,7 @@ import (
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
||||
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
|
||||
@@ -80,6 +80,9 @@ var (
|
||||
// being set for ClientConn. Users should either set one or explicitly
|
||||
// call WithInsecure DialOption to disable security.
|
||||
errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
|
||||
// errTransportCredsAndBundle indicates that creds bundle is used together
|
||||
// with other individual Transport Credentials.
|
||||
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
|
||||
// errTransportCredentialsMissing indicates that users want to transmit security
|
||||
// information (e.g., oauth2 token) which requires secure connection on an insecure
|
||||
// connection.
|
||||
@@ -137,17 +140,33 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
if channelz.IsOn() {
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
cc.csMgr.channelzID = cc.channelzID
|
||||
}
|
||||
|
||||
if !cc.dopts.insecure {
|
||||
if cc.dopts.copts.TransportCredentials == nil {
|
||||
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
|
||||
return nil, errNoTransportSecurity
|
||||
}
|
||||
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
|
||||
return nil, errTransportCredsAndBundle
|
||||
}
|
||||
} else {
|
||||
if cc.dopts.copts.TransportCredentials != nil {
|
||||
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
|
||||
return nil, errCredentialsConflict
|
||||
}
|
||||
for _, cd := range cc.dopts.copts.PerRPCCredentials {
|
||||
@@ -260,6 +279,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
}
|
||||
cc.balancerBuildOpts = balancer.BuildOptions{
|
||||
DialCreds: credsClone,
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
ChannelzParentID: cc.channelzID,
|
||||
}
|
||||
@@ -308,6 +328,7 @@ type connectivityStateManager struct {
|
||||
mu sync.Mutex
|
||||
state connectivity.State
|
||||
notifyChan chan struct{}
|
||||
channelzID int64
|
||||
}
|
||||
|
||||
// updateState updates the connectivity.State of ClientConn.
|
||||
@@ -323,6 +344,12 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
|
||||
return
|
||||
}
|
||||
csm.state = state
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
if csm.notifyChan != nil {
|
||||
// There are other goroutines waiting on this channel.
|
||||
close(csm.notifyChan)
|
||||
@@ -500,10 +527,26 @@ func (cc *ClientConn) switchBalancer(name string) {
|
||||
}
|
||||
|
||||
builder := balancer.Get(name)
|
||||
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
|
||||
// we reuse previous one?
|
||||
if channelz.IsOn() {
|
||||
if builder == nil {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
|
||||
Severity: channelz.CtWarning,
|
||||
})
|
||||
} else {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
}
|
||||
if builder == nil {
|
||||
grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
|
||||
builder = newPickfirstBuilder()
|
||||
}
|
||||
|
||||
cc.preBalancerName = cc.curBalancerName
|
||||
cc.curBalancerName = builder.Name()
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
||||
@@ -524,13 +567,15 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
|
||||
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
|
||||
//
|
||||
// Caller needs to make sure len(addrs) > 0.
|
||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
|
||||
ac := &addrConn{
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
resetBackoff: make(chan struct{}),
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
scopts: opts,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
|
||||
resetBackoff: make(chan struct{}),
|
||||
}
|
||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||
@@ -541,6 +586,14 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
}
|
||||
cc.conns[ac] = struct{}{}
|
||||
cc.mu.Unlock()
|
||||
@@ -590,11 +643,9 @@ func (cc *ClientConn) incrCallsFailed() {
|
||||
atomic.AddInt64(&cc.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
// connect starts to creating transport and also starts the transport monitor
|
||||
// goroutine for this ac.
|
||||
// connect starts creating a transport.
|
||||
// It does nothing if the ac is not IDLE.
|
||||
// TODO(bar) Move this to the addrConn section.
|
||||
// This was part of resetAddrConn, keep it here to make the diff look clean.
|
||||
func (ac *addrConn) connect() error {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
@@ -605,22 +656,12 @@ func (ac *addrConn) connect() error {
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
ac.state = connectivity.Connecting
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Start a goroutine connecting to the server asynchronously.
|
||||
go func() {
|
||||
if err := ac.resetTransport(); err != nil {
|
||||
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
|
||||
if err != errConnClosing {
|
||||
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||
ac.tearDown(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
ac.transportMonitor()
|
||||
}()
|
||||
go ac.resetTransport(false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -649,7 +690,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
|
||||
if curAddrFound {
|
||||
ac.addrs = addrs
|
||||
ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
|
||||
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
|
||||
}
|
||||
|
||||
return curAddrFound
|
||||
@@ -675,8 +716,10 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
|
||||
}
|
||||
|
||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||
hdr, _ := metadata.FromOutgoingContext(ctx)
|
||||
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
|
||||
FullMethodName: method,
|
||||
Header: hdr,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, toRPCErr(err)
|
||||
@@ -690,11 +733,29 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
|
||||
if cc.dopts.disableServiceConfig {
|
||||
return nil
|
||||
}
|
||||
if cc.scRaw == js {
|
||||
return nil
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
|
||||
// for human consumption.
|
||||
Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
sc, err := parseServiceConfig(js)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc.mu.Lock()
|
||||
// Check if the ClientConn is already closed. Some fields (e.g.
|
||||
// balancerWrapper) are set to nil when closing the ClientConn, and could
|
||||
// cause nil pointer panic if we don't have this check.
|
||||
if cc.conns == nil {
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
cc.scRaw = js
|
||||
cc.sc = sc
|
||||
|
||||
@@ -788,6 +849,19 @@ func (cc *ClientConn) Close() error {
|
||||
ac.tearDown(ErrClientConnClosing)
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
ted := &channelz.TraceEventDesc{
|
||||
Desc: "Channel Deleted",
|
||||
Severity: channelz.CtINFO,
|
||||
}
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
ted.Parent = &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
}
|
||||
}
|
||||
channelz.AddTraceEvent(cc.channelzID, ted)
|
||||
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
|
||||
// the entity beng deleted, and thus prevent it from being deleted right away.
|
||||
channelz.RemoveEntry(cc.channelzID)
|
||||
}
|
||||
return nil
|
||||
@@ -799,26 +873,25 @@ type addrConn struct {
|
||||
cancel context.CancelFunc
|
||||
|
||||
cc *ClientConn
|
||||
addrs []resolver.Address
|
||||
dopts dialOptions
|
||||
events trace.EventLog
|
||||
acbw balancer.SubConn
|
||||
scopts balancer.NewSubConnOptions
|
||||
|
||||
mu sync.Mutex
|
||||
curAddr resolver.Address
|
||||
reconnectIdx int // The index in addrs list to start reconnecting from.
|
||||
state connectivity.State
|
||||
// ready is closed and becomes nil when a new transport is up or failed
|
||||
// due to timeout.
|
||||
ready chan struct{}
|
||||
transport transport.ClientTransport
|
||||
transport transport.ClientTransport // The current transport.
|
||||
|
||||
// The reason this addrConn is torn down.
|
||||
tearDownErr error
|
||||
mu sync.Mutex
|
||||
addrIdx int // The index in addrs list to start reconnecting from.
|
||||
curAddr resolver.Address // The current address.
|
||||
addrs []resolver.Address // All addresses that the resolver resolved to.
|
||||
|
||||
connectRetryNum int
|
||||
// Use updateConnectivityState for updating addrConn's connectivity state.
|
||||
state connectivity.State
|
||||
|
||||
tearDownErr error // The reason this addrConn is torn down.
|
||||
|
||||
backoffIdx int
|
||||
// backoffDeadline is the time until which resetTransport needs to
|
||||
// wait before increasing connectRetryNum count.
|
||||
// wait before increasing backoffIdx count.
|
||||
backoffDeadline time.Time
|
||||
// connectDeadline is the time by which all connection
|
||||
// negotiations must complete.
|
||||
@@ -828,6 +901,19 @@ type addrConn struct {
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
|
||||
successfulHandshake bool
|
||||
}
|
||||
|
||||
// Note: this requires a lock on ac.mu.
|
||||
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
|
||||
ac.state = s
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// adjustParams updates parameters used to create transports upon
|
||||
@@ -844,301 +930,328 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||
}
|
||||
}
|
||||
|
||||
// printf records an event in ac's event log, unless ac has been closed.
|
||||
// REQUIRES ac.mu is held.
|
||||
func (ac *addrConn) printf(format string, a ...interface{}) {
|
||||
if ac.events != nil {
|
||||
ac.events.Printf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// resetTransport recreates a transport to the address for ac. The old
|
||||
// transport will close itself on error or when the clientconn is closed.
|
||||
// The created transport must receive initial settings frame from the server.
|
||||
// In case that doesn't happen, transportMonitor will kill the newly created
|
||||
// transport after connectDeadline has expired.
|
||||
// In case there was an error on the transport before the settings frame was
|
||||
// received, resetTransport resumes connecting to backends after the one that
|
||||
// was previously connected to. In case end of the list is reached, resetTransport
|
||||
// backs off until the original deadline.
|
||||
// If the DialOption WithWaitForHandshake was set, resetTrasport returns
|
||||
// successfully only after server settings are received.
|
||||
// resetTransport makes sure that a healthy ac.transport exists.
|
||||
//
|
||||
// TODO(bar) make sure all state transitions are valid.
|
||||
func (ac *addrConn) resetTransport() error {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
}
|
||||
ac.transport = nil
|
||||
ridx := ac.reconnectIdx
|
||||
ac.mu.Unlock()
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
var backoffDeadline, connectDeadline time.Time
|
||||
var resetBackoff chan struct{}
|
||||
for connectRetryNum := 0; ; connectRetryNum++ {
|
||||
ac.mu.Lock()
|
||||
if ac.backoffDeadline.IsZero() {
|
||||
// This means either a successful HTTP2 connection was established
|
||||
// or this is the first time this addrConn is trying to establish a
|
||||
// connection.
|
||||
backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
|
||||
resetBackoff = ac.resetBackoff
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := getMinConnectTimeout()
|
||||
if backoffFor > dialDuration {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
start := time.Now()
|
||||
backoffDeadline = start.Add(backoffFor)
|
||||
connectDeadline = start.Add(dialDuration)
|
||||
ridx = 0 // Start connecting from the beginning.
|
||||
} else {
|
||||
// Continue trying to connect with the same deadlines.
|
||||
connectRetryNum = ac.connectRetryNum
|
||||
backoffDeadline = ac.backoffDeadline
|
||||
connectDeadline = ac.connectDeadline
|
||||
ac.backoffDeadline = time.Time{}
|
||||
ac.connectDeadline = time.Time{}
|
||||
ac.connectRetryNum = 0
|
||||
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
|
||||
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
|
||||
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
|
||||
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
|
||||
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
|
||||
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
|
||||
//
|
||||
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
|
||||
func (ac *addrConn) resetTransport(resolveNow bool) {
|
||||
for {
|
||||
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
|
||||
// want to reset is if we have tried all the addresses handed to us.
|
||||
if resolveNow {
|
||||
ac.mu.Lock()
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
return
|
||||
}
|
||||
ac.printf("connecting")
|
||||
if ac.state != connectivity.Connecting {
|
||||
ac.state = connectivity.Connecting
|
||||
|
||||
// If the connection is READY, a failure must have occurred.
|
||||
// Otherwise, we'll consider this is a transient failure when:
|
||||
// We've exhausted all addresses
|
||||
// We're in CONNECTING
|
||||
// And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
|
||||
if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
// copy ac.addrs in case of race
|
||||
addrsIter := make([]resolver.Address, len(ac.addrs))
|
||||
copy(addrsIter, ac.addrs)
|
||||
copts := ac.dopts.copts
|
||||
ac.transport = nil
|
||||
ac.mu.Unlock()
|
||||
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts, resetBackoff)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
if err := ac.nextAddr(); err != nil {
|
||||
return
|
||||
}
|
||||
if connected {
|
||||
return nil
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
backoffIdx := ac.backoffIdx
|
||||
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
|
||||
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := getMinConnectTimeout()
|
||||
if backoffFor > dialDuration {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
start := time.Now()
|
||||
connectDeadline := start.Add(dialDuration)
|
||||
ac.backoffDeadline = start.Add(backoffFor)
|
||||
ac.connectDeadline = connectDeadline
|
||||
|
||||
ac.mu.Unlock()
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if ac.state != connectivity.Connecting {
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
|
||||
addr := ac.addrs[ac.addrIdx]
|
||||
copts := ac.dopts.copts
|
||||
if ac.scopts.CredsBundle != nil {
|
||||
copts.CredsBundle = ac.scopts.CredsBundle
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
|
||||
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// createTransport creates a connection to one of the backends in addrs.
|
||||
// It returns true if a connection was established.
|
||||
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) {
|
||||
for i := ridx; i < len(addrs); i++ {
|
||||
addr := addrs[i]
|
||||
target := transport.TargetInfo{
|
||||
Addr: addr.Addr,
|
||||
Metadata: addr.Metadata,
|
||||
Authority: ac.cc.authority,
|
||||
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
||||
oneReset := sync.Once{}
|
||||
skipReset := make(chan struct{})
|
||||
allowedToReset := make(chan struct{})
|
||||
prefaceReceived := make(chan struct{})
|
||||
onCloseCalled := make(chan struct{})
|
||||
|
||||
var prefaceMu sync.Mutex
|
||||
var serverPrefaceReceived bool
|
||||
var clientPrefaceWrote bool
|
||||
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
||||
return
|
||||
case <-allowedToReset: // We're in the clear to reset.
|
||||
go oneReset.Do(func() { ac.resetTransport(false) })
|
||||
}
|
||||
done := make(chan struct{})
|
||||
onPrefaceReceipt := func() {
|
||||
ac.mu.Lock()
|
||||
close(done)
|
||||
if !ac.backoffDeadline.IsZero() {
|
||||
// If we haven't already started reconnecting to
|
||||
// other backends.
|
||||
// Note, this can happen when writer notices an error
|
||||
// and triggers resetTransport while at the same time
|
||||
// reader receives the preface and invokes this closure.
|
||||
ac.backoffDeadline = time.Time{}
|
||||
ac.connectDeadline = time.Time{}
|
||||
ac.connectRetryNum = 0
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
|
||||
|
||||
onClose := func() {
|
||||
close(onCloseCalled)
|
||||
prefaceTimer.Stop()
|
||||
|
||||
select {
|
||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
||||
return
|
||||
case <-allowedToReset: // We're in the clear to reset.
|
||||
oneReset.Do(func() { ac.resetTransport(false) })
|
||||
}
|
||||
// Do not cancel in the success path because of
|
||||
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
|
||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||
if channelz.IsOn() {
|
||||
copts.ChannelzParentID = ac.channelzID
|
||||
}
|
||||
|
||||
target := transport.TargetInfo{
|
||||
Addr: addr.Addr,
|
||||
Metadata: addr.Metadata,
|
||||
Authority: ac.cc.authority,
|
||||
}
|
||||
|
||||
onPrefaceReceipt := func() {
|
||||
close(prefaceReceived)
|
||||
prefaceTimer.Stop()
|
||||
|
||||
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
|
||||
ac.mu.Lock()
|
||||
|
||||
prefaceMu.Lock()
|
||||
serverPrefaceReceived = true
|
||||
if clientPrefaceWrote {
|
||||
ac.successfulHandshake = true
|
||||
ac.backoffDeadline = time.Time{}
|
||||
ac.connectDeadline = time.Time{}
|
||||
ac.addrIdx = 0
|
||||
ac.backoffIdx = 0
|
||||
}
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
|
||||
if err != nil {
|
||||
cancel()
|
||||
ac.cc.blockingpicker.updateConnectionError(err)
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
return false, errConnClosing
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
||||
continue
|
||||
prefaceMu.Unlock()
|
||||
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
// Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078.
|
||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||
if channelz.IsOn() {
|
||||
copts.ChannelzParentID = ac.channelzID
|
||||
}
|
||||
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
|
||||
|
||||
if err == nil {
|
||||
prefaceMu.Lock()
|
||||
clientPrefaceWrote = true
|
||||
if serverPrefaceReceived {
|
||||
ac.successfulHandshake = true
|
||||
}
|
||||
prefaceMu.Unlock()
|
||||
|
||||
if ac.dopts.waitForHandshake {
|
||||
select {
|
||||
case <-done:
|
||||
case <-connectCtx.Done():
|
||||
// Didn't receive server preface, must kill this new transport now.
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
|
||||
case <-prefaceTimer.C:
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
continue
|
||||
case <-ac.ctx.Done():
|
||||
err = errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
select {
|
||||
case <-prefaceTimer.C:
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
case <-prefaceReceived:
|
||||
// We got the preface just in the nick of time - huzzah!
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// newTr is either nil, or closed.
|
||||
cancel()
|
||||
ac.cc.blockingpicker.updateConnectionError(err)
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
// ac.tearDonn(...) has been invoked.
|
||||
newTr.Close()
|
||||
return false, errConnClosing
|
||||
}
|
||||
ac.printf("ready")
|
||||
ac.state = connectivity.Ready
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.transport = newTr
|
||||
ac.curAddr = addr
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
// If the server has responded back with preface already,
|
||||
// don't set the reconnect parameters.
|
||||
default:
|
||||
ac.connectRetryNum = connectRetryNum
|
||||
ac.backoffDeadline = backoffDeadline
|
||||
ac.connectDeadline = connectDeadline
|
||||
ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
|
||||
|
||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
||||
// in resetTransport take care of reconnecting.
|
||||
close(skipReset)
|
||||
|
||||
return errConnClosing
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return true, nil
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
||||
|
||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
||||
// in resetTransport take care of reconnecting.
|
||||
close(skipReset)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return false, errConnClosing
|
||||
|
||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
||||
// in resetTransport take care of reconnecting.
|
||||
close(skipReset)
|
||||
|
||||
newTr.Close()
|
||||
return errConnClosing
|
||||
}
|
||||
ac.state = connectivity.TransientFailure
|
||||
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
ac.transport = newTr
|
||||
ac.curAddr = addr
|
||||
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
|
||||
// goroutine failing races with all the code in this method that sets the connection to "ready".
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
}
|
||||
|
||||
// nextAddr increments the addrIdx if there are more addresses to try. If
|
||||
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
|
||||
// increment the backoffIdx.
|
||||
//
|
||||
// nextAddr must be called without ac.mu being held.
|
||||
func (ac *addrConn) nextAddr() error {
|
||||
ac.mu.Lock()
|
||||
|
||||
// If a handshake has been observed, we expect the counters to have manually
|
||||
// been reset so we'll just return, since we want the next usage to start
|
||||
// at index 0.
|
||||
if ac.successfulHandshake {
|
||||
ac.successfulHandshake = false
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if ac.addrIdx < len(ac.addrs)-1 {
|
||||
ac.addrIdx++
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
ac.addrIdx = 0
|
||||
ac.backoffIdx++
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
backoffDeadline := ac.backoffDeadline
|
||||
b := ac.resetBackoff
|
||||
ac.mu.Unlock()
|
||||
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-resetBackoff:
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return false, ac.ctx.Err()
|
||||
return ac.ctx.Err()
|
||||
}
|
||||
return false, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) resetConnectBackoff() {
|
||||
ac.mu.Lock()
|
||||
close(ac.resetBackoff)
|
||||
ac.backoffIdx = 0
|
||||
ac.resetBackoff = make(chan struct{})
|
||||
ac.connectRetryNum = 0
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
// Run in a goroutine to track the error in transport and create the
|
||||
// new transport if an error happens. It returns when the channel is closing.
|
||||
func (ac *addrConn) transportMonitor() {
|
||||
for {
|
||||
var timer *time.Timer
|
||||
var cdeadline <-chan time.Time
|
||||
ac.mu.Lock()
|
||||
t := ac.transport
|
||||
if !ac.connectDeadline.IsZero() {
|
||||
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
|
||||
cdeadline = timer.C
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
// Block until we receive a goaway or an error occurs.
|
||||
select {
|
||||
case <-t.GoAway():
|
||||
done := t.Error()
|
||||
cleanup := t.Close
|
||||
// Since this transport will be orphaned (won't have a transportMonitor)
|
||||
// we need to launch a goroutine to keep track of clientConn.Close()
|
||||
// happening since it might not be noticed by any other goroutine for a while.
|
||||
go func() {
|
||||
<-done
|
||||
cleanup()
|
||||
}()
|
||||
case <-t.Error():
|
||||
// In case this is triggered because clientConn.Close()
|
||||
// was called, we want to immeditately close the transport
|
||||
// since no other goroutine might notice it for a while.
|
||||
t.Close()
|
||||
case <-cdeadline:
|
||||
ac.mu.Lock()
|
||||
// This implies that client received server preface.
|
||||
if ac.backoffDeadline.IsZero() {
|
||||
ac.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
timer = nil
|
||||
// No server preface received until deadline.
|
||||
// Kill the connection.
|
||||
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
|
||||
t.Close()
|
||||
}
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
}
|
||||
// If a GoAway happened, regardless of error, adjust our keepalive
|
||||
// parameters as appropriate.
|
||||
select {
|
||||
case <-t.GoAway():
|
||||
ac.adjustParams(t.GetGoAwayReason())
|
||||
default:
|
||||
}
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Set connectivity state to TransientFailure before calling
|
||||
// resetTransport. Transition READY->CONNECTING is not valid.
|
||||
ac.state = connectivity.TransientFailure
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
ac.curAddr = resolver.Address{}
|
||||
ac.mu.Unlock()
|
||||
if err := ac.resetTransport(); err != nil {
|
||||
ac.mu.Lock()
|
||||
ac.printf("transport exiting: %v", err)
|
||||
ac.mu.Unlock()
|
||||
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
|
||||
if err != errConnClosing {
|
||||
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||
ac.tearDown(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getReadyTransport returns the transport if ac's state is READY.
|
||||
// Otherwise it returns nil, false.
|
||||
// If ac's state is IDLE, it will trigger ac to connect.
|
||||
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Ready {
|
||||
if ac.state == connectivity.Ready && ac.transport != nil {
|
||||
t := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return t, true
|
||||
@@ -1161,34 +1274,42 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
||||
// tight loop.
|
||||
// tearDown doesn't remove ac from ac.cc.conns.
|
||||
func (ac *addrConn) tearDown(err error) {
|
||||
ac.cancel()
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// We have to set the state to Shutdown before anything else to prevent races
|
||||
// between setting the state and logic that waits on context cancelation / etc.
|
||||
ac.updateConnectivityState(connectivity.Shutdown)
|
||||
ac.cancel()
|
||||
ac.tearDownErr = err
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.curAddr = resolver.Address{}
|
||||
if err == errConnDrain && ac.transport != nil {
|
||||
// GracefulClose(...) may be executed multiple times when
|
||||
// i) receiving multiple GoAway frames from the server; or
|
||||
// ii) there are concurrent name resolver/Balancer triggered
|
||||
// address removal and GoAway.
|
||||
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
|
||||
ac.mu.Unlock()
|
||||
ac.transport.GracefulClose()
|
||||
}
|
||||
ac.state = connectivity.Shutdown
|
||||
ac.tearDownErr = err
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
if ac.events != nil {
|
||||
ac.events.Finish()
|
||||
ac.events = nil
|
||||
}
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
ac.mu.Lock()
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel Deleted",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
|
||||
// the entity beng deleted, and thus prevent it from being deleted right away.
|
||||
channelz.RemoveEntry(ac.channelzID)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ac *addrConn) getState() connectivity.State {
|
||||
|
||||
19
vendor/google.golang.org/grpc/credentials/credentials.go
generated
vendored
19
vendor/google.golang.org/grpc/credentials/credentials.go
generated
vendored
@@ -108,6 +108,25 @@ type TransportCredentials interface {
|
||||
OverrideServerName(string) error
|
||||
}
|
||||
|
||||
// Bundle is a combination of TransportCredentials and PerRPCCredentials.
|
||||
//
|
||||
// It also contains a mode switching method, so it can be used as a combination
|
||||
// of different credential policies.
|
||||
//
|
||||
// Bundle cannot be used together with individual TransportCredentials.
|
||||
// PerRPCCredentials from Bundle will be appended to other PerRPCCredentials.
|
||||
//
|
||||
// This API is experimental.
|
||||
type Bundle interface {
|
||||
TransportCredentials() TransportCredentials
|
||||
PerRPCCredentials() PerRPCCredentials
|
||||
// NewWithMode should make a copy of Bundle, and switch mode. Modifying the
|
||||
// existing Bundle may cause races.
|
||||
//
|
||||
// NewWithMode returns nil if the requested mode is not supported.
|
||||
NewWithMode(mode string) (Bundle, error)
|
||||
}
|
||||
|
||||
// TLSInfo contains the auth information for a TLS authenticated connection.
|
||||
// It implements the AuthInfo interface.
|
||||
type TLSInfo struct {
|
||||
|
||||
14
vendor/google.golang.org/grpc/dialoptions.go
generated
vendored
14
vendor/google.golang.org/grpc/dialoptions.go
generated
vendored
@@ -286,7 +286,8 @@ func WithInsecure() DialOption {
|
||||
}
|
||||
|
||||
// WithTransportCredentials returns a DialOption which configures a connection
|
||||
// level security credentials (e.g., TLS/SSL).
|
||||
// level security credentials (e.g., TLS/SSL). This should not be used together
|
||||
// with WithCredentialsBundle.
|
||||
func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
|
||||
return newFuncDialOption(func(o *dialOptions) {
|
||||
o.copts.TransportCredentials = creds
|
||||
@@ -301,6 +302,17 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
|
||||
})
|
||||
}
|
||||
|
||||
// WithCredentialsBundle returns a DialOption to set a credentials bundle for
|
||||
// the ClientConn.WithCreds. This should not be used together with
|
||||
// WithTransportCredentials.
|
||||
//
|
||||
// This API is experimental.
|
||||
func WithCredentialsBundle(b credentials.Bundle) DialOption {
|
||||
return newFuncDialOption(func(o *dialOptions) {
|
||||
o.copts.CredsBundle = b
|
||||
})
|
||||
}
|
||||
|
||||
// WithTimeout returns a DialOption that configures a timeout for dialing a
|
||||
// ClientConn initially. This is valid if and only if WithBlock() is present.
|
||||
//
|
||||
|
||||
2
vendor/google.golang.org/grpc/install_gae.sh
generated
vendored
2
vendor/google.golang.org/grpc/install_gae.sh
generated
vendored
@@ -1,6 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
TMP=$(mktemp -d /tmp/sdk.XXX) \
|
||||
&& curl -o $TMP.zip "https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.64.zip" \
|
||||
&& curl -o $TMP.zip "https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.68.zip" \
|
||||
&& unzip -q $TMP.zip -d $TMP \
|
||||
&& export PATH="$PATH:$TMP/go_appengine"
|
||||
|
||||
103
vendor/google.golang.org/grpc/internal/channelz/funcs.go
generated
vendored
103
vendor/google.golang.org/grpc/internal/channelz/funcs.go
generated
vendored
@@ -27,16 +27,22 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxTraceEntry int32 = 30
|
||||
)
|
||||
|
||||
var (
|
||||
db dbWrapper
|
||||
idGen idGenerator
|
||||
// EntryPerPage defines the number of channelz entries to be shown on a web page.
|
||||
EntryPerPage = 50
|
||||
curState int32
|
||||
EntryPerPage = 50
|
||||
curState int32
|
||||
maxTraceEntry = defaultMaxTraceEntry
|
||||
)
|
||||
|
||||
// TurnOn turns on channelz data collection.
|
||||
@@ -52,6 +58,22 @@ func IsOn() bool {
|
||||
return atomic.CompareAndSwapInt32(&curState, 1, 1)
|
||||
}
|
||||
|
||||
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
|
||||
// Setting it to 0 will disable channel tracing.
|
||||
func SetMaxTraceEntry(i int32) {
|
||||
atomic.StoreInt32(&maxTraceEntry, i)
|
||||
}
|
||||
|
||||
// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
|
||||
func ResetMaxTraceEntryToDefault() {
|
||||
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
|
||||
}
|
||||
|
||||
func getMaxTraceEntry() int {
|
||||
i := atomic.LoadInt32(&maxTraceEntry)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
// dbWarpper wraps around a reference to internal channelz data storage, and
|
||||
// provide synchronized functionality to set and get the reference.
|
||||
type dbWrapper struct {
|
||||
@@ -146,6 +168,7 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
|
||||
nestedChans: make(map[int64]string),
|
||||
id: id,
|
||||
pid: pid,
|
||||
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
|
||||
}
|
||||
if pid == 0 {
|
||||
db.get().addChannel(id, cn, true, pid, ref)
|
||||
@@ -170,6 +193,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
|
||||
sockets: make(map[int64]string),
|
||||
id: id,
|
||||
pid: pid,
|
||||
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
|
||||
}
|
||||
db.get().addSubChannel(id, sc, pid, ref)
|
||||
return id
|
||||
@@ -226,6 +250,24 @@ func RemoveEntry(id int64) {
|
||||
db.get().removeEntry(id)
|
||||
}
|
||||
|
||||
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
|
||||
// to the channel trace.
|
||||
// The Parent field is optional. It is used for event that will be recorded in the entity's parent
|
||||
// trace also.
|
||||
type TraceEventDesc struct {
|
||||
Desc string
|
||||
Severity Severity
|
||||
Parent *TraceEventDesc
|
||||
}
|
||||
|
||||
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
|
||||
func AddTraceEvent(id int64, desc *TraceEventDesc) {
|
||||
if getMaxTraceEntry() == 0 {
|
||||
return
|
||||
}
|
||||
db.get().traceEvent(id, desc)
|
||||
}
|
||||
|
||||
// channelMap is the storage data structure for channelz.
|
||||
// Methods of channelMap can be divided in two two categories with respect to locking.
|
||||
// 1. Methods acquire the global lock.
|
||||
@@ -251,6 +293,7 @@ func (c *channelMap) addServer(id int64, s *server) {
|
||||
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
|
||||
c.mu.Lock()
|
||||
cn.cm = c
|
||||
cn.trace.cm = c
|
||||
c.channels[id] = cn
|
||||
if isTopChannel {
|
||||
c.topLevelChannels[id] = struct{}{}
|
||||
@@ -263,6 +306,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in
|
||||
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
|
||||
c.mu.Lock()
|
||||
sc.cm = c
|
||||
sc.trace.cm = c
|
||||
c.subChannels[id] = sc
|
||||
c.findEntry(pid).addChild(id, sc)
|
||||
c.mu.Unlock()
|
||||
@@ -284,16 +328,25 @@ func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// removeEntry triggers the removal of an entry, which may not indeed delete the
|
||||
// entry, if it has to wait on the deletion of its children, or may lead to a chain
|
||||
// of entry deletion. For example, deleting the last socket of a gracefully shutting
|
||||
// down server will lead to the server being also deleted.
|
||||
// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
|
||||
// wait on the deletion of its children and until no other entity's channel trace references it.
|
||||
// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
|
||||
// shutting down server will lead to the server being also deleted.
|
||||
func (c *channelMap) removeEntry(id int64) {
|
||||
c.mu.Lock()
|
||||
c.findEntry(id).triggerDelete()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// c.mu must be held by the caller
|
||||
func (c *channelMap) decrTraceRefCount(id int64) {
|
||||
e := c.findEntry(id)
|
||||
if v, ok := e.(tracedChannel); ok {
|
||||
v.decrTraceRefCount()
|
||||
e.deleteSelfIfReady()
|
||||
}
|
||||
}
|
||||
|
||||
// c.mu must be held by the caller.
|
||||
func (c *channelMap) findEntry(id int64) entry {
|
||||
var v entry
|
||||
@@ -347,6 +400,39 @@ func (c *channelMap) deleteEntry(id int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
|
||||
c.mu.Lock()
|
||||
child := c.findEntry(id)
|
||||
childTC, ok := child.(tracedChannel)
|
||||
if !ok {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
|
||||
if desc.Parent != nil {
|
||||
parent := c.findEntry(child.getParentID())
|
||||
var chanType RefChannelType
|
||||
switch child.(type) {
|
||||
case *channel:
|
||||
chanType = RefChannel
|
||||
case *subChannel:
|
||||
chanType = RefSubChannel
|
||||
}
|
||||
if parentTC, ok := parent.(tracedChannel); ok {
|
||||
parentTC.getChannelTrace().append(&TraceEvent{
|
||||
Desc: desc.Parent.Desc,
|
||||
Severity: desc.Parent.Severity,
|
||||
Timestamp: time.Now(),
|
||||
RefID: id,
|
||||
RefName: childTC.getRefName(),
|
||||
RefType: chanType,
|
||||
})
|
||||
childTC.incrTraceRefCount()
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
type int64Slice []int64
|
||||
|
||||
func (s int64Slice) Len() int { return len(s) }
|
||||
@@ -408,6 +494,7 @@ func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
|
||||
t[i].ChannelData = cn.c.ChannelzMetric()
|
||||
t[i].ID = cn.id
|
||||
t[i].RefName = cn.refName
|
||||
t[i].Trace = cn.trace.dumpData()
|
||||
}
|
||||
return t, end
|
||||
}
|
||||
@@ -470,7 +557,7 @@ func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric,
|
||||
for k := range svrskts {
|
||||
ids = append(ids, k)
|
||||
}
|
||||
sort.Sort((int64Slice(ids)))
|
||||
sort.Sort(int64Slice(ids))
|
||||
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
|
||||
count := 0
|
||||
var end bool
|
||||
@@ -518,6 +605,7 @@ func (c *channelMap) GetChannel(id int64) *ChannelMetric {
|
||||
cm.ChannelData = cn.c.ChannelzMetric()
|
||||
cm.ID = cn.id
|
||||
cm.RefName = cn.refName
|
||||
cm.Trace = cn.trace.dumpData()
|
||||
return cm
|
||||
}
|
||||
|
||||
@@ -536,6 +624,7 @@ func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
|
||||
cm.ChannelData = sc.c.ChannelzMetric()
|
||||
cm.ID = sc.id
|
||||
cm.RefName = sc.refName
|
||||
cm.Trace = sc.trace.dumpData()
|
||||
return cm
|
||||
}
|
||||
|
||||
|
||||
311
vendor/google.golang.org/grpc/internal/channelz/types.go
generated
vendored
311
vendor/google.golang.org/grpc/internal/channelz/types.go
generated
vendored
@@ -20,6 +20,8 @@ package channelz
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/connectivity"
|
||||
@@ -40,6 +42,8 @@ type entry interface {
|
||||
// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child
|
||||
// list is now empty. If both conditions are met, then delete self from database.
|
||||
deleteSelfIfReady()
|
||||
// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
|
||||
getParentID() int64
|
||||
}
|
||||
|
||||
// dummyEntry is a fake entry to handle entry not found case.
|
||||
@@ -73,6 +77,10 @@ func (*dummyEntry) deleteSelfIfReady() {
|
||||
// code should not reach here. deleteSelfIfReady is always called on an existing entry.
|
||||
}
|
||||
|
||||
func (*dummyEntry) getParentID() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// ChannelMetric defines the info channelz provides for a specific Channel, which
|
||||
// includes ChannelInternalMetric and channelz-specific data, such as channelz id,
|
||||
// child list, etc.
|
||||
@@ -95,6 +103,8 @@ type ChannelMetric struct {
|
||||
// Note current grpc implementation doesn't allow channel having sockets directly,
|
||||
// therefore, this is field is unused.
|
||||
Sockets map[int64]string
|
||||
// Trace contains the most recent traced events.
|
||||
Trace *ChannelTrace
|
||||
}
|
||||
|
||||
// SubChannelMetric defines the info channelz provides for a specific SubChannel,
|
||||
@@ -121,6 +131,8 @@ type SubChannelMetric struct {
|
||||
// Sockets tracks the socket type children of this subchannel in the format of a map
|
||||
// from socket channelz id to corresponding reference string.
|
||||
Sockets map[int64]string
|
||||
// Trace contains the most recent traced events.
|
||||
Trace *ChannelTrace
|
||||
}
|
||||
|
||||
// ChannelInternalMetric defines the struct that the implementor of Channel interface
|
||||
@@ -138,7 +150,35 @@ type ChannelInternalMetric struct {
|
||||
CallsFailed int64
|
||||
// The last time a call was started on the channel.
|
||||
LastCallStartedTimestamp time.Time
|
||||
//TODO: trace
|
||||
}
|
||||
|
||||
// ChannelTrace stores traced events on a channel/subchannel and related info.
|
||||
type ChannelTrace struct {
|
||||
// EventNum is the number of events that ever got traced (i.e. including those that have been deleted)
|
||||
EventNum int64
|
||||
// CreationTime is the creation time of the trace.
|
||||
CreationTime time.Time
|
||||
// Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the
|
||||
// oldest one)
|
||||
Events []*TraceEvent
|
||||
}
|
||||
|
||||
// TraceEvent represent a single trace event
|
||||
type TraceEvent struct {
|
||||
// Desc is a simple description of the trace event.
|
||||
Desc string
|
||||
// Severity states the severity of this trace event.
|
||||
Severity Severity
|
||||
// Timestamp is the event time.
|
||||
Timestamp time.Time
|
||||
// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
|
||||
// involved in this event.
|
||||
// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
|
||||
RefID int64
|
||||
// RefName is the reference name for the entity that gets referenced in the event.
|
||||
RefName string
|
||||
// RefType indicates the referenced entity type, i.e Channel or SubChannel.
|
||||
RefType RefChannelType
|
||||
}
|
||||
|
||||
// Channel is the interface that should be satisfied in order to be tracked by
|
||||
@@ -147,6 +187,12 @@ type Channel interface {
|
||||
ChannelzMetric() *ChannelInternalMetric
|
||||
}
|
||||
|
||||
type dummyChannel struct{}
|
||||
|
||||
func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric {
|
||||
return &ChannelInternalMetric{}
|
||||
}
|
||||
|
||||
type channel struct {
|
||||
refName string
|
||||
c Channel
|
||||
@@ -156,6 +202,10 @@ type channel struct {
|
||||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
trace *channelTrace
|
||||
// traceRefCount is the number of trace events that reference this channel.
|
||||
// Non-zero traceRefCount means the trace of this channel cannot be deleted.
|
||||
traceRefCount int32
|
||||
}
|
||||
|
||||
func (c *channel) addChild(id int64, e entry) {
|
||||
@@ -180,25 +230,96 @@ func (c *channel) triggerDelete() {
|
||||
c.deleteSelfIfReady()
|
||||
}
|
||||
|
||||
func (c *channel) deleteSelfIfReady() {
|
||||
func (c *channel) getParentID() int64 {
|
||||
return c.pid
|
||||
}
|
||||
|
||||
// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
|
||||
// deleting the channel reference from its parent's child list.
|
||||
//
|
||||
// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
|
||||
// corresponding grpc object has been invoked, and the channel does not have any children left.
|
||||
//
|
||||
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
|
||||
func (c *channel) deleteSelfFromTree() (deleted bool) {
|
||||
if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
|
||||
return
|
||||
return false
|
||||
}
|
||||
c.cm.deleteEntry(c.id)
|
||||
// not top channel
|
||||
if c.pid != 0 {
|
||||
c.cm.findEntry(c.pid).deleteChild(c.id)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
|
||||
// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
|
||||
// channel, and its memory will be garbage collected.
|
||||
//
|
||||
// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
|
||||
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
|
||||
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
|
||||
// by grpc, the reference to the grpc object is reset to a dummy object.
|
||||
//
|
||||
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
|
||||
//
|
||||
// It returns a bool to indicate whether the channel can be safely deleted from map.
|
||||
func (c *channel) deleteSelfFromMap() (delete bool) {
|
||||
if c.getTraceRefCount() != 0 {
|
||||
c.c = &dummyChannel{}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
|
||||
// parent's child list.
|
||||
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
|
||||
// will return entry not found error.
|
||||
func (c *channel) deleteSelfIfReady() {
|
||||
if !c.deleteSelfFromTree() {
|
||||
return
|
||||
}
|
||||
if !c.deleteSelfFromMap() {
|
||||
return
|
||||
}
|
||||
c.cm.deleteEntry(c.id)
|
||||
c.trace.clear()
|
||||
}
|
||||
|
||||
func (c *channel) getChannelTrace() *channelTrace {
|
||||
return c.trace
|
||||
}
|
||||
|
||||
func (c *channel) incrTraceRefCount() {
|
||||
atomic.AddInt32(&c.traceRefCount, 1)
|
||||
}
|
||||
|
||||
func (c *channel) decrTraceRefCount() {
|
||||
atomic.AddInt32(&c.traceRefCount, -1)
|
||||
}
|
||||
|
||||
func (c *channel) getTraceRefCount() int {
|
||||
i := atomic.LoadInt32(&c.traceRefCount)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
func (c *channel) getRefName() string {
|
||||
return c.refName
|
||||
}
|
||||
|
||||
type subChannel struct {
|
||||
refName string
|
||||
c Channel
|
||||
closeCalled bool
|
||||
sockets map[int64]string
|
||||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
refName string
|
||||
c Channel
|
||||
closeCalled bool
|
||||
sockets map[int64]string
|
||||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
trace *channelTrace
|
||||
traceRefCount int32
|
||||
}
|
||||
|
||||
func (sc *subChannel) addChild(id int64, e entry) {
|
||||
@@ -219,12 +340,82 @@ func (sc *subChannel) triggerDelete() {
|
||||
sc.deleteSelfIfReady()
|
||||
}
|
||||
|
||||
func (sc *subChannel) deleteSelfIfReady() {
|
||||
func (sc *subChannel) getParentID() int64 {
|
||||
return sc.pid
|
||||
}
|
||||
|
||||
// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
|
||||
// means deleting the subchannel reference from its parent's child list.
|
||||
//
|
||||
// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
|
||||
// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
|
||||
//
|
||||
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
|
||||
func (sc *subChannel) deleteSelfFromTree() (deleted bool) {
|
||||
if !sc.closeCalled || len(sc.sockets) != 0 {
|
||||
return false
|
||||
}
|
||||
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
|
||||
// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
|
||||
// the subchannel, and its memory will be garbage collected.
|
||||
//
|
||||
// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
|
||||
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
|
||||
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
|
||||
// by grpc, the reference to the grpc object is reset to a dummy object.
|
||||
//
|
||||
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
|
||||
//
|
||||
// It returns a bool to indicate whether the channel can be safely deleted from map.
|
||||
func (sc *subChannel) deleteSelfFromMap() (delete bool) {
|
||||
if sc.getTraceRefCount() != 0 {
|
||||
// free the grpc struct (i.e. addrConn)
|
||||
sc.c = &dummyChannel{}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
|
||||
// its parent's child list.
|
||||
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
|
||||
// by id will return entry not found error.
|
||||
func (sc *subChannel) deleteSelfIfReady() {
|
||||
if !sc.deleteSelfFromTree() {
|
||||
return
|
||||
}
|
||||
if !sc.deleteSelfFromMap() {
|
||||
return
|
||||
}
|
||||
sc.cm.deleteEntry(sc.id)
|
||||
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
|
||||
sc.trace.clear()
|
||||
}
|
||||
|
||||
func (sc *subChannel) getChannelTrace() *channelTrace {
|
||||
return sc.trace
|
||||
}
|
||||
|
||||
func (sc *subChannel) incrTraceRefCount() {
|
||||
atomic.AddInt32(&sc.traceRefCount, 1)
|
||||
}
|
||||
|
||||
func (sc *subChannel) decrTraceRefCount() {
|
||||
atomic.AddInt32(&sc.traceRefCount, -1)
|
||||
}
|
||||
|
||||
func (sc *subChannel) getTraceRefCount() int {
|
||||
i := atomic.LoadInt32(&sc.traceRefCount)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
func (sc *subChannel) getRefName() string {
|
||||
return sc.refName
|
||||
}
|
||||
|
||||
// SocketMetric defines the info channelz provides for a specific Socket, which
|
||||
@@ -318,6 +509,10 @@ func (ls *listenSocket) deleteSelfIfReady() {
|
||||
grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket")
|
||||
}
|
||||
|
||||
func (ls *listenSocket) getParentID() int64 {
|
||||
return ls.pid
|
||||
}
|
||||
|
||||
type normalSocket struct {
|
||||
refName string
|
||||
s Socket
|
||||
@@ -343,6 +538,10 @@ func (ns *normalSocket) deleteSelfIfReady() {
|
||||
grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket")
|
||||
}
|
||||
|
||||
func (ns *normalSocket) getParentID() int64 {
|
||||
return ns.pid
|
||||
}
|
||||
|
||||
// ServerMetric defines the info channelz provides for a specific Server, which
|
||||
// includes ServerInternalMetric and channelz-specific data, such as channelz id,
|
||||
// child list, etc.
|
||||
@@ -370,7 +569,6 @@ type ServerInternalMetric struct {
|
||||
CallsFailed int64
|
||||
// The last time a call was started on the server.
|
||||
LastCallStartedTimestamp time.Time
|
||||
//TODO: trace
|
||||
}
|
||||
|
||||
// Server is the interface to be satisfied in order to be tracked by channelz as
|
||||
@@ -417,3 +615,88 @@ func (s *server) deleteSelfIfReady() {
|
||||
}
|
||||
s.cm.deleteEntry(s.id)
|
||||
}
|
||||
|
||||
func (s *server) getParentID() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
type tracedChannel interface {
|
||||
getChannelTrace() *channelTrace
|
||||
incrTraceRefCount()
|
||||
decrTraceRefCount()
|
||||
getRefName() string
|
||||
}
|
||||
|
||||
type channelTrace struct {
|
||||
cm *channelMap
|
||||
createdTime time.Time
|
||||
eventCount int64
|
||||
mu sync.Mutex
|
||||
events []*TraceEvent
|
||||
}
|
||||
|
||||
func (c *channelTrace) append(e *TraceEvent) {
|
||||
c.mu.Lock()
|
||||
if len(c.events) == getMaxTraceEntry() {
|
||||
del := c.events[0]
|
||||
c.events = c.events[1:]
|
||||
if del.RefID != 0 {
|
||||
// start recursive cleanup in a goroutine to not block the call originated from grpc.
|
||||
go func() {
|
||||
// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
|
||||
c.cm.mu.Lock()
|
||||
c.cm.decrTraceRefCount(del.RefID)
|
||||
c.cm.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
}
|
||||
e.Timestamp = time.Now()
|
||||
c.events = append(c.events, e)
|
||||
c.eventCount++
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *channelTrace) clear() {
|
||||
c.mu.Lock()
|
||||
for _, e := range c.events {
|
||||
if e.RefID != 0 {
|
||||
// caller should have already held the c.cm.mu lock.
|
||||
c.cm.decrTraceRefCount(e.RefID)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Severity is the severity level of a trace event.
|
||||
// The canonical enumeration of all valid values is here:
|
||||
// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
|
||||
type Severity int
|
||||
|
||||
const (
|
||||
// CtUNKNOWN indicates unknown severity of a trace event.
|
||||
CtUNKNOWN Severity = iota
|
||||
// CtINFO indicates info level severity of a trace event.
|
||||
CtINFO
|
||||
// CtWarning indicates warning level severity of a trace event.
|
||||
CtWarning
|
||||
// CtError indicates error level severity of a trace event.
|
||||
CtError
|
||||
)
|
||||
|
||||
// RefChannelType is the type of the entity being referenced in a trace event.
|
||||
type RefChannelType int
|
||||
|
||||
const (
|
||||
// RefChannel indicates the referenced entity is a Channel.
|
||||
RefChannel RefChannelType = iota
|
||||
// RefSubChannel indicates the referenced entity is a SubChannel.
|
||||
RefSubChannel
|
||||
)
|
||||
|
||||
func (c *channelTrace) dumpData() *ChannelTrace {
|
||||
c.mu.Lock()
|
||||
ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}
|
||||
ct.Events = c.events[:len(c.events)]
|
||||
c.mu.Unlock()
|
||||
return ct
|
||||
}
|
||||
|
||||
16
vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
generated
vendored
16
vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
generated
vendored
@@ -20,11 +20,13 @@
|
||||
|
||||
package channelz
|
||||
|
||||
import "google.golang.org/grpc/grpclog"
|
||||
import (
|
||||
"sync"
|
||||
|
||||
func init() {
|
||||
grpclog.Infof("Channelz: socket options are not supported on non-linux os and appengine.")
|
||||
}
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
|
||||
// SocketOptionData defines the struct to hold socket option data, and related
|
||||
// getter function to obtain info from fd.
|
||||
@@ -35,4 +37,8 @@ type SocketOptionData struct {
|
||||
// Getsockopt defines the function to get socket options requested by channelz.
|
||||
// It is to be passed to syscall.RawConn.Control().
|
||||
// Windows OS doesn't support Socket Option
|
||||
func (s *SocketOptionData) Getsockopt(fd uintptr) {}
|
||||
func (s *SocketOptionData) Getsockopt(fd uintptr) {
|
||||
once.Do(func() {
|
||||
grpclog.Warningln("Channelz: socket options are not supported on non-linux os and appengine.")
|
||||
})
|
||||
}
|
||||
|
||||
15
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
15
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
@@ -20,9 +20,24 @@
|
||||
// symbols to avoid circular dependencies.
|
||||
package internal
|
||||
|
||||
import "golang.org/x/net/context"
|
||||
|
||||
var (
|
||||
// WithContextDialer is exported by clientconn.go
|
||||
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
|
||||
// WithResolverBuilder is exported by clientconn.go
|
||||
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
|
||||
// HealthCheckFunc is used to provide client-side LB channel health checking
|
||||
HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
|
||||
)
|
||||
|
||||
const (
|
||||
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
|
||||
CredsBundleModeFallback = "fallback"
|
||||
// CredsBundleModeBalancer switches GoogleDefaultCreds to grpclb balancer
|
||||
// mode.
|
||||
CredsBundleModeBalancer = "balancer"
|
||||
// CredsBundleModeBackendFromBalancer switches GoogleDefaultCreds to mode
|
||||
// that supports backend returned by grpclb balancer.
|
||||
CredsBundleModeBackendFromBalancer = "backend-from-balancer"
|
||||
)
|
||||
|
||||
57
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
57
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@@ -73,7 +73,7 @@ type http2Client struct {
|
||||
|
||||
isSecure bool
|
||||
|
||||
creds []credentials.PerRPCCredentials
|
||||
perRPCCreds []credentials.PerRPCCredentials
|
||||
|
||||
// Boolean to keep track of reading activity on transport.
|
||||
// 1 is true and 0 is false.
|
||||
@@ -112,6 +112,9 @@ type http2Client struct {
|
||||
// Fields below are for channelz metric collection.
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
|
||||
onGoAway func(GoAwayReason)
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
|
||||
@@ -140,7 +143,7 @@ func isTemporary(err error) bool {
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) {
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
|
||||
scheme := "http"
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
@@ -166,9 +169,20 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
isSecure bool
|
||||
authInfo credentials.AuthInfo
|
||||
)
|
||||
if creds := opts.TransportCredentials; creds != nil {
|
||||
transportCreds := opts.TransportCredentials
|
||||
perRPCCreds := opts.PerRPCCredentials
|
||||
|
||||
if b := opts.CredsBundle; b != nil {
|
||||
if t := b.TransportCredentials(); t != nil {
|
||||
transportCreds = t
|
||||
}
|
||||
if t := b.PerRPCCredentials(); t != nil {
|
||||
perRPCCreds = append(perRPCCreds, t)
|
||||
}
|
||||
}
|
||||
if transportCreds != nil {
|
||||
scheme = "https"
|
||||
conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
|
||||
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
|
||||
if err != nil {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
|
||||
}
|
||||
@@ -213,7 +227,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
scheme: scheme,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
isSecure: isSecure,
|
||||
creds: opts.PerRPCCredentials,
|
||||
perRPCCreds: perRPCCreds,
|
||||
kp: kp,
|
||||
statsHandler: opts.StatsHandler,
|
||||
initialWindowSize: initialWindowSize,
|
||||
@@ -223,6 +237,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
streamQuota: defaultMaxStreamsClient,
|
||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||
czData: new(channelzData),
|
||||
onGoAway: onGoAway,
|
||||
onClose: onClose,
|
||||
}
|
||||
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||
if opts.InitialWindowSize >= defaultWindowSize {
|
||||
@@ -259,6 +275,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
// a dedicated goroutine which reads HTTP2 frame from network. Then it
|
||||
// dispatches the frame to the corresponding stream entity.
|
||||
go t.reader()
|
||||
|
||||
// Send connection preface to server.
|
||||
n, err := t.conn.Write(clientPreface)
|
||||
if err != nil {
|
||||
@@ -295,6 +312,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.framer.writer.Flush()
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
@@ -443,7 +461,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
|
||||
func (t *http2Client) createAudience(callHdr *CallHdr) string {
|
||||
// Create an audience string only if needed.
|
||||
if len(t.creds) == 0 && callHdr.Creds == nil {
|
||||
if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
|
||||
return ""
|
||||
}
|
||||
// Construct URI required to get auth request metadata.
|
||||
@@ -458,7 +476,7 @@ func (t *http2Client) createAudience(callHdr *CallHdr) string {
|
||||
|
||||
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
|
||||
authData := map[string]string{}
|
||||
for _, c := range t.creds {
|
||||
for _, c := range t.perRPCCreds {
|
||||
data, err := c.GetRequestMetadata(ctx, audience)
|
||||
if err != nil {
|
||||
if _, ok := status.FromError(err); ok {
|
||||
@@ -664,7 +682,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||
// Set stream status to done.
|
||||
if s.swapState(streamDone) == streamDone {
|
||||
// If it was already done, return.
|
||||
// If it was already done, return. If multiple closeStream calls
|
||||
// happen simultaneously, wait for the first to finish.
|
||||
<-s.done
|
||||
return
|
||||
}
|
||||
// status and trailers can be updated here without any synchronization because the stream goroutine will
|
||||
@@ -678,8 +698,6 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
// This will unblock reads eventually.
|
||||
s.write(recvMsg{err: err})
|
||||
}
|
||||
// This will unblock write.
|
||||
close(s.done)
|
||||
// If headerChan isn't closed, then close it.
|
||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||
s.noHeaders = true
|
||||
@@ -715,11 +733,17 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||
return true
|
||||
}
|
||||
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
|
||||
// This will unblock write.
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// Close kicks off the shutdown process of the transport. This should be called
|
||||
// only once on a transport. Once it is called, the transport should not be
|
||||
// accessed any more.
|
||||
//
|
||||
// This method blocks until the addrConn that initiated this transport is
|
||||
// re-connected. This happens because t.onClose() begins reconnect logic at the
|
||||
// addrConn level and blocks until the addrConn is successfully connected.
|
||||
func (t *http2Client) Close() error {
|
||||
t.mu.Lock()
|
||||
// Make sure we only Close once.
|
||||
@@ -747,6 +771,7 @@ func (t *http2Client) Close() error {
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
go t.onClose()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1043,6 +1068,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
close(t.goAway)
|
||||
t.state = draining
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
|
||||
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
|
||||
t.onGoAway(t.goAwayReason)
|
||||
}
|
||||
// All streams with IDs greater than the GoAwayId
|
||||
// and smaller than the previous GoAway ID should be killed.
|
||||
@@ -1145,7 +1173,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
if !endStream {
|
||||
return
|
||||
}
|
||||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true)
|
||||
// if client received END_STREAM from server while stream was still active, send RST_STREAM
|
||||
rst := s.getState() == streamActive
|
||||
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
|
||||
}
|
||||
|
||||
// reader runs as a separate goroutine in charge of reading data from network
|
||||
@@ -1159,15 +1189,16 @@ func (t *http2Client) reader() {
|
||||
// Check the validity of server preface.
|
||||
frame, err := t.framer.fr.ReadFrame()
|
||||
if err != nil {
|
||||
t.Close()
|
||||
t.Close() // this kicks off resetTransport, so must be last before return
|
||||
return
|
||||
}
|
||||
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
|
||||
if t.keepaliveEnabled {
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
}
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
t.Close()
|
||||
t.Close() // this kicks off resetTransport, so must be last before return
|
||||
return
|
||||
}
|
||||
t.onSuccess()
|
||||
|
||||
10
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
10
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@@ -24,6 +24,7 @@ import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@@ -435,6 +436,10 @@ func decodeTimeout(s string) (time.Duration, error) {
|
||||
if size < 2 {
|
||||
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
|
||||
}
|
||||
if size > 9 {
|
||||
// Spec allows for 8 digits plus the unit.
|
||||
return 0, fmt.Errorf("transport: timeout string is too long: %q", s)
|
||||
}
|
||||
unit := timeoutUnit(s[size-1])
|
||||
d, ok := timeoutUnitToDuration(unit)
|
||||
if !ok {
|
||||
@@ -444,6 +449,11 @@ func decodeTimeout(s string) (time.Duration, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
const maxHours = math.MaxInt64 / int64(time.Hour)
|
||||
if d == time.Hour && t > maxHours {
|
||||
// This timeout would overflow math.MaxInt64; clamp it.
|
||||
return time.Duration(math.MaxInt64), nil
|
||||
}
|
||||
return d * time.Duration(t), nil
|
||||
}
|
||||
|
||||
|
||||
10
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
10
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@@ -465,8 +465,12 @@ type ConnectOptions struct {
|
||||
FailOnNonTempDialError bool
|
||||
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
|
||||
PerRPCCredentials []credentials.PerRPCCredentials
|
||||
// TransportCredentials stores the Authenticator required to setup a client connection.
|
||||
// TransportCredentials stores the Authenticator required to setup a client
|
||||
// connection. Only one of TransportCredentials and CredsBundle is non-nil.
|
||||
TransportCredentials credentials.TransportCredentials
|
||||
// CredsBundle is the credentials bundle to be used. Only one of
|
||||
// TransportCredentials and CredsBundle is non-nil.
|
||||
CredsBundle credentials.Bundle
|
||||
// KeepaliveParams stores the keepalive parameters.
|
||||
KeepaliveParams keepalive.ClientParameters
|
||||
// StatsHandler stores the handler for stats.
|
||||
@@ -494,8 +498,8 @@ type TargetInfo struct {
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
|
||||
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
|
||||
62
vendor/google.golang.org/grpc/keepalive/keepalive.go
generated
vendored
62
vendor/google.golang.org/grpc/keepalive/keepalive.go
generated
vendored
@@ -16,7 +16,8 @@
|
||||
*
|
||||
*/
|
||||
|
||||
// Package keepalive defines configurable parameters for point-to-point healthcheck.
|
||||
// Package keepalive defines configurable parameters for point-to-point
|
||||
// healthcheck.
|
||||
package keepalive
|
||||
|
||||
import (
|
||||
@@ -24,42 +25,59 @@ import (
|
||||
)
|
||||
|
||||
// ClientParameters is used to set keepalive parameters on the client-side.
|
||||
// These configure how the client will actively probe to notice when a connection is broken
|
||||
// and send pings so intermediaries will be aware of the liveness of the connection.
|
||||
// Make sure these parameters are set in coordination with the keepalive policy on the server,
|
||||
// as incompatible settings can result in closing of connection.
|
||||
// These configure how the client will actively probe to notice when a
|
||||
// connection is broken and send pings so intermediaries will be aware of the
|
||||
// liveness of the connection. Make sure these parameters are set in
|
||||
// coordination with the keepalive policy on the server, as incompatible
|
||||
// settings can result in closing of connection.
|
||||
type ClientParameters struct {
|
||||
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
|
||||
// After a duration of this time if the client doesn't see any activity it
|
||||
// pings the server to see if the transport is still alive.
|
||||
Time time.Duration // The current default value is infinity.
|
||||
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
|
||||
// the connection is closed.
|
||||
// After having pinged for keepalive check, the client waits for a duration
|
||||
// of Timeout and if no activity is seen even after that the connection is
|
||||
// closed.
|
||||
Timeout time.Duration // The current default value is 20 seconds.
|
||||
// If true, client runs keepalive checks even with no active RPCs.
|
||||
// If true, client sends keepalive pings even with no active RPCs. If false,
|
||||
// when there are no active RPCs, Time and Timeout will be ignored and no
|
||||
// keepalive pings will be sent.
|
||||
PermitWithoutStream bool // false by default.
|
||||
}
|
||||
|
||||
// ServerParameters is used to set keepalive and max-age parameters on the server-side.
|
||||
// ServerParameters is used to set keepalive and max-age parameters on the
|
||||
// server-side.
|
||||
type ServerParameters struct {
|
||||
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
|
||||
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
|
||||
// MaxConnectionIdle is a duration for the amount of time after which an
|
||||
// idle connection would be closed by sending a GoAway. Idleness duration is
|
||||
// defined since the most recent time the number of outstanding RPCs became
|
||||
// zero or the connection establishment.
|
||||
MaxConnectionIdle time.Duration // The current default value is infinity.
|
||||
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
|
||||
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
|
||||
// MaxConnectionAge is a duration for the maximum amount of time a
|
||||
// connection may exist before it will be closed by sending a GoAway. A
|
||||
// random jitter of +/-10% will be added to MaxConnectionAge to spread out
|
||||
// connection storms.
|
||||
MaxConnectionAge time.Duration // The current default value is infinity.
|
||||
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
|
||||
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after
|
||||
// which the connection will be forcibly closed.
|
||||
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
|
||||
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
|
||||
// After a duration of this time if the server doesn't see any activity it
|
||||
// pings the client to see if the transport is still alive.
|
||||
Time time.Duration // The current default value is 2 hours.
|
||||
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
|
||||
// the connection is closed.
|
||||
// After having pinged for keepalive check, the server waits for a duration
|
||||
// of Timeout and if no activity is seen even after that the connection is
|
||||
// closed.
|
||||
Timeout time.Duration // The current default value is 20 seconds.
|
||||
}
|
||||
|
||||
// EnforcementPolicy is used to set keepalive enforcement policy on the server-side.
|
||||
// Server will close connection with a client that violates this policy.
|
||||
// EnforcementPolicy is used to set keepalive enforcement policy on the
|
||||
// server-side. Server will close connection with a client that violates this
|
||||
// policy.
|
||||
type EnforcementPolicy struct {
|
||||
// MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
|
||||
// MinTime is the minimum amount of time a client should wait before sending
|
||||
// a keepalive ping.
|
||||
MinTime time.Duration // The current default value is 5 minutes.
|
||||
// If true, server expects keepalive pings even when there are no active streams(RPCs).
|
||||
// If true, server allows keepalive pings even when there are no active
|
||||
// streams(RPCs). If false, and client sends ping when there are no active
|
||||
// streams, server will send GOAWAY and close the connection.
|
||||
PermitWithoutStream bool // false by default.
|
||||
}
|
||||
|
||||
1
vendor/google.golang.org/grpc/pickfirst.go
generated
vendored
1
vendor/google.golang.org/grpc/pickfirst.go
generated
vendored
@@ -56,6 +56,7 @@ func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err er
|
||||
if b.sc == nil {
|
||||
b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
|
||||
if err != nil {
|
||||
//TODO(yuxuanli): why not change the cc state to Idle?
|
||||
grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
43
vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
generated
vendored
43
vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
generated
vendored
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2017 gRPC authors.
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -73,10 +73,7 @@ type dnsBuilder struct {
|
||||
|
||||
// Build creates and starts a DNS resolver that watches the name resolution of the target.
|
||||
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
|
||||
if target.Authority != "" {
|
||||
return nil, fmt.Errorf("default DNS resolver does not support custom DNS server")
|
||||
}
|
||||
host, port, err := parseTarget(target.Endpoint)
|
||||
host, port, err := parseTarget(target.Endpoint, defaultPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -111,6 +108,15 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
|
||||
disableServiceConfig: opts.DisableServiceConfig,
|
||||
}
|
||||
|
||||
if target.Authority == "" {
|
||||
d.resolver = defaultResolver
|
||||
} else {
|
||||
d.resolver, err = customAuthorityResolver(target.Authority)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
d.wg.Add(1)
|
||||
go d.watcher()
|
||||
return d, nil
|
||||
@@ -121,6 +127,12 @@ func (b *dnsBuilder) Scheme() string {
|
||||
return "dns"
|
||||
}
|
||||
|
||||
type netResolver interface {
|
||||
LookupHost(ctx context.Context, host string) (addrs []string, err error)
|
||||
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
|
||||
LookupTXT(ctx context.Context, name string) (txts []string, err error)
|
||||
}
|
||||
|
||||
// ipResolver watches for the name resolution update for an IP address.
|
||||
type ipResolver struct {
|
||||
cc resolver.ClientConn
|
||||
@@ -161,6 +173,7 @@ type dnsResolver struct {
|
||||
retryCount int
|
||||
host string
|
||||
port string
|
||||
resolver netResolver
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cc resolver.ClientConn
|
||||
@@ -218,13 +231,13 @@ func (d *dnsResolver) watcher() {
|
||||
|
||||
func (d *dnsResolver) lookupSRV() []resolver.Address {
|
||||
var newAddrs []resolver.Address
|
||||
_, srvs, err := lookupSRV(d.ctx, "grpclb", "tcp", d.host)
|
||||
_, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
|
||||
if err != nil {
|
||||
grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
|
||||
return nil
|
||||
}
|
||||
for _, s := range srvs {
|
||||
lbAddrs, err := lookupHost(d.ctx, s.Target)
|
||||
lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
|
||||
if err != nil {
|
||||
grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
|
||||
continue
|
||||
@@ -243,7 +256,7 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
|
||||
}
|
||||
|
||||
func (d *dnsResolver) lookupTXT() string {
|
||||
ss, err := lookupTXT(d.ctx, d.host)
|
||||
ss, err := d.resolver.LookupTXT(d.ctx, d.host)
|
||||
if err != nil {
|
||||
grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
|
||||
return ""
|
||||
@@ -263,7 +276,7 @@ func (d *dnsResolver) lookupTXT() string {
|
||||
|
||||
func (d *dnsResolver) lookupHost() []resolver.Address {
|
||||
var newAddrs []resolver.Address
|
||||
addrs, err := lookupHost(d.ctx, d.host)
|
||||
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
|
||||
if err != nil {
|
||||
grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
|
||||
return nil
|
||||
@@ -305,16 +318,16 @@ func formatIP(addr string) (addrIP string, ok bool) {
|
||||
return "[" + addr + "]", true
|
||||
}
|
||||
|
||||
// parseTarget takes the user input target string, returns formatted host and port info.
|
||||
// parseTarget takes the user input target string and default port, returns formatted host and port info.
|
||||
// If target doesn't specify a port, set the port to be the defaultPort.
|
||||
// If target is in IPv6 format and host-name is enclosed in sqarue brackets, brackets
|
||||
// are strippd when setting the host.
|
||||
// examples:
|
||||
// target: "www.google.com" returns host: "www.google.com", port: "443"
|
||||
// target: "ipv4-host:80" returns host: "ipv4-host", port: "80"
|
||||
// target: "[ipv6-host]" returns host: "ipv6-host", port: "443"
|
||||
// target: ":80" returns host: "localhost", port: "80"
|
||||
func parseTarget(target string) (host, port string, err error) {
|
||||
// target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"
|
||||
// target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80"
|
||||
// target: "[ipv6-host]" defaultPort: "443" returns host: "ipv6-host", port: "443"
|
||||
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
|
||||
func parseTarget(target, defaultPort string) (host, port string, err error) {
|
||||
if target == "" {
|
||||
return "", "", errMissingAddr
|
||||
}
|
||||
|
||||
35
vendor/google.golang.org/grpc/resolver/dns/go17.go
generated
vendored
35
vendor/google.golang.org/grpc/resolver/dns/go17.go
generated
vendored
@@ -1,35 +0,0 @@
|
||||
// +build go1.6, !go1.8
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2017 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package dns
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
lookupHost = func(ctx context.Context, host string) ([]string, error) { return net.LookupHost(host) }
|
||||
lookupSRV = func(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) {
|
||||
return net.LookupSRV(service, proto, name)
|
||||
}
|
||||
lookupTXT = func(ctx context.Context, name string) ([]string, error) { return net.LookupTXT(name) }
|
||||
)
|
||||
29
vendor/google.golang.org/grpc/resolver/dns/go18.go
generated
vendored
29
vendor/google.golang.org/grpc/resolver/dns/go18.go
generated
vendored
@@ -1,29 +0,0 @@
|
||||
// +build go1.8
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2017 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package dns
|
||||
|
||||
import "net"
|
||||
|
||||
var (
|
||||
lookupHost = net.DefaultResolver.LookupHost
|
||||
lookupSRV = net.DefaultResolver.LookupSRV
|
||||
lookupTXT = net.DefaultResolver.LookupTXT
|
||||
)
|
||||
54
vendor/google.golang.org/grpc/resolver/dns/go19.go
generated
vendored
Normal file
54
vendor/google.golang.org/grpc/resolver/dns/go19.go
generated
vendored
Normal file
@@ -0,0 +1,54 @@
|
||||
// +build go1.9
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package dns
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultResolver netResolver = net.DefaultResolver
|
||||
)
|
||||
|
||||
const defaultDNSSvrPort = "53"
|
||||
|
||||
var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
return func(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
var dialer net.Dialer
|
||||
return dialer.DialContext(ctx, network, authority)
|
||||
}
|
||||
}
|
||||
|
||||
var customAuthorityResolver = func(authority string) (netResolver, error) {
|
||||
host, port, err := parseTarget(authority, defaultDNSSvrPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
authorityWithPort := net.JoinHostPort(host, port)
|
||||
|
||||
return &net.Resolver{
|
||||
PreferGo: true,
|
||||
Dial: customAuthorityDialler(authorityWithPort),
|
||||
}, nil
|
||||
}
|
||||
51
vendor/google.golang.org/grpc/resolver/dns/pre_go19.go
generated
vendored
Normal file
51
vendor/google.golang.org/grpc/resolver/dns/pre_go19.go
generated
vendored
Normal file
@@ -0,0 +1,51 @@
|
||||
// +build go1.6, !go1.9
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2018 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package dns
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultResolver netResolver = &preGo19Resolver{}
|
||||
)
|
||||
|
||||
type preGo19Resolver struct {
|
||||
}
|
||||
|
||||
func (*preGo19Resolver) LookupHost(ctx context.Context, host string) ([]string, error) {
|
||||
return net.LookupHost(host)
|
||||
}
|
||||
|
||||
func (*preGo19Resolver) LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) {
|
||||
return net.LookupSRV(service, proto, name)
|
||||
}
|
||||
|
||||
func (*preGo19Resolver) LookupTXT(ctx context.Context, name string) ([]string, error) {
|
||||
return net.LookupTXT(name)
|
||||
}
|
||||
|
||||
var customAuthorityResolver = func(authority string) (netResolver, error) {
|
||||
return nil, fmt.Errorf("Default DNS resolver does not support custom DNS server with go < 1.9")
|
||||
}
|
||||
41
vendor/google.golang.org/grpc/resolver_conn_wrapper.go
generated
vendored
41
vendor/google.golang.org/grpc/resolver_conn_wrapper.go
generated
vendored
@@ -23,17 +23,19 @@ import (
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
// ccResolverWrapper is a wrapper on top of cc for resolvers.
|
||||
// It implements resolver.ClientConnection interface.
|
||||
type ccResolverWrapper struct {
|
||||
cc *ClientConn
|
||||
resolver resolver.Resolver
|
||||
addrCh chan []resolver.Address
|
||||
scCh chan string
|
||||
done chan struct{}
|
||||
cc *ClientConn
|
||||
resolver resolver.Resolver
|
||||
addrCh chan []resolver.Address
|
||||
scCh chan string
|
||||
done chan struct{}
|
||||
lastAddressesCount int
|
||||
}
|
||||
|
||||
// split2 returns the values from strings.SplitN(s, sep, 2).
|
||||
@@ -114,6 +116,9 @@ func (ccr *ccResolverWrapper) watcher() {
|
||||
default:
|
||||
}
|
||||
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
|
||||
if channelz.IsOn() {
|
||||
ccr.addChannelzTraceEvent(addrs)
|
||||
}
|
||||
ccr.cc.handleResolvedAddrs(addrs, nil)
|
||||
case sc := <-ccr.scCh:
|
||||
select {
|
||||
@@ -156,3 +161,29 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
|
||||
}
|
||||
ccr.scCh <- sc
|
||||
}
|
||||
|
||||
func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
|
||||
if len(addrs) == 0 && ccr.lastAddressesCount != 0 {
|
||||
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Resolver returns an empty address list",
|
||||
Severity: channelz.CtWarning,
|
||||
})
|
||||
} else if len(addrs) != 0 && ccr.lastAddressesCount == 0 {
|
||||
var s string
|
||||
for i, a := range addrs {
|
||||
if a.ServerName != "" {
|
||||
s += a.Addr + "(" + a.ServerName + ")"
|
||||
} else {
|
||||
s += a.Addr
|
||||
}
|
||||
if i != len(addrs)-1 {
|
||||
s += " "
|
||||
}
|
||||
}
|
||||
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
ccr.lastAddressesCount = len(addrs)
|
||||
}
|
||||
|
||||
33
vendor/google.golang.org/grpc/rpc_util.go
generated
vendored
33
vendor/google.golang.org/grpc/rpc_util.go
generated
vendored
@@ -531,7 +531,10 @@ func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte,
|
||||
}
|
||||
cbuf := &bytes.Buffer{}
|
||||
if compressor != nil {
|
||||
z, _ := compressor.Compress(cbuf)
|
||||
z, err := compressor.Compress(cbuf)
|
||||
if err != nil {
|
||||
return nil, wrapErr(err)
|
||||
}
|
||||
if _, err := z.Write(in); err != nil {
|
||||
return nil, wrapErr(err)
|
||||
}
|
||||
@@ -595,20 +598,17 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool
|
||||
return nil
|
||||
}
|
||||
|
||||
// For the two compressor parameters, both should not be set, but if they are,
|
||||
// dc takes precedence over compressor.
|
||||
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
|
||||
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
|
||||
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) ([]byte, error) {
|
||||
pf, d, err := p.recvMsg(maxReceiveMessageSize)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(d)
|
||||
}
|
||||
|
||||
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
|
||||
return st.Err()
|
||||
return nil, st.Err()
|
||||
}
|
||||
|
||||
if pf == compressionMade {
|
||||
@@ -617,23 +617,34 @@ func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interf
|
||||
if dc != nil {
|
||||
d, err = dc.Do(bytes.NewReader(d))
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
}
|
||||
} else {
|
||||
dcReader, err := compressor.Decompress(bytes.NewReader(d))
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
}
|
||||
d, err = ioutil.ReadAll(dcReader)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(d) > maxReceiveMessageSize {
|
||||
// TODO: Revisit the error code. Currently keep it consistent with java
|
||||
// implementation.
|
||||
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
|
||||
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// For the two compressor parameters, both should not be set, but if they are,
|
||||
// dc takes precedence over compressor.
|
||||
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
|
||||
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
|
||||
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, inPayload, compressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.Unmarshal(d, m); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
|
||||
|
||||
111
vendor/google.golang.org/grpc/server.go
generated
vendored
111
vendor/google.golang.org/grpc/server.go
generated
vendored
@@ -19,7 +19,6 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -33,8 +32,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"io/ioutil"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
|
||||
@@ -901,76 +898,32 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
}
|
||||
|
||||
p := &parser{r: stream}
|
||||
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
|
||||
if err == io.EOF {
|
||||
// The entire stream is done (for unary RPC only).
|
||||
return err
|
||||
}
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
||||
var inPayload *stats.InPayload
|
||||
if sh != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
}
|
||||
}
|
||||
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, inPayload, decomp)
|
||||
if err != nil {
|
||||
if st, ok := status.FromError(err); ok {
|
||||
if e := t.WriteStatus(stream, st); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
} else {
|
||||
switch st := err.(type) {
|
||||
case transport.ConnectionError:
|
||||
// Nothing to do here.
|
||||
default:
|
||||
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
t.IncrMsgRecv()
|
||||
}
|
||||
if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
|
||||
if e := t.WriteStatus(stream, st); e != nil {
|
||||
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
return st.Err()
|
||||
}
|
||||
var inPayload *stats.InPayload
|
||||
if sh != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
}
|
||||
}
|
||||
df := func(v interface{}) error {
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(req)
|
||||
}
|
||||
if pf == compressionMade {
|
||||
var err error
|
||||
if dc != nil {
|
||||
req, err = dc.Do(bytes.NewReader(req))
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
} else {
|
||||
tmp, _ := decomp.Decompress(bytes.NewReader(req))
|
||||
req, err = ioutil.ReadAll(tmp)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(req) > s.opts.maxReceiveMessageSize {
|
||||
// TODO: Revisit the error code. Currently keep it consistent with
|
||||
// java implementation.
|
||||
return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
|
||||
}
|
||||
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
|
||||
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.Payload = v
|
||||
inPayload.Data = req
|
||||
inPayload.Length = len(req)
|
||||
inPayload.Data = d
|
||||
inPayload.Length = len(d)
|
||||
sh.HandleRPC(stream.Context(), inPayload)
|
||||
}
|
||||
if trInfo != nil {
|
||||
@@ -1180,47 +1133,27 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
}
|
||||
service := sm[:pos]
|
||||
method := sm[pos+1:]
|
||||
srv, ok := s.m[service]
|
||||
if !ok {
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
|
||||
if srv, ok := s.m[service]; ok {
|
||||
if md, ok := srv.md[method]; ok {
|
||||
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
||||
return
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
||||
trInfo.tr.SetError()
|
||||
if sd, ok := srv.sd[method]; ok {
|
||||
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
||||
return
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown service %v", service)
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.Finish()
|
||||
}
|
||||
return
|
||||
}
|
||||
// Unary RPC or Streaming RPC?
|
||||
if md, ok := srv.md[method]; ok {
|
||||
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
||||
return
|
||||
}
|
||||
if sd, ok := srv.sd[method]; ok {
|
||||
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
||||
return
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
// Unknown service, or known server unknown method.
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
return
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown method %v", method)
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown service %v", service)
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
|
||||
3
vendor/google.golang.org/grpc/service_config.go
generated
vendored
3
vendor/google.golang.org/grpc/service_config.go
generated
vendored
@@ -229,6 +229,9 @@ type jsonSC struct {
|
||||
}
|
||||
|
||||
func parseServiceConfig(js string) (ServiceConfig, error) {
|
||||
if len(js) == 0 {
|
||||
return ServiceConfig{}, fmt.Errorf("no JSON service config provided")
|
||||
}
|
||||
var rsc jsonSC
|
||||
err := json.Unmarshal([]byte(js), &rsc)
|
||||
if err != nil {
|
||||
|
||||
8
vendor/google.golang.org/grpc/status/status.go
generated
vendored
8
vendor/google.golang.org/grpc/status/status.go
generated
vendored
@@ -126,7 +126,9 @@ func FromError(err error) (s *Status, ok bool) {
|
||||
if err == nil {
|
||||
return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
|
||||
}
|
||||
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
|
||||
if se, ok := err.(interface {
|
||||
GRPCStatus() *Status
|
||||
}); ok {
|
||||
return se.GRPCStatus(), true
|
||||
}
|
||||
return New(codes.Unknown, err.Error()), false
|
||||
@@ -182,7 +184,9 @@ func Code(err error) codes.Code {
|
||||
if err == nil {
|
||||
return codes.OK
|
||||
}
|
||||
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
|
||||
if se, ok := err.(interface {
|
||||
GRPCStatus() *Status
|
||||
}); ok {
|
||||
return se.GRPCStatus().Code()
|
||||
}
|
||||
return codes.Unknown
|
||||
|
||||
12
vendor/google.golang.org/grpc/stream.go
generated
vendored
12
vendor/google.golang.org/grpc/stream.go
generated
vendored
@@ -660,7 +660,14 @@ func (cs *clientStream) CloseSend() error {
|
||||
return nil
|
||||
}
|
||||
cs.sentLast = true
|
||||
op := func(a *csAttempt) error { return a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) }
|
||||
op := func(a *csAttempt) error {
|
||||
a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
|
||||
// Always return nil; io.EOF is the only error that might make sense
|
||||
// instead, but there is no need to signal the client to call RecvMsg
|
||||
// as the only use left for the stream after CloseSend is to call
|
||||
// RecvMsg. This also matches historical behavior.
|
||||
return nil
|
||||
}
|
||||
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
|
||||
// We never returned an error here for reasons.
|
||||
return nil
|
||||
@@ -809,11 +816,14 @@ func (a *csAttempt) finish(err error) {
|
||||
|
||||
if a.done != nil {
|
||||
br := false
|
||||
var tr metadata.MD
|
||||
if a.s != nil {
|
||||
br = a.s.BytesReceived()
|
||||
tr = a.s.Trailer()
|
||||
}
|
||||
a.done(balancer.DoneInfo{
|
||||
Err: err,
|
||||
Trailer: tr,
|
||||
BytesSent: a.s != nil,
|
||||
BytesReceived: br,
|
||||
})
|
||||
|
||||
2
vendor/google.golang.org/grpc/version.go
generated
vendored
2
vendor/google.golang.org/grpc/version.go
generated
vendored
@@ -19,4 +19,4 @@
|
||||
package grpc
|
||||
|
||||
// Version is the current grpc version.
|
||||
const Version = "1.15.0"
|
||||
const Version = "1.16.0"
|
||||
|
||||
11
vendor/google.golang.org/grpc/vet.sh
generated
vendored
11
vendor/google.golang.org/grpc/vet.sh
generated
vendored
@@ -71,7 +71,7 @@ fi
|
||||
|
||||
git ls-files "*.go" | xargs grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" 2>&1 | tee /dev/stderr | (! read)
|
||||
git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand') | tee /dev/stderr | (! read)
|
||||
git ls-files | xargs dirname | sort | uniq | xargs go run go_vet/vet.go | tee /dev/stderr | (! read)
|
||||
git ls-files | xargs dirname | sort | uniq | xargs go run test/go_vet/vet.go | tee /dev/stderr | (! read)
|
||||
gofmt -s -d -l . 2>&1 | tee /dev/stderr | (! read)
|
||||
goimports -l . 2>&1 | tee /dev/stderr | (! read)
|
||||
golint ./... 2>&1 | (grep -vE "(_mock|\.pb)\.go:" || true) | tee /dev/stderr | (! read)
|
||||
@@ -80,7 +80,14 @@ golint ./... 2>&1 | (grep -vE "(_mock|\.pb)\.go:" || true) | tee /dev/stderr | (
|
||||
# TODO: Remove this mangling once "context" is imported directly (grpc/grpc-go#711).
|
||||
git ls-files "*.go" | xargs sed -i 's:"golang.org/x/net/context":"context":'
|
||||
set +o pipefail # vet exits with non-zero error if issues are found
|
||||
go tool vet -all . 2>&1 | grep -vE 'clientconn.go:.*cancel (function|var)' | tee /dev/stderr | (! read)
|
||||
|
||||
# TODO(deklerk) remove when we drop Go 1.6 support
|
||||
go tool vet -all . 2>&1 | \
|
||||
grep -vE 'clientconn.go:.*cancel (function|var)' | \
|
||||
grep -vE '.*transport_test.go:.*cancel' | \
|
||||
tee /dev/stderr | \
|
||||
(! read)
|
||||
|
||||
set -o pipefail
|
||||
git reset --hard HEAD
|
||||
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -221,7 +221,7 @@ google.golang.org/api/support/bundler
|
||||
google.golang.org/appengine/cloudsql
|
||||
# google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e
|
||||
google.golang.org/genproto/googleapis/rpc/status
|
||||
# google.golang.org/grpc v1.15.0
|
||||
# google.golang.org/grpc v1.16.0
|
||||
google.golang.org/grpc
|
||||
google.golang.org/grpc/codes
|
||||
google.golang.org/grpc/credentials
|
||||
|
||||
Reference in New Issue
Block a user