1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.SettableFuture; 22 import io.grpc.CallOptions; 23 import io.grpc.ClientStreamTracer; 24 import io.grpc.Context; 25 import io.grpc.InternalChannelz.SocketStats; 26 import io.grpc.InternalLogId; 27 import io.grpc.LoadBalancer.PickResult; 28 import io.grpc.LoadBalancer.PickSubchannelArgs; 29 import io.grpc.LoadBalancer.SubchannelPicker; 30 import io.grpc.Metadata; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.Status; 33 import io.grpc.SynchronizationContext; 34 import io.grpc.internal.ClientStreamListener.RpcProgress; 35 import java.util.ArrayList; 36 import java.util.Collection; 37 import java.util.Collections; 38 import java.util.LinkedHashSet; 39 import java.util.concurrent.Executor; 40 import javax.annotation.Nonnull; 41 import javax.annotation.Nullable; 42 import javax.annotation.concurrent.GuardedBy; 43 44 /** 45 * A client transport that queues requests before a real transport is available. When {@link 46 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a 47 * transport for each pending stream. 48 * 49 * <p>This transport owns every stream that it has created until a real transport has been picked 50 * for that stream, at which point the ownership of the stream is transferred to the real transport, 51 * thus the delayed transport stops owning the stream. 52 */ 53 final class DelayedClientTransport implements ManagedClientTransport { 54 // lazily allocated, since it is infrequently used. 55 private final InternalLogId logId = 56 InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null); 57 58 private final Object lock = new Object(); 59 60 private final Executor defaultAppExecutor; 61 private final SynchronizationContext syncContext; 62 63 private Runnable reportTransportInUse; 64 private Runnable reportTransportNotInUse; 65 private Runnable reportTransportTerminated; 66 private Listener listener; 67 68 @Nonnull 69 @GuardedBy("lock") 70 private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); 71 72 /** 73 * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered 74 * terminated. 75 */ 76 @GuardedBy("lock") 77 private Status shutdownStatus; 78 79 /** 80 * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved 81 * to idle. 82 */ 83 @GuardedBy("lock") 84 @Nullable 85 private SubchannelPicker lastPicker; 86 87 @GuardedBy("lock") 88 private long lastPickerVersion; 89 90 /** 91 * Creates a new delayed transport. 92 * 93 * @param defaultAppExecutor pending streams will create real streams and run bufferred operations 94 * in an application executor, which will be this executor, unless there is on provided in 95 * {@link CallOptions}. 96 * @param syncContext all listener callbacks of the delayed transport will be run from this 97 * SynchronizationContext. 98 */ DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext)99 DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) { 100 this.defaultAppExecutor = defaultAppExecutor; 101 this.syncContext = syncContext; 102 } 103 104 @Override start(final Listener listener)105 public final Runnable start(final Listener listener) { 106 this.listener = listener; 107 reportTransportInUse = new Runnable() { 108 @Override 109 public void run() { 110 listener.transportInUse(true); 111 } 112 }; 113 reportTransportNotInUse = new Runnable() { 114 @Override 115 public void run() { 116 listener.transportInUse(false); 117 } 118 }; 119 reportTransportTerminated = new Runnable() { 120 @Override 121 public void run() { 122 listener.transportTerminated(); 123 } 124 }; 125 return null; 126 } 127 128 /** 129 * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last 130 * picker will be consulted. 131 * 132 * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is 133 * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned. 134 */ 135 @Override newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)136 public final ClientStream newStream( 137 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, 138 ClientStreamTracer[] tracers) { 139 try { 140 PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); 141 SubchannelPicker picker = null; 142 long pickerVersion = -1; 143 while (true) { 144 synchronized (lock) { 145 if (shutdownStatus != null) { 146 return new FailingClientStream(shutdownStatus, tracers); 147 } 148 if (lastPicker == null) { 149 return createPendingStream(args, tracers); 150 } 151 // Check for second time through the loop, and whether anything changed 152 if (picker != null && pickerVersion == lastPickerVersion) { 153 return createPendingStream(args, tracers); 154 } 155 picker = lastPicker; 156 pickerVersion = lastPickerVersion; 157 } 158 PickResult pickResult = picker.pickSubchannel(args); 159 ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, 160 callOptions.isWaitForReady()); 161 if (transport != null) { 162 return transport.newStream( 163 args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(), 164 tracers); 165 } 166 // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible 167 // race with reprocess()), we will buffer it. Otherwise, will try with the new picker. 168 } 169 } finally { 170 syncContext.drain(); 171 } 172 } 173 174 /** 175 * Caller must call {@code syncContext.drain()} outside of lock because this method may 176 * schedule tasks on syncContext. 177 */ 178 @GuardedBy("lock") createPendingStream( PickSubchannelArgs args, ClientStreamTracer[] tracers)179 private PendingStream createPendingStream( 180 PickSubchannelArgs args, ClientStreamTracer[] tracers) { 181 PendingStream pendingStream = new PendingStream(args, tracers); 182 pendingStreams.add(pendingStream); 183 if (getPendingStreamsCount() == 1) { 184 syncContext.executeLater(reportTransportInUse); 185 } 186 for (ClientStreamTracer streamTracer : tracers) { 187 streamTracer.createPendingStream(); 188 } 189 return pendingStream; 190 } 191 192 @Override ping(final PingCallback callback, Executor executor)193 public final void ping(final PingCallback callback, Executor executor) { 194 throw new UnsupportedOperationException("This method is not expected to be called"); 195 } 196 197 @Override getStats()198 public ListenableFuture<SocketStats> getStats() { 199 SettableFuture<SocketStats> ret = SettableFuture.create(); 200 ret.set(null); 201 return ret; 202 } 203 204 /** 205 * Prevents creating any new streams. Buffered streams are not failed and may still proceed 206 * when {@link #reprocess} is called. The delayed transport will be terminated when there is no 207 * more buffered streams. 208 */ 209 @Override shutdown(final Status status)210 public final void shutdown(final Status status) { 211 synchronized (lock) { 212 if (shutdownStatus != null) { 213 return; 214 } 215 shutdownStatus = status; 216 syncContext.executeLater(new Runnable() { 217 @Override 218 public void run() { 219 listener.transportShutdown(status); 220 } 221 }); 222 if (!hasPendingStreams() && reportTransportTerminated != null) { 223 syncContext.executeLater(reportTransportTerminated); 224 reportTransportTerminated = null; 225 } 226 } 227 syncContext.drain(); 228 } 229 230 /** 231 * Shuts down this transport and cancels all streams that it owns, hence immediately terminates 232 * this transport. 233 */ 234 @Override shutdownNow(Status status)235 public final void shutdownNow(Status status) { 236 shutdown(status); 237 Collection<PendingStream> savedPendingStreams; 238 Runnable savedReportTransportTerminated; 239 synchronized (lock) { 240 savedPendingStreams = pendingStreams; 241 savedReportTransportTerminated = reportTransportTerminated; 242 reportTransportTerminated = null; 243 if (!pendingStreams.isEmpty()) { 244 pendingStreams = Collections.emptyList(); 245 } 246 } 247 if (savedReportTransportTerminated != null) { 248 for (PendingStream stream : savedPendingStreams) { 249 Runnable runnable = stream.setStream( 250 new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers)); 251 if (runnable != null) { 252 // Drain in-line instead of using an executor as failing stream just throws everything 253 // away. This is essentially the same behavior as DelayedStream.cancel() but can be done 254 // before stream.start(). 255 runnable.run(); 256 } 257 } 258 syncContext.execute(savedReportTransportTerminated); 259 } 260 // If savedReportTransportTerminated == null, transportTerminated() has already been called in 261 // shutdown(). 262 } 263 hasPendingStreams()264 public final boolean hasPendingStreams() { 265 synchronized (lock) { 266 return !pendingStreams.isEmpty(); 267 } 268 } 269 270 @VisibleForTesting getPendingStreamsCount()271 final int getPendingStreamsCount() { 272 synchronized (lock) { 273 return pendingStreams.size(); 274 } 275 } 276 277 /** 278 * Use the picker to try picking a transport for every pending stream, proceed the stream if the 279 * pick is successful, otherwise keep it pending. 280 * 281 * <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending 282 * streams will be served by the latest picker (if a same picker is given more than once, they are 283 * considered different pickers) as soon as possible. 284 * 285 * <p>This method <strong>must not</strong> be called concurrently with itself. 286 */ reprocess(@ullable SubchannelPicker picker)287 final void reprocess(@Nullable SubchannelPicker picker) { 288 ArrayList<PendingStream> toProcess; 289 synchronized (lock) { 290 lastPicker = picker; 291 lastPickerVersion++; 292 if (picker == null || !hasPendingStreams()) { 293 return; 294 } 295 toProcess = new ArrayList<>(pendingStreams); 296 } 297 ArrayList<PendingStream> toRemove = new ArrayList<>(); 298 299 for (final PendingStream stream : toProcess) { 300 PickResult pickResult = picker.pickSubchannel(stream.args); 301 CallOptions callOptions = stream.args.getCallOptions(); 302 final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, 303 callOptions.isWaitForReady()); 304 if (transport != null) { 305 Executor executor = defaultAppExecutor; 306 // createRealStream may be expensive. It will start real streams on the transport. If 307 // there are pending requests, they will be serialized too, which may be expensive. Since 308 // we are now on transport thread, we need to offload the work to an executor. 309 if (callOptions.getExecutor() != null) { 310 executor = callOptions.getExecutor(); 311 } 312 Runnable runnable = stream.createRealStream(transport); 313 if (runnable != null) { 314 executor.execute(runnable); 315 } 316 toRemove.add(stream); 317 } // else: stay pending 318 } 319 320 synchronized (lock) { 321 // Between this synchronized and the previous one: 322 // - Streams may have been cancelled, which may turn pendingStreams into emptiness. 323 // - shutdown() may be called, which may turn pendingStreams into null. 324 if (!hasPendingStreams()) { 325 return; 326 } 327 pendingStreams.removeAll(toRemove); 328 // Because delayed transport is long-lived, we take this opportunity to down-size the 329 // hashmap. 330 if (pendingStreams.isEmpty()) { 331 pendingStreams = new LinkedHashSet<>(); 332 } 333 if (!hasPendingStreams()) { 334 // There may be a brief gap between delayed transport clearing in-use state, and first real 335 // transport starting streams and setting in-use state. During the gap the whole channel's 336 // in-use state may be false. However, it shouldn't cause spurious switching to idleness 337 // (which would shutdown the transports and LoadBalancer) because the gap should be shorter 338 // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). 339 syncContext.executeLater(reportTransportNotInUse); 340 if (shutdownStatus != null && reportTransportTerminated != null) { 341 syncContext.executeLater(reportTransportTerminated); 342 reportTransportTerminated = null; 343 } 344 } 345 } 346 syncContext.drain(); 347 } 348 349 @Override getLogId()350 public InternalLogId getLogId() { 351 return logId; 352 } 353 354 private class PendingStream extends DelayedStream { 355 private final PickSubchannelArgs args; 356 private final Context context = Context.current(); 357 private final ClientStreamTracer[] tracers; 358 PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers)359 private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { 360 this.args = args; 361 this.tracers = tracers; 362 } 363 364 /** Runnable may be null. */ createRealStream(ClientTransport transport)365 private Runnable createRealStream(ClientTransport transport) { 366 ClientStream realStream; 367 Context origContext = context.attach(); 368 try { 369 realStream = transport.newStream( 370 args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(), 371 tracers); 372 } finally { 373 context.detach(origContext); 374 } 375 return setStream(realStream); 376 } 377 378 @Override cancel(Status reason)379 public void cancel(Status reason) { 380 super.cancel(reason); 381 synchronized (lock) { 382 if (reportTransportTerminated != null) { 383 boolean justRemovedAnElement = pendingStreams.remove(this); 384 if (!hasPendingStreams() && justRemovedAnElement) { 385 syncContext.executeLater(reportTransportNotInUse); 386 if (shutdownStatus != null) { 387 syncContext.executeLater(reportTransportTerminated); 388 reportTransportTerminated = null; 389 } 390 } 391 } 392 } 393 syncContext.drain(); 394 } 395 396 @Override onEarlyCancellation(Status reason)397 protected void onEarlyCancellation(Status reason) { 398 for (ClientStreamTracer tracer : tracers) { 399 tracer.streamClosed(reason); 400 } 401 } 402 403 @Override appendTimeoutInsight(InsightBuilder insight)404 public void appendTimeoutInsight(InsightBuilder insight) { 405 if (args.getCallOptions().isWaitForReady()) { 406 insight.append("wait_for_ready"); 407 } 408 super.appendTimeoutInsight(insight); 409 } 410 } 411 } 412