1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import io.grpc.Attributes; 24 import io.grpc.Compressor; 25 import io.grpc.Deadline; 26 import io.grpc.DecompressorRegistry; 27 import io.grpc.Metadata; 28 import io.grpc.Status; 29 import java.io.InputStream; 30 import java.util.ArrayList; 31 import java.util.List; 32 import javax.annotation.concurrent.GuardedBy; 33 34 /** 35 * A stream that queues requests before the transport is available, and delegates to a real stream 36 * implementation when the transport is available. 37 * 38 * <p>{@code ClientStream} itself doesn't require thread-safety. However, the state of {@code 39 * DelayedStream} may be internally altered by different threads, thus internal synchronization is 40 * necessary. 41 */ 42 class DelayedStream implements ClientStream { 43 /** {@code true} once realStream is valid and all pending calls have been drained. */ 44 private volatile boolean passThrough; 45 /** 46 * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate 47 * order, but also used if an error occurrs before {@code realStream} is set. 48 */ 49 private ClientStreamListener listener; 50 /** Must hold {@code this} lock when setting. */ 51 private ClientStream realStream; 52 @GuardedBy("this") 53 private Status error; 54 @GuardedBy("this") 55 private List<Runnable> pendingCalls = new ArrayList<>(); 56 @GuardedBy("this") 57 private DelayedStreamListener delayedListener; 58 59 @Override setMaxInboundMessageSize(final int maxSize)60 public void setMaxInboundMessageSize(final int maxSize) { 61 if (passThrough) { 62 realStream.setMaxInboundMessageSize(maxSize); 63 } else { 64 delayOrExecute(new Runnable() { 65 @Override 66 public void run() { 67 realStream.setMaxInboundMessageSize(maxSize); 68 } 69 }); 70 } 71 } 72 73 @Override setMaxOutboundMessageSize(final int maxSize)74 public void setMaxOutboundMessageSize(final int maxSize) { 75 if (passThrough) { 76 realStream.setMaxOutboundMessageSize(maxSize); 77 } else { 78 delayOrExecute(new Runnable() { 79 @Override 80 public void run() { 81 realStream.setMaxOutboundMessageSize(maxSize); 82 } 83 }); 84 } 85 } 86 87 @Override setDeadline(final Deadline deadline)88 public void setDeadline(final Deadline deadline) { 89 delayOrExecute(new Runnable() { 90 @Override 91 public void run() { 92 realStream.setDeadline(deadline); 93 } 94 }); 95 } 96 97 /** 98 * Transfers all pending and future requests and mutations to the given stream. 99 * 100 * <p>No-op if either this method or {@link #cancel} have already been called. 101 */ 102 // When this method returns, passThrough is guaranteed to be true setStream(ClientStream stream)103 final void setStream(ClientStream stream) { 104 synchronized (this) { 105 // If realStream != null, then either setStream() or cancel() has been called. 106 if (realStream != null) { 107 return; 108 } 109 realStream = checkNotNull(stream, "stream"); 110 } 111 112 drainPendingCalls(); 113 } 114 115 /** 116 * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called 117 * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock 118 * should not be held when calling this method. 119 */ drainPendingCalls()120 private void drainPendingCalls() { 121 assert realStream != null; 122 assert !passThrough; 123 List<Runnable> toRun = new ArrayList<>(); 124 DelayedStreamListener delayedListener = null; 125 while (true) { 126 synchronized (this) { 127 if (pendingCalls.isEmpty()) { 128 pendingCalls = null; 129 passThrough = true; 130 delayedListener = this.delayedListener; 131 break; 132 } 133 // Since there were pendingCalls, we need to process them. To maintain ordering we can't set 134 // passThrough=true until we run all pendingCalls, but new Runnables may be added after we 135 // drop the lock. So we will have to re-check pendingCalls. 136 List<Runnable> tmp = toRun; 137 toRun = pendingCalls; 138 pendingCalls = tmp; 139 } 140 for (Runnable runnable : toRun) { 141 // Must not call transport while lock is held to prevent deadlocks. 142 // TODO(ejona): exception handling 143 runnable.run(); 144 } 145 toRun.clear(); 146 } 147 if (delayedListener != null) { 148 delayedListener.drainPendingCallbacks(); 149 } 150 } 151 152 /** 153 * Enqueue the runnable or execute it now. Call sites that may be called many times may want avoid 154 * this method if {@code passThrough == true}. 155 * 156 * <p>Note that this method is no more thread-safe than {@code runnable}. It is thread-safe if and 157 * only if {@code runnable} is thread-safe. 158 */ delayOrExecute(Runnable runnable)159 private void delayOrExecute(Runnable runnable) { 160 synchronized (this) { 161 if (!passThrough) { 162 pendingCalls.add(runnable); 163 return; 164 } 165 } 166 runnable.run(); 167 } 168 169 @Override setAuthority(final String authority)170 public void setAuthority(final String authority) { 171 checkState(listener == null, "May only be called before start"); 172 checkNotNull(authority, "authority"); 173 delayOrExecute(new Runnable() { 174 @Override 175 public void run() { 176 realStream.setAuthority(authority); 177 } 178 }); 179 } 180 181 @Override start(ClientStreamListener listener)182 public void start(ClientStreamListener listener) { 183 checkState(this.listener == null, "already started"); 184 185 Status savedError; 186 boolean savedPassThrough; 187 synchronized (this) { 188 this.listener = checkNotNull(listener, "listener"); 189 // If error != null, then cancel() has been called and was unable to close the listener 190 savedError = error; 191 savedPassThrough = passThrough; 192 if (!savedPassThrough) { 193 listener = delayedListener = new DelayedStreamListener(listener); 194 } 195 } 196 if (savedError != null) { 197 listener.closed(savedError, new Metadata()); 198 return; 199 } 200 201 if (savedPassThrough) { 202 realStream.start(listener); 203 } else { 204 final ClientStreamListener finalListener = listener; 205 delayOrExecute(new Runnable() { 206 @Override 207 public void run() { 208 realStream.start(finalListener); 209 } 210 }); 211 } 212 } 213 214 @Override getAttributes()215 public Attributes getAttributes() { 216 checkState(passThrough, "Called getAttributes before attributes are ready"); 217 return realStream.getAttributes(); 218 } 219 220 @Override writeMessage(final InputStream message)221 public void writeMessage(final InputStream message) { 222 checkNotNull(message, "message"); 223 if (passThrough) { 224 realStream.writeMessage(message); 225 } else { 226 delayOrExecute(new Runnable() { 227 @Override 228 public void run() { 229 realStream.writeMessage(message); 230 } 231 }); 232 } 233 } 234 235 @Override flush()236 public void flush() { 237 if (passThrough) { 238 realStream.flush(); 239 } else { 240 delayOrExecute(new Runnable() { 241 @Override 242 public void run() { 243 realStream.flush(); 244 } 245 }); 246 } 247 } 248 249 // When this method returns, passThrough is guaranteed to be true 250 @Override cancel(final Status reason)251 public void cancel(final Status reason) { 252 checkNotNull(reason, "reason"); 253 boolean delegateToRealStream = true; 254 ClientStreamListener listenerToClose = null; 255 synchronized (this) { 256 // If realStream != null, then either setStream() or cancel() has been called 257 if (realStream == null) { 258 realStream = NoopClientStream.INSTANCE; 259 delegateToRealStream = false; 260 261 // If listener == null, then start() will later call listener with 'error' 262 listenerToClose = listener; 263 error = reason; 264 } 265 } 266 if (delegateToRealStream) { 267 delayOrExecute(new Runnable() { 268 @Override 269 public void run() { 270 realStream.cancel(reason); 271 } 272 }); 273 } else { 274 if (listenerToClose != null) { 275 listenerToClose.closed(reason, new Metadata()); 276 } 277 drainPendingCalls(); 278 } 279 } 280 281 @Override halfClose()282 public void halfClose() { 283 delayOrExecute(new Runnable() { 284 @Override 285 public void run() { 286 realStream.halfClose(); 287 } 288 }); 289 } 290 291 @Override request(final int numMessages)292 public void request(final int numMessages) { 293 if (passThrough) { 294 realStream.request(numMessages); 295 } else { 296 delayOrExecute(new Runnable() { 297 @Override 298 public void run() { 299 realStream.request(numMessages); 300 } 301 }); 302 } 303 } 304 305 @Override setCompressor(final Compressor compressor)306 public void setCompressor(final Compressor compressor) { 307 checkNotNull(compressor, "compressor"); 308 delayOrExecute(new Runnable() { 309 @Override 310 public void run() { 311 realStream.setCompressor(compressor); 312 } 313 }); 314 } 315 316 @Override setFullStreamDecompression(final boolean fullStreamDecompression)317 public void setFullStreamDecompression(final boolean fullStreamDecompression) { 318 delayOrExecute( 319 new Runnable() { 320 @Override 321 public void run() { 322 realStream.setFullStreamDecompression(fullStreamDecompression); 323 } 324 }); 325 } 326 327 @Override setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)328 public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) { 329 checkNotNull(decompressorRegistry, "decompressorRegistry"); 330 delayOrExecute(new Runnable() { 331 @Override 332 public void run() { 333 realStream.setDecompressorRegistry(decompressorRegistry); 334 } 335 }); 336 } 337 338 @Override isReady()339 public boolean isReady() { 340 if (passThrough) { 341 return realStream.isReady(); 342 } else { 343 return false; 344 } 345 } 346 347 @Override setMessageCompression(final boolean enable)348 public void setMessageCompression(final boolean enable) { 349 if (passThrough) { 350 realStream.setMessageCompression(enable); 351 } else { 352 delayOrExecute(new Runnable() { 353 @Override 354 public void run() { 355 realStream.setMessageCompression(enable); 356 } 357 }); 358 } 359 } 360 361 @VisibleForTesting getRealStream()362 ClientStream getRealStream() { 363 return realStream; 364 } 365 366 private static class DelayedStreamListener implements ClientStreamListener { 367 private final ClientStreamListener realListener; 368 private volatile boolean passThrough; 369 @GuardedBy("this") 370 private List<Runnable> pendingCallbacks = new ArrayList<>(); 371 DelayedStreamListener(ClientStreamListener listener)372 public DelayedStreamListener(ClientStreamListener listener) { 373 this.realListener = listener; 374 } 375 delayOrExecute(Runnable runnable)376 private void delayOrExecute(Runnable runnable) { 377 synchronized (this) { 378 if (!passThrough) { 379 pendingCallbacks.add(runnable); 380 return; 381 } 382 } 383 runnable.run(); 384 } 385 386 @Override messagesAvailable(final MessageProducer producer)387 public void messagesAvailable(final MessageProducer producer) { 388 if (passThrough) { 389 realListener.messagesAvailable(producer); 390 } else { 391 delayOrExecute(new Runnable() { 392 @Override 393 public void run() { 394 realListener.messagesAvailable(producer); 395 } 396 }); 397 } 398 } 399 400 @Override onReady()401 public void onReady() { 402 if (passThrough) { 403 realListener.onReady(); 404 } else { 405 delayOrExecute(new Runnable() { 406 @Override 407 public void run() { 408 realListener.onReady(); 409 } 410 }); 411 } 412 } 413 414 @Override headersRead(final Metadata headers)415 public void headersRead(final Metadata headers) { 416 delayOrExecute(new Runnable() { 417 @Override 418 public void run() { 419 realListener.headersRead(headers); 420 } 421 }); 422 } 423 424 @Override closed(final Status status, final Metadata trailers)425 public void closed(final Status status, final Metadata trailers) { 426 delayOrExecute(new Runnable() { 427 @Override 428 public void run() { 429 realListener.closed(status, trailers); 430 } 431 }); 432 } 433 434 @Override closed( final Status status, final RpcProgress rpcProgress, final Metadata trailers)435 public void closed( 436 final Status status, final RpcProgress rpcProgress, 437 final Metadata trailers) { 438 delayOrExecute(new Runnable() { 439 @Override 440 public void run() { 441 realListener.closed(status, rpcProgress, trailers); 442 } 443 }); 444 } 445 drainPendingCallbacks()446 public void drainPendingCallbacks() { 447 assert !passThrough; 448 List<Runnable> toRun = new ArrayList<>(); 449 while (true) { 450 synchronized (this) { 451 if (pendingCallbacks.isEmpty()) { 452 pendingCallbacks = null; 453 passThrough = true; 454 break; 455 } 456 // Since there were pendingCallbacks, we need to process them. To maintain ordering we 457 // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be 458 // added after we drop the lock. So we will have to re-check pendingCallbacks. 459 List<Runnable> tmp = toRun; 460 toRun = pendingCallbacks; 461 pendingCallbacks = tmp; 462 } 463 for (Runnable runnable : toRun) { 464 // Avoid calling listener while lock is held to prevent deadlocks. 465 // TODO(ejona): exception handling 466 runnable.run(); 467 } 468 toRun.clear(); 469 } 470 } 471 } 472 } 473