1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.truth.Truth.assertThat; 18 import static com.google.common.util.concurrent.Futures.allAsList; 19 import static com.google.common.util.concurrent.Futures.getDone; 20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 21 import static java.util.concurrent.TimeUnit.SECONDS; 22 23 import com.google.common.annotations.GwtIncompatible; 24 import com.google.common.base.Function; 25 import com.google.common.testing.GcFinalization; 26 import com.google.common.testing.TestLogHandler; 27 import com.google.j2objc.annotations.J2ObjCIncompatible; 28 import java.lang.ref.WeakReference; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Logger; 39 import junit.framework.TestCase; 40 import org.checkerframework.checker.nullness.qual.Nullable; 41 42 /** Tests for {@link ExecutionSequencer} */ 43 public class ExecutionSequencerTest extends TestCase { 44 45 ExecutorService executor; 46 47 private ExecutionSequencer serializer; 48 private SettableFuture<@Nullable Void> firstFuture; 49 private TestCallable firstCallable; 50 51 @Override setUp()52 public void setUp() throws Exception { 53 executor = Executors.newCachedThreadPool(); 54 serializer = ExecutionSequencer.create(); 55 firstFuture = SettableFuture.create(); 56 firstCallable = new TestCallable(firstFuture); 57 } 58 59 @Override tearDown()60 public void tearDown() throws Exception { 61 executor.shutdown(); 62 } 63 testCallableStartsAfterFirstFutureCompletes()64 public void testCallableStartsAfterFirstFutureCompletes() { 65 @SuppressWarnings({"unused", "nullness"}) 66 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 67 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 68 @SuppressWarnings({"unused", "nullness"}) 69 Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); 70 assertThat(firstCallable.called).isTrue(); 71 assertThat(secondCallable.called).isFalse(); 72 firstFuture.set(null); 73 assertThat(secondCallable.called).isTrue(); 74 } 75 testCancellationDoesNotViolateSerialization()76 public void testCancellationDoesNotViolateSerialization() { 77 @SuppressWarnings({"unused", "nullness"}) 78 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 79 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 80 ListenableFuture<@Nullable Void> secondFuture = 81 serializer.submitAsync(secondCallable, directExecutor()); 82 TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 83 @SuppressWarnings({"unused", "nullness"}) 84 Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); 85 secondFuture.cancel(true); 86 assertThat(secondCallable.called).isFalse(); 87 assertThat(thirdCallable.called).isFalse(); 88 firstFuture.set(null); 89 assertThat(secondCallable.called).isFalse(); 90 assertThat(thirdCallable.called).isTrue(); 91 } 92 testCancellationMultipleThreads()93 public void testCancellationMultipleThreads() throws Exception { 94 final BlockingCallable blockingCallable = new BlockingCallable(); 95 ListenableFuture<@Nullable Void> unused = serializer.submit(blockingCallable, executor); 96 ListenableFuture<Boolean> future2 = 97 serializer.submit( 98 new Callable<Boolean>() { 99 @Override 100 public Boolean call() { 101 return blockingCallable.isRunning(); 102 } 103 }, 104 directExecutor()); 105 106 // Wait for the first task to be started in the background. It will block until we explicitly 107 // stop it. 108 blockingCallable.waitForStart(); 109 110 // Give the second task a chance to (incorrectly) start up while the first task is running. 111 assertThat(future2.isDone()).isFalse(); 112 113 // Stop the first task. The second task should then run. 114 blockingCallable.stop(); 115 executor.shutdown(); 116 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 117 assertThat(getDone(future2)).isFalse(); 118 } 119 testSecondTaskWaitsForFirstEvenIfCancelled()120 public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception { 121 final BlockingCallable blockingCallable = new BlockingCallable(); 122 ListenableFuture<@Nullable Void> future1 = serializer.submit(blockingCallable, executor); 123 ListenableFuture<Boolean> future2 = 124 serializer.submit( 125 new Callable<Boolean>() { 126 @Override 127 public Boolean call() { 128 return blockingCallable.isRunning(); 129 } 130 }, 131 directExecutor()); 132 133 // Wait for the first task to be started in the background. It will block until we explicitly 134 // stop it. 135 blockingCallable.waitForStart(); 136 137 // This time, cancel the future for the first task. The task remains running, only the future 138 // is cancelled. 139 future1.cancel(false); 140 141 // Give the second task a chance to (incorrectly) start up while the first task is running. 142 // (This is the assertion that fails.) 143 assertThat(future2.isDone()).isFalse(); 144 145 // Stop the first task. The second task should then run. 146 blockingCallable.stop(); 147 executor.shutdown(); 148 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 149 assertThat(getDone(future2)).isFalse(); 150 } 151 152 @GwtIncompatible 153 @J2ObjCIncompatible // gc 154 @AndroidIncompatible testCancellationWithReferencedObject()155 public void testCancellationWithReferencedObject() throws Exception { 156 Object toBeGCed = new Object(); 157 WeakReference<Object> ref = new WeakReference<>(toBeGCed); 158 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 159 ListenableFuture<?> ignored = 160 serializer.submitAsync( 161 new AsyncCallable<@Nullable Void>() { 162 @Override 163 public ListenableFuture<@Nullable Void> call() { 164 return settableFuture; 165 } 166 }, 167 directExecutor()); 168 serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); 169 toBeGCed = null; 170 GcFinalization.awaitClear(ref); 171 } 172 toStringCallable(final Object object)173 private static Callable<String> toStringCallable(final Object object) { 174 return new Callable<String>() { 175 @Override 176 public String call() { 177 return object.toString(); 178 } 179 }; 180 } 181 182 public void testCancellationDuringReentrancy() throws Exception { 183 TestLogHandler logHandler = new TestLogHandler(); 184 Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); 185 186 List<Future<?>> results = new ArrayList<>(); 187 final Runnable[] manualExecutorTask = new Runnable[1]; 188 Executor manualExecutor = 189 new Executor() { 190 @Override 191 public void execute(Runnable task) { 192 manualExecutorTask[0] = task; 193 } 194 }; 195 196 results.add(serializer.submit(Callables.returning(null), manualExecutor)); 197 final Future<?>[] thingToCancel = new Future<?>[1]; 198 results.add( 199 serializer.submit( 200 new Callable<@Nullable Void>() { 201 @Override 202 public @Nullable Void call() { 203 thingToCancel[0].cancel(false); 204 return null; 205 } 206 }, 207 directExecutor())); 208 thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor()); 209 results.add(thingToCancel[0]); 210 // Enqueue more than enough tasks to force reentrancy. 211 for (int i = 0; i < 5; i++) { 212 results.add(serializer.submit(Callables.returning(null), directExecutor())); 213 } 214 215 manualExecutorTask[0].run(); 216 217 for (Future<?> result : results) { 218 if (!result.isCancelled()) { 219 result.get(10, SECONDS); 220 } 221 // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect. 222 } 223 224 assertThat(logHandler.getStoredLogRecords()).isEmpty(); 225 } 226 227 public void testAvoidsStackOverflow_manySubmitted() throws Exception { 228 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 229 ArrayList<ListenableFuture<@Nullable Void>> results = new ArrayList<>(50_001); 230 results.add( 231 serializer.submitAsync( 232 new AsyncCallable<@Nullable Void>() { 233 @Override 234 public ListenableFuture<@Nullable Void> call() { 235 return settableFuture; 236 } 237 }, 238 directExecutor())); 239 for (int i = 0; i < 50_000; i++) { 240 results.add(serializer.submit(Callables.<Void>returning(null), directExecutor())); 241 } 242 settableFuture.set(null); 243 getDone(allAsList(results)); 244 } 245 246 public void testAvoidsStackOverflow_manyCancelled() throws Exception { 247 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 248 ListenableFuture<@Nullable Void> unused = 249 serializer.submitAsync( 250 new AsyncCallable<@Nullable Void>() { 251 @Override 252 public ListenableFuture<@Nullable Void> call() { 253 return settableFuture; 254 } 255 }, 256 directExecutor()); 257 for (int i = 0; i < 50_000; i++) { 258 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 259 } 260 ListenableFuture<Integer> stackDepthCheck = 261 serializer.submit( 262 new Callable<Integer>() { 263 @Override 264 public Integer call() { 265 return Thread.currentThread().getStackTrace().length; 266 } 267 }, 268 directExecutor()); 269 settableFuture.set(null); 270 assertThat(getDone(stackDepthCheck)) 271 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 272 } 273 274 public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { 275 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 276 ListenableFuture<@Nullable Void> unused = 277 serializer.submitAsync( 278 new AsyncCallable<@Nullable Void>() { 279 @Override 280 public ListenableFuture<@Nullable Void> call() { 281 return settableFuture; 282 } 283 }, 284 directExecutor()); 285 for (int i = 0; i < 25_000; i++) { 286 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 287 unused = serializer.submit(Callables.<Void>returning(null), directExecutor()); 288 } 289 ListenableFuture<Integer> stackDepthCheck = 290 serializer.submit( 291 new Callable<Integer>() { 292 @Override 293 public Integer call() { 294 return Thread.currentThread().getStackTrace().length; 295 } 296 }, 297 directExecutor()); 298 settableFuture.set(null); 299 assertThat(getDone(stackDepthCheck)) 300 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 301 } 302 303 private static Function<Integer, Integer> add(final int delta) { 304 return new Function<Integer, Integer>() { 305 @Override 306 public Integer apply(Integer input) { 307 return input + delta; 308 } 309 }; 310 } 311 312 private static AsyncCallable<Integer> asyncAdd( 313 final ListenableFuture<Integer> future, final int delta, final Executor executor) { 314 return new AsyncCallable<Integer>() { 315 @Override 316 public ListenableFuture<Integer> call() throws Exception { 317 return Futures.transform(future, add(delta), executor); 318 } 319 }; 320 } 321 322 private static final class LongHolder { 323 long count; 324 } 325 326 private static final int ITERATION_COUNT = 50_000; 327 private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; 328 329 @GwtIncompatible // threads 330 public void testAvoidsStackOverflow_multipleThreads() throws Exception { 331 final LongHolder holder = new LongHolder(); 332 final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>(); 333 final List<Integer> completeLengthChecks; 334 final int baseStackDepth; 335 ExecutorService service = Executors.newFixedThreadPool(5); 336 try { 337 // Avoid counting frames from the executor itself, or the ExecutionSequencer 338 baseStackDepth = 339 serializer 340 .submit( 341 new Callable<Integer>() { 342 @Override 343 public Integer call() { 344 return Thread.currentThread().getStackTrace().length; 345 } 346 }, 347 service) 348 .get(); 349 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 350 ListenableFuture<?> unused = 351 serializer.submitAsync( 352 new AsyncCallable<@Nullable Void>() { 353 @Override 354 public ListenableFuture<@Nullable Void> call() { 355 return settableFuture; 356 } 357 }, 358 directExecutor()); 359 for (int i = 0; i < 50_000; i++) { 360 if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { 361 // after some number of iterations, switch threads 362 unused = 363 serializer.submit( 364 new Callable<@Nullable Void>() { 365 @Override 366 public @Nullable Void call() { 367 holder.count++; 368 return null; 369 } 370 }, 371 service); 372 } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { 373 // When at max depth, record stack trace depth 374 lengthChecks.add( 375 serializer.submit( 376 new Callable<Integer>() { 377 @Override 378 public Integer call() { 379 holder.count++; 380 return Thread.currentThread().getStackTrace().length; 381 } 382 }, 383 directExecutor())); 384 } else { 385 // Otherwise, schedule a task on directExecutor 386 unused = 387 serializer.submit( 388 new Callable<@Nullable Void>() { 389 @Override 390 public @Nullable Void call() { 391 holder.count++; 392 return null; 393 } 394 }, 395 directExecutor()); 396 } 397 } 398 settableFuture.set(null); 399 completeLengthChecks = allAsList(lengthChecks).get(); 400 } finally { 401 service.shutdown(); 402 } 403 assertThat(holder.count).isEqualTo(ITERATION_COUNT); 404 for (int length : completeLengthChecks) { 405 // Verify that at max depth, less than one stack frame per submitted task was consumed 406 assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); 407 } 408 } 409 410 @SuppressWarnings("ObjectToString") // Intended behavior 411 public void testToString() { 412 Future<?> unused = serializer.submitAsync(firstCallable, directExecutor()); 413 TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create()); 414 Future<?> second = serializer.submitAsync(secondCallable, directExecutor()); 415 assertThat(secondCallable.called).isFalse(); 416 assertThat(second.toString()).contains(secondCallable.toString()); 417 firstFuture.set(null); 418 assertThat(second.toString()).contains(secondCallable.future.toString()); 419 } 420 421 private static class BlockingCallable implements Callable<@Nullable Void> { 422 private final CountDownLatch startLatch = new CountDownLatch(1); 423 private final CountDownLatch stopLatch = new CountDownLatch(1); 424 425 private volatile boolean running = false; 426 427 @Override 428 public @Nullable Void call() throws InterruptedException { 429 running = true; 430 startLatch.countDown(); 431 stopLatch.await(); 432 running = false; 433 return null; 434 } 435 436 public void waitForStart() throws InterruptedException { 437 startLatch.await(); 438 } 439 440 public void stop() { 441 stopLatch.countDown(); 442 } 443 444 public boolean isRunning() { 445 return running; 446 } 447 } 448 449 private static final class TestCallable implements AsyncCallable<@Nullable Void> { 450 451 private final ListenableFuture<@Nullable Void> future; 452 private boolean called = false; 453 454 private TestCallable(ListenableFuture<@Nullable Void> future) { 455 this.future = future; 456 } 457 458 @Override 459 public ListenableFuture<@Nullable Void> call() throws Exception { 460 called = true; 461 return future; 462 } 463 } 464 } 465