• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.MoreObjects;
23 import com.google.common.base.Preconditions;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.SettableFuture;
26 import io.grpc.Attributes;
27 import io.grpc.CallOptions;
28 import io.grpc.ClientCall;
29 import io.grpc.ClientStreamTracer;
30 import io.grpc.ConnectivityState;
31 import io.grpc.ConnectivityStateInfo;
32 import io.grpc.Context;
33 import io.grpc.EquivalentAddressGroup;
34 import io.grpc.InternalChannelz;
35 import io.grpc.InternalChannelz.ChannelStats;
36 import io.grpc.InternalChannelz.ChannelTrace;
37 import io.grpc.InternalInstrumented;
38 import io.grpc.InternalLogId;
39 import io.grpc.InternalWithLogId;
40 import io.grpc.LoadBalancer;
41 import io.grpc.LoadBalancer.PickResult;
42 import io.grpc.LoadBalancer.PickSubchannelArgs;
43 import io.grpc.LoadBalancer.Subchannel;
44 import io.grpc.LoadBalancer.SubchannelPicker;
45 import io.grpc.ManagedChannel;
46 import io.grpc.Metadata;
47 import io.grpc.MethodDescriptor;
48 import io.grpc.Status;
49 import io.grpc.SynchronizationContext;
50 import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
51 import java.util.Collections;
52 import java.util.List;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.Executor;
55 import java.util.concurrent.ScheduledExecutorService;
56 import java.util.concurrent.TimeUnit;
57 import java.util.logging.Level;
58 import java.util.logging.Logger;
59 import javax.annotation.concurrent.ThreadSafe;
60 
61 /**
62  * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer}
63  * to its own RPC needs.
64  */
65 @ThreadSafe
66 final class OobChannel extends ManagedChannel implements InternalInstrumented<ChannelStats> {
67   private static final Logger log = Logger.getLogger(OobChannel.class.getName());
68 
69   private InternalSubchannel subchannel;
70   private AbstractSubchannel subchannelImpl;
71   private SubchannelPicker subchannelPicker;
72 
73   private final InternalLogId logId;
74   private final String authority;
75   private final DelayedClientTransport delayedTransport;
76   private final InternalChannelz channelz;
77   private final ObjectPool<? extends Executor> executorPool;
78   private final Executor executor;
79   private final ScheduledExecutorService deadlineCancellationExecutor;
80   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
81   private volatile boolean shutdown;
82   private final CallTracer channelCallsTracer;
83   private final ChannelTracer channelTracer;
84   private final TimeProvider timeProvider;
85 
86   private final ClientStreamProvider transportProvider = new ClientStreamProvider() {
87     @Override
88     public ClientStream newStream(MethodDescriptor<?, ?> method,
89         CallOptions callOptions, Metadata headers, Context context) {
90       ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
91           callOptions, headers, 0, /* isTransparentRetry= */ false);
92       Context origContext = context.attach();
93       // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
94       // matter here because OOB communication should be sparse, and it's not on application RPC's
95       // critical path.
96       try {
97         return delayedTransport.newStream(method, headers, callOptions, tracers);
98       } finally {
99         context.detach(origContext);
100       }
101     }
102   };
103 
OobChannel( String authority, ObjectPool<? extends Executor> executorPool, ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext, CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz, TimeProvider timeProvider)104   OobChannel(
105       String authority, ObjectPool<? extends Executor> executorPool,
106       ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext,
107       CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz,
108       TimeProvider timeProvider) {
109     this.authority = checkNotNull(authority, "authority");
110     this.logId = InternalLogId.allocate(getClass(), authority);
111     this.executorPool = checkNotNull(executorPool, "executorPool");
112     this.executor = checkNotNull(executorPool.getObject(), "executor");
113     this.deadlineCancellationExecutor = checkNotNull(
114         deadlineCancellationExecutor, "deadlineCancellationExecutor");
115     this.delayedTransport = new DelayedClientTransport(executor, syncContext);
116     this.channelz = Preconditions.checkNotNull(channelz);
117     this.delayedTransport.start(new ManagedClientTransport.Listener() {
118         @Override
119         public void transportShutdown(Status s) {
120           // Don't care
121         }
122 
123         @Override
124         public void transportTerminated() {
125           subchannelImpl.shutdown();
126         }
127 
128         @Override
129         public void transportReady() {
130           // Don't care
131         }
132 
133         @Override
134         public void transportInUse(boolean inUse) {
135           // Don't care
136         }
137       });
138     this.channelCallsTracer = callsTracer;
139     this.channelTracer = checkNotNull(channelTracer, "channelTracer");
140     this.timeProvider = checkNotNull(timeProvider, "timeProvider");
141   }
142 
143   // Must be called only once, right after the OobChannel is created.
setSubchannel(final InternalSubchannel subchannel)144   void setSubchannel(final InternalSubchannel subchannel) {
145     log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel});
146     this.subchannel = subchannel;
147     subchannelImpl = new AbstractSubchannel() {
148         @Override
149         public void shutdown() {
150           subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown"));
151         }
152 
153         @Override
154         InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
155           return subchannel;
156         }
157 
158         @Override
159         public void requestConnection() {
160           subchannel.obtainActiveTransport();
161         }
162 
163         @Override
164         public List<EquivalentAddressGroup> getAllAddresses() {
165           return subchannel.getAddressGroups();
166         }
167 
168         @Override
169         public Attributes getAttributes() {
170           return Attributes.EMPTY;
171         }
172 
173         @Override
174         public Object getInternalSubchannel() {
175           return subchannel;
176         }
177     };
178 
179     final class OobSubchannelPicker extends SubchannelPicker {
180       final PickResult result = PickResult.withSubchannel(subchannelImpl);
181 
182       @Override
183       public PickResult pickSubchannel(PickSubchannelArgs args) {
184         return result;
185       }
186 
187       @Override
188       public String toString() {
189         return MoreObjects.toStringHelper(OobSubchannelPicker.class)
190             .add("result", result)
191             .toString();
192       }
193     }
194 
195     subchannelPicker = new OobSubchannelPicker();
196     delayedTransport.reprocess(subchannelPicker);
197   }
198 
updateAddresses(List<EquivalentAddressGroup> eag)199   void updateAddresses(List<EquivalentAddressGroup> eag) {
200     subchannel.updateAddresses(eag);
201   }
202 
203   @Override
newCall( MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions)204   public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
205       MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
206     return new ClientCallImpl<>(methodDescriptor,
207         callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
208         callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, null);
209   }
210 
211   @Override
authority()212   public String authority() {
213     return authority;
214   }
215 
216   @Override
isTerminated()217   public boolean isTerminated() {
218     return terminatedLatch.getCount() == 0;
219   }
220 
221   @Override
awaitTermination(long time, TimeUnit unit)222   public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
223     return terminatedLatch.await(time, unit);
224   }
225 
226   @Override
getState(boolean requestConnectionIgnored)227   public ConnectivityState getState(boolean requestConnectionIgnored) {
228     if (subchannel == null) {
229       return ConnectivityState.IDLE;
230     }
231     return subchannel.getState();
232   }
233 
234   @Override
shutdown()235   public ManagedChannel shutdown() {
236     shutdown = true;
237     delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called"));
238     return this;
239   }
240 
241   @Override
isShutdown()242   public boolean isShutdown() {
243     return shutdown;
244   }
245 
246   @Override
shutdownNow()247   public ManagedChannel shutdownNow() {
248     shutdown = true;
249     delayedTransport.shutdownNow(
250         Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called"));
251     return this;
252   }
253 
handleSubchannelStateChange(final ConnectivityStateInfo newState)254   void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
255     channelTracer.reportEvent(
256         new ChannelTrace.Event.Builder()
257             .setDescription("Entering " + newState.getState() + " state")
258             .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
259             .setTimestampNanos(timeProvider.currentTimeNanos())
260             .build());
261     switch (newState.getState()) {
262       case READY:
263       case IDLE:
264         delayedTransport.reprocess(subchannelPicker);
265         break;
266       case TRANSIENT_FAILURE:
267         final class OobErrorPicker extends SubchannelPicker {
268           final PickResult errorResult = PickResult.withError(newState.getStatus());
269 
270           @Override
271           public PickResult pickSubchannel(PickSubchannelArgs args) {
272             return errorResult;
273           }
274 
275           @Override
276           public String toString() {
277             return MoreObjects.toStringHelper(OobErrorPicker.class)
278                 .add("errorResult", errorResult)
279                 .toString();
280           }
281         }
282 
283         delayedTransport.reprocess(new OobErrorPicker());
284         break;
285       default:
286         // Do nothing
287     }
288   }
289 
290   // must be run from channel executor
handleSubchannelTerminated()291   void handleSubchannelTerminated() {
292     channelz.removeSubchannel(this);
293     // When delayedTransport is terminated, it shuts down subchannel.  Therefore, at this point
294     // both delayedTransport and subchannel have terminated.
295     executorPool.returnObject(executor);
296     terminatedLatch.countDown();
297   }
298 
299   @VisibleForTesting
getSubchannel()300   Subchannel getSubchannel() {
301     return subchannelImpl;
302   }
303 
getInternalSubchannel()304   InternalSubchannel getInternalSubchannel() {
305     return subchannel;
306   }
307 
308   @Override
getStats()309   public ListenableFuture<ChannelStats> getStats() {
310     final SettableFuture<ChannelStats> ret = SettableFuture.create();
311     final ChannelStats.Builder builder = new ChannelStats.Builder();
312     channelCallsTracer.updateBuilder(builder);
313     channelTracer.updateBuilder(builder);
314     builder
315         .setTarget(authority)
316         .setState(subchannel.getState())
317         .setSubchannels(Collections.<InternalWithLogId>singletonList(subchannel));
318     ret.set(builder.build());
319     return ret;
320   }
321 
322   @Override
getLogId()323   public InternalLogId getLogId() {
324     return logId;
325   }
326 
327   @Override
toString()328   public String toString() {
329     return MoreObjects.toStringHelper(this)
330         .add("logId", logId.getId())
331         .add("authority", authority)
332         .toString();
333   }
334 
335   @Override
resetConnectBackoff()336   public void resetConnectBackoff() {
337     subchannel.resetConnectBackoff();
338   }
339 }
340