• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import io.grpc.CallOptions;
23 import io.grpc.Context;
24 import io.grpc.InternalChannelz.SocketStats;
25 import io.grpc.InternalLogId;
26 import io.grpc.LoadBalancer.PickResult;
27 import io.grpc.LoadBalancer.PickSubchannelArgs;
28 import io.grpc.LoadBalancer.SubchannelPicker;
29 import io.grpc.Metadata;
30 import io.grpc.MethodDescriptor;
31 import io.grpc.Status;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.LinkedHashSet;
36 import java.util.concurrent.Executor;
37 import javax.annotation.Nonnull;
38 import javax.annotation.Nullable;
39 import javax.annotation.concurrent.GuardedBy;
40 
41 /**
42  * A client transport that queues requests before a real transport is available. When {@link
43  * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
44  * transport for each pending stream.
45  *
46  * <p>This transport owns every stream that it has created until a real transport has been picked
47  * for that stream, at which point the ownership of the stream is transferred to the real transport,
48  * thus the delayed transport stops owning the stream.
49  */
50 final class DelayedClientTransport implements ManagedClientTransport {
51   private final InternalLogId lodId = InternalLogId.allocate(getClass().getName());
52 
53   private final Object lock = new Object();
54 
55   private final Executor defaultAppExecutor;
56   private final ChannelExecutor channelExecutor;
57 
58   private Runnable reportTransportInUse;
59   private Runnable reportTransportNotInUse;
60   private Runnable reportTransportTerminated;
61   private Listener listener;
62 
63   @Nonnull
64   @GuardedBy("lock")
65   private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();
66 
67   /**
68    * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
69    * terminated.
70    */
71   @GuardedBy("lock")
72   private Status shutdownStatus;
73 
74   /**
75    * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
76    * to idle.
77    */
78   @GuardedBy("lock")
79   @Nullable
80   private SubchannelPicker lastPicker;
81 
82   @GuardedBy("lock")
83   private long lastPickerVersion;
84 
85   /**
86    * Creates a new delayed transport.
87    *
88    * @param defaultAppExecutor pending streams will create real streams and run bufferred operations
89    *        in an application executor, which will be this executor, unless there is on provided in
90    *        {@link CallOptions}.
91    * @param channelExecutor all listener callbacks of the delayed transport will be run from this
92    *        ChannelExecutor.
93    */
DelayedClientTransport(Executor defaultAppExecutor, ChannelExecutor channelExecutor)94   DelayedClientTransport(Executor defaultAppExecutor, ChannelExecutor channelExecutor) {
95     this.defaultAppExecutor = defaultAppExecutor;
96     this.channelExecutor = channelExecutor;
97   }
98 
99   @Override
start(final Listener listener)100   public final Runnable start(final Listener listener) {
101     this.listener = listener;
102     reportTransportInUse = new Runnable() {
103         @Override
104         public void run() {
105           listener.transportInUse(true);
106         }
107       };
108     reportTransportNotInUse = new Runnable() {
109         @Override
110         public void run() {
111           listener.transportInUse(false);
112         }
113       };
114     reportTransportTerminated = new Runnable() {
115         @Override
116         public void run() {
117           listener.transportTerminated();
118         }
119       };
120     return null;
121   }
122 
123   /**
124    * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
125    * picker will be consulted.
126    *
127    * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
128    * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
129    */
130   @Override
newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions)131   public final ClientStream newStream(
132       MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
133     try {
134       SubchannelPicker picker;
135       PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
136       long pickerVersion = -1;
137       synchronized (lock) {
138         if (shutdownStatus == null) {
139           if (lastPicker == null) {
140             return createPendingStream(args);
141           }
142           picker = lastPicker;
143           pickerVersion = lastPickerVersion;
144         } else {
145           return new FailingClientStream(shutdownStatus);
146         }
147       }
148       while (true) {
149         PickResult pickResult = picker.pickSubchannel(args);
150         ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
151             callOptions.isWaitForReady());
152         if (transport != null) {
153           return transport.newStream(
154               args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
155         }
156         // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
157         // race with reprocess()), we will buffer it.  Otherwise, will try with the new picker.
158         synchronized (lock) {
159           if (shutdownStatus != null) {
160             return new FailingClientStream(shutdownStatus);
161           }
162           if (pickerVersion == lastPickerVersion) {
163             return createPendingStream(args);
164           }
165           picker = lastPicker;
166           pickerVersion = lastPickerVersion;
167         }
168       }
169     } finally {
170       channelExecutor.drain();
171     }
172   }
173 
174   /**
175    * Caller must call {@code channelExecutor.drain()} outside of lock because this method may
176    * schedule tasks on channelExecutor.
177    */
178   @GuardedBy("lock")
createPendingStream(PickSubchannelArgs args)179   private PendingStream createPendingStream(PickSubchannelArgs args) {
180     PendingStream pendingStream = new PendingStream(args);
181     pendingStreams.add(pendingStream);
182     if (getPendingStreamsCount() == 1) {
183       channelExecutor.executeLater(reportTransportInUse);
184     }
185     return pendingStream;
186   }
187 
188   @Override
ping(final PingCallback callback, Executor executor)189   public final void ping(final PingCallback callback, Executor executor) {
190     throw new UnsupportedOperationException("This method is not expected to be called");
191   }
192 
193   @Override
getStats()194   public ListenableFuture<SocketStats> getStats() {
195     SettableFuture<SocketStats> ret = SettableFuture.create();
196     ret.set(null);
197     return ret;
198   }
199 
200   /**
201    * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
202    * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
203    * more buffered streams.
204    */
205   @Override
shutdown(final Status status)206   public final void shutdown(final Status status) {
207     synchronized (lock) {
208       if (shutdownStatus != null) {
209         return;
210       }
211       shutdownStatus = status;
212       channelExecutor.executeLater(new Runnable() {
213           @Override
214           public void run() {
215             listener.transportShutdown(status);
216           }
217         });
218       if (!hasPendingStreams() && reportTransportTerminated != null) {
219         channelExecutor.executeLater(reportTransportTerminated);
220         reportTransportTerminated = null;
221       }
222     }
223     channelExecutor.drain();
224   }
225 
226   /**
227    * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
228    * this transport.
229    */
230   @Override
shutdownNow(Status status)231   public final void shutdownNow(Status status) {
232     shutdown(status);
233     Collection<PendingStream> savedPendingStreams;
234     Runnable savedReportTransportTerminated;
235     synchronized (lock) {
236       savedPendingStreams = pendingStreams;
237       savedReportTransportTerminated = reportTransportTerminated;
238       reportTransportTerminated = null;
239       if (!pendingStreams.isEmpty()) {
240         pendingStreams = Collections.<PendingStream>emptyList();
241       }
242     }
243     if (savedReportTransportTerminated != null) {
244       for (PendingStream stream : savedPendingStreams) {
245         stream.cancel(status);
246       }
247       channelExecutor.executeLater(savedReportTransportTerminated).drain();
248     }
249     // If savedReportTransportTerminated == null, transportTerminated() has already been called in
250     // shutdown().
251   }
252 
hasPendingStreams()253   public final boolean hasPendingStreams() {
254     synchronized (lock) {
255       return !pendingStreams.isEmpty();
256     }
257   }
258 
259   @VisibleForTesting
getPendingStreamsCount()260   final int getPendingStreamsCount() {
261     synchronized (lock) {
262       return pendingStreams.size();
263     }
264   }
265 
266   /**
267    * Use the picker to try picking a transport for every pending stream, proceed the stream if the
268    * pick is successful, otherwise keep it pending.
269    *
270    * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
271    * streams will be served by the latest picker (if a same picker is given more than once, they are
272    * considered different pickers) as soon as possible.
273    *
274    * <p>This method <strong>must not</strong> be called concurrently with itself.
275    */
reprocess(@ullable SubchannelPicker picker)276   final void reprocess(@Nullable SubchannelPicker picker) {
277     ArrayList<PendingStream> toProcess;
278     synchronized (lock) {
279       lastPicker = picker;
280       lastPickerVersion++;
281       if (picker == null || !hasPendingStreams()) {
282         return;
283       }
284       toProcess = new ArrayList<>(pendingStreams);
285     }
286     ArrayList<PendingStream> toRemove = new ArrayList<>();
287 
288     for (final PendingStream stream : toProcess) {
289       PickResult pickResult = picker.pickSubchannel(stream.args);
290       CallOptions callOptions = stream.args.getCallOptions();
291       final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
292           callOptions.isWaitForReady());
293       if (transport != null) {
294         Executor executor = defaultAppExecutor;
295         // createRealStream may be expensive. It will start real streams on the transport. If
296         // there are pending requests, they will be serialized too, which may be expensive. Since
297         // we are now on transport thread, we need to offload the work to an executor.
298         if (callOptions.getExecutor() != null) {
299           executor = callOptions.getExecutor();
300         }
301         executor.execute(new Runnable() {
302             @Override
303             public void run() {
304               stream.createRealStream(transport);
305             }
306           });
307         toRemove.add(stream);
308       }  // else: stay pending
309     }
310 
311     synchronized (lock) {
312       // Between this synchronized and the previous one:
313       //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
314       //   - shutdown() may be called, which may turn pendingStreams into null.
315       if (!hasPendingStreams()) {
316         return;
317       }
318       pendingStreams.removeAll(toRemove);
319       // Because delayed transport is long-lived, we take this opportunity to down-size the
320       // hashmap.
321       if (pendingStreams.isEmpty()) {
322         pendingStreams = new LinkedHashSet<PendingStream>();
323       }
324       if (!hasPendingStreams()) {
325         // There may be a brief gap between delayed transport clearing in-use state, and first real
326         // transport starting streams and setting in-use state.  During the gap the whole channel's
327         // in-use state may be false. However, it shouldn't cause spurious switching to idleness
328         // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
329         // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
330         channelExecutor.executeLater(reportTransportNotInUse);
331         if (shutdownStatus != null && reportTransportTerminated != null) {
332           channelExecutor.executeLater(reportTransportTerminated);
333           reportTransportTerminated = null;
334         }
335       }
336     }
337     channelExecutor.drain();
338   }
339 
340   // TODO(carl-mastrangelo): remove this once the Subchannel change is in.
341   @Override
getLogId()342   public InternalLogId getLogId() {
343     return lodId;
344   }
345 
346   private class PendingStream extends DelayedStream {
347     private final PickSubchannelArgs args;
348     private final Context context = Context.current();
349 
PendingStream(PickSubchannelArgs args)350     private PendingStream(PickSubchannelArgs args) {
351       this.args = args;
352     }
353 
createRealStream(ClientTransport transport)354     private void createRealStream(ClientTransport transport) {
355       ClientStream realStream;
356       Context origContext = context.attach();
357       try {
358         realStream = transport.newStream(
359             args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
360       } finally {
361         context.detach(origContext);
362       }
363       setStream(realStream);
364     }
365 
366     @Override
cancel(Status reason)367     public void cancel(Status reason) {
368       super.cancel(reason);
369       synchronized (lock) {
370         if (reportTransportTerminated != null) {
371           boolean justRemovedAnElement = pendingStreams.remove(this);
372           if (!hasPendingStreams() && justRemovedAnElement) {
373             channelExecutor.executeLater(reportTransportNotInUse);
374             if (shutdownStatus != null) {
375               channelExecutor.executeLater(reportTransportTerminated);
376               reportTransportTerminated = null;
377             }
378           }
379         }
380       }
381       channelExecutor.drain();
382     }
383   }
384 }
385