1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static java.util.concurrent.TimeUnit.NANOSECONDS; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.MoreObjects; 25 import io.grpc.Attributes; 26 import io.grpc.ClientCall; 27 import io.grpc.Context; 28 import io.grpc.Deadline; 29 import io.grpc.Metadata; 30 import io.grpc.Status; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.Locale; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.ScheduledExecutorService; 36 import java.util.concurrent.ScheduledFuture; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Level; 39 import java.util.logging.Logger; 40 import javax.annotation.Nullable; 41 import javax.annotation.concurrent.GuardedBy; 42 43 /** 44 * A call that queues requests before a real call is ready to be delegated to. 45 * 46 * <p>{@code ClientCall} itself doesn't require thread-safety. However, the state of {@code 47 * DelayedCall} may be internally altered by different threads, thus internal synchronization is 48 * necessary. 49 */ 50 public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> { 51 private static final Logger logger = Logger.getLogger(DelayedClientCall.class.getName()); 52 /** 53 * A timer to monitor the initial deadline. The timer must be cancelled on transition to the real 54 * call. 55 */ 56 @Nullable 57 private final ScheduledFuture<?> initialDeadlineMonitor; 58 private final Executor callExecutor; 59 private final Context context; 60 /** {@code true} once realCall is valid and all pending calls have been drained. */ 61 private volatile boolean passThrough; 62 /** 63 * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate 64 * order, but also used if an error occurs before {@code realCall} is set. 65 */ 66 private Listener<RespT> listener; 67 // Must hold {@code this} lock when setting. 68 private ClientCall<ReqT, RespT> realCall; 69 @GuardedBy("this") 70 private Status error; 71 @GuardedBy("this") 72 private List<Runnable> pendingRunnables = new ArrayList<>(); 73 @GuardedBy("this") 74 private DelayedListener<RespT> delayedListener; 75 DelayedClientCall( Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline)76 protected DelayedClientCall( 77 Executor callExecutor, ScheduledExecutorService scheduler, @Nullable Deadline deadline) { 78 this.callExecutor = checkNotNull(callExecutor, "callExecutor"); 79 checkNotNull(scheduler, "scheduler"); 80 context = Context.current(); 81 initialDeadlineMonitor = scheduleDeadlineIfNeeded(scheduler, deadline); 82 } 83 84 // If one argument is null, consider the other the "Before" isAbeforeB(@ullable Deadline a, @Nullable Deadline b)85 private boolean isAbeforeB(@Nullable Deadline a, @Nullable Deadline b) { 86 if (b == null) { 87 return true; 88 } else if (a == null) { 89 return false; 90 } 91 92 return a.isBefore(b); 93 } 94 95 @Nullable scheduleDeadlineIfNeeded( ScheduledExecutorService scheduler, @Nullable Deadline deadline)96 private ScheduledFuture<?> scheduleDeadlineIfNeeded( 97 ScheduledExecutorService scheduler, @Nullable Deadline deadline) { 98 Deadline contextDeadline = context.getDeadline(); 99 if (deadline == null && contextDeadline == null) { 100 return null; 101 } 102 long remainingNanos = Long.MAX_VALUE; 103 if (deadline != null) { 104 remainingNanos = deadline.timeRemaining(NANOSECONDS); 105 } 106 107 if (contextDeadline != null && contextDeadline.timeRemaining(NANOSECONDS) < remainingNanos) { 108 remainingNanos = contextDeadline.timeRemaining(NANOSECONDS); 109 if (logger.isLoggable(Level.FINE)) { 110 StringBuilder builder = 111 new StringBuilder( 112 String.format( 113 Locale.US, 114 "Call timeout set to '%d' ns, due to context deadline.", remainingNanos)); 115 if (deadline == null) { 116 builder.append(" Explicit call timeout was not set."); 117 } else { 118 long callTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS); 119 builder.append(String.format( 120 Locale.US, " Explicit call timeout was '%d' ns.", callTimeout)); 121 } 122 logger.fine(builder.toString()); 123 } 124 } 125 126 long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); 127 long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); 128 final StringBuilder buf = new StringBuilder(); 129 String deadlineName = isAbeforeB(contextDeadline, deadline) ? "Context" : "CallOptions"; 130 if (remainingNanos < 0) { 131 buf.append("ClientCall started after "); 132 buf.append(deadlineName); 133 buf.append(" deadline was exceeded. Deadline has been exceeded for "); 134 } else { 135 buf.append("Deadline "); 136 buf.append(deadlineName); 137 buf.append(" will be exceeded in "); 138 } 139 buf.append(seconds); 140 buf.append(String.format(Locale.US, ".%09d", nanos)); 141 buf.append("s. "); 142 143 /** Cancels the call if deadline exceeded prior to the real call being set. */ 144 class DeadlineExceededRunnable implements Runnable { 145 @Override 146 public void run() { 147 cancel( 148 Status.DEADLINE_EXCEEDED.withDescription(buf.toString()), 149 // We should not cancel the call if the realCall is set because there could be a 150 // race between cancel() and realCall.start(). The realCall will handle deadline by 151 // itself. 152 /* onlyCancelPendingCall= */ true); 153 } 154 } 155 156 return scheduler.schedule(new DeadlineExceededRunnable(), remainingNanos, NANOSECONDS); 157 } 158 159 /** 160 * Transfers all pending and future requests and mutations to the given call. 161 * 162 * <p>No-op if either this method or {@link #cancel} have already been called. 163 */ 164 // When this method returns, passThrough is guaranteed to be true setCall(ClientCall<ReqT, RespT> call)165 public final Runnable setCall(ClientCall<ReqT, RespT> call) { 166 synchronized (this) { 167 // If realCall != null, then either setCall() or cancel() has been called. 168 if (realCall != null) { 169 return null; 170 } 171 setRealCall(checkNotNull(call, "call")); 172 } 173 return new ContextRunnable(context) { 174 @Override 175 public void runInContext() { 176 drainPendingCalls(); 177 } 178 }; 179 } 180 181 @Override 182 public final void start(Listener<RespT> listener, final Metadata headers) { 183 checkState(this.listener == null, "already started"); 184 Status savedError; 185 boolean savedPassThrough; 186 synchronized (this) { 187 this.listener = checkNotNull(listener, "listener"); 188 // If error != null, then cancel() has been called and was unable to close the listener 189 savedError = error; 190 savedPassThrough = passThrough; 191 if (!savedPassThrough) { 192 listener = delayedListener = new DelayedListener<>(listener); 193 } 194 } 195 if (savedError != null) { 196 callExecutor.execute(new CloseListenerRunnable(listener, savedError)); 197 return; 198 } 199 if (savedPassThrough) { 200 realCall.start(listener, headers); 201 } else { 202 final Listener<RespT> finalListener = listener; 203 delayOrExecute(new Runnable() { 204 @Override 205 public void run() { 206 realCall.start(finalListener, headers); 207 } 208 }); 209 } 210 } 211 212 // When this method returns, passThrough is guaranteed to be true 213 @Override 214 public final void cancel(@Nullable final String message, @Nullable final Throwable cause) { 215 Status status = Status.CANCELLED; 216 if (message != null) { 217 status = status.withDescription(message); 218 } else { 219 status = status.withDescription("Call cancelled without message"); 220 } 221 if (cause != null) { 222 status = status.withCause(cause); 223 } 224 cancel(status, false); 225 } 226 227 /** 228 * Cancels the call unless {@code realCall} is set and {@code onlyCancelPendingCall} is true. 229 */ 230 private void cancel(final Status status, boolean onlyCancelPendingCall) { 231 boolean delegateToRealCall = true; 232 Listener<RespT> listenerToClose = null; 233 synchronized (this) { 234 // If realCall != null, then either setCall() or cancel() has been called 235 if (realCall == null) { 236 @SuppressWarnings("unchecked") 237 ClientCall<ReqT, RespT> noopCall = (ClientCall<ReqT, RespT>) NOOP_CALL; 238 setRealCall(noopCall); 239 delegateToRealCall = false; 240 // If listener == null, then start() will later call listener with 'error' 241 listenerToClose = listener; 242 error = status; 243 } else if (onlyCancelPendingCall) { 244 return; 245 } 246 } 247 if (delegateToRealCall) { 248 delayOrExecute(new Runnable() { 249 @Override 250 public void run() { 251 realCall.cancel(status.getDescription(), status.getCause()); 252 } 253 }); 254 } else { 255 if (listenerToClose != null) { 256 callExecutor.execute(new CloseListenerRunnable(listenerToClose, status)); 257 } 258 drainPendingCalls(); 259 } 260 callCancelled(); 261 } 262 263 protected void callCancelled() { 264 } 265 266 private void delayOrExecute(Runnable runnable) { 267 synchronized (this) { 268 if (!passThrough) { 269 pendingRunnables.add(runnable); 270 return; 271 } 272 } 273 runnable.run(); 274 } 275 276 /** 277 * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called 278 * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock 279 * should not be held when calling this method. 280 */ 281 private void drainPendingCalls() { 282 assert realCall != null; 283 assert !passThrough; 284 List<Runnable> toRun = new ArrayList<>(); 285 DelayedListener<RespT> delayedListener ; 286 while (true) { 287 synchronized (this) { 288 if (pendingRunnables.isEmpty()) { 289 pendingRunnables = null; 290 passThrough = true; 291 delayedListener = this.delayedListener; 292 break; 293 } 294 // Since there were pendingCalls, we need to process them. To maintain ordering we can't set 295 // passThrough=true until we run all pendingCalls, but new Runnables may be added after we 296 // drop the lock. So we will have to re-check pendingCalls. 297 List<Runnable> tmp = toRun; 298 toRun = pendingRunnables; 299 pendingRunnables = tmp; 300 } 301 for (Runnable runnable : toRun) { 302 // Must not call transport while lock is held to prevent deadlocks. 303 // TODO(ejona): exception handling 304 runnable.run(); 305 } 306 toRun.clear(); 307 } 308 if (delayedListener != null) { 309 final DelayedListener<RespT> listener = delayedListener; 310 class DrainListenerRunnable extends ContextRunnable { 311 DrainListenerRunnable() { 312 super(context); 313 } 314 315 @Override 316 public void runInContext() { 317 listener.drainPendingCallbacks(); 318 } 319 } 320 321 callExecutor.execute(new DrainListenerRunnable()); 322 } 323 } 324 325 @GuardedBy("this") 326 private void setRealCall(ClientCall<ReqT, RespT> realCall) { 327 checkState(this.realCall == null, "realCall already set to %s", this.realCall); 328 if (initialDeadlineMonitor != null) { 329 initialDeadlineMonitor.cancel(false); 330 } 331 this.realCall = realCall; 332 } 333 334 @VisibleForTesting 335 final ClientCall<ReqT, RespT> getRealCall() { 336 return realCall; 337 } 338 339 @Override 340 public final void sendMessage(final ReqT message) { 341 if (passThrough) { 342 realCall.sendMessage(message); 343 } else { 344 delayOrExecute(new Runnable() { 345 @Override 346 public void run() { 347 realCall.sendMessage(message); 348 } 349 }); 350 } 351 } 352 353 @Override 354 public final void setMessageCompression(final boolean enable) { 355 if (passThrough) { 356 realCall.setMessageCompression(enable); 357 } else { 358 delayOrExecute(new Runnable() { 359 @Override 360 public void run() { 361 realCall.setMessageCompression(enable); 362 } 363 }); 364 } 365 } 366 367 @Override 368 public final void request(final int numMessages) { 369 if (passThrough) { 370 realCall.request(numMessages); 371 } else { 372 delayOrExecute(new Runnable() { 373 @Override 374 public void run() { 375 realCall.request(numMessages); 376 } 377 }); 378 } 379 } 380 381 @Override 382 public final void halfClose() { 383 delayOrExecute(new Runnable() { 384 @Override 385 public void run() { 386 realCall.halfClose(); 387 } 388 }); 389 } 390 391 @Override 392 public final boolean isReady() { 393 if (passThrough) { 394 return realCall.isReady(); 395 } else { 396 return false; 397 } 398 } 399 400 @Override 401 public final Attributes getAttributes() { 402 ClientCall<ReqT, RespT> savedRealCall; 403 synchronized (this) { 404 savedRealCall = realCall; 405 } 406 if (savedRealCall != null) { 407 return savedRealCall.getAttributes(); 408 } else { 409 return Attributes.EMPTY; 410 } 411 } 412 413 @Override 414 public String toString() { 415 return MoreObjects.toStringHelper(this) 416 .add("realCall", realCall) 417 .toString(); 418 } 419 420 private final class CloseListenerRunnable extends ContextRunnable { 421 final Listener<RespT> listener; 422 final Status status; 423 424 CloseListenerRunnable(Listener<RespT> listener, Status status) { 425 super(context); 426 this.listener = listener; 427 this.status = status; 428 } 429 430 @Override 431 public void runInContext() { 432 listener.onClose(status, new Metadata()); 433 } 434 } 435 436 private static final class DelayedListener<RespT> extends Listener<RespT> { 437 private final Listener<RespT> realListener; 438 private volatile boolean passThrough; 439 @GuardedBy("this") 440 private List<Runnable> pendingCallbacks = new ArrayList<>(); 441 442 public DelayedListener(Listener<RespT> listener) { 443 this.realListener = listener; 444 } 445 446 private void delayOrExecute(Runnable runnable) { 447 synchronized (this) { 448 if (!passThrough) { 449 pendingCallbacks.add(runnable); 450 return; 451 } 452 } 453 runnable.run(); 454 } 455 456 @Override 457 public void onHeaders(final Metadata headers) { 458 if (passThrough) { 459 realListener.onHeaders(headers); 460 } else { 461 delayOrExecute(new Runnable() { 462 @Override 463 public void run() { 464 realListener.onHeaders(headers); 465 } 466 }); 467 } 468 } 469 470 @Override 471 public void onMessage(final RespT message) { 472 if (passThrough) { 473 realListener.onMessage(message); 474 } else { 475 delayOrExecute(new Runnable() { 476 @Override 477 public void run() { 478 realListener.onMessage(message); 479 } 480 }); 481 } 482 } 483 484 @Override 485 public void onClose(final Status status, final Metadata trailers) { 486 delayOrExecute(new Runnable() { 487 @Override 488 public void run() { 489 realListener.onClose(status, trailers); 490 } 491 }); 492 } 493 494 @Override 495 public void onReady() { 496 if (passThrough) { 497 realListener.onReady(); 498 } else { 499 delayOrExecute(new Runnable() { 500 @Override 501 public void run() { 502 realListener.onReady(); 503 } 504 }); 505 } 506 } 507 508 void drainPendingCallbacks() { 509 assert !passThrough; 510 List<Runnable> toRun = new ArrayList<>(); 511 while (true) { 512 synchronized (this) { 513 if (pendingCallbacks.isEmpty()) { 514 pendingCallbacks = null; 515 passThrough = true; 516 break; 517 } 518 // Since there were pendingCallbacks, we need to process them. To maintain ordering we 519 // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be 520 // added after we drop the lock. So we will have to re-check pendingCallbacks. 521 List<Runnable> tmp = toRun; 522 toRun = pendingCallbacks; 523 pendingCallbacks = tmp; 524 } 525 for (Runnable runnable : toRun) { 526 // Avoid calling listener while lock is held to prevent deadlocks. 527 // TODO(ejona): exception handling 528 runnable.run(); 529 } 530 toRun.clear(); 531 } 532 } 533 } 534 535 private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() { 536 @Override 537 public void start(Listener<Object> responseListener, Metadata headers) {} 538 539 @Override 540 public void request(int numMessages) {} 541 542 @Override 543 public void cancel(String message, Throwable cause) {} 544 545 @Override 546 public void halfClose() {} 547 548 @Override 549 public void sendMessage(Object message) {} 550 551 // Always returns {@code false}, since this is only used when the startup of the call fails. 552 @Override 553 public boolean isReady() { 554 return false; 555 } 556 }; 557 } 558