1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "errors" 23 "fmt" 24 "math" 25 "net" 26 "reflect" 27 "strings" 28 "sync" 29 "time" 30 31 "golang.org/x/net/context" 32 "golang.org/x/net/trace" 33 "google.golang.org/grpc/balancer" 34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal" 40 "google.golang.org/grpc/internal/backoff" 41 "google.golang.org/grpc/internal/channelz" 42 "google.golang.org/grpc/keepalive" 43 "google.golang.org/grpc/resolver" 44 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. 45 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. 46 "google.golang.org/grpc/stats" 47 "google.golang.org/grpc/status" 48 "google.golang.org/grpc/transport" 49) 50 51const ( 52 // minimum time to give a connection to complete 53 minConnectTimeout = 20 * time.Second 54 // must match grpclbName in grpclb/grpclb.go 55 grpclbName = "grpclb" 56) 57 58var ( 59 // ErrClientConnClosing indicates that the operation is illegal because 60 // the ClientConn is closing. 61 // 62 // Deprecated: this error should not be relied upon by users; use the status 63 // code of Canceled instead. 64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 65 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 66 errConnDrain = errors.New("grpc: the connection is drained") 67 // errConnClosing indicates that the connection is closing. 68 errConnClosing = errors.New("grpc: the connection is closing") 69 // errConnUnavailable indicates that the connection is unavailable. 70 errConnUnavailable = errors.New("grpc: the connection is unavailable") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // We use an accessor so that minConnectTimeout can be 74 // atomically read and updated while testing. 75 getMinConnectTimeout = func() time.Duration { 76 return minConnectTimeout 77 } 78) 79 80// The following errors are returned from Dial and DialContext 81var ( 82 // errNoTransportSecurity indicates that there is no transport security 83 // being set for ClientConn. Users should either set one or explicitly 84 // call WithInsecure DialOption to disable security. 85 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 86 // errTransportCredentialsMissing indicates that users want to transmit security 87 // information (e.g., oauth2 token) which requires secure connection on an insecure 88 // connection. 89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 90 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 91 // and grpc.WithInsecure() are both called for a connection. 92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 93 // errNetworkIO indicates that the connection is down due to some network I/O error. 94 errNetworkIO = errors.New("grpc: failed with network I/O error") 95) 96 97// dialOptions configure a Dial call. dialOptions are set by the DialOption 98// values passed to Dial. 99type dialOptions struct { 100 unaryInt UnaryClientInterceptor 101 streamInt StreamClientInterceptor 102 cp Compressor 103 dc Decompressor 104 bs backoff.Strategy 105 block bool 106 insecure bool 107 timeout time.Duration 108 scChan <-chan ServiceConfig 109 copts transport.ConnectOptions 110 callOptions []CallOption 111 // This is used by v1 balancer dial option WithBalancer to support v1 112 // balancer, and also by WithBalancerName dial option. 113 balancerBuilder balancer.Builder 114 // This is to support grpclb. 115 resolverBuilder resolver.Builder 116 waitForHandshake bool 117 channelzParentID int64 118 disableServiceConfig bool 119} 120 121const ( 122 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 123 defaultClientMaxSendMessageSize = math.MaxInt32 124) 125 126// RegisterChannelz turns on channelz service. 127// This is an EXPERIMENTAL API. 128func RegisterChannelz() { 129 channelz.TurnOn() 130} 131 132// DialOption configures how we set up the connection. 133type DialOption func(*dialOptions) 134 135// WithWaitForHandshake blocks until the initial settings frame is received from the 136// server before assigning RPCs to the connection. 137// Experimental API. 138func WithWaitForHandshake() DialOption { 139 return func(o *dialOptions) { 140 o.waitForHandshake = true 141 } 142} 143 144// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched 145// before doing a write on the wire. 146func WithWriteBufferSize(s int) DialOption { 147 return func(o *dialOptions) { 148 o.copts.WriteBufferSize = s 149 } 150} 151 152// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most 153// for each read syscall. 154func WithReadBufferSize(s int) DialOption { 155 return func(o *dialOptions) { 156 o.copts.ReadBufferSize = s 157 } 158} 159 160// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. 161// The lower bound for window size is 64K and any value smaller than that will be ignored. 162func WithInitialWindowSize(s int32) DialOption { 163 return func(o *dialOptions) { 164 o.copts.InitialWindowSize = s 165 } 166} 167 168// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. 169// The lower bound for window size is 64K and any value smaller than that will be ignored. 170func WithInitialConnWindowSize(s int32) DialOption { 171 return func(o *dialOptions) { 172 o.copts.InitialConnWindowSize = s 173 } 174} 175 176// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. 177// 178// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. 179func WithMaxMsgSize(s int) DialOption { 180 return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) 181} 182 183// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection. 184func WithDefaultCallOptions(cos ...CallOption) DialOption { 185 return func(o *dialOptions) { 186 o.callOptions = append(o.callOptions, cos...) 187 } 188} 189 190// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. 191// 192// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead. 193func WithCodec(c Codec) DialOption { 194 return WithDefaultCallOptions(CallCustomCodec(c)) 195} 196 197// WithCompressor returns a DialOption which sets a Compressor to use for 198// message compression. It has lower priority than the compressor set by 199// the UseCompressor CallOption. 200// 201// Deprecated: use UseCompressor instead. 202func WithCompressor(cp Compressor) DialOption { 203 return func(o *dialOptions) { 204 o.cp = cp 205 } 206} 207 208// WithDecompressor returns a DialOption which sets a Decompressor to use for 209// incoming message decompression. If incoming response messages are encoded 210// using the decompressor's Type(), it will be used. Otherwise, the message 211// encoding will be used to look up the compressor registered via 212// encoding.RegisterCompressor, which will then be used to decompress the 213// message. If no compressor is registered for the encoding, an Unimplemented 214// status error will be returned. 215// 216// Deprecated: use encoding.RegisterCompressor instead. 217func WithDecompressor(dc Decompressor) DialOption { 218 return func(o *dialOptions) { 219 o.dc = dc 220 } 221} 222 223// WithBalancer returns a DialOption which sets a load balancer with the v1 API. 224// Name resolver will be ignored if this DialOption is specified. 225// 226// Deprecated: use the new balancer APIs in balancer package and WithBalancerName. 227func WithBalancer(b Balancer) DialOption { 228 return func(o *dialOptions) { 229 o.balancerBuilder = &balancerWrapperBuilder{ 230 b: b, 231 } 232 } 233} 234 235// WithBalancerName sets the balancer that the ClientConn will be initialized 236// with. Balancer registered with balancerName will be used. This function 237// panics if no balancer was registered by balancerName. 238// 239// The balancer cannot be overridden by balancer option specified by service 240// config. 241// 242// This is an EXPERIMENTAL API. 243func WithBalancerName(balancerName string) DialOption { 244 builder := balancer.Get(balancerName) 245 if builder == nil { 246 panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName)) 247 } 248 return func(o *dialOptions) { 249 o.balancerBuilder = builder 250 } 251} 252 253// withResolverBuilder is only for grpclb. 254func withResolverBuilder(b resolver.Builder) DialOption { 255 return func(o *dialOptions) { 256 o.resolverBuilder = b 257 } 258} 259 260// WithServiceConfig returns a DialOption which has a channel to read the service configuration. 261// 262// Deprecated: service config should be received through name resolver, as specified here. 263// https://github.com/grpc/grpc/blob/master/doc/service_config.md 264func WithServiceConfig(c <-chan ServiceConfig) DialOption { 265 return func(o *dialOptions) { 266 o.scChan = c 267 } 268} 269 270// WithBackoffMaxDelay configures the dialer to use the provided maximum delay 271// when backing off after failed connection attempts. 272func WithBackoffMaxDelay(md time.Duration) DialOption { 273 return WithBackoffConfig(BackoffConfig{MaxDelay: md}) 274} 275 276// WithBackoffConfig configures the dialer to use the provided backoff 277// parameters after connection failures. 278// 279// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up 280// for use. 281func WithBackoffConfig(b BackoffConfig) DialOption { 282 283 return withBackoff(backoff.Exponential{ 284 MaxDelay: b.MaxDelay, 285 }) 286} 287 288// withBackoff sets the backoff strategy used for connectRetryNum after a 289// failed connection attempt. 290// 291// This can be exported if arbitrary backoff strategies are allowed by gRPC. 292func withBackoff(bs backoff.Strategy) DialOption { 293 return func(o *dialOptions) { 294 o.bs = bs 295 } 296} 297 298// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying 299// connection is up. Without this, Dial returns immediately and connecting the server 300// happens in background. 301func WithBlock() DialOption { 302 return func(o *dialOptions) { 303 o.block = true 304 } 305} 306 307// WithInsecure returns a DialOption which disables transport security for this ClientConn. 308// Note that transport security is required unless WithInsecure is set. 309func WithInsecure() DialOption { 310 return func(o *dialOptions) { 311 o.insecure = true 312 } 313} 314 315// WithTransportCredentials returns a DialOption which configures a 316// connection level security credentials (e.g., TLS/SSL). 317func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { 318 return func(o *dialOptions) { 319 o.copts.TransportCredentials = creds 320 } 321} 322 323// WithPerRPCCredentials returns a DialOption which sets 324// credentials and places auth state on each outbound RPC. 325func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { 326 return func(o *dialOptions) { 327 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) 328 } 329} 330 331// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn 332// initially. This is valid if and only if WithBlock() is present. 333// 334// Deprecated: use DialContext and context.WithTimeout instead. 335func WithTimeout(d time.Duration) DialOption { 336 return func(o *dialOptions) { 337 o.timeout = d 338 } 339} 340 341func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption { 342 return func(o *dialOptions) { 343 o.copts.Dialer = f 344 } 345} 346 347func init() { 348 internal.WithContextDialer = withContextDialer 349 internal.WithResolverBuilder = withResolverBuilder 350} 351 352// WithDialer returns a DialOption that specifies a function to use for dialing network addresses. 353// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's 354// Temporary() method to decide if it should try to reconnect to the network address. 355func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { 356 return withContextDialer( 357 func(ctx context.Context, addr string) (net.Conn, error) { 358 if deadline, ok := ctx.Deadline(); ok { 359 return f(addr, deadline.Sub(time.Now())) 360 } 361 return f(addr, 0) 362 }) 363} 364 365// WithStatsHandler returns a DialOption that specifies the stats handler 366// for all the RPCs and underlying network connections in this ClientConn. 367func WithStatsHandler(h stats.Handler) DialOption { 368 return func(o *dialOptions) { 369 o.copts.StatsHandler = h 370 } 371} 372 373// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors. 374// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network 375// address and won't try to reconnect. 376// The default value of FailOnNonTempDialError is false. 377// This is an EXPERIMENTAL API. 378func FailOnNonTempDialError(f bool) DialOption { 379 return func(o *dialOptions) { 380 o.copts.FailOnNonTempDialError = f 381 } 382} 383 384// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. 385func WithUserAgent(s string) DialOption { 386 return func(o *dialOptions) { 387 o.copts.UserAgent = s 388 } 389} 390 391// WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport. 392func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { 393 return func(o *dialOptions) { 394 o.copts.KeepaliveParams = kp 395 } 396} 397 398// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. 399func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { 400 return func(o *dialOptions) { 401 o.unaryInt = f 402 } 403} 404 405// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs. 406func WithStreamInterceptor(f StreamClientInterceptor) DialOption { 407 return func(o *dialOptions) { 408 o.streamInt = f 409 } 410} 411 412// WithAuthority returns a DialOption that specifies the value to be used as 413// the :authority pseudo-header. This value only works with WithInsecure and 414// has no effect if TransportCredentials are present. 415func WithAuthority(a string) DialOption { 416 return func(o *dialOptions) { 417 o.copts.Authority = a 418 } 419} 420 421// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's 422// parent. This function is used in nested channel creation (e.g. grpclb dial). 423func WithChannelzParentID(id int64) DialOption { 424 return func(o *dialOptions) { 425 o.channelzParentID = id 426 } 427} 428 429// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any 430// service config provided by the resolver and provides a hint to the resolver 431// to not fetch service configs. 432func WithDisableServiceConfig() DialOption { 433 return func(o *dialOptions) { 434 o.disableServiceConfig = true 435 } 436} 437 438// Dial creates a client connection to the given target. 439func Dial(target string, opts ...DialOption) (*ClientConn, error) { 440 return DialContext(context.Background(), target, opts...) 441} 442 443// DialContext creates a client connection to the given target. By default, it's 444// a non-blocking dial (the function won't wait for connections to be 445// established, and connecting happens in the background). To make it a blocking 446// dial, use WithBlock() dial option. 447// 448// In the non-blocking case, the ctx does not act against the connection. It 449// only controls the setup steps. 450// 451// In the blocking case, ctx can be used to cancel or expire the pending 452// connection. Once this function returns, the cancellation and expiration of 453// ctx will be noop. Users should call ClientConn.Close to terminate all the 454// pending operations after this function returns. 455// 456// The target name syntax is defined in 457// https://github.com/grpc/grpc/blob/master/doc/naming.md. 458// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 459func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 460 cc := &ClientConn{ 461 target: target, 462 csMgr: &connectivityStateManager{}, 463 conns: make(map[*addrConn]struct{}), 464 465 blockingpicker: newPickerWrapper(), 466 } 467 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 468 469 for _, opt := range opts { 470 opt(&cc.dopts) 471 } 472 473 if channelz.IsOn() { 474 if cc.dopts.channelzParentID != 0 { 475 cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target) 476 } else { 477 cc.channelzID = channelz.RegisterChannel(cc, 0, target) 478 } 479 } 480 481 if !cc.dopts.insecure { 482 if cc.dopts.copts.TransportCredentials == nil { 483 return nil, errNoTransportSecurity 484 } 485 } else { 486 if cc.dopts.copts.TransportCredentials != nil { 487 return nil, errCredentialsConflict 488 } 489 for _, cd := range cc.dopts.copts.PerRPCCredentials { 490 if cd.RequireTransportSecurity() { 491 return nil, errTransportCredentialsMissing 492 } 493 } 494 } 495 496 cc.mkp = cc.dopts.copts.KeepaliveParams 497 498 if cc.dopts.copts.Dialer == nil { 499 cc.dopts.copts.Dialer = newProxyDialer( 500 func(ctx context.Context, addr string) (net.Conn, error) { 501 network, addr := parseDialTarget(addr) 502 return dialContext(ctx, network, addr) 503 }, 504 ) 505 } 506 507 if cc.dopts.copts.UserAgent != "" { 508 cc.dopts.copts.UserAgent += " " + grpcUA 509 } else { 510 cc.dopts.copts.UserAgent = grpcUA 511 } 512 513 if cc.dopts.timeout > 0 { 514 var cancel context.CancelFunc 515 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 516 defer cancel() 517 } 518 519 defer func() { 520 select { 521 case <-ctx.Done(): 522 conn, err = nil, ctx.Err() 523 default: 524 } 525 526 if err != nil { 527 cc.Close() 528 } 529 }() 530 531 scSet := false 532 if cc.dopts.scChan != nil { 533 // Try to get an initial service config. 534 select { 535 case sc, ok := <-cc.dopts.scChan: 536 if ok { 537 cc.sc = sc 538 scSet = true 539 } 540 default: 541 } 542 } 543 if cc.dopts.bs == nil { 544 cc.dopts.bs = backoff.Exponential{ 545 MaxDelay: DefaultBackoffConfig.MaxDelay, 546 } 547 } 548 if cc.dopts.resolverBuilder == nil { 549 // Only try to parse target when resolver builder is not already set. 550 cc.parsedTarget = parseTarget(cc.target) 551 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 552 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 553 if cc.dopts.resolverBuilder == nil { 554 // If resolver builder is still nil, the parse target's scheme is 555 // not registered. Fallback to default resolver and set Endpoint to 556 // the original unparsed target. 557 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 558 cc.parsedTarget = resolver.Target{ 559 Scheme: resolver.GetDefaultScheme(), 560 Endpoint: target, 561 } 562 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 563 } 564 } else { 565 cc.parsedTarget = resolver.Target{Endpoint: target} 566 } 567 creds := cc.dopts.copts.TransportCredentials 568 if creds != nil && creds.Info().ServerName != "" { 569 cc.authority = creds.Info().ServerName 570 } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { 571 cc.authority = cc.dopts.copts.Authority 572 } else { 573 // Use endpoint from "scheme://authority/endpoint" as the default 574 // authority for ClientConn. 575 cc.authority = cc.parsedTarget.Endpoint 576 } 577 578 if cc.dopts.scChan != nil && !scSet { 579 // Blocking wait for the initial service config. 580 select { 581 case sc, ok := <-cc.dopts.scChan: 582 if ok { 583 cc.sc = sc 584 } 585 case <-ctx.Done(): 586 return nil, ctx.Err() 587 } 588 } 589 if cc.dopts.scChan != nil { 590 go cc.scWatcher() 591 } 592 593 var credsClone credentials.TransportCredentials 594 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 595 credsClone = creds.Clone() 596 } 597 cc.balancerBuildOpts = balancer.BuildOptions{ 598 DialCreds: credsClone, 599 Dialer: cc.dopts.copts.Dialer, 600 ChannelzParentID: cc.channelzID, 601 } 602 603 // Build the resolver. 604 cc.resolverWrapper, err = newCCResolverWrapper(cc) 605 if err != nil { 606 return nil, fmt.Errorf("failed to build resolver: %v", err) 607 } 608 // Start the resolver wrapper goroutine after resolverWrapper is created. 609 // 610 // If the goroutine is started before resolverWrapper is ready, the 611 // following may happen: The goroutine sends updates to cc. cc forwards 612 // those to balancer. Balancer creates new addrConn. addrConn fails to 613 // connect, and calls resolveNow(). resolveNow() tries to use the non-ready 614 // resolverWrapper. 615 cc.resolverWrapper.start() 616 617 // A blocking dial blocks until the clientConn is ready. 618 if cc.dopts.block { 619 for { 620 s := cc.GetState() 621 if s == connectivity.Ready { 622 break 623 } 624 if !cc.WaitForStateChange(ctx, s) { 625 // ctx got timeout or canceled. 626 return nil, ctx.Err() 627 } 628 } 629 } 630 631 return cc, nil 632} 633 634// connectivityStateManager keeps the connectivity.State of ClientConn. 635// This struct will eventually be exported so the balancers can access it. 636type connectivityStateManager struct { 637 mu sync.Mutex 638 state connectivity.State 639 notifyChan chan struct{} 640} 641 642// updateState updates the connectivity.State of ClientConn. 643// If there's a change it notifies goroutines waiting on state change to 644// happen. 645func (csm *connectivityStateManager) updateState(state connectivity.State) { 646 csm.mu.Lock() 647 defer csm.mu.Unlock() 648 if csm.state == connectivity.Shutdown { 649 return 650 } 651 if csm.state == state { 652 return 653 } 654 csm.state = state 655 if csm.notifyChan != nil { 656 // There are other goroutines waiting on this channel. 657 close(csm.notifyChan) 658 csm.notifyChan = nil 659 } 660} 661 662func (csm *connectivityStateManager) getState() connectivity.State { 663 csm.mu.Lock() 664 defer csm.mu.Unlock() 665 return csm.state 666} 667 668func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 669 csm.mu.Lock() 670 defer csm.mu.Unlock() 671 if csm.notifyChan == nil { 672 csm.notifyChan = make(chan struct{}) 673 } 674 return csm.notifyChan 675} 676 677// ClientConn represents a client connection to an RPC server. 678type ClientConn struct { 679 ctx context.Context 680 cancel context.CancelFunc 681 682 target string 683 parsedTarget resolver.Target 684 authority string 685 dopts dialOptions 686 csMgr *connectivityStateManager 687 688 balancerBuildOpts balancer.BuildOptions 689 resolverWrapper *ccResolverWrapper 690 blockingpicker *pickerWrapper 691 692 mu sync.RWMutex 693 sc ServiceConfig 694 scRaw string 695 conns map[*addrConn]struct{} 696 // Keepalive parameter can be updated if a GoAway is received. 697 mkp keepalive.ClientParameters 698 curBalancerName string 699 preBalancerName string // previous balancer name. 700 curAddresses []resolver.Address 701 balancerWrapper *ccBalancerWrapper 702 703 channelzID int64 // channelz unique identification number 704 czmu sync.RWMutex 705 callsStarted int64 706 callsSucceeded int64 707 callsFailed int64 708 lastCallStartedTime time.Time 709} 710 711// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 712// ctx expires. A true value is returned in former case and false in latter. 713// This is an EXPERIMENTAL API. 714func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 715 ch := cc.csMgr.getNotifyChan() 716 if cc.csMgr.getState() != sourceState { 717 return true 718 } 719 select { 720 case <-ctx.Done(): 721 return false 722 case <-ch: 723 return true 724 } 725} 726 727// GetState returns the connectivity.State of ClientConn. 728// This is an EXPERIMENTAL API. 729func (cc *ClientConn) GetState() connectivity.State { 730 return cc.csMgr.getState() 731} 732 733func (cc *ClientConn) scWatcher() { 734 for { 735 select { 736 case sc, ok := <-cc.dopts.scChan: 737 if !ok { 738 return 739 } 740 cc.mu.Lock() 741 // TODO: load balance policy runtime change is ignored. 742 // We may revist this decision in the future. 743 cc.sc = sc 744 cc.scRaw = "" 745 cc.mu.Unlock() 746 case <-cc.ctx.Done(): 747 return 748 } 749 } 750} 751 752func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { 753 cc.mu.Lock() 754 defer cc.mu.Unlock() 755 if cc.conns == nil { 756 // cc was closed. 757 return 758 } 759 760 if reflect.DeepEqual(cc.curAddresses, addrs) { 761 return 762 } 763 764 cc.curAddresses = addrs 765 766 if cc.dopts.balancerBuilder == nil { 767 // Only look at balancer types and switch balancer if balancer dial 768 // option is not set. 769 var isGRPCLB bool 770 for _, a := range addrs { 771 if a.Type == resolver.GRPCLB { 772 isGRPCLB = true 773 break 774 } 775 } 776 var newBalancerName string 777 if isGRPCLB { 778 newBalancerName = grpclbName 779 } else { 780 // Address list doesn't contain grpclb address. Try to pick a 781 // non-grpclb balancer. 782 newBalancerName = cc.curBalancerName 783 // If current balancer is grpclb, switch to the previous one. 784 if newBalancerName == grpclbName { 785 newBalancerName = cc.preBalancerName 786 } 787 // The following could be true in two cases: 788 // - the first time handling resolved addresses 789 // (curBalancerName="") 790 // - the first time handling non-grpclb addresses 791 // (curBalancerName="grpclb", preBalancerName="") 792 if newBalancerName == "" { 793 newBalancerName = PickFirstBalancerName 794 } 795 } 796 cc.switchBalancer(newBalancerName) 797 } else if cc.balancerWrapper == nil { 798 // Balancer dial option was set, and this is the first time handling 799 // resolved addresses. Build a balancer with dopts.balancerBuilder. 800 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 801 } 802 803 cc.balancerWrapper.handleResolvedAddrs(addrs, nil) 804} 805 806// switchBalancer starts the switching from current balancer to the balancer 807// with the given name. 808// 809// It will NOT send the current address list to the new balancer. If needed, 810// caller of this function should send address list to the new balancer after 811// this function returns. 812// 813// Caller must hold cc.mu. 814func (cc *ClientConn) switchBalancer(name string) { 815 if cc.conns == nil { 816 return 817 } 818 819 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) { 820 return 821 } 822 823 grpclog.Infof("ClientConn switching balancer to %q", name) 824 if cc.dopts.balancerBuilder != nil { 825 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 826 return 827 } 828 // TODO(bar switching) change this to two steps: drain and close. 829 // Keep track of sc in wrapper. 830 if cc.balancerWrapper != nil { 831 cc.balancerWrapper.close() 832 } 833 // Clear all stickiness state. 834 cc.blockingpicker.clearStickinessState() 835 836 builder := balancer.Get(name) 837 if builder == nil { 838 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 839 builder = newPickfirstBuilder() 840 } 841 cc.preBalancerName = cc.curBalancerName 842 cc.curBalancerName = builder.Name() 843 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 844} 845 846func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 847 cc.mu.Lock() 848 if cc.conns == nil { 849 cc.mu.Unlock() 850 return 851 } 852 // TODO(bar switching) send updates to all balancer wrappers when balancer 853 // gracefully switching is supported. 854 cc.balancerWrapper.handleSubConnStateChange(sc, s) 855 cc.mu.Unlock() 856} 857 858// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 859// 860// Caller needs to make sure len(addrs) > 0. 861func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { 862 ac := &addrConn{ 863 cc: cc, 864 addrs: addrs, 865 dopts: cc.dopts, 866 } 867 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 868 // Track ac in cc. This needs to be done before any getTransport(...) is called. 869 cc.mu.Lock() 870 if cc.conns == nil { 871 cc.mu.Unlock() 872 return nil, ErrClientConnClosing 873 } 874 if channelz.IsOn() { 875 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 876 } 877 cc.conns[ac] = struct{}{} 878 cc.mu.Unlock() 879 return ac, nil 880} 881 882// removeAddrConn removes the addrConn in the subConn from clientConn. 883// It also tears down the ac with the given error. 884func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 885 cc.mu.Lock() 886 if cc.conns == nil { 887 cc.mu.Unlock() 888 return 889 } 890 delete(cc.conns, ac) 891 cc.mu.Unlock() 892 ac.tearDown(err) 893} 894 895// ChannelzMetric returns ChannelInternalMetric of current ClientConn. 896// This is an EXPERIMENTAL API. 897func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric { 898 state := cc.GetState() 899 cc.czmu.RLock() 900 defer cc.czmu.RUnlock() 901 return &channelz.ChannelInternalMetric{ 902 State: state, 903 Target: cc.target, 904 CallsStarted: cc.callsStarted, 905 CallsSucceeded: cc.callsSucceeded, 906 CallsFailed: cc.callsFailed, 907 LastCallStartedTimestamp: cc.lastCallStartedTime, 908 } 909} 910 911func (cc *ClientConn) incrCallsStarted() { 912 cc.czmu.Lock() 913 cc.callsStarted++ 914 // TODO(yuxuanli): will make this a time.Time pointer improve performance? 915 cc.lastCallStartedTime = time.Now() 916 cc.czmu.Unlock() 917} 918 919func (cc *ClientConn) incrCallsSucceeded() { 920 cc.czmu.Lock() 921 cc.callsSucceeded++ 922 cc.czmu.Unlock() 923} 924 925func (cc *ClientConn) incrCallsFailed() { 926 cc.czmu.Lock() 927 cc.callsFailed++ 928 cc.czmu.Unlock() 929} 930 931// connect starts to creating transport and also starts the transport monitor 932// goroutine for this ac. 933// It does nothing if the ac is not IDLE. 934// TODO(bar) Move this to the addrConn section. 935// This was part of resetAddrConn, keep it here to make the diff look clean. 936func (ac *addrConn) connect() error { 937 ac.mu.Lock() 938 if ac.state == connectivity.Shutdown { 939 ac.mu.Unlock() 940 return errConnClosing 941 } 942 if ac.state != connectivity.Idle { 943 ac.mu.Unlock() 944 return nil 945 } 946 ac.state = connectivity.Connecting 947 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 948 ac.mu.Unlock() 949 950 // Start a goroutine connecting to the server asynchronously. 951 go func() { 952 if err := ac.resetTransport(); err != nil { 953 grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) 954 if err != errConnClosing { 955 // Keep this ac in cc.conns, to get the reason it's torn down. 956 ac.tearDown(err) 957 } 958 return 959 } 960 ac.transportMonitor() 961 }() 962 return nil 963} 964 965// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 966// 967// It checks whether current connected address of ac is in the new addrs list. 968// - If true, it updates ac.addrs and returns true. The ac will keep using 969// the existing connection. 970// - If false, it does nothing and returns false. 971func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 972 ac.mu.Lock() 973 defer ac.mu.Unlock() 974 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 975 if ac.state == connectivity.Shutdown { 976 ac.addrs = addrs 977 return true 978 } 979 980 var curAddrFound bool 981 for _, a := range addrs { 982 if reflect.DeepEqual(ac.curAddr, a) { 983 curAddrFound = true 984 break 985 } 986 } 987 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 988 if curAddrFound { 989 ac.addrs = addrs 990 ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. 991 } 992 993 return curAddrFound 994} 995 996// GetMethodConfig gets the method config of the input method. 997// If there's an exact match for input method (i.e. /service/method), we return 998// the corresponding MethodConfig. 999// If there isn't an exact match for the input method, we look for the default config 1000// under the service (i.e /service/). If there is a default MethodConfig for 1001// the service, we return it. 1002// Otherwise, we return an empty MethodConfig. 1003func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 1004 // TODO: Avoid the locking here. 1005 cc.mu.RLock() 1006 defer cc.mu.RUnlock() 1007 m, ok := cc.sc.Methods[method] 1008 if !ok { 1009 i := strings.LastIndex(method, "/") 1010 m = cc.sc.Methods[method[:i+1]] 1011 } 1012 return m 1013} 1014 1015func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) { 1016 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{}) 1017 if err != nil { 1018 return nil, nil, toRPCErr(err) 1019 } 1020 return t, done, nil 1021} 1022 1023// handleServiceConfig parses the service config string in JSON format to Go native 1024// struct ServiceConfig, and store both the struct and the JSON string in ClientConn. 1025func (cc *ClientConn) handleServiceConfig(js string) error { 1026 if cc.dopts.disableServiceConfig { 1027 return nil 1028 } 1029 sc, err := parseServiceConfig(js) 1030 if err != nil { 1031 return err 1032 } 1033 cc.mu.Lock() 1034 cc.scRaw = js 1035 cc.sc = sc 1036 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config. 1037 if cc.curBalancerName == grpclbName { 1038 // If current balancer is grpclb, there's at least one grpclb 1039 // balancer address in the resolved list. Don't switch the balancer, 1040 // but change the previous balancer name, so if a new resolved 1041 // address list doesn't contain grpclb address, balancer will be 1042 // switched to *sc.LB. 1043 cc.preBalancerName = *sc.LB 1044 } else { 1045 cc.switchBalancer(*sc.LB) 1046 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil) 1047 } 1048 } 1049 1050 if envConfigStickinessOn { 1051 var newStickinessMDKey string 1052 if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" { 1053 newStickinessMDKey = *sc.stickinessMetadataKey 1054 } 1055 // newStickinessMDKey is "" if one of the following happens: 1056 // - stickinessMetadataKey is set to "" 1057 // - stickinessMetadataKey field doesn't exist in service config 1058 cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey)) 1059 } 1060 1061 cc.mu.Unlock() 1062 return nil 1063} 1064 1065func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { 1066 cc.mu.RLock() 1067 r := cc.resolverWrapper 1068 cc.mu.RUnlock() 1069 if r == nil { 1070 return 1071 } 1072 go r.resolveNow(o) 1073} 1074 1075// Close tears down the ClientConn and all underlying connections. 1076func (cc *ClientConn) Close() error { 1077 defer cc.cancel() 1078 1079 cc.mu.Lock() 1080 if cc.conns == nil { 1081 cc.mu.Unlock() 1082 return ErrClientConnClosing 1083 } 1084 conns := cc.conns 1085 cc.conns = nil 1086 cc.csMgr.updateState(connectivity.Shutdown) 1087 1088 rWrapper := cc.resolverWrapper 1089 cc.resolverWrapper = nil 1090 bWrapper := cc.balancerWrapper 1091 cc.balancerWrapper = nil 1092 cc.mu.Unlock() 1093 1094 cc.blockingpicker.close() 1095 1096 if rWrapper != nil { 1097 rWrapper.close() 1098 } 1099 if bWrapper != nil { 1100 bWrapper.close() 1101 } 1102 1103 for ac := range conns { 1104 ac.tearDown(ErrClientConnClosing) 1105 } 1106 if channelz.IsOn() { 1107 channelz.RemoveEntry(cc.channelzID) 1108 } 1109 return nil 1110} 1111 1112// addrConn is a network connection to a given address. 1113type addrConn struct { 1114 ctx context.Context 1115 cancel context.CancelFunc 1116 1117 cc *ClientConn 1118 addrs []resolver.Address 1119 dopts dialOptions 1120 events trace.EventLog 1121 acbw balancer.SubConn 1122 1123 mu sync.Mutex 1124 curAddr resolver.Address 1125 reconnectIdx int // The index in addrs list to start reconnecting from. 1126 state connectivity.State 1127 // ready is closed and becomes nil when a new transport is up or failed 1128 // due to timeout. 1129 ready chan struct{} 1130 transport transport.ClientTransport 1131 1132 // The reason this addrConn is torn down. 1133 tearDownErr error 1134 1135 connectRetryNum int 1136 // backoffDeadline is the time until which resetTransport needs to 1137 // wait before increasing connectRetryNum count. 1138 backoffDeadline time.Time 1139 // connectDeadline is the time by which all connection 1140 // negotiations must complete. 1141 connectDeadline time.Time 1142 1143 channelzID int64 // channelz unique identification number 1144 czmu sync.RWMutex 1145 callsStarted int64 1146 callsSucceeded int64 1147 callsFailed int64 1148 lastCallStartedTime time.Time 1149} 1150 1151// adjustParams updates parameters used to create transports upon 1152// receiving a GoAway. 1153func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1154 switch r { 1155 case transport.GoAwayTooManyPings: 1156 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1157 ac.cc.mu.Lock() 1158 if v > ac.cc.mkp.Time { 1159 ac.cc.mkp.Time = v 1160 } 1161 ac.cc.mu.Unlock() 1162 } 1163} 1164 1165// printf records an event in ac's event log, unless ac has been closed. 1166// REQUIRES ac.mu is held. 1167func (ac *addrConn) printf(format string, a ...interface{}) { 1168 if ac.events != nil { 1169 ac.events.Printf(format, a...) 1170 } 1171} 1172 1173// errorf records an error in ac's event log, unless ac has been closed. 1174// REQUIRES ac.mu is held. 1175func (ac *addrConn) errorf(format string, a ...interface{}) { 1176 if ac.events != nil { 1177 ac.events.Errorf(format, a...) 1178 } 1179} 1180 1181// resetTransport recreates a transport to the address for ac. The old 1182// transport will close itself on error or when the clientconn is closed. 1183// The created transport must receive initial settings frame from the server. 1184// In case that doesn't happen, transportMonitor will kill the newly created 1185// transport after connectDeadline has expired. 1186// In case there was an error on the transport before the settings frame was 1187// received, resetTransport resumes connecting to backends after the one that 1188// was previously connected to. In case end of the list is reached, resetTransport 1189// backs off until the original deadline. 1190// If the DialOption WithWaitForHandshake was set, resetTrasport returns 1191// successfully only after server settings are received. 1192// 1193// TODO(bar) make sure all state transitions are valid. 1194func (ac *addrConn) resetTransport() error { 1195 ac.mu.Lock() 1196 if ac.state == connectivity.Shutdown { 1197 ac.mu.Unlock() 1198 return errConnClosing 1199 } 1200 if ac.ready != nil { 1201 close(ac.ready) 1202 ac.ready = nil 1203 } 1204 ac.transport = nil 1205 ridx := ac.reconnectIdx 1206 ac.mu.Unlock() 1207 ac.cc.mu.RLock() 1208 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1209 ac.cc.mu.RUnlock() 1210 var backoffDeadline, connectDeadline time.Time 1211 for connectRetryNum := 0; ; connectRetryNum++ { 1212 ac.mu.Lock() 1213 if ac.backoffDeadline.IsZero() { 1214 // This means either a successful HTTP2 connection was established 1215 // or this is the first time this addrConn is trying to establish a 1216 // connection. 1217 backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration. 1218 // This will be the duration that dial gets to finish. 1219 dialDuration := getMinConnectTimeout() 1220 if backoffFor > dialDuration { 1221 // Give dial more time as we keep failing to connect. 1222 dialDuration = backoffFor 1223 } 1224 start := time.Now() 1225 backoffDeadline = start.Add(backoffFor) 1226 connectDeadline = start.Add(dialDuration) 1227 ridx = 0 // Start connecting from the beginning. 1228 } else { 1229 // Continue trying to connect with the same deadlines. 1230 connectRetryNum = ac.connectRetryNum 1231 backoffDeadline = ac.backoffDeadline 1232 connectDeadline = ac.connectDeadline 1233 ac.backoffDeadline = time.Time{} 1234 ac.connectDeadline = time.Time{} 1235 ac.connectRetryNum = 0 1236 } 1237 if ac.state == connectivity.Shutdown { 1238 ac.mu.Unlock() 1239 return errConnClosing 1240 } 1241 ac.printf("connecting") 1242 if ac.state != connectivity.Connecting { 1243 ac.state = connectivity.Connecting 1244 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1245 } 1246 // copy ac.addrs in case of race 1247 addrsIter := make([]resolver.Address, len(ac.addrs)) 1248 copy(addrsIter, ac.addrs) 1249 copts := ac.dopts.copts 1250 ac.mu.Unlock() 1251 connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts) 1252 if err != nil { 1253 return err 1254 } 1255 if connected { 1256 return nil 1257 } 1258 } 1259} 1260 1261// createTransport creates a connection to one of the backends in addrs. 1262// It returns true if a connection was established. 1263func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) { 1264 for i := ridx; i < len(addrs); i++ { 1265 addr := addrs[i] 1266 target := transport.TargetInfo{ 1267 Addr: addr.Addr, 1268 Metadata: addr.Metadata, 1269 Authority: ac.cc.authority, 1270 } 1271 done := make(chan struct{}) 1272 onPrefaceReceipt := func() { 1273 ac.mu.Lock() 1274 close(done) 1275 if !ac.backoffDeadline.IsZero() { 1276 // If we haven't already started reconnecting to 1277 // other backends. 1278 // Note, this can happen when writer notices an error 1279 // and triggers resetTransport while at the same time 1280 // reader receives the preface and invokes this closure. 1281 ac.backoffDeadline = time.Time{} 1282 ac.connectDeadline = time.Time{} 1283 ac.connectRetryNum = 0 1284 } 1285 ac.mu.Unlock() 1286 } 1287 // Do not cancel in the success path because of 1288 // this issue in Go1.6: https://github.com/golang/go/issues/15078. 1289 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1290 if channelz.IsOn() { 1291 copts.ChannelzParentID = ac.channelzID 1292 } 1293 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) 1294 if err != nil { 1295 cancel() 1296 ac.cc.blockingpicker.updateConnectionError(err) 1297 ac.mu.Lock() 1298 if ac.state == connectivity.Shutdown { 1299 // ac.tearDown(...) has been invoked. 1300 ac.mu.Unlock() 1301 return false, errConnClosing 1302 } 1303 ac.mu.Unlock() 1304 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1305 continue 1306 } 1307 if ac.dopts.waitForHandshake { 1308 select { 1309 case <-done: 1310 case <-connectCtx.Done(): 1311 // Didn't receive server preface, must kill this new transport now. 1312 grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") 1313 newTr.Close() 1314 break 1315 case <-ac.ctx.Done(): 1316 } 1317 } 1318 ac.mu.Lock() 1319 if ac.state == connectivity.Shutdown { 1320 ac.mu.Unlock() 1321 // ac.tearDonn(...) has been invoked. 1322 newTr.Close() 1323 return false, errConnClosing 1324 } 1325 ac.printf("ready") 1326 ac.state = connectivity.Ready 1327 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1328 ac.transport = newTr 1329 ac.curAddr = addr 1330 if ac.ready != nil { 1331 close(ac.ready) 1332 ac.ready = nil 1333 } 1334 select { 1335 case <-done: 1336 // If the server has responded back with preface already, 1337 // don't set the reconnect parameters. 1338 default: 1339 ac.connectRetryNum = connectRetryNum 1340 ac.backoffDeadline = backoffDeadline 1341 ac.connectDeadline = connectDeadline 1342 ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. 1343 } 1344 ac.mu.Unlock() 1345 return true, nil 1346 } 1347 ac.mu.Lock() 1348 if ac.state == connectivity.Shutdown { 1349 ac.mu.Unlock() 1350 return false, errConnClosing 1351 } 1352 ac.state = connectivity.TransientFailure 1353 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1354 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1355 if ac.ready != nil { 1356 close(ac.ready) 1357 ac.ready = nil 1358 } 1359 ac.mu.Unlock() 1360 timer := time.NewTimer(backoffDeadline.Sub(time.Now())) 1361 select { 1362 case <-timer.C: 1363 case <-ac.ctx.Done(): 1364 timer.Stop() 1365 return false, ac.ctx.Err() 1366 } 1367 return false, nil 1368} 1369 1370// Run in a goroutine to track the error in transport and create the 1371// new transport if an error happens. It returns when the channel is closing. 1372func (ac *addrConn) transportMonitor() { 1373 for { 1374 var timer *time.Timer 1375 var cdeadline <-chan time.Time 1376 ac.mu.Lock() 1377 t := ac.transport 1378 if !ac.connectDeadline.IsZero() { 1379 timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) 1380 cdeadline = timer.C 1381 } 1382 ac.mu.Unlock() 1383 // Block until we receive a goaway or an error occurs. 1384 select { 1385 case <-t.GoAway(): 1386 done := t.Error() 1387 cleanup := t.Close 1388 // Since this transport will be orphaned (won't have a transportMonitor) 1389 // we need to launch a goroutine to keep track of clientConn.Close() 1390 // happening since it might not be noticed by any other goroutine for a while. 1391 go func() { 1392 <-done 1393 cleanup() 1394 }() 1395 case <-t.Error(): 1396 // In case this is triggered because clientConn.Close() 1397 // was called, we want to immeditately close the transport 1398 // since no other goroutine might notice it for a while. 1399 t.Close() 1400 case <-cdeadline: 1401 ac.mu.Lock() 1402 // This implies that client received server preface. 1403 if ac.backoffDeadline.IsZero() { 1404 ac.mu.Unlock() 1405 continue 1406 } 1407 ac.mu.Unlock() 1408 timer = nil 1409 // No server preface received until deadline. 1410 // Kill the connection. 1411 grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") 1412 t.Close() 1413 } 1414 if timer != nil { 1415 timer.Stop() 1416 } 1417 // If a GoAway happened, regardless of error, adjust our keepalive 1418 // parameters as appropriate. 1419 select { 1420 case <-t.GoAway(): 1421 ac.adjustParams(t.GetGoAwayReason()) 1422 default: 1423 } 1424 ac.mu.Lock() 1425 if ac.state == connectivity.Shutdown { 1426 ac.mu.Unlock() 1427 return 1428 } 1429 // Set connectivity state to TransientFailure before calling 1430 // resetTransport. Transition READY->CONNECTING is not valid. 1431 ac.state = connectivity.TransientFailure 1432 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1433 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1434 ac.curAddr = resolver.Address{} 1435 ac.mu.Unlock() 1436 if err := ac.resetTransport(); err != nil { 1437 ac.mu.Lock() 1438 ac.printf("transport exiting: %v", err) 1439 ac.mu.Unlock() 1440 grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) 1441 if err != errConnClosing { 1442 // Keep this ac in cc.conns, to get the reason it's torn down. 1443 ac.tearDown(err) 1444 } 1445 return 1446 } 1447 } 1448} 1449 1450// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or 1451// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. 1452func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { 1453 for { 1454 ac.mu.Lock() 1455 switch { 1456 case ac.state == connectivity.Shutdown: 1457 if failfast || !hasBalancer { 1458 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. 1459 err := ac.tearDownErr 1460 ac.mu.Unlock() 1461 return nil, err 1462 } 1463 ac.mu.Unlock() 1464 return nil, errConnClosing 1465 case ac.state == connectivity.Ready: 1466 ct := ac.transport 1467 ac.mu.Unlock() 1468 return ct, nil 1469 case ac.state == connectivity.TransientFailure: 1470 if failfast || hasBalancer { 1471 ac.mu.Unlock() 1472 return nil, errConnUnavailable 1473 } 1474 } 1475 ready := ac.ready 1476 if ready == nil { 1477 ready = make(chan struct{}) 1478 ac.ready = ready 1479 } 1480 ac.mu.Unlock() 1481 select { 1482 case <-ctx.Done(): 1483 return nil, toRPCErr(ctx.Err()) 1484 // Wait until the new transport is ready or failed. 1485 case <-ready: 1486 } 1487 } 1488} 1489 1490// getReadyTransport returns the transport if ac's state is READY. 1491// Otherwise it returns nil, false. 1492// If ac's state is IDLE, it will trigger ac to connect. 1493func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1494 ac.mu.Lock() 1495 if ac.state == connectivity.Ready { 1496 t := ac.transport 1497 ac.mu.Unlock() 1498 return t, true 1499 } 1500 var idle bool 1501 if ac.state == connectivity.Idle { 1502 idle = true 1503 } 1504 ac.mu.Unlock() 1505 // Trigger idle ac to connect. 1506 if idle { 1507 ac.connect() 1508 } 1509 return nil, false 1510} 1511 1512// tearDown starts to tear down the addrConn. 1513// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1514// some edge cases (e.g., the caller opens and closes many addrConn's in a 1515// tight loop. 1516// tearDown doesn't remove ac from ac.cc.conns. 1517func (ac *addrConn) tearDown(err error) { 1518 ac.cancel() 1519 ac.mu.Lock() 1520 defer ac.mu.Unlock() 1521 if ac.state == connectivity.Shutdown { 1522 return 1523 } 1524 ac.curAddr = resolver.Address{} 1525 if err == errConnDrain && ac.transport != nil { 1526 // GracefulClose(...) may be executed multiple times when 1527 // i) receiving multiple GoAway frames from the server; or 1528 // ii) there are concurrent name resolver/Balancer triggered 1529 // address removal and GoAway. 1530 ac.transport.GracefulClose() 1531 } 1532 ac.state = connectivity.Shutdown 1533 ac.tearDownErr = err 1534 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 1535 if ac.events != nil { 1536 ac.events.Finish() 1537 ac.events = nil 1538 } 1539 if ac.ready != nil { 1540 close(ac.ready) 1541 ac.ready = nil 1542 } 1543 if channelz.IsOn() { 1544 channelz.RemoveEntry(ac.channelzID) 1545 } 1546} 1547 1548func (ac *addrConn) getState() connectivity.State { 1549 ac.mu.Lock() 1550 defer ac.mu.Unlock() 1551 return ac.state 1552} 1553 1554func (ac *addrConn) getCurAddr() (ret resolver.Address) { 1555 ac.mu.Lock() 1556 ret = ac.curAddr 1557 ac.mu.Unlock() 1558 return 1559} 1560 1561func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1562 ac.mu.Lock() 1563 addr := ac.curAddr.Addr 1564 ac.mu.Unlock() 1565 state := ac.getState() 1566 ac.czmu.RLock() 1567 defer ac.czmu.RUnlock() 1568 return &channelz.ChannelInternalMetric{ 1569 State: state, 1570 Target: addr, 1571 CallsStarted: ac.callsStarted, 1572 CallsSucceeded: ac.callsSucceeded, 1573 CallsFailed: ac.callsFailed, 1574 LastCallStartedTimestamp: ac.lastCallStartedTime, 1575 } 1576} 1577 1578func (ac *addrConn) incrCallsStarted() { 1579 ac.czmu.Lock() 1580 ac.callsStarted++ 1581 ac.lastCallStartedTime = time.Now() 1582 ac.czmu.Unlock() 1583} 1584 1585func (ac *addrConn) incrCallsSucceeded() { 1586 ac.czmu.Lock() 1587 ac.callsSucceeded++ 1588 ac.czmu.Unlock() 1589} 1590 1591func (ac *addrConn) incrCallsFailed() { 1592 ac.czmu.Lock() 1593 ac.callsFailed++ 1594 ac.czmu.Unlock() 1595} 1596 1597// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1598// underlying connections within the specified timeout. 1599// 1600// Deprecated: This error is never returned by grpc and should not be 1601// referenced by users. 1602var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1603