1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "sync" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/connectivity" 27 "google.golang.org/grpc/grpclog" 28 "google.golang.org/grpc/resolver" 29) 30 31// scStateUpdate contains the subConn and the new state it changed to. 32type scStateUpdate struct { 33 sc balancer.SubConn 34 state connectivity.State 35} 36 37// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple. 38// TODO make a general purpose buffer that uses interface{}. 39type scStateUpdateBuffer struct { 40 c chan *scStateUpdate 41 mu sync.Mutex 42 backlog []*scStateUpdate 43} 44 45func newSCStateUpdateBuffer() *scStateUpdateBuffer { 46 return &scStateUpdateBuffer{ 47 c: make(chan *scStateUpdate, 1), 48 } 49} 50 51func (b *scStateUpdateBuffer) put(t *scStateUpdate) { 52 b.mu.Lock() 53 defer b.mu.Unlock() 54 if len(b.backlog) == 0 { 55 select { 56 case b.c <- t: 57 return 58 default: 59 } 60 } 61 b.backlog = append(b.backlog, t) 62} 63 64func (b *scStateUpdateBuffer) load() { 65 b.mu.Lock() 66 defer b.mu.Unlock() 67 if len(b.backlog) > 0 { 68 select { 69 case b.c <- b.backlog[0]: 70 b.backlog[0] = nil 71 b.backlog = b.backlog[1:] 72 default: 73 } 74 } 75} 76 77// get returns the channel that the scStateUpdate will be sent to. 78// 79// Upon receiving, the caller should call load to send another 80// scStateChangeTuple onto the channel if there is any. 81func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate { 82 return b.c 83} 84 85// resolverUpdate contains the new resolved addresses or error if there's 86// any. 87type resolverUpdate struct { 88 addrs []resolver.Address 89 err error 90} 91 92// ccBalancerWrapper is a wrapper on top of cc for balancers. 93// It implements balancer.ClientConn interface. 94type ccBalancerWrapper struct { 95 cc *ClientConn 96 balancer balancer.Balancer 97 stateChangeQueue *scStateUpdateBuffer 98 resolverUpdateCh chan *resolverUpdate 99 done chan struct{} 100 101 mu sync.Mutex 102 subConns map[*acBalancerWrapper]struct{} 103} 104 105func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { 106 ccb := &ccBalancerWrapper{ 107 cc: cc, 108 stateChangeQueue: newSCStateUpdateBuffer(), 109 resolverUpdateCh: make(chan *resolverUpdate, 1), 110 done: make(chan struct{}), 111 subConns: make(map[*acBalancerWrapper]struct{}), 112 } 113 go ccb.watcher() 114 ccb.balancer = b.Build(ccb, bopts) 115 return ccb 116} 117 118// watcher balancer functions sequentially, so the balancer can be implemented 119// lock-free. 120func (ccb *ccBalancerWrapper) watcher() { 121 for { 122 select { 123 case t := <-ccb.stateChangeQueue.get(): 124 ccb.stateChangeQueue.load() 125 select { 126 case <-ccb.done: 127 ccb.balancer.Close() 128 return 129 default: 130 } 131 ccb.balancer.HandleSubConnStateChange(t.sc, t.state) 132 case t := <-ccb.resolverUpdateCh: 133 select { 134 case <-ccb.done: 135 ccb.balancer.Close() 136 return 137 default: 138 } 139 ccb.balancer.HandleResolvedAddrs(t.addrs, t.err) 140 case <-ccb.done: 141 } 142 143 select { 144 case <-ccb.done: 145 ccb.balancer.Close() 146 ccb.mu.Lock() 147 scs := ccb.subConns 148 ccb.subConns = nil 149 ccb.mu.Unlock() 150 for acbw := range scs { 151 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 152 } 153 return 154 default: 155 } 156 } 157} 158 159func (ccb *ccBalancerWrapper) close() { 160 close(ccb.done) 161} 162 163func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 164 // When updating addresses for a SubConn, if the address in use is not in 165 // the new addresses, the old ac will be tearDown() and a new ac will be 166 // created. tearDown() generates a state change with Shutdown state, we 167 // don't want the balancer to receive this state change. So before 168 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and 169 // this function will be called with (nil, Shutdown). We don't need to call 170 // balancer method in this case. 171 if sc == nil { 172 return 173 } 174 ccb.stateChangeQueue.put(&scStateUpdate{ 175 sc: sc, 176 state: s, 177 }) 178} 179 180func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) { 181 select { 182 case <-ccb.resolverUpdateCh: 183 default: 184 } 185 ccb.resolverUpdateCh <- &resolverUpdate{ 186 addrs: addrs, 187 err: err, 188 } 189} 190 191func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { 192 if len(addrs) <= 0 { 193 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") 194 } 195 ccb.mu.Lock() 196 defer ccb.mu.Unlock() 197 if ccb.subConns == nil { 198 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") 199 } 200 ac, err := ccb.cc.newAddrConn(addrs) 201 if err != nil { 202 return nil, err 203 } 204 acbw := &acBalancerWrapper{ac: ac} 205 acbw.ac.mu.Lock() 206 ac.acbw = acbw 207 acbw.ac.mu.Unlock() 208 ccb.subConns[acbw] = struct{}{} 209 return acbw, nil 210} 211 212func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { 213 acbw, ok := sc.(*acBalancerWrapper) 214 if !ok { 215 return 216 } 217 ccb.mu.Lock() 218 defer ccb.mu.Unlock() 219 if ccb.subConns == nil { 220 return 221 } 222 delete(ccb.subConns, acbw) 223 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) 224} 225 226func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { 227 ccb.mu.Lock() 228 defer ccb.mu.Unlock() 229 if ccb.subConns == nil { 230 return 231 } 232 ccb.cc.csMgr.updateState(s) 233 ccb.cc.blockingpicker.updatePicker(p) 234} 235 236func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) { 237 ccb.cc.resolveNow(o) 238} 239 240func (ccb *ccBalancerWrapper) Target() string { 241 return ccb.cc.target 242} 243 244// acBalancerWrapper is a wrapper on top of ac for balancers. 245// It implements balancer.SubConn interface. 246type acBalancerWrapper struct { 247 mu sync.Mutex 248 ac *addrConn 249} 250 251func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { 252 acbw.mu.Lock() 253 defer acbw.mu.Unlock() 254 if len(addrs) <= 0 { 255 acbw.ac.tearDown(errConnDrain) 256 return 257 } 258 if !acbw.ac.tryUpdateAddrs(addrs) { 259 cc := acbw.ac.cc 260 acbw.ac.mu.Lock() 261 // Set old ac.acbw to nil so the Shutdown state update will be ignored 262 // by balancer. 263 // 264 // TODO(bar) the state transition could be wrong when tearDown() old ac 265 // and creating new ac, fix the transition. 266 acbw.ac.acbw = nil 267 acbw.ac.mu.Unlock() 268 acState := acbw.ac.getState() 269 acbw.ac.tearDown(errConnDrain) 270 271 if acState == connectivity.Shutdown { 272 return 273 } 274 275 ac, err := cc.newAddrConn(addrs) 276 if err != nil { 277 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) 278 return 279 } 280 acbw.ac = ac 281 ac.mu.Lock() 282 ac.acbw = acbw 283 ac.mu.Unlock() 284 if acState != connectivity.Idle { 285 ac.connect() 286 } 287 } 288} 289 290func (acbw *acBalancerWrapper) Connect() { 291 acbw.mu.Lock() 292 defer acbw.mu.Unlock() 293 acbw.ac.connect() 294} 295 296func (acbw *acBalancerWrapper) getAddrConn() *addrConn { 297 acbw.mu.Lock() 298 defer acbw.mu.Unlock() 299 return acbw.ac 300} 301