• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.ConnectivityState.CONNECTING;
23 import static io.grpc.ConnectivityState.IDLE;
24 import static io.grpc.ConnectivityState.READY;
25 import static io.grpc.ConnectivityState.SHUTDOWN;
26 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Stopwatch;
32 import com.google.protobuf.util.Durations;
33 import io.grpc.Attributes;
34 import io.grpc.ChannelLogger;
35 import io.grpc.ChannelLogger.ChannelLogLevel;
36 import io.grpc.ConnectivityState;
37 import io.grpc.ConnectivityStateInfo;
38 import io.grpc.Context;
39 import io.grpc.EquivalentAddressGroup;
40 import io.grpc.LoadBalancer.CreateSubchannelArgs;
41 import io.grpc.LoadBalancer.Helper;
42 import io.grpc.LoadBalancer.PickResult;
43 import io.grpc.LoadBalancer.PickSubchannelArgs;
44 import io.grpc.LoadBalancer.Subchannel;
45 import io.grpc.LoadBalancer.SubchannelPicker;
46 import io.grpc.LoadBalancer.SubchannelStateListener;
47 import io.grpc.ManagedChannel;
48 import io.grpc.Metadata;
49 import io.grpc.Status;
50 import io.grpc.SynchronizationContext;
51 import io.grpc.SynchronizationContext.ScheduledHandle;
52 import io.grpc.grpclb.SubchannelPool.PooledSubchannelStateListener;
53 import io.grpc.internal.BackoffPolicy;
54 import io.grpc.internal.TimeProvider;
55 import io.grpc.lb.v1.ClientStats;
56 import io.grpc.lb.v1.InitialLoadBalanceRequest;
57 import io.grpc.lb.v1.InitialLoadBalanceResponse;
58 import io.grpc.lb.v1.LoadBalanceRequest;
59 import io.grpc.lb.v1.LoadBalanceResponse;
60 import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase;
61 import io.grpc.lb.v1.LoadBalancerGrpc;
62 import io.grpc.lb.v1.Server;
63 import io.grpc.lb.v1.ServerList;
64 import io.grpc.stub.StreamObserver;
65 import java.net.InetAddress;
66 import java.net.InetSocketAddress;
67 import java.net.UnknownHostException;
68 import java.util.ArrayList;
69 import java.util.Arrays;
70 import java.util.Collections;
71 import java.util.HashMap;
72 import java.util.List;
73 import java.util.Map;
74 import java.util.concurrent.ScheduledExecutorService;
75 import java.util.concurrent.TimeUnit;
76 import java.util.concurrent.atomic.AtomicBoolean;
77 import java.util.concurrent.atomic.AtomicReference;
78 import javax.annotation.Nullable;
79 import javax.annotation.concurrent.NotThreadSafe;
80 
81 /**
82  * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}.  Created when
83  * GrpclbLoadBalancer switches to GRPCLB mode.  Closed and discarded when GrpclbLoadBalancer
84  * switches away from GRPCLB mode.
85  */
86 @NotThreadSafe
87 final class GrpclbState {
88   static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
89   private static final Attributes LB_PROVIDED_BACKEND_ATTRS =
90       Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
91 
92   // Temporary workaround to reduce log spam for a grpclb server that incessantly sends updates
93   // Tracked by b/198440401
94   static final boolean SHOULD_LOG_SERVER_LISTS =
95       Boolean.parseBoolean(System.getProperty("io.grpc.grpclb.LogServerLists", "true"));
96 
97   @VisibleForTesting
98   static final PickResult DROP_PICK_RESULT =
99       PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
100   @VisibleForTesting
101   static final Status NO_AVAILABLE_BACKENDS_STATUS =
102       Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends");
103   @VisibleForTesting
104   static final Status BALANCER_TIMEOUT_STATUS =
105       Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer");
106   @VisibleForTesting
107   static final Status BALANCER_REQUESTED_FALLBACK_STATUS =
108       Status.UNAVAILABLE.withDescription("Fallback requested by balancer");
109   @VisibleForTesting
110   static final Status NO_FALLBACK_BACKENDS_STATUS =
111       Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found");
112   // This error status should never be propagated to RPC failures, as "no backend or balancer
113   // addresses found" should be directly handled as a name resolution error. So in cases of no
114   // balancer address, fallback should never fail.
115   private static final Status NO_LB_ADDRESS_PROVIDED_STATUS =
116       Status.UNAVAILABLE.withDescription("No balancer address found");
117 
118 
119   @VisibleForTesting
120   static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
121       @Override
122       public PickResult picked(Metadata headers) {
123         return PickResult.withNoResult();
124       }
125 
126       @Override
127       public String toString() {
128         return "BUFFER_ENTRY";
129       }
130     };
131   @VisibleForTesting
132   static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed";
133 
134   enum Mode {
135     ROUND_ROBIN,
136     PICK_FIRST,
137   }
138 
139   private final String serviceName;
140   private final long fallbackTimeoutMs;
141   private final Helper helper;
142   private final Context context;
143   private final SynchronizationContext syncContext;
144   @Nullable
145   private final SubchannelPool subchannelPool;
146   private final TimeProvider time;
147   private final Stopwatch stopwatch;
148   private final ScheduledExecutorService timerService;
149 
150   private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
151       Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
152   private final BackoffPolicy.Provider backoffPolicyProvider;
153   private final ChannelLogger logger;
154 
155   // Scheduled only once.  Never reset.
156   @Nullable
157   private ScheduledHandle fallbackTimer;
158   private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
159   private boolean usingFallbackBackends;
160   // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no
161   // fallback addresses found).
162   @Nullable
163   private Status fallbackReason;
164   // True if the current balancer has returned a serverlist.  Will be reset to false when lost
165   // connection to a balancer.
166   private boolean balancerWorking;
167   @Nullable
168   private BackoffPolicy lbRpcRetryPolicy;
169   @Nullable
170   private ScheduledHandle lbRpcRetryTimer;
171 
172   @Nullable
173   private ManagedChannel lbCommChannel;
174 
175   @Nullable
176   private LbStream lbStream;
177   private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap();
178   private final GrpclbConfig config;
179 
180   // Has the same size as the round-robin list from the balancer.
181   // A drop entry from the round-robin list becomes a DropEntry here.
182   // A backend entry from the robin-robin list becomes a null here.
183   private List<DropEntry> dropList = Collections.emptyList();
184   // Contains only non-drop, i.e., backends from the round-robin list from the balancer.
185   private List<BackendEntry> backendList = Collections.emptyList();
186   private RoundRobinPicker currentPicker =
187       new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
188   private boolean requestConnectionPending;
189 
GrpclbState( GrpclbConfig config, Helper helper, Context context, SubchannelPool subchannelPool, TimeProvider time, Stopwatch stopwatch, BackoffPolicy.Provider backoffPolicyProvider)190   GrpclbState(
191       GrpclbConfig config,
192       Helper helper,
193       Context context,
194       SubchannelPool subchannelPool,
195       TimeProvider time,
196       Stopwatch stopwatch,
197       BackoffPolicy.Provider backoffPolicyProvider) {
198     this.config = checkNotNull(config, "config");
199     this.helper = checkNotNull(helper, "helper");
200     this.context = checkNotNull(context, "context");
201     this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
202     if (config.getMode() == Mode.ROUND_ROBIN) {
203       this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
204       subchannelPool.registerListener(
205           new PooledSubchannelStateListener() {
206             @Override
207             public void onSubchannelState(
208                 Subchannel subchannel, ConnectivityStateInfo newState) {
209               handleSubchannelState(subchannel, newState);
210             }
211           });
212     } else {
213       this.subchannelPool = null;
214     }
215     this.time = checkNotNull(time, "time provider");
216     this.stopwatch = checkNotNull(stopwatch, "stopwatch");
217     this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
218     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
219     if (config.getServiceName() != null) {
220       this.serviceName = config.getServiceName();
221     } else {
222       this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
223     }
224     this.fallbackTimeoutMs = config.getFallbackTimeoutMs();
225     this.logger = checkNotNull(helper.getChannelLogger(), "logger");
226     logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName);
227   }
228 
handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState)229   void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
230     if (newState.getState() == SHUTDOWN || !subchannels.containsValue(subchannel)) {
231       return;
232     }
233     if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
234       subchannel.requestConnection();
235     }
236     if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
237       helper.refreshNameResolution();
238     }
239 
240     AtomicReference<ConnectivityStateInfo> stateInfoRef =
241         subchannel.getAttributes().get(STATE_INFO);
242     // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at
243     // every moment which causes RR to stay in CONNECTING. It's better to keep the TRANSIENT_FAILURE
244     // state in that case so that fail-fast RPCs can fail fast.
245     boolean keepState =
246         config.getMode() == Mode.ROUND_ROBIN
247         && stateInfoRef.get().getState() == TRANSIENT_FAILURE
248         && (newState.getState() == CONNECTING || newState.getState() == IDLE);
249     if (!keepState) {
250       stateInfoRef.set(newState);
251       maybeUseFallbackBackends();
252       maybeUpdatePicker();
253     }
254   }
255 
256   /**
257    * Handle new addresses of the balancer and backends from the resolver, and create connection if
258    * not yet connected.
259    */
handleAddresses( List<EquivalentAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers)260   void handleAddresses(
261       List<EquivalentAddressGroup> newLbAddressGroups,
262       List<EquivalentAddressGroup> newBackendServers) {
263     logger.log(
264         ChannelLogLevel.DEBUG,
265         "[grpclb-<{0}>] Resolved addresses: lb addresses {1}, backends: {2}",
266         serviceName,
267         newLbAddressGroups,
268         newBackendServers);
269     fallbackBackendList = newBackendServers;
270     if (newLbAddressGroups.isEmpty()) {
271       // No balancer address: close existing balancer connection and prepare to enter fallback
272       // mode. If there is no successful backend connection, it enters fallback mode immediately.
273       // Otherwise, fallback does not happen until backend connections are lost. This behavior
274       // might be different from other languages (e.g., existing balancer connection is not
275       // closed in C-core), but we aren't changing it at this time.
276       shutdownLbComm();
277       if (!usingFallbackBackends) {
278         fallbackReason = NO_LB_ADDRESS_PROVIDED_STATUS;
279         cancelFallbackTimer();
280         maybeUseFallbackBackends();
281       }
282     } else {
283       startLbComm(newLbAddressGroups);
284       // Avoid creating a new RPC just because the addresses were updated, as it can cause a
285       // stampeding herd. The current RPC may be on a connection to an address not present in
286       // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
287       // outdated backend, we could choose to re-create the RPC.
288       if (lbStream == null) {
289         cancelLbRpcRetryTimer();
290         startLbRpc();
291       }
292       // Start the fallback timer if it's never started and we are not already using fallback
293       // backends.
294       if (fallbackTimer == null && !usingFallbackBackends) {
295         fallbackTimer =
296             syncContext.schedule(
297                 new FallbackModeTask(BALANCER_TIMEOUT_STATUS),
298                 fallbackTimeoutMs,
299                 TimeUnit.MILLISECONDS,
300                 timerService);
301       }
302     }
303     if (usingFallbackBackends) {
304       // Populate the new fallback backends to round-robin list.
305       useFallbackBackends();
306     }
307     maybeUpdatePicker();
308   }
309 
requestConnection()310   void requestConnection() {
311     requestConnectionPending = true;
312     for (RoundRobinEntry entry : currentPicker.pickList) {
313       if (entry instanceof IdleSubchannelEntry) {
314         ((IdleSubchannelEntry) entry).subchannel.requestConnection();
315         requestConnectionPending = false;
316       }
317     }
318   }
319 
maybeUseFallbackBackends()320   private void maybeUseFallbackBackends() {
321     if (balancerWorking || usingFallbackBackends) {
322       return;
323     }
324     // Balancer RPC should have either been broken or timed out.
325     checkState(fallbackReason != null, "no reason to fallback");
326     for (Subchannel subchannel : subchannels.values()) {
327       ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
328       if (stateInfo.getState() == READY) {
329         return;
330       }
331       // If we do have balancer-provided backends, use one of its error in the error message if
332       // fail to fallback.
333       if (stateInfo.getState() == TRANSIENT_FAILURE) {
334         fallbackReason = stateInfo.getStatus();
335       }
336     }
337     // Fallback conditions met
338     useFallbackBackends();
339   }
340 
341   /**
342    * Populate backend servers to be used from the fallback backends.
343    */
useFallbackBackends()344   private void useFallbackBackends() {
345     usingFallbackBackends = true;
346     logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Using fallback backends", serviceName);
347 
348     List<DropEntry> newDropList = new ArrayList<>();
349     List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
350     for (EquivalentAddressGroup eag : fallbackBackendList) {
351       newDropList.add(null);
352       newBackendAddrList.add(new BackendAddressGroup(eag, null));
353     }
354     updateServerList(newDropList, newBackendAddrList, null);
355   }
356 
shutdownLbComm()357   private void shutdownLbComm() {
358     if (lbCommChannel != null) {
359       lbCommChannel.shutdown();
360       lbCommChannel = null;
361     }
362     shutdownLbRpc();
363   }
364 
shutdownLbRpc()365   private void shutdownLbRpc() {
366     if (lbStream != null) {
367       lbStream.close(Status.CANCELLED.withDescription("balancer shutdown").asException());
368       // lbStream will be set to null in LbStream.cleanup()
369     }
370   }
371 
startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags)372   private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) {
373     checkNotNull(overrideAuthorityEags, "overrideAuthorityEags");
374     assert !overrideAuthorityEags.isEmpty();
375     String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes()
376         .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX;
377     if (lbCommChannel == null) {
378       lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority);
379       logger.log(
380           ChannelLogLevel.DEBUG,
381           "[grpclb-<{0}>] Created grpclb channel: EAG={1}",
382           serviceName,
383           overrideAuthorityEags);
384     } else {
385       helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags);
386     }
387   }
388 
startLbRpc()389   private void startLbRpc() {
390     checkState(lbStream == null, "previous lbStream has not been cleared yet");
391     LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
392     lbStream = new LbStream(stub);
393     Context prevContext = context.attach();
394     try {
395       lbStream.start();
396     } finally {
397       context.detach(prevContext);
398     }
399     stopwatch.reset().start();
400 
401     LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
402         .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
403             .setName(serviceName).build())
404         .build();
405     logger.log(
406         ChannelLogLevel.DEBUG,
407         "[grpclb-<{0}>] Sent initial grpclb request {1}", serviceName, initRequest);
408     try {
409       lbStream.lbRequestWriter.onNext(initRequest);
410     } catch (Exception e) {
411       lbStream.close(e);
412     }
413   }
414 
cancelFallbackTimer()415   private void cancelFallbackTimer() {
416     if (fallbackTimer != null) {
417       fallbackTimer.cancel();
418     }
419   }
420 
cancelLbRpcRetryTimer()421   private void cancelLbRpcRetryTimer() {
422     if (lbRpcRetryTimer != null) {
423       lbRpcRetryTimer.cancel();
424       lbRpcRetryTimer = null;
425     }
426   }
427 
shutdown()428   void shutdown() {
429     logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Shutdown", serviceName);
430     shutdownLbComm();
431     switch (config.getMode()) {
432       case ROUND_ROBIN:
433         // We close the subchannels through subchannelPool instead of helper just for convenience of
434         // testing.
435         for (Subchannel subchannel : subchannels.values()) {
436           returnSubchannelToPool(subchannel);
437         }
438         subchannelPool.clear();
439         break;
440       case PICK_FIRST:
441         if (!subchannels.isEmpty()) {
442           checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
443           subchannels.values().iterator().next().shutdown();
444         }
445         break;
446       default:
447         throw new AssertionError("Missing case for " + config.getMode());
448     }
449     subchannels = Collections.emptyMap();
450     cancelFallbackTimer();
451     cancelLbRpcRetryTimer();
452   }
453 
propagateError(Status status)454   void propagateError(Status status) {
455     logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status);
456     if (backendList.isEmpty()) {
457       Status error =
458           Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription());
459       maybeUpdatePicker(
460           TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error))));
461     }
462   }
463 
returnSubchannelToPool(Subchannel subchannel)464   private void returnSubchannelToPool(Subchannel subchannel) {
465     subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
466   }
467 
468   @VisibleForTesting
469   @Nullable
getLoadRecorder()470   GrpclbClientLoadRecorder getLoadRecorder() {
471     if (lbStream == null) {
472       return null;
473     }
474     return lbStream.loadRecorder;
475   }
476 
477   /**
478    * Populate backend servers to be used based on the given list of addresses.
479    */
updateServerList( List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder)480   private void updateServerList(
481       List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
482       @Nullable GrpclbClientLoadRecorder loadRecorder) {
483     HashMap<List<EquivalentAddressGroup>, Subchannel> newSubchannelMap =
484         new HashMap<>();
485     List<BackendEntry> newBackendList = new ArrayList<>();
486 
487     switch (config.getMode()) {
488       case ROUND_ROBIN:
489         for (BackendAddressGroup backendAddr : newBackendAddrList) {
490           EquivalentAddressGroup eag = backendAddr.getAddresses();
491           List<EquivalentAddressGroup> eagAsList = Collections.singletonList(eag);
492           Subchannel subchannel = newSubchannelMap.get(eagAsList);
493           if (subchannel == null) {
494             subchannel = subchannels.get(eagAsList);
495             if (subchannel == null) {
496               subchannel = subchannelPool.takeOrCreateSubchannel(eag, createSubchannelAttrs());
497               subchannel.requestConnection();
498             }
499             newSubchannelMap.put(eagAsList, subchannel);
500           }
501           BackendEntry entry;
502           // Only picks with tokens are reported to LoadRecorder
503           if (backendAddr.getToken() == null) {
504             entry = new BackendEntry(subchannel);
505           } else {
506             entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
507           }
508           newBackendList.add(entry);
509         }
510         // Close Subchannels whose addresses have been delisted
511         for (Map.Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
512           List<EquivalentAddressGroup> eagList = entry.getKey();
513           if (!newSubchannelMap.containsKey(eagList)) {
514             returnSubchannelToPool(entry.getValue());
515           }
516         }
517         subchannels = Collections.unmodifiableMap(newSubchannelMap);
518         break;
519       case PICK_FIRST:
520         checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels);
521         final Subchannel subchannel;
522         if (newBackendAddrList.isEmpty()) {
523           if (subchannels.size() == 1) {
524             subchannel = subchannels.values().iterator().next();
525             subchannel.shutdown();
526             subchannels = Collections.emptyMap();
527           }
528           break;
529         }
530         List<EquivalentAddressGroup> eagList = new ArrayList<>();
531         // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
532         // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
533         // headers.
534         //
535         // The PICK_FIRST code path doesn't cache Subchannels.
536         for (BackendAddressGroup bag : newBackendAddrList) {
537           EquivalentAddressGroup origEag = bag.getAddresses();
538           Attributes eagAttrs = origEag.getAttributes();
539           if (bag.getToken() != null) {
540             eagAttrs = eagAttrs.toBuilder()
541                 .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build();
542           }
543           eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
544         }
545         if (subchannels.isEmpty()) {
546           subchannel =
547               helper.createSubchannel(
548                   CreateSubchannelArgs.newBuilder()
549                       .setAddresses(eagList)
550                       .setAttributes(createSubchannelAttrs())
551                       .build());
552           subchannel.start(new SubchannelStateListener() {
553             @Override
554             public void onSubchannelState(ConnectivityStateInfo newState) {
555               handleSubchannelState(subchannel, newState);
556             }
557           });
558           if (requestConnectionPending) {
559             subchannel.requestConnection();
560             requestConnectionPending = false;
561           }
562         } else {
563           subchannel = subchannels.values().iterator().next();
564           subchannel.updateAddresses(eagList);
565         }
566         subchannels = Collections.singletonMap(eagList, subchannel);
567         newBackendList.add(
568             new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder)));
569         break;
570       default:
571         throw new AssertionError("Missing case for " + config.getMode());
572     }
573 
574     dropList = Collections.unmodifiableList(newDropList);
575     backendList = Collections.unmodifiableList(newBackendList);
576   }
577 
578   @VisibleForTesting
579   class FallbackModeTask implements Runnable {
580     private final Status reason;
581 
FallbackModeTask(Status reason)582     private FallbackModeTask(Status reason) {
583       this.reason = reason;
584     }
585 
586     @Override
run()587     public void run() {
588       // Timer should have been cancelled if entered fallback early.
589       checkState(!usingFallbackBackends, "already in fallback");
590       fallbackReason = reason;
591       maybeUseFallbackBackends();
592       maybeUpdatePicker();
593     }
594   }
595 
596   @VisibleForTesting
597   class LbRpcRetryTask implements Runnable {
598     @Override
run()599     public void run() {
600       startLbRpc();
601     }
602   }
603 
604   @VisibleForTesting
605   static class LoadReportingTask implements Runnable {
606     private final LbStream stream;
607 
LoadReportingTask(LbStream stream)608     LoadReportingTask(LbStream stream) {
609       this.stream = stream;
610     }
611 
612     @Override
run()613     public void run() {
614       stream.loadReportTimer = null;
615       stream.sendLoadReport();
616     }
617   }
618 
619   private class LbStream implements StreamObserver<LoadBalanceResponse> {
620     final GrpclbClientLoadRecorder loadRecorder;
621     final LoadBalancerGrpc.LoadBalancerStub stub;
622     StreamObserver<LoadBalanceRequest> lbRequestWriter;
623 
624     // These fields are only accessed from helper.runSerialized()
625     boolean initialResponseReceived;
626     boolean closed;
627     long loadReportIntervalMillis = -1;
628     ScheduledHandle loadReportTimer;
629 
LbStream(LoadBalancerGrpc.LoadBalancerStub stub)630     LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
631       this.stub = checkNotNull(stub, "stub");
632       // Stats data only valid for current LbStream.  We do not carry over data from previous
633       // stream.
634       loadRecorder = new GrpclbClientLoadRecorder(time);
635     }
636 
start()637     void start() {
638       lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
639     }
640 
onNext(final LoadBalanceResponse response)641     @Override public void onNext(final LoadBalanceResponse response) {
642       syncContext.execute(new Runnable() {
643           @Override
644           public void run() {
645             handleResponse(response);
646           }
647         });
648     }
649 
onError(final Throwable error)650     @Override public void onError(final Throwable error) {
651       syncContext.execute(new Runnable() {
652           @Override
653           public void run() {
654             handleStreamClosed(Status.fromThrowable(error)
655                 .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
656           }
657         });
658     }
659 
onCompleted()660     @Override public void onCompleted() {
661       syncContext.execute(new Runnable() {
662           @Override
663           public void run() {
664             handleStreamClosed(
665                 Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed"));
666           }
667         });
668     }
669 
670     // Following methods must be run in helper.runSerialized()
671 
sendLoadReport()672     private void sendLoadReport() {
673       if (closed) {
674         return;
675       }
676       ClientStats stats = loadRecorder.generateLoadReport();
677       // TODO(zhangkun83): flow control?
678       try {
679         lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build());
680         scheduleNextLoadReport();
681       } catch (Exception e) {
682         close(e);
683       }
684     }
685 
scheduleNextLoadReport()686     private void scheduleNextLoadReport() {
687       if (loadReportIntervalMillis > 0) {
688         loadReportTimer = syncContext.schedule(
689             new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS,
690             timerService);
691       }
692     }
693 
handleResponse(LoadBalanceResponse response)694     private void handleResponse(LoadBalanceResponse response) {
695       if (closed) {
696         return;
697       }
698 
699       LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase();
700       if (!initialResponseReceived) {
701         logger.log(
702             ChannelLogLevel.INFO,
703             "[grpclb-<{0}>] Got an LB initial response: {1}", serviceName, response);
704         if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) {
705           logger.log(
706               ChannelLogLevel.WARNING,
707               "[grpclb-<{0}>] Received a response without initial response",
708               serviceName);
709           return;
710         }
711         initialResponseReceived = true;
712         InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
713         loadReportIntervalMillis =
714             Durations.toMillis(initialResponse.getClientStatsReportInterval());
715         scheduleNextLoadReport();
716         return;
717       }
718       if (SHOULD_LOG_SERVER_LISTS) {
719         logger.log(
720             ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response: {1}", serviceName, response);
721       } else {
722         logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response", serviceName);
723       }
724 
725       if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
726         // Force entering fallback requested by balancer.
727         cancelFallbackTimer();
728         fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS;
729         useFallbackBackends();
730         maybeUpdatePicker();
731         return;
732       } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
733         logger.log(
734             ChannelLogLevel.WARNING,
735             "[grpclb-<{0}>] Ignoring unexpected response type: {1}",
736             serviceName,
737             typeCase);
738         return;
739       }
740 
741       balancerWorking = true;
742       // TODO(zhangkun83): handle delegate from initialResponse
743       ServerList serverList = response.getServerList();
744       List<DropEntry> newDropList = new ArrayList<>();
745       List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
746       // Construct the new collections. Create new Subchannels when necessary.
747       for (Server server : serverList.getServersList()) {
748         String token = server.getLoadBalanceToken();
749         if (server.getDrop()) {
750           newDropList.add(new DropEntry(loadRecorder, token));
751         } else {
752           newDropList.add(null);
753           InetSocketAddress address;
754           try {
755             address = new InetSocketAddress(
756                 InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
757           } catch (UnknownHostException e) {
758             propagateError(
759                 Status.UNAVAILABLE
760                     .withDescription("Invalid backend address: " + server)
761                     .withCause(e));
762             continue;
763           }
764           // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of
765           // TLS, with Netty.
766           EquivalentAddressGroup eag =
767               new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS);
768           newBackendAddrList.add(new BackendAddressGroup(eag, token));
769         }
770       }
771       // Exit fallback as soon as a new server list is received from the balancer.
772       usingFallbackBackends = false;
773       fallbackReason = null;
774       cancelFallbackTimer();
775       updateServerList(newDropList, newBackendAddrList, loadRecorder);
776       maybeUpdatePicker();
777     }
778 
handleStreamClosed(Status error)779     private void handleStreamClosed(Status error) {
780       checkArgument(!error.isOk(), "unexpected OK status");
781       if (closed) {
782         return;
783       }
784       closed = true;
785       cleanUp();
786       propagateError(error);
787       balancerWorking = false;
788       fallbackReason = error;
789       cancelFallbackTimer();
790       maybeUseFallbackBackends();
791       maybeUpdatePicker();
792 
793       long delayNanos = 0;
794       if (initialResponseReceived || lbRpcRetryPolicy == null) {
795         // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
796         // has never been initialized.
797         lbRpcRetryPolicy = backoffPolicyProvider.get();
798       }
799       // Backoff only when balancer wasn't working previously.
800       if (!initialResponseReceived) {
801         // The back-off policy determines the interval between consecutive RPC upstarts, thus the
802         // actual delay may be smaller than the value from the back-off policy, or even negative,
803         // depending how much time was spent in the previous RPC.
804         delayNanos =
805             lbRpcRetryPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
806       }
807       if (delayNanos <= 0) {
808         startLbRpc();
809       } else {
810         lbRpcRetryTimer =
811             syncContext.schedule(new LbRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS,
812                 timerService);
813       }
814 
815       helper.refreshNameResolution();
816     }
817 
close(Exception error)818     void close(Exception error) {
819       if (closed) {
820         return;
821       }
822       closed = true;
823       cleanUp();
824       lbRequestWriter.onError(error);
825     }
826 
cleanUp()827     private void cleanUp() {
828       if (loadReportTimer != null) {
829         loadReportTimer.cancel();
830         loadReportTimer = null;
831       }
832       if (lbStream == this) {
833         lbStream = null;
834       }
835     }
836   }
837 
838   /**
839    * Make and use a picker out of the current lists and the states of subchannels if they have
840    * changed since the last picker created.
841    */
maybeUpdatePicker()842   private void maybeUpdatePicker() {
843     List<RoundRobinEntry> pickList;
844     ConnectivityState state;
845     if (backendList.isEmpty()) {
846       // Note balancer (is working) may enforce using fallback backends, and that fallback may
847       // fail. So we should check if currently in fallback first.
848       if (usingFallbackBackends) {
849         Status error =
850             NO_FALLBACK_BACKENDS_STATUS
851                 .withCause(fallbackReason.getCause())
852                 .augmentDescription(fallbackReason.getDescription());
853         pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(error));
854         state = TRANSIENT_FAILURE;
855       } else if (balancerWorking)  {
856         pickList =
857             Collections.<RoundRobinEntry>singletonList(
858                 new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
859         state = TRANSIENT_FAILURE;
860       } else {  // still waiting for balancer
861         pickList = Collections.singletonList(BUFFER_ENTRY);
862         state = CONNECTING;
863       }
864       maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
865       return;
866     }
867     switch (config.getMode()) {
868       case ROUND_ROBIN:
869         pickList = new ArrayList<>(backendList.size());
870         Status error = null;
871         boolean hasPending = false;
872         for (BackendEntry entry : backendList) {
873           Subchannel subchannel = entry.subchannel;
874           Attributes attrs = subchannel.getAttributes();
875           ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
876           if (stateInfo.getState() == READY) {
877             pickList.add(entry);
878           } else if (stateInfo.getState() == TRANSIENT_FAILURE) {
879             error = stateInfo.getStatus();
880           } else {
881             hasPending = true;
882           }
883         }
884         if (pickList.isEmpty()) {
885           if (hasPending) {
886             pickList.add(BUFFER_ENTRY);
887             state = CONNECTING;
888           } else {
889             pickList.add(new ErrorEntry(error));
890             state = TRANSIENT_FAILURE;
891           }
892         } else {
893           state = READY;
894         }
895         break;
896       case PICK_FIRST: {
897         checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList);
898         BackendEntry onlyEntry = backendList.get(0);
899         ConnectivityStateInfo stateInfo =
900             onlyEntry.subchannel.getAttributes().get(STATE_INFO).get();
901         state = stateInfo.getState();
902         switch (state) {
903           case READY:
904             pickList = Collections.<RoundRobinEntry>singletonList(onlyEntry);
905             break;
906           case TRANSIENT_FAILURE:
907             pickList =
908                 Collections.<RoundRobinEntry>singletonList(new ErrorEntry(stateInfo.getStatus()));
909             break;
910           case CONNECTING:
911             pickList = Collections.singletonList(BUFFER_ENTRY);
912             break;
913           default:
914             pickList = Collections.<RoundRobinEntry>singletonList(
915                 new IdleSubchannelEntry(onlyEntry.subchannel, syncContext));
916         }
917         break;
918       }
919       default:
920         throw new AssertionError("Missing case for " + config.getMode());
921     }
922     maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
923   }
924 
925   /**
926    * Update the given picker to the helper if it's different from the current one.
927    */
maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker)928   private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
929     // Discard the new picker if we are sure it won't make any difference, in order to save
930     // re-processing pending streams, and avoid unnecessary resetting of the pointer in
931     // RoundRobinPicker.
932     if (picker.dropList.equals(currentPicker.dropList)
933         && picker.pickList.equals(currentPicker.pickList)) {
934       return;
935     }
936     currentPicker = picker;
937     helper.updateBalancingState(state, picker);
938   }
939 
createSubchannelAttrs()940   private static Attributes createSubchannelAttrs() {
941     return Attributes.newBuilder()
942         .set(STATE_INFO,
943             new AtomicReference<>(
944                 ConnectivityStateInfo.forNonError(IDLE)))
945         .build();
946   }
947 
948   @VisibleForTesting
949   static final class DropEntry {
950     private final GrpclbClientLoadRecorder loadRecorder;
951     private final String token;
952 
DropEntry(GrpclbClientLoadRecorder loadRecorder, String token)953     DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
954       this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
955       this.token = checkNotNull(token, "token");
956     }
957 
picked()958     PickResult picked() {
959       loadRecorder.recordDroppedRequest(token);
960       return DROP_PICK_RESULT;
961     }
962 
963     @Override
toString()964     public String toString() {
965       // This is printed in logs.  Only include useful information.
966       return "drop(" + token + ")";
967     }
968 
969     @Override
hashCode()970     public int hashCode() {
971       return Objects.hashCode(loadRecorder, token);
972     }
973 
974     @Override
equals(Object other)975     public boolean equals(Object other) {
976       if (!(other instanceof DropEntry)) {
977         return false;
978       }
979       DropEntry that = (DropEntry) other;
980       return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
981     }
982   }
983 
984   @VisibleForTesting
985   interface RoundRobinEntry {
picked(Metadata headers)986     PickResult picked(Metadata headers);
987   }
988 
989   @VisibleForTesting
990   static final class BackendEntry implements RoundRobinEntry {
991     final Subchannel subchannel;
992     @VisibleForTesting
993     final PickResult result;
994     @Nullable
995     private final String token;
996 
997     /**
998      * For ROUND_ROBIN: creates a BackendEntry whose usage will be reported to load recorder.
999      */
BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token)1000     BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
1001       this.subchannel = checkNotNull(subchannel, "subchannel");
1002       this.result =
1003           PickResult.withSubchannel(subchannel, checkNotNull(loadRecorder, "loadRecorder"));
1004       this.token = checkNotNull(token, "token");
1005     }
1006 
1007     /**
1008      * For ROUND_ROBIN/PICK_FIRST: creates a BackendEntry whose usage will not be reported.
1009      */
BackendEntry(Subchannel subchannel)1010     BackendEntry(Subchannel subchannel) {
1011       this.subchannel = checkNotNull(subchannel, "subchannel");
1012       this.result = PickResult.withSubchannel(subchannel);
1013       this.token = null;
1014     }
1015 
1016     /**
1017      * For PICK_FIRST: creates a BackendEntry that includes all addresses.
1018      */
BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory)1019     BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory) {
1020       this.subchannel = checkNotNull(subchannel, "subchannel");
1021       this.result =
1022           PickResult.withSubchannel(subchannel, checkNotNull(tracerFactory, "tracerFactory"));
1023       this.token = null;
1024     }
1025 
1026     @Override
picked(Metadata headers)1027     public PickResult picked(Metadata headers) {
1028       headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
1029       if (token != null) {
1030         headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
1031       }
1032       return result;
1033     }
1034 
1035     @Override
toString()1036     public String toString() {
1037       // This is printed in logs.  Only give out useful information.
1038       return "[" + subchannel.getAllAddresses().toString() + "(" + token + ")]";
1039     }
1040 
1041     @Override
hashCode()1042     public int hashCode() {
1043       return Objects.hashCode(result, token);
1044     }
1045 
1046     @Override
equals(Object other)1047     public boolean equals(Object other) {
1048       if (!(other instanceof BackendEntry)) {
1049         return false;
1050       }
1051       BackendEntry that = (BackendEntry) other;
1052       return Objects.equal(result, that.result) && Objects.equal(token, that.token);
1053     }
1054   }
1055 
1056   @VisibleForTesting
1057   static final class IdleSubchannelEntry implements RoundRobinEntry {
1058     private final SynchronizationContext syncContext;
1059     private final Subchannel subchannel;
1060     private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1061 
IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext)1062     IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) {
1063       this.subchannel = checkNotNull(subchannel, "subchannel");
1064       this.syncContext = checkNotNull(syncContext, "syncContext");
1065     }
1066 
1067     @Override
picked(Metadata headers)1068     public PickResult picked(Metadata headers) {
1069       if (connectionRequested.compareAndSet(false, true)) {
1070         syncContext.execute(new Runnable() {
1071             @Override
1072             public void run() {
1073               subchannel.requestConnection();
1074             }
1075           });
1076       }
1077       return PickResult.withNoResult();
1078     }
1079 
1080     @Override
toString()1081     public String toString() {
1082       // This is printed in logs.  Only give out useful information.
1083       return "(idle)[" + subchannel.getAllAddresses().toString() + "]";
1084     }
1085 
1086     @Override
hashCode()1087     public int hashCode() {
1088       return Objects.hashCode(subchannel, syncContext);
1089     }
1090 
1091     @Override
equals(Object other)1092     public boolean equals(Object other) {
1093       if (!(other instanceof IdleSubchannelEntry)) {
1094         return false;
1095       }
1096       IdleSubchannelEntry that = (IdleSubchannelEntry) other;
1097       return Objects.equal(subchannel, that.subchannel)
1098           && Objects.equal(syncContext, that.syncContext);
1099     }
1100   }
1101 
1102   @VisibleForTesting
1103   static final class ErrorEntry implements RoundRobinEntry {
1104     final PickResult result;
1105 
ErrorEntry(Status status)1106     ErrorEntry(Status status) {
1107       result = PickResult.withError(status);
1108     }
1109 
1110     @Override
picked(Metadata headers)1111     public PickResult picked(Metadata headers) {
1112       return result;
1113     }
1114 
1115     @Override
hashCode()1116     public int hashCode() {
1117       return Objects.hashCode(result);
1118     }
1119 
1120     @Override
equals(Object other)1121     public boolean equals(Object other) {
1122       if (!(other instanceof ErrorEntry)) {
1123         return false;
1124       }
1125       return Objects.equal(result, ((ErrorEntry) other).result);
1126     }
1127 
1128     @Override
toString()1129     public String toString() {
1130       // This is printed in logs.  Only include useful information.
1131       return result.getStatus().toString();
1132     }
1133   }
1134 
1135   @VisibleForTesting
1136   static final class RoundRobinPicker extends SubchannelPicker {
1137     @VisibleForTesting
1138     final List<DropEntry> dropList;
1139     private int dropIndex;
1140 
1141     @VisibleForTesting
1142     final List<? extends RoundRobinEntry> pickList;
1143     private int pickIndex;
1144 
1145     // dropList can be empty, which means no drop.
1146     // pickList must not be empty.
RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList)1147     RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
1148       this.dropList = checkNotNull(dropList, "dropList");
1149       this.pickList = checkNotNull(pickList, "pickList");
1150       checkArgument(!pickList.isEmpty(), "pickList is empty");
1151     }
1152 
1153     @Override
pickSubchannel(PickSubchannelArgs args)1154     public PickResult pickSubchannel(PickSubchannelArgs args) {
1155       synchronized (pickList) {
1156         // Two-level round-robin.
1157         // First round-robin on dropList. If a drop entry is selected, request will be dropped.  If
1158         // a non-drop entry is selected, then round-robin on pickList.  This makes sure requests are
1159         // dropped at the same proportion as the drop entries appear on the round-robin list from
1160         // the balancer, while only backends from pickList are selected for the non-drop cases.
1161         if (!dropList.isEmpty()) {
1162           DropEntry drop = dropList.get(dropIndex);
1163           dropIndex++;
1164           if (dropIndex == dropList.size()) {
1165             dropIndex = 0;
1166           }
1167           if (drop != null) {
1168             return drop.picked();
1169           }
1170         }
1171 
1172         RoundRobinEntry pick = pickList.get(pickIndex);
1173         pickIndex++;
1174         if (pickIndex == pickList.size()) {
1175           pickIndex = 0;
1176         }
1177         return pick.picked(args.getHeaders());
1178       }
1179     }
1180 
1181     @Override
toString()1182     public String toString() {
1183       if (SHOULD_LOG_SERVER_LISTS) {
1184         return MoreObjects.toStringHelper(RoundRobinPicker.class)
1185             .add("dropList", dropList)
1186             .add("pickList", pickList)
1187             .toString();
1188       }
1189       return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
1190     }
1191   }
1192 }
1193