• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import io.grpc.CallOptions;
23 import io.grpc.ClientStreamTracer;
24 import io.grpc.Context;
25 import io.grpc.InternalChannelz.SocketStats;
26 import io.grpc.InternalLogId;
27 import io.grpc.LoadBalancer.PickResult;
28 import io.grpc.LoadBalancer.PickSubchannelArgs;
29 import io.grpc.LoadBalancer.SubchannelPicker;
30 import io.grpc.Metadata;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.Status;
33 import io.grpc.SynchronizationContext;
34 import io.grpc.internal.ClientStreamListener.RpcProgress;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.LinkedHashSet;
39 import java.util.concurrent.Executor;
40 import javax.annotation.Nonnull;
41 import javax.annotation.Nullable;
42 import javax.annotation.concurrent.GuardedBy;
43 
44 /**
45  * A client transport that queues requests before a real transport is available. When {@link
46  * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47  * transport for each pending stream.
48  *
49  * <p>This transport owns every stream that it has created until a real transport has been picked
50  * for that stream, at which point the ownership of the stream is transferred to the real transport,
51  * thus the delayed transport stops owning the stream.
52  */
53 final class DelayedClientTransport implements ManagedClientTransport {
54   // lazily allocated, since it is infrequently used.
55   private final InternalLogId logId =
56       InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
57 
58   private final Object lock = new Object();
59 
60   private final Executor defaultAppExecutor;
61   private final SynchronizationContext syncContext;
62 
63   private Runnable reportTransportInUse;
64   private Runnable reportTransportNotInUse;
65   private Runnable reportTransportTerminated;
66   private Listener listener;
67 
68   @Nonnull
69   @GuardedBy("lock")
70   private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71 
72   /**
73    * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74    * terminated.
75    */
76   @GuardedBy("lock")
77   private Status shutdownStatus;
78 
79   /**
80    * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81    * to idle.
82    */
83   @GuardedBy("lock")
84   @Nullable
85   private SubchannelPicker lastPicker;
86 
87   @GuardedBy("lock")
88   private long lastPickerVersion;
89 
90   /**
91    * Creates a new delayed transport.
92    *
93    * @param defaultAppExecutor pending streams will create real streams and run bufferred operations
94    *        in an application executor, which will be this executor, unless there is on provided in
95    *        {@link CallOptions}.
96    * @param syncContext all listener callbacks of the delayed transport will be run from this
97    *        SynchronizationContext.
98    */
DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext)99   DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
100     this.defaultAppExecutor = defaultAppExecutor;
101     this.syncContext = syncContext;
102   }
103 
104   @Override
start(final Listener listener)105   public final Runnable start(final Listener listener) {
106     this.listener = listener;
107     reportTransportInUse = new Runnable() {
108         @Override
109         public void run() {
110           listener.transportInUse(true);
111         }
112       };
113     reportTransportNotInUse = new Runnable() {
114         @Override
115         public void run() {
116           listener.transportInUse(false);
117         }
118       };
119     reportTransportTerminated = new Runnable() {
120         @Override
121         public void run() {
122           listener.transportTerminated();
123         }
124       };
125     return null;
126   }
127 
128   /**
129    * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
130    * picker will be consulted.
131    *
132    * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
133    * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
134    */
135   @Override
newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)136   public final ClientStream newStream(
137       MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
138       ClientStreamTracer[] tracers) {
139     try {
140       PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
141       SubchannelPicker picker = null;
142       long pickerVersion = -1;
143       while (true) {
144         synchronized (lock) {
145           if (shutdownStatus != null) {
146             return new FailingClientStream(shutdownStatus, tracers);
147           }
148           if (lastPicker == null) {
149             return createPendingStream(args, tracers);
150           }
151           // Check for second time through the loop, and whether anything changed
152           if (picker != null && pickerVersion == lastPickerVersion) {
153             return createPendingStream(args, tracers);
154           }
155           picker = lastPicker;
156           pickerVersion = lastPickerVersion;
157         }
158         PickResult pickResult = picker.pickSubchannel(args);
159         ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
160             callOptions.isWaitForReady());
161         if (transport != null) {
162           return transport.newStream(
163               args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
164               tracers);
165         }
166         // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
167         // race with reprocess()), we will buffer it.  Otherwise, will try with the new picker.
168       }
169     } finally {
170       syncContext.drain();
171     }
172   }
173 
174   /**
175    * Caller must call {@code syncContext.drain()} outside of lock because this method may
176    * schedule tasks on syncContext.
177    */
178   @GuardedBy("lock")
createPendingStream( PickSubchannelArgs args, ClientStreamTracer[] tracers)179   private PendingStream createPendingStream(
180       PickSubchannelArgs args, ClientStreamTracer[] tracers) {
181     PendingStream pendingStream = new PendingStream(args, tracers);
182     pendingStreams.add(pendingStream);
183     if (getPendingStreamsCount() == 1) {
184       syncContext.executeLater(reportTransportInUse);
185     }
186     for (ClientStreamTracer streamTracer : tracers) {
187       streamTracer.createPendingStream();
188     }
189     return pendingStream;
190   }
191 
192   @Override
ping(final PingCallback callback, Executor executor)193   public final void ping(final PingCallback callback, Executor executor) {
194     throw new UnsupportedOperationException("This method is not expected to be called");
195   }
196 
197   @Override
getStats()198   public ListenableFuture<SocketStats> getStats() {
199     SettableFuture<SocketStats> ret = SettableFuture.create();
200     ret.set(null);
201     return ret;
202   }
203 
204   /**
205    * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
206    * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
207    * more buffered streams.
208    */
209   @Override
shutdown(final Status status)210   public final void shutdown(final Status status) {
211     synchronized (lock) {
212       if (shutdownStatus != null) {
213         return;
214       }
215       shutdownStatus = status;
216       syncContext.executeLater(new Runnable() {
217           @Override
218           public void run() {
219             listener.transportShutdown(status);
220           }
221         });
222       if (!hasPendingStreams() && reportTransportTerminated != null) {
223         syncContext.executeLater(reportTransportTerminated);
224         reportTransportTerminated = null;
225       }
226     }
227     syncContext.drain();
228   }
229 
230   /**
231    * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
232    * this transport.
233    */
234   @Override
shutdownNow(Status status)235   public final void shutdownNow(Status status) {
236     shutdown(status);
237     Collection<PendingStream> savedPendingStreams;
238     Runnable savedReportTransportTerminated;
239     synchronized (lock) {
240       savedPendingStreams = pendingStreams;
241       savedReportTransportTerminated = reportTransportTerminated;
242       reportTransportTerminated = null;
243       if (!pendingStreams.isEmpty()) {
244         pendingStreams = Collections.emptyList();
245       }
246     }
247     if (savedReportTransportTerminated != null) {
248       for (PendingStream stream : savedPendingStreams) {
249         Runnable runnable = stream.setStream(
250             new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
251         if (runnable != null) {
252           // Drain in-line instead of using an executor as failing stream just throws everything
253           // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
254           // before stream.start().
255           runnable.run();
256         }
257       }
258       syncContext.execute(savedReportTransportTerminated);
259     }
260     // If savedReportTransportTerminated == null, transportTerminated() has already been called in
261     // shutdown().
262   }
263 
hasPendingStreams()264   public final boolean hasPendingStreams() {
265     synchronized (lock) {
266       return !pendingStreams.isEmpty();
267     }
268   }
269 
270   @VisibleForTesting
getPendingStreamsCount()271   final int getPendingStreamsCount() {
272     synchronized (lock) {
273       return pendingStreams.size();
274     }
275   }
276 
277   /**
278    * Use the picker to try picking a transport for every pending stream, proceed the stream if the
279    * pick is successful, otherwise keep it pending.
280    *
281    * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
282    * streams will be served by the latest picker (if a same picker is given more than once, they are
283    * considered different pickers) as soon as possible.
284    *
285    * <p>This method <strong>must not</strong> be called concurrently with itself.
286    */
reprocess(@ullable SubchannelPicker picker)287   final void reprocess(@Nullable SubchannelPicker picker) {
288     ArrayList<PendingStream> toProcess;
289     synchronized (lock) {
290       lastPicker = picker;
291       lastPickerVersion++;
292       if (picker == null || !hasPendingStreams()) {
293         return;
294       }
295       toProcess = new ArrayList<>(pendingStreams);
296     }
297     ArrayList<PendingStream> toRemove = new ArrayList<>();
298 
299     for (final PendingStream stream : toProcess) {
300       PickResult pickResult = picker.pickSubchannel(stream.args);
301       CallOptions callOptions = stream.args.getCallOptions();
302       final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
303           callOptions.isWaitForReady());
304       if (transport != null) {
305         Executor executor = defaultAppExecutor;
306         // createRealStream may be expensive. It will start real streams on the transport. If
307         // there are pending requests, they will be serialized too, which may be expensive. Since
308         // we are now on transport thread, we need to offload the work to an executor.
309         if (callOptions.getExecutor() != null) {
310           executor = callOptions.getExecutor();
311         }
312         Runnable runnable = stream.createRealStream(transport);
313         if (runnable != null) {
314           executor.execute(runnable);
315         }
316         toRemove.add(stream);
317       }  // else: stay pending
318     }
319 
320     synchronized (lock) {
321       // Between this synchronized and the previous one:
322       //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
323       //   - shutdown() may be called, which may turn pendingStreams into null.
324       if (!hasPendingStreams()) {
325         return;
326       }
327       pendingStreams.removeAll(toRemove);
328       // Because delayed transport is long-lived, we take this opportunity to down-size the
329       // hashmap.
330       if (pendingStreams.isEmpty()) {
331         pendingStreams = new LinkedHashSet<>();
332       }
333       if (!hasPendingStreams()) {
334         // There may be a brief gap between delayed transport clearing in-use state, and first real
335         // transport starting streams and setting in-use state.  During the gap the whole channel's
336         // in-use state may be false. However, it shouldn't cause spurious switching to idleness
337         // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
338         // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
339         syncContext.executeLater(reportTransportNotInUse);
340         if (shutdownStatus != null && reportTransportTerminated != null) {
341           syncContext.executeLater(reportTransportTerminated);
342           reportTransportTerminated = null;
343         }
344       }
345     }
346     syncContext.drain();
347   }
348 
349   @Override
getLogId()350   public InternalLogId getLogId() {
351     return logId;
352   }
353 
354   private class PendingStream extends DelayedStream {
355     private final PickSubchannelArgs args;
356     private final Context context = Context.current();
357     private final ClientStreamTracer[] tracers;
358 
PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers)359     private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
360       this.args = args;
361       this.tracers = tracers;
362     }
363 
364     /** Runnable may be null. */
createRealStream(ClientTransport transport)365     private Runnable createRealStream(ClientTransport transport) {
366       ClientStream realStream;
367       Context origContext = context.attach();
368       try {
369         realStream = transport.newStream(
370             args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
371             tracers);
372       } finally {
373         context.detach(origContext);
374       }
375       return setStream(realStream);
376     }
377 
378     @Override
cancel(Status reason)379     public void cancel(Status reason) {
380       super.cancel(reason);
381       synchronized (lock) {
382         if (reportTransportTerminated != null) {
383           boolean justRemovedAnElement = pendingStreams.remove(this);
384           if (!hasPendingStreams() && justRemovedAnElement) {
385             syncContext.executeLater(reportTransportNotInUse);
386             if (shutdownStatus != null) {
387               syncContext.executeLater(reportTransportTerminated);
388               reportTransportTerminated = null;
389             }
390           }
391         }
392       }
393       syncContext.drain();
394     }
395 
396     @Override
onEarlyCancellation(Status reason)397     protected void onEarlyCancellation(Status reason) {
398       for (ClientStreamTracer tracer : tracers) {
399         tracer.streamClosed(reason);
400       }
401     }
402 
403     @Override
appendTimeoutInsight(InsightBuilder insight)404     public void appendTimeoutInsight(InsightBuilder insight) {
405       if (args.getCallOptions().isWaitForReady()) {
406         insight.append("wait_for_ready");
407       }
408       super.appendTimeoutInsight(insight);
409     }
410   }
411 }
412