• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.CONNECTING;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.READY;
25 import static io.grpc.ConnectivityState.SHUTDOWN;
26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Objects;
31 import com.google.protobuf.util.Durations;
32 import io.grpc.Attributes;
33 import io.grpc.ConnectivityState;
34 import io.grpc.ConnectivityStateInfo;
35 import io.grpc.EquivalentAddressGroup;
36 import io.grpc.InternalLogId;
37 import io.grpc.LoadBalancer.Helper;
38 import io.grpc.LoadBalancer.PickResult;
39 import io.grpc.LoadBalancer.PickSubchannelArgs;
40 import io.grpc.LoadBalancer.Subchannel;
41 import io.grpc.LoadBalancer.SubchannelPicker;
42 import io.grpc.ManagedChannel;
43 import io.grpc.Metadata;
44 import io.grpc.Status;
45 import io.grpc.internal.BackoffPolicy;
46 import io.grpc.internal.GrpcAttributes;
47 import io.grpc.internal.TimeProvider;
48 import io.grpc.lb.v1.ClientStats;
49 import io.grpc.lb.v1.InitialLoadBalanceRequest;
50 import io.grpc.lb.v1.InitialLoadBalanceResponse;
51 import io.grpc.lb.v1.LoadBalanceRequest;
52 import io.grpc.lb.v1.LoadBalanceResponse;
53 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase;
54 import io.grpc.lb.v1.LoadBalancerGrpc;
55 import io.grpc.lb.v1.Server;
56 import io.grpc.lb.v1.ServerList;
57 import io.grpc.stub.StreamObserver;
58 import java.net.InetAddress;
59 import java.net.InetSocketAddress;
60 import java.net.SocketAddress;
61 import java.net.UnknownHostException;
62 import java.util.ArrayList;
63 import java.util.Arrays;
64 import java.util.Collections;
65 import java.util.HashMap;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Map.Entry;
69 import java.util.concurrent.ScheduledExecutorService;
70 import java.util.concurrent.ScheduledFuture;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.atomic.AtomicReference;
73 import java.util.logging.Level;
74 import java.util.logging.Logger;
75 import javax.annotation.Nullable;
76 import javax.annotation.concurrent.NotThreadSafe;
77 
78 /**
79  * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}.  Created when
80  * GrpclbLoadBalancer switches to GRPCLB mode.  Closed and discarded when GrpclbLoadBalancer
81  * switches away from GRPCLB mode.
82  */
83 @NotThreadSafe
84 final class GrpclbState {
85   private static final Logger logger = Logger.getLogger(GrpclbState.class.getName());
86 
87   static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
88   private static final Attributes LB_PROVIDED_BACKEND_ATTRS =
89       Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_PROVIDED_BACKEND, true).build();
90 
91   @VisibleForTesting
92   static final PickResult DROP_PICK_RESULT =
93       PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
94 
95   @VisibleForTesting
96   static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
97       @Override
98       public PickResult picked(Metadata headers) {
99         return PickResult.withNoResult();
100       }
101 
102       @Override
103       public String toString() {
104         return "BUFFER_ENTRY";
105       }
106     };
107 
108   private final InternalLogId logId;
109   private final String serviceName;
110   private final Helper helper;
111   private final SubchannelPool subchannelPool;
112   private final TimeProvider time;
113   private final ScheduledExecutorService timerService;
114 
115   private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
116       Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
117   private final BackoffPolicy.Provider backoffPolicyProvider;
118 
119   // Scheduled only once.  Never reset.
120   @Nullable
121   private FallbackModeTask fallbackTimer;
122   private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
123   private boolean usingFallbackBackends;
124   // True if the current balancer has returned a serverlist.  Will be reset to false when lost
125   // connection to a balancer.
126   private boolean balancerWorking;
127   @Nullable
128   private BackoffPolicy lbRpcRetryPolicy;
129   @Nullable
130   private LbRpcRetryTask lbRpcRetryTimer;
131   private long prevLbRpcStartNanos;
132 
133   @Nullable
134   private ManagedChannel lbCommChannel;
135 
136   @Nullable
137   private LbStream lbStream;
138   private Map<EquivalentAddressGroup, Subchannel> subchannels = Collections.emptyMap();
139 
140   // Has the same size as the round-robin list from the balancer.
141   // A drop entry from the round-robin list becomes a DropEntry here.
142   // A backend entry from the robin-robin list becomes a null here.
143   private List<DropEntry> dropList = Collections.emptyList();
144   // Contains only non-drop, i.e., backends from the round-robin list from the balancer.
145   private List<BackendEntry> backendList = Collections.emptyList();
146   private RoundRobinPicker currentPicker =
147       new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
148 
GrpclbState( Helper helper, SubchannelPool subchannelPool, TimeProvider time, ScheduledExecutorService timerService, BackoffPolicy.Provider backoffPolicyProvider, InternalLogId logId)149   GrpclbState(
150       Helper helper,
151       SubchannelPool subchannelPool,
152       TimeProvider time,
153       ScheduledExecutorService timerService,
154       BackoffPolicy.Provider backoffPolicyProvider,
155       InternalLogId logId) {
156     this.helper = checkNotNull(helper, "helper");
157     this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
158     this.time = checkNotNull(time, "time provider");
159     this.timerService = checkNotNull(timerService, "timerService");
160     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
161     this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
162     this.logId = checkNotNull(logId, "logId");
163   }
164 
handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)165   void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
166     if (newState.getState() == SHUTDOWN || !(subchannels.values().contains(subchannel))) {
167       return;
168     }
169     if (newState.getState() == IDLE) {
170       subchannel.requestConnection();
171     }
172     subchannel.getAttributes().get(STATE_INFO).set(newState);
173     maybeUseFallbackBackends();
174     maybeUpdatePicker();
175   }
176 
177   /**
178    * Handle new addresses of the balancer and backends from the resolver, and create connection if
179    * not yet connected.
180    */
handleAddresses( List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)181   void handleAddresses(
182       List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
183     if (newLbAddressGroups.isEmpty()) {
184       propagateError(Status.UNAVAILABLE.withDescription(
185               "NameResolver returned no LB address while asking for GRPCLB"));
186       return;
187     }
188     LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
189     startLbComm(newLbAddressGroup);
190     // Avoid creating a new RPC just because the addresses were updated, as it can cause a
191     // stampeding herd. The current RPC may be on a connection to an address not present in
192     // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
193     // outdated backend, we could choose to re-create the RPC.
194     if (lbStream == null) {
195       startLbRpc();
196     }
197     fallbackBackendList = newBackendServers;
198     // Start the fallback timer if it's never started
199     if (fallbackTimer == null) {
200       logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
201       fallbackTimer = new FallbackModeTask();
202       fallbackTimer.schedule();
203     }
204     if (usingFallbackBackends) {
205       // Populate the new fallback backends to round-robin list.
206       useFallbackBackends();
207     }
208     maybeUpdatePicker();
209   }
210 
maybeUseFallbackBackends()211   private void maybeUseFallbackBackends() {
212     if (balancerWorking) {
213       return;
214     }
215     if (usingFallbackBackends) {
216       return;
217     }
218     int numReadySubchannels = 0;
219     for (Subchannel subchannel : subchannels.values()) {
220       if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
221         numReadySubchannels++;
222       }
223     }
224     if (numReadySubchannels > 0) {
225       return;
226     }
227     // Fallback contiditions met
228     useFallbackBackends();
229   }
230 
231   /**
232    * Populate the round-robin lists with the fallback backends.
233    */
useFallbackBackends()234   private void useFallbackBackends() {
235     usingFallbackBackends = true;
236     logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList});
237 
238     List<DropEntry> newDropList = new ArrayList<>();
239     List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
240     for (EquivalentAddressGroup eag : fallbackBackendList) {
241       newDropList.add(null);
242       newBackendAddrList.add(new BackendAddressGroup(eag, null));
243     }
244     useRoundRobinLists(newDropList, newBackendAddrList, null);
245   }
246 
shutdownLbComm()247   private void shutdownLbComm() {
248     if (lbCommChannel != null) {
249       lbCommChannel.shutdown();
250       lbCommChannel = null;
251     }
252     shutdownLbRpc();
253   }
254 
shutdownLbRpc()255   private void shutdownLbRpc() {
256     if (lbStream != null) {
257       lbStream.close(null);
258       // lbStream will be set to null in LbStream.cleanup()
259     }
260   }
261 
startLbComm(LbAddressGroup lbAddressGroup)262   private void startLbComm(LbAddressGroup lbAddressGroup) {
263     checkNotNull(lbAddressGroup, "lbAddressGroup");
264     if (lbCommChannel == null) {
265       lbCommChannel = helper.createOobChannel(
266           lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
267     } else if (lbAddressGroup.getAuthority().equals(lbCommChannel.authority())) {
268       helper.updateOobChannelAddresses(lbCommChannel, lbAddressGroup.getAddresses());
269     } else {
270       // Full restart of channel
271       shutdownLbComm();
272       lbCommChannel = helper.createOobChannel(
273           lbAddressGroup.getAddresses(), lbAddressGroup.getAuthority());
274     }
275   }
276 
startLbRpc()277   private void startLbRpc() {
278     checkState(lbStream == null, "previous lbStream has not been cleared yet");
279     LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
280     lbStream = new LbStream(stub);
281     lbStream.start();
282     prevLbRpcStartNanos = time.currentTimeNanos();
283 
284     LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
285         .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
286             .setName(serviceName).build())
287         .build();
288     try {
289       lbStream.lbRequestWriter.onNext(initRequest);
290     } catch (Exception e) {
291       lbStream.close(e);
292     }
293   }
294 
cancelFallbackTimer()295   private void cancelFallbackTimer() {
296     if (fallbackTimer != null) {
297       fallbackTimer.cancel();
298     }
299   }
300 
cancelLbRpcRetryTimer()301   private void cancelLbRpcRetryTimer() {
302     if (lbRpcRetryTimer != null) {
303       lbRpcRetryTimer.cancel();
304     }
305   }
306 
shutdown()307   void shutdown() {
308     shutdownLbComm();
309     // We close the subchannels through subchannelPool instead of helper just for convenience of
310     // testing.
311     for (Subchannel subchannel : subchannels.values()) {
312       subchannelPool.returnSubchannel(subchannel);
313     }
314     subchannels = Collections.emptyMap();
315     subchannelPool.clear();
316     cancelFallbackTimer();
317     cancelLbRpcRetryTimer();
318   }
319 
propagateError(Status status)320   void propagateError(Status status) {
321     logger.log(Level.FINE, "[{0}] Had an error: {1}; dropList={2}; backendList={3}",
322         new Object[] {logId, status, dropList, backendList});
323     if (backendList.isEmpty()) {
324       maybeUpdatePicker(
325           TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status))));
326     }
327   }
328 
329   @VisibleForTesting
330   @Nullable
getLoadRecorder()331   GrpclbClientLoadRecorder getLoadRecorder() {
332     if (lbStream == null) {
333       return null;
334     }
335     return lbStream.loadRecorder;
336   }
337 
338   /**
339    * Populate the round-robin lists with the given values.
340    */
useRoundRobinLists( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)341   private void useRoundRobinLists(
342       List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
343       @Nullable GrpclbClientLoadRecorder loadRecorder) {
344     logger.log(Level.FINE, "[{0}] Using round-robin list: {1}, droplist={2}",
345          new Object[] {logId, newBackendAddrList, newDropList});
346     HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
347         new HashMap<EquivalentAddressGroup, Subchannel>();
348     List<BackendEntry> newBackendList = new ArrayList<>();
349 
350     for (BackendAddressGroup backendAddr : newBackendAddrList) {
351       EquivalentAddressGroup eag = backendAddr.getAddresses();
352       Subchannel subchannel = newSubchannelMap.get(eag);
353       if (subchannel == null) {
354         subchannel = subchannels.get(eag);
355         if (subchannel == null) {
356           Attributes subchannelAttrs = Attributes.newBuilder()
357               .set(STATE_INFO,
358                   new AtomicReference<ConnectivityStateInfo>(
359                       ConnectivityStateInfo.forNonError(IDLE)))
360               .build();
361           subchannel = subchannelPool.takeOrCreateSubchannel(eag, subchannelAttrs);
362           subchannel.requestConnection();
363         }
364         newSubchannelMap.put(eag, subchannel);
365       }
366       BackendEntry entry;
367       // Only picks with tokens are reported to LoadRecorder
368       if (backendAddr.getToken() == null) {
369         entry = new BackendEntry(subchannel);
370       } else {
371         entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
372       }
373       newBackendList.add(entry);
374     }
375 
376     // Close Subchannels whose addresses have been delisted
377     for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
378       EquivalentAddressGroup eag = entry.getKey();
379       if (!newSubchannelMap.containsKey(eag)) {
380         subchannelPool.returnSubchannel(entry.getValue());
381       }
382     }
383 
384     subchannels = Collections.unmodifiableMap(newSubchannelMap);
385     dropList = Collections.unmodifiableList(newDropList);
386     backendList = Collections.unmodifiableList(newBackendList);
387   }
388 
389   @VisibleForTesting
390   class FallbackModeTask implements Runnable {
391     private ScheduledFuture<?> scheduledFuture;
392 
393     @Override
run()394     public void run() {
395       helper.runSerialized(new Runnable() {
396           @Override
397           public void run() {
398             checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
399             maybeUseFallbackBackends();
400             maybeUpdatePicker();
401           }
402         });
403     }
404 
cancel()405     void cancel() {
406       scheduledFuture.cancel(false);
407     }
408 
schedule()409     void schedule() {
410       checkState(scheduledFuture == null, "FallbackModeTask already scheduled");
411       scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
412     }
413   }
414 
415   @VisibleForTesting
416   class LbRpcRetryTask implements Runnable {
417     private ScheduledFuture<?> scheduledFuture;
418 
419     @Override
run()420     public void run() {
421       helper.runSerialized(new Runnable() {
422           @Override
423           public void run() {
424             checkState(
425                 lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch");
426             startLbRpc();
427           }
428         });
429     }
430 
cancel()431     void cancel() {
432       scheduledFuture.cancel(false);
433     }
434 
schedule(long delayNanos)435     void schedule(long delayNanos) {
436       checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled");
437       scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS);
438     }
439   }
440 
441   @VisibleForTesting
442   class LoadReportingTask implements Runnable {
443     private final LbStream stream;
444 
LoadReportingTask(LbStream stream)445     LoadReportingTask(LbStream stream) {
446       this.stream = stream;
447     }
448 
449     @Override
run()450     public void run() {
451       helper.runSerialized(new Runnable() {
452           @Override
453           public void run() {
454             stream.loadReportFuture = null;
455             stream.sendLoadReport();
456           }
457         });
458     }
459   }
460 
461   private class LbStream implements StreamObserver<LoadBalanceResponse> {
462     final GrpclbClientLoadRecorder loadRecorder;
463     final LoadBalancerGrpc.LoadBalancerStub stub;
464     StreamObserver<LoadBalanceRequest> lbRequestWriter;
465 
466     // These fields are only accessed from helper.runSerialized()
467     boolean initialResponseReceived;
468     boolean closed;
469     long loadReportIntervalMillis = -1;
470     ScheduledFuture<?> loadReportFuture;
471 
LbStream(LoadBalancerGrpc.LoadBalancerStub stub)472     LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
473       this.stub = checkNotNull(stub, "stub");
474       // Stats data only valid for current LbStream.  We do not carry over data from previous
475       // stream.
476       loadRecorder = new GrpclbClientLoadRecorder(time);
477     }
478 
start()479     void start() {
480       lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
481     }
482 
onNext(final LoadBalanceResponse response)483     @Override public void onNext(final LoadBalanceResponse response) {
484       helper.runSerialized(new Runnable() {
485           @Override
486           public void run() {
487             handleResponse(response);
488           }
489         });
490     }
491 
onError(final Throwable error)492     @Override public void onError(final Throwable error) {
493       helper.runSerialized(new Runnable() {
494           @Override
495           public void run() {
496             handleStreamClosed(Status.fromThrowable(error)
497                 .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
498           }
499         });
500     }
501 
onCompleted()502     @Override public void onCompleted() {
503       helper.runSerialized(new Runnable() {
504           @Override
505           public void run() {
506             handleStreamClosed(
507                 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed"));
508           }
509         });
510     }
511 
512     // Following methods must be run in helper.runSerialized()
513 
sendLoadReport()514     private void sendLoadReport() {
515       if (closed) {
516         return;
517       }
518       ClientStats stats = loadRecorder.generateLoadReport();
519       // TODO(zhangkun83): flow control?
520       try {
521         lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build());
522         scheduleNextLoadReport();
523       } catch (Exception e) {
524         close(e);
525       }
526     }
527 
scheduleNextLoadReport()528     private void scheduleNextLoadReport() {
529       if (loadReportIntervalMillis > 0) {
530         loadReportFuture = timerService.schedule(
531             new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS);
532       }
533     }
534 
handleResponse(LoadBalanceResponse response)535     private void handleResponse(LoadBalanceResponse response) {
536       if (closed) {
537         return;
538       }
539       logger.log(Level.FINER, "[{0}] Got an LB response: {1}", new Object[] {logId, response});
540 
541       LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase();
542       if (!initialResponseReceived) {
543         if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) {
544           logger.log(
545               Level.WARNING,
546               "[{0}] : Did not receive response with type initial response: {1}",
547               new Object[] {logId, response});
548           return;
549         }
550         initialResponseReceived = true;
551         InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
552         loadReportIntervalMillis =
553             Durations.toMillis(initialResponse.getClientStatsReportInterval());
554         scheduleNextLoadReport();
555         return;
556       }
557 
558       if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
559         logger.log(
560             Level.WARNING,
561             "[{0}] : Ignoring unexpected response type: {1}",
562             new Object[] {logId, response});
563         return;
564       }
565 
566       balancerWorking = true;
567       // TODO(zhangkun83): handle delegate from initialResponse
568       ServerList serverList = response.getServerList();
569       List<DropEntry> newDropList = new ArrayList<>();
570       List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
571       // Construct the new collections. Create new Subchannels when necessary.
572       for (Server server : serverList.getServersList()) {
573         String token = server.getLoadBalanceToken();
574         if (server.getDrop()) {
575           newDropList.add(new DropEntry(loadRecorder, token));
576         } else {
577           newDropList.add(null);
578           InetSocketAddress address;
579           try {
580             address = new InetSocketAddress(
581                 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
582           } catch (UnknownHostException e) {
583             propagateError(
584                 Status.UNAVAILABLE
585                     .withDescription("Host for server not found: " + server)
586                     .withCause(e));
587             continue;
588           }
589           // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of
590           // TLS, with Netty.
591           EquivalentAddressGroup eag =
592               new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS);
593           newBackendAddrList.add(new BackendAddressGroup(eag, token));
594         }
595       }
596       // Stop using fallback backends as soon as a new server list is received from the balancer.
597       usingFallbackBackends = false;
598       cancelFallbackTimer();
599       useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
600       maybeUpdatePicker();
601     }
602 
handleStreamClosed(Status error)603     private void handleStreamClosed(Status error) {
604       checkArgument(!error.isOk(), "unexpected OK status");
605       if (closed) {
606         return;
607       }
608       closed = true;
609       cleanUp();
610       propagateError(error);
611       balancerWorking = false;
612       maybeUseFallbackBackends();
613       maybeUpdatePicker();
614 
615       long delayNanos = 0;
616       if (initialResponseReceived || lbRpcRetryPolicy == null) {
617         // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
618         // has never been initialized.
619         lbRpcRetryPolicy = backoffPolicyProvider.get();
620       }
621       // Backoff only when balancer wasn't working previously.
622       if (!initialResponseReceived) {
623         // The back-off policy determines the interval between consecutive RPC upstarts, thus the
624         // actual delay may be smaller than the value from the back-off policy, or even negative,
625         // depending how much time was spent in the previous RPC.
626         delayNanos =
627             prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos();
628       }
629       if (delayNanos <= 0) {
630         startLbRpc();
631       } else {
632         lbRpcRetryTimer = new LbRpcRetryTask();
633         lbRpcRetryTimer.schedule(delayNanos);
634       }
635     }
636 
close(@ullable Exception error)637     void close(@Nullable Exception error) {
638       if (closed) {
639         return;
640       }
641       closed = true;
642       cleanUp();
643       try {
644         if (error == null) {
645           lbRequestWriter.onCompleted();
646         } else {
647           lbRequestWriter.onError(error);
648         }
649       } catch (Exception e) {
650         // Don't care
651       }
652     }
653 
cleanUp()654     private void cleanUp() {
655       if (loadReportFuture != null) {
656         loadReportFuture.cancel(false);
657         loadReportFuture = null;
658       }
659       if (lbStream == this) {
660         lbStream = null;
661       }
662     }
663   }
664 
665   /**
666    * Make and use a picker out of the current lists and the states of subchannels if they have
667    * changed since the last picker created.
668    */
maybeUpdatePicker()669   private void maybeUpdatePicker() {
670     List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size());
671     Status error = null;
672     boolean hasIdle = false;
673     for (BackendEntry entry : backendList) {
674       Subchannel subchannel = entry.result.getSubchannel();
675       Attributes attrs = subchannel.getAttributes();
676       ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
677       if (stateInfo.getState() == READY) {
678         pickList.add(entry);
679       } else if (stateInfo.getState() == TRANSIENT_FAILURE) {
680         error = stateInfo.getStatus();
681       } else if (stateInfo.getState() == IDLE) {
682         hasIdle = true;
683       }
684     }
685     ConnectivityState state;
686     if (pickList.isEmpty()) {
687       if (error != null && !hasIdle) {
688         logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
689             new Object[] {logId, error});
690         pickList.add(new ErrorEntry(error));
691         state = TRANSIENT_FAILURE;
692       } else {
693         logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId);
694         pickList.add(BUFFER_ENTRY);
695         state = CONNECTING;
696       }
697     } else {
698       logger.log(
699           Level.FINE, "[{0}] Using drop list {1} and pick list {2}",
700           new Object[] {logId, dropList, pickList});
701       state = READY;
702     }
703     maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
704   }
705 
706   /**
707    * Update the given picker to the helper if it's different from the current one.
708    */
maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)709   private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
710     // Discard the new picker if we are sure it won't make any difference, in order to save
711     // re-processing pending streams, and avoid unnecessary resetting of the pointer in
712     // RoundRobinPicker.
713     if (picker.dropList.equals(currentPicker.dropList)
714         && picker.pickList.equals(currentPicker.pickList)) {
715       return;
716     }
717     // No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
718     // stream thus no time is wasted in re-process.
719     currentPicker = picker;
720     helper.updateBalancingState(state, picker);
721   }
722 
flattenLbAddressGroups(List<LbAddressGroup> groupList)723   private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) {
724     assert !groupList.isEmpty();
725     List<EquivalentAddressGroup> eags = new ArrayList<>(groupList.size());
726     String authority = groupList.get(0).getAuthority();
727     for (LbAddressGroup group : groupList) {
728       if (!authority.equals(group.getAuthority())) {
729         // TODO(ejona): Allow different authorities for different addresses. Requires support from
730         // Helper.
731         logger.log(Level.WARNING,
732             "[{0}] Multiple authorities found for LB. "
733             + "Skipping addresses for {0} in preference to {1}",
734             new Object[] {logId, group.getAuthority(), authority});
735       } else {
736         eags.add(group.getAddresses());
737       }
738     }
739     // ALTS code can use the presence of ATTR_LB_ADDR_AUTHORITY to select ALTS instead of TLS, with
740     // Netty.
741     // TODO(ejona): The process here is a bit of a hack because ATTR_LB_ADDR_AUTHORITY isn't
742     // actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow
743     // this to be more obvious.
744     Attributes attrs = Attributes.newBuilder()
745         .set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
746         .build();
747     return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority);
748   }
749 
750   /**
751    * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
752    */
flattenEquivalentAddressGroup( List<EquivalentAddressGroup> groupList, Attributes attrs)753   private static EquivalentAddressGroup flattenEquivalentAddressGroup(
754       List<EquivalentAddressGroup> groupList, Attributes attrs) {
755     List<SocketAddress> addrs = new ArrayList<>();
756     for (EquivalentAddressGroup group : groupList) {
757       addrs.addAll(group.getAddresses());
758     }
759     return new EquivalentAddressGroup(addrs, attrs);
760   }
761 
762   @VisibleForTesting
763   static final class DropEntry {
764     private final GrpclbClientLoadRecorder loadRecorder;
765     private final String token;
766 
DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)767     DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
768       this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
769       this.token = checkNotNull(token, "token");
770     }
771 
picked()772     PickResult picked() {
773       loadRecorder.recordDroppedRequest(token);
774       return DROP_PICK_RESULT;
775     }
776 
777     @Override
toString()778     public String toString() {
779       return MoreObjects.toStringHelper(this)
780           .add("loadRecorder", loadRecorder)
781           .add("token", token)
782           .toString();
783     }
784 
785     @Override
hashCode()786     public int hashCode() {
787       return Objects.hashCode(loadRecorder, token);
788     }
789 
790     @Override
equals(Object other)791     public boolean equals(Object other) {
792       if (!(other instanceof DropEntry)) {
793         return false;
794       }
795       DropEntry that = (DropEntry) other;
796       return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
797     }
798   }
799 
800   private interface RoundRobinEntry {
picked(Metadata headers)801     PickResult picked(Metadata headers);
802   }
803 
804   @VisibleForTesting
805   static final class BackendEntry implements RoundRobinEntry {
806     @VisibleForTesting
807     final PickResult result;
808     @Nullable
809     private final GrpclbClientLoadRecorder loadRecorder;
810     @Nullable
811     private final String token;
812 
813     /**
814      * Creates a BackendEntry whose usage will be reported to load recorder.
815      */
BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)816     BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
817       this.result = PickResult.withSubchannel(subchannel, loadRecorder);
818       this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
819       this.token = checkNotNull(token, "token");
820     }
821 
822     /**
823      * Creates a BackendEntry whose usage will not be reported.
824      */
BackendEntry(Subchannel subchannel)825     BackendEntry(Subchannel subchannel) {
826       this.result = PickResult.withSubchannel(subchannel);
827       this.loadRecorder = null;
828       this.token = null;
829     }
830 
831     @Override
picked(Metadata headers)832     public PickResult picked(Metadata headers) {
833       headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
834       if (token != null) {
835         headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
836       }
837       return result;
838     }
839 
840     @Override
toString()841     public String toString() {
842       return MoreObjects.toStringHelper(this)
843           .add("result", result)
844           .add("loadRecorder", loadRecorder)
845           .add("token", token)
846           .toString();
847     }
848 
849     @Override
hashCode()850     public int hashCode() {
851       return Objects.hashCode(loadRecorder, result, token);
852     }
853 
854     @Override
equals(Object other)855     public boolean equals(Object other) {
856       if (!(other instanceof BackendEntry)) {
857         return false;
858       }
859       BackendEntry that = (BackendEntry) other;
860       return Objects.equal(result, that.result) && Objects.equal(token, that.token)
861           && Objects.equal(loadRecorder, that.loadRecorder);
862     }
863   }
864 
865   @VisibleForTesting
866   static final class ErrorEntry implements RoundRobinEntry {
867     final PickResult result;
868 
ErrorEntry(Status status)869     ErrorEntry(Status status) {
870       result = PickResult.withError(status);
871     }
872 
873     @Override
picked(Metadata headers)874     public PickResult picked(Metadata headers) {
875       return result;
876     }
877 
878     @Override
hashCode()879     public int hashCode() {
880       return Objects.hashCode(result);
881     }
882 
883     @Override
equals(Object other)884     public boolean equals(Object other) {
885       if (!(other instanceof ErrorEntry)) {
886         return false;
887       }
888       return Objects.equal(result, ((ErrorEntry) other).result);
889     }
890 
891     @Override
toString()892     public String toString() {
893       return MoreObjects.toStringHelper(this)
894           .add("result", result)
895           .toString();
896     }
897   }
898 
899   @VisibleForTesting
900   static final class RoundRobinPicker extends SubchannelPicker {
901     @VisibleForTesting
902     final List<DropEntry> dropList;
903     private int dropIndex;
904 
905     @VisibleForTesting
906     final List<? extends RoundRobinEntry> pickList;
907     private int pickIndex;
908 
909     // dropList can be empty, which means no drop.
910     // pickList must not be empty.
RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)911     RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
912       this.dropList = checkNotNull(dropList, "dropList");
913       this.pickList = checkNotNull(pickList, "pickList");
914       checkArgument(!pickList.isEmpty(), "pickList is empty");
915     }
916 
917     @Override
pickSubchannel(PickSubchannelArgs args)918     public PickResult pickSubchannel(PickSubchannelArgs args) {
919       synchronized (pickList) {
920         // Two-level round-robin.
921         // First round-robin on dropList. If a drop entry is selected, request will be dropped.  If
922         // a non-drop entry is selected, then round-robin on pickList.  This makes sure requests are
923         // dropped at the same proportion as the drop entries appear on the round-robin list from
924         // the balancer, while only READY backends (that make up pickList) are selected for the
925         // non-drop cases.
926         if (!dropList.isEmpty()) {
927           DropEntry drop = dropList.get(dropIndex);
928           dropIndex++;
929           if (dropIndex == dropList.size()) {
930             dropIndex = 0;
931           }
932           if (drop != null) {
933             return drop.picked();
934           }
935         }
936 
937         RoundRobinEntry pick = pickList.get(pickIndex);
938         pickIndex++;
939         if (pickIndex == pickList.size()) {
940           pickIndex = 0;
941         }
942         return pick.picked(args.getHeaders());
943       }
944     }
945   }
946 }
947