1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.SettableFuture; 22 import io.grpc.CallOptions; 23 import io.grpc.Context; 24 import io.grpc.InternalChannelz.SocketStats; 25 import io.grpc.InternalLogId; 26 import io.grpc.LoadBalancer.PickResult; 27 import io.grpc.LoadBalancer.PickSubchannelArgs; 28 import io.grpc.LoadBalancer.SubchannelPicker; 29 import io.grpc.Metadata; 30 import io.grpc.MethodDescriptor; 31 import io.grpc.Status; 32 import java.util.ArrayList; 33 import java.util.Collection; 34 import java.util.Collections; 35 import java.util.LinkedHashSet; 36 import java.util.concurrent.Executor; 37 import javax.annotation.Nonnull; 38 import javax.annotation.Nullable; 39 import javax.annotation.concurrent.GuardedBy; 40 41 /** 42 * A client transport that queues requests before a real transport is available. When {@link 43 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a 44 * transport for each pending stream. 45 * 46 * <p>This transport owns every stream that it has created until a real transport has been picked 47 * for that stream, at which point the ownership of the stream is transferred to the real transport, 48 * thus the delayed transport stops owning the stream. 49 */ 50 final class DelayedClientTransport implements ManagedClientTransport { 51 private final InternalLogId lodId = InternalLogId.allocate(getClass().getName()); 52 53 private final Object lock = new Object(); 54 55 private final Executor defaultAppExecutor; 56 private final ChannelExecutor channelExecutor; 57 58 private Runnable reportTransportInUse; 59 private Runnable reportTransportNotInUse; 60 private Runnable reportTransportTerminated; 61 private Listener listener; 62 63 @Nonnull 64 @GuardedBy("lock") 65 private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>(); 66 67 /** 68 * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered 69 * terminated. 70 */ 71 @GuardedBy("lock") 72 private Status shutdownStatus; 73 74 /** 75 * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved 76 * to idle. 77 */ 78 @GuardedBy("lock") 79 @Nullable 80 private SubchannelPicker lastPicker; 81 82 @GuardedBy("lock") 83 private long lastPickerVersion; 84 85 /** 86 * Creates a new delayed transport. 87 * 88 * @param defaultAppExecutor pending streams will create real streams and run bufferred operations 89 * in an application executor, which will be this executor, unless there is on provided in 90 * {@link CallOptions}. 91 * @param channelExecutor all listener callbacks of the delayed transport will be run from this 92 * ChannelExecutor. 93 */ DelayedClientTransport(Executor defaultAppExecutor, ChannelExecutor channelExecutor)94 DelayedClientTransport(Executor defaultAppExecutor, ChannelExecutor channelExecutor) { 95 this.defaultAppExecutor = defaultAppExecutor; 96 this.channelExecutor = channelExecutor; 97 } 98 99 @Override start(final Listener listener)100 public final Runnable start(final Listener listener) { 101 this.listener = listener; 102 reportTransportInUse = new Runnable() { 103 @Override 104 public void run() { 105 listener.transportInUse(true); 106 } 107 }; 108 reportTransportNotInUse = new Runnable() { 109 @Override 110 public void run() { 111 listener.transportInUse(false); 112 } 113 }; 114 reportTransportTerminated = new Runnable() { 115 @Override 116 public void run() { 117 listener.transportTerminated(); 118 } 119 }; 120 return null; 121 } 122 123 /** 124 * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last 125 * picker will be consulted. 126 * 127 * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is 128 * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned. 129 */ 130 @Override newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions)131 public final ClientStream newStream( 132 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { 133 try { 134 SubchannelPicker picker; 135 PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions); 136 long pickerVersion = -1; 137 synchronized (lock) { 138 if (shutdownStatus == null) { 139 if (lastPicker == null) { 140 return createPendingStream(args); 141 } 142 picker = lastPicker; 143 pickerVersion = lastPickerVersion; 144 } else { 145 return new FailingClientStream(shutdownStatus); 146 } 147 } 148 while (true) { 149 PickResult pickResult = picker.pickSubchannel(args); 150 ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, 151 callOptions.isWaitForReady()); 152 if (transport != null) { 153 return transport.newStream( 154 args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions()); 155 } 156 // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible 157 // race with reprocess()), we will buffer it. Otherwise, will try with the new picker. 158 synchronized (lock) { 159 if (shutdownStatus != null) { 160 return new FailingClientStream(shutdownStatus); 161 } 162 if (pickerVersion == lastPickerVersion) { 163 return createPendingStream(args); 164 } 165 picker = lastPicker; 166 pickerVersion = lastPickerVersion; 167 } 168 } 169 } finally { 170 channelExecutor.drain(); 171 } 172 } 173 174 /** 175 * Caller must call {@code channelExecutor.drain()} outside of lock because this method may 176 * schedule tasks on channelExecutor. 177 */ 178 @GuardedBy("lock") createPendingStream(PickSubchannelArgs args)179 private PendingStream createPendingStream(PickSubchannelArgs args) { 180 PendingStream pendingStream = new PendingStream(args); 181 pendingStreams.add(pendingStream); 182 if (getPendingStreamsCount() == 1) { 183 channelExecutor.executeLater(reportTransportInUse); 184 } 185 return pendingStream; 186 } 187 188 @Override ping(final PingCallback callback, Executor executor)189 public final void ping(final PingCallback callback, Executor executor) { 190 throw new UnsupportedOperationException("This method is not expected to be called"); 191 } 192 193 @Override getStats()194 public ListenableFuture<SocketStats> getStats() { 195 SettableFuture<SocketStats> ret = SettableFuture.create(); 196 ret.set(null); 197 return ret; 198 } 199 200 /** 201 * Prevents creating any new streams. Buffered streams are not failed and may still proceed 202 * when {@link #reprocess} is called. The delayed transport will be terminated when there is no 203 * more buffered streams. 204 */ 205 @Override shutdown(final Status status)206 public final void shutdown(final Status status) { 207 synchronized (lock) { 208 if (shutdownStatus != null) { 209 return; 210 } 211 shutdownStatus = status; 212 channelExecutor.executeLater(new Runnable() { 213 @Override 214 public void run() { 215 listener.transportShutdown(status); 216 } 217 }); 218 if (!hasPendingStreams() && reportTransportTerminated != null) { 219 channelExecutor.executeLater(reportTransportTerminated); 220 reportTransportTerminated = null; 221 } 222 } 223 channelExecutor.drain(); 224 } 225 226 /** 227 * Shuts down this transport and cancels all streams that it owns, hence immediately terminates 228 * this transport. 229 */ 230 @Override shutdownNow(Status status)231 public final void shutdownNow(Status status) { 232 shutdown(status); 233 Collection<PendingStream> savedPendingStreams; 234 Runnable savedReportTransportTerminated; 235 synchronized (lock) { 236 savedPendingStreams = pendingStreams; 237 savedReportTransportTerminated = reportTransportTerminated; 238 reportTransportTerminated = null; 239 if (!pendingStreams.isEmpty()) { 240 pendingStreams = Collections.<PendingStream>emptyList(); 241 } 242 } 243 if (savedReportTransportTerminated != null) { 244 for (PendingStream stream : savedPendingStreams) { 245 stream.cancel(status); 246 } 247 channelExecutor.executeLater(savedReportTransportTerminated).drain(); 248 } 249 // If savedReportTransportTerminated == null, transportTerminated() has already been called in 250 // shutdown(). 251 } 252 hasPendingStreams()253 public final boolean hasPendingStreams() { 254 synchronized (lock) { 255 return !pendingStreams.isEmpty(); 256 } 257 } 258 259 @VisibleForTesting getPendingStreamsCount()260 final int getPendingStreamsCount() { 261 synchronized (lock) { 262 return pendingStreams.size(); 263 } 264 } 265 266 /** 267 * Use the picker to try picking a transport for every pending stream, proceed the stream if the 268 * pick is successful, otherwise keep it pending. 269 * 270 * <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending 271 * streams will be served by the latest picker (if a same picker is given more than once, they are 272 * considered different pickers) as soon as possible. 273 * 274 * <p>This method <strong>must not</strong> be called concurrently with itself. 275 */ reprocess(@ullable SubchannelPicker picker)276 final void reprocess(@Nullable SubchannelPicker picker) { 277 ArrayList<PendingStream> toProcess; 278 synchronized (lock) { 279 lastPicker = picker; 280 lastPickerVersion++; 281 if (picker == null || !hasPendingStreams()) { 282 return; 283 } 284 toProcess = new ArrayList<>(pendingStreams); 285 } 286 ArrayList<PendingStream> toRemove = new ArrayList<>(); 287 288 for (final PendingStream stream : toProcess) { 289 PickResult pickResult = picker.pickSubchannel(stream.args); 290 CallOptions callOptions = stream.args.getCallOptions(); 291 final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, 292 callOptions.isWaitForReady()); 293 if (transport != null) { 294 Executor executor = defaultAppExecutor; 295 // createRealStream may be expensive. It will start real streams on the transport. If 296 // there are pending requests, they will be serialized too, which may be expensive. Since 297 // we are now on transport thread, we need to offload the work to an executor. 298 if (callOptions.getExecutor() != null) { 299 executor = callOptions.getExecutor(); 300 } 301 executor.execute(new Runnable() { 302 @Override 303 public void run() { 304 stream.createRealStream(transport); 305 } 306 }); 307 toRemove.add(stream); 308 } // else: stay pending 309 } 310 311 synchronized (lock) { 312 // Between this synchronized and the previous one: 313 // - Streams may have been cancelled, which may turn pendingStreams into emptiness. 314 // - shutdown() may be called, which may turn pendingStreams into null. 315 if (!hasPendingStreams()) { 316 return; 317 } 318 pendingStreams.removeAll(toRemove); 319 // Because delayed transport is long-lived, we take this opportunity to down-size the 320 // hashmap. 321 if (pendingStreams.isEmpty()) { 322 pendingStreams = new LinkedHashSet<PendingStream>(); 323 } 324 if (!hasPendingStreams()) { 325 // There may be a brief gap between delayed transport clearing in-use state, and first real 326 // transport starting streams and setting in-use state. During the gap the whole channel's 327 // in-use state may be false. However, it shouldn't cause spurious switching to idleness 328 // (which would shutdown the transports and LoadBalancer) because the gap should be shorter 329 // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). 330 channelExecutor.executeLater(reportTransportNotInUse); 331 if (shutdownStatus != null && reportTransportTerminated != null) { 332 channelExecutor.executeLater(reportTransportTerminated); 333 reportTransportTerminated = null; 334 } 335 } 336 } 337 channelExecutor.drain(); 338 } 339 340 // TODO(carl-mastrangelo): remove this once the Subchannel change is in. 341 @Override getLogId()342 public InternalLogId getLogId() { 343 return lodId; 344 } 345 346 private class PendingStream extends DelayedStream { 347 private final PickSubchannelArgs args; 348 private final Context context = Context.current(); 349 PendingStream(PickSubchannelArgs args)350 private PendingStream(PickSubchannelArgs args) { 351 this.args = args; 352 } 353 createRealStream(ClientTransport transport)354 private void createRealStream(ClientTransport transport) { 355 ClientStream realStream; 356 Context origContext = context.attach(); 357 try { 358 realStream = transport.newStream( 359 args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions()); 360 } finally { 361 context.detach(origContext); 362 } 363 setStream(realStream); 364 } 365 366 @Override cancel(Status reason)367 public void cancel(Status reason) { 368 super.cancel(reason); 369 synchronized (lock) { 370 if (reportTransportTerminated != null) { 371 boolean justRemovedAnElement = pendingStreams.remove(this); 372 if (!hasPendingStreams() && justRemovedAnElement) { 373 channelExecutor.executeLater(reportTransportNotInUse); 374 if (shutdownStatus != null) { 375 channelExecutor.executeLater(reportTransportTerminated); 376 reportTransportTerminated = null; 377 } 378 } 379 } 380 } 381 channelExecutor.drain(); 382 } 383 } 384 } 385