1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.truth.Truth.assertThat; 18 import static com.google.common.util.concurrent.Futures.allAsList; 19 import static com.google.common.util.concurrent.Futures.getDone; 20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 21 import static java.util.concurrent.TimeUnit.SECONDS; 22 23 import com.google.common.annotations.GwtIncompatible; 24 import com.google.common.annotations.J2ktIncompatible; 25 import com.google.common.base.Function; 26 import com.google.common.testing.GcFinalization; 27 import com.google.common.testing.TestLogHandler; 28 import com.google.j2objc.annotations.J2ObjCIncompatible; 29 import java.lang.ref.WeakReference; 30 import java.util.ArrayList; 31 import java.util.List; 32 import java.util.concurrent.Callable; 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.ExecutorService; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.Future; 38 import java.util.concurrent.TimeUnit; 39 import java.util.logging.Logger; 40 import junit.framework.TestCase; 41 import org.checkerframework.checker.nullness.qual.Nullable; 42 43 /** Tests for {@link ExecutionSequencer} */ 44 public class ExecutionSequencerTest extends TestCase { 45 46 ExecutorService executor; 47 48 private ExecutionSequencer serializer; 49 private SettableFuture<@Nullable Void> firstFuture; 50 private TestCallable firstCallable; 51 52 @Override setUp()53 public void setUp() throws Exception { 54 executor = Executors.newCachedThreadPool(); 55 serializer = ExecutionSequencer.create(); 56 firstFuture = SettableFuture.create(); 57 firstCallable = new TestCallable(firstFuture); 58 } 59 60 @Override tearDown()61 public void tearDown() throws Exception { 62 executor.shutdown(); 63 } 64 testCallableStartsAfterFirstFutureCompletes()65 public void testCallableStartsAfterFirstFutureCompletes() { 66 @SuppressWarnings({"unused", "nullness"}) 67 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 68 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 69 @SuppressWarnings({"unused", "nullness"}) 70 Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); 71 assertThat(firstCallable.called).isTrue(); 72 assertThat(secondCallable.called).isFalse(); 73 firstFuture.set(null); 74 assertThat(secondCallable.called).isTrue(); 75 } 76 testCancellationDoesNotViolateSerialization()77 public void testCancellationDoesNotViolateSerialization() { 78 @SuppressWarnings({"unused", "nullness"}) 79 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 80 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 81 ListenableFuture<@Nullable Void> secondFuture = 82 serializer.submitAsync(secondCallable, directExecutor()); 83 TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 84 @SuppressWarnings({"unused", "nullness"}) 85 Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); 86 secondFuture.cancel(true); 87 assertThat(secondCallable.called).isFalse(); 88 assertThat(thirdCallable.called).isFalse(); 89 firstFuture.set(null); 90 assertThat(secondCallable.called).isFalse(); 91 assertThat(thirdCallable.called).isTrue(); 92 } 93 testCancellationMultipleThreads()94 public void testCancellationMultipleThreads() throws Exception { 95 final BlockingCallable blockingCallable = new BlockingCallable(); 96 ListenableFuture<@Nullable Void> unused = serializer.submit(blockingCallable, executor); 97 ListenableFuture<Boolean> future2 = 98 serializer.submit( 99 new Callable<Boolean>() { 100 @Override 101 public Boolean call() { 102 return blockingCallable.isRunning(); 103 } 104 }, 105 directExecutor()); 106 107 // Wait for the first task to be started in the background. It will block until we explicitly 108 // stop it. 109 blockingCallable.waitForStart(); 110 111 // Give the second task a chance to (incorrectly) start up while the first task is running. 112 assertThat(future2.isDone()).isFalse(); 113 114 // Stop the first task. The second task should then run. 115 blockingCallable.stop(); 116 executor.shutdown(); 117 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 118 assertThat(getDone(future2)).isFalse(); 119 } 120 testSecondTaskWaitsForFirstEvenIfCancelled()121 public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception { 122 final BlockingCallable blockingCallable = new BlockingCallable(); 123 ListenableFuture<@Nullable Void> future1 = serializer.submit(blockingCallable, executor); 124 ListenableFuture<Boolean> future2 = 125 serializer.submit( 126 new Callable<Boolean>() { 127 @Override 128 public Boolean call() { 129 return blockingCallable.isRunning(); 130 } 131 }, 132 directExecutor()); 133 134 // Wait for the first task to be started in the background. It will block until we explicitly 135 // stop it. 136 blockingCallable.waitForStart(); 137 138 // This time, cancel the future for the first task. The task remains running, only the future 139 // is cancelled. 140 future1.cancel(false); 141 142 // Give the second task a chance to (incorrectly) start up while the first task is running. 143 // (This is the assertion that fails.) 144 assertThat(future2.isDone()).isFalse(); 145 146 // Stop the first task. The second task should then run. 147 blockingCallable.stop(); 148 executor.shutdown(); 149 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 150 assertThat(getDone(future2)).isFalse(); 151 } 152 153 @J2ktIncompatible 154 @GwtIncompatible 155 @J2ObjCIncompatible // gc 156 @AndroidIncompatible testCancellationWithReferencedObject()157 public void testCancellationWithReferencedObject() throws Exception { 158 Object toBeGCed = new Object(); 159 WeakReference<Object> ref = new WeakReference<>(toBeGCed); 160 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 161 ListenableFuture<?> ignored = 162 serializer.submitAsync( 163 new AsyncCallable<@Nullable Void>() { 164 @Override 165 public ListenableFuture<@Nullable Void> call() { 166 return settableFuture; 167 } 168 }, 169 directExecutor()); 170 serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); 171 toBeGCed = null; 172 GcFinalization.awaitClear(ref); 173 } 174 toStringCallable(final Object object)175 private static Callable<String> toStringCallable(final Object object) { 176 return new Callable<String>() { 177 @Override 178 public String call() { 179 return object.toString(); 180 } 181 }; 182 } 183 184 public void testCancellationDuringReentrancy() throws Exception { 185 TestLogHandler logHandler = new TestLogHandler(); 186 Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); 187 188 List<Future<?>> results = new ArrayList<>(); 189 final Runnable[] manualExecutorTask = new Runnable[1]; 190 Executor manualExecutor = 191 new Executor() { 192 @Override 193 public void execute(Runnable task) { 194 manualExecutorTask[0] = task; 195 } 196 }; 197 198 results.add(serializer.submit(Callables.returning(null), manualExecutor)); 199 final Future<?>[] thingToCancel = new Future<?>[1]; 200 results.add( 201 serializer.submit( 202 new Callable<@Nullable Void>() { 203 @Override 204 public @Nullable Void call() { 205 thingToCancel[0].cancel(false); 206 return null; 207 } 208 }, 209 directExecutor())); 210 thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor()); 211 results.add(thingToCancel[0]); 212 // Enqueue more than enough tasks to force reentrancy. 213 for (int i = 0; i < 5; i++) { 214 results.add(serializer.submit(Callables.returning(null), directExecutor())); 215 } 216 217 manualExecutorTask[0].run(); 218 219 for (Future<?> result : results) { 220 if (!result.isCancelled()) { 221 result.get(10, SECONDS); 222 } 223 // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect. 224 } 225 226 assertThat(logHandler.getStoredLogRecords()).isEmpty(); 227 } 228 229 public void testAvoidsStackOverflow_manySubmitted() throws Exception { 230 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 231 ArrayList<ListenableFuture<@Nullable Void>> results = new ArrayList<>(50_001); 232 results.add( 233 serializer.submitAsync( 234 new AsyncCallable<@Nullable Void>() { 235 @Override 236 public ListenableFuture<@Nullable Void> call() { 237 return settableFuture; 238 } 239 }, 240 directExecutor())); 241 for (int i = 0; i < 50_000; i++) { 242 results.add(serializer.submit(Callables.<Void>returning(null), directExecutor())); 243 } 244 settableFuture.set(null); 245 getDone(allAsList(results)); 246 } 247 248 public void testAvoidsStackOverflow_manyCancelled() throws Exception { 249 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 250 ListenableFuture<@Nullable Void> unused = 251 serializer.submitAsync( 252 new AsyncCallable<@Nullable Void>() { 253 @Override 254 public ListenableFuture<@Nullable Void> call() { 255 return settableFuture; 256 } 257 }, 258 directExecutor()); 259 for (int i = 0; i < 50_000; i++) { 260 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 261 } 262 ListenableFuture<Integer> stackDepthCheck = 263 serializer.submit( 264 new Callable<Integer>() { 265 @Override 266 public Integer call() { 267 return Thread.currentThread().getStackTrace().length; 268 } 269 }, 270 directExecutor()); 271 settableFuture.set(null); 272 assertThat(getDone(stackDepthCheck)) 273 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 274 } 275 276 public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { 277 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 278 ListenableFuture<@Nullable Void> unused = 279 serializer.submitAsync( 280 new AsyncCallable<@Nullable Void>() { 281 @Override 282 public ListenableFuture<@Nullable Void> call() { 283 return settableFuture; 284 } 285 }, 286 directExecutor()); 287 for (int i = 0; i < 25_000; i++) { 288 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 289 unused = serializer.submit(Callables.<Void>returning(null), directExecutor()); 290 } 291 ListenableFuture<Integer> stackDepthCheck = 292 serializer.submit( 293 new Callable<Integer>() { 294 @Override 295 public Integer call() { 296 return Thread.currentThread().getStackTrace().length; 297 } 298 }, 299 directExecutor()); 300 settableFuture.set(null); 301 assertThat(getDone(stackDepthCheck)) 302 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 303 } 304 305 private static Function<Integer, Integer> add(final int delta) { 306 return new Function<Integer, Integer>() { 307 @Override 308 public Integer apply(Integer input) { 309 return input + delta; 310 } 311 }; 312 } 313 314 private static AsyncCallable<Integer> asyncAdd( 315 final ListenableFuture<Integer> future, final int delta, final Executor executor) { 316 return new AsyncCallable<Integer>() { 317 @Override 318 public ListenableFuture<Integer> call() throws Exception { 319 return Futures.transform(future, add(delta), executor); 320 } 321 }; 322 } 323 324 private static final class LongHolder { 325 long count; 326 } 327 328 private static final int ITERATION_COUNT = 50_000; 329 private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; 330 331 @J2ktIncompatible 332 @GwtIncompatible // threads 333 public void testAvoidsStackOverflow_multipleThreads() throws Exception { 334 final LongHolder holder = new LongHolder(); 335 final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>(); 336 final List<Integer> completeLengthChecks; 337 final int baseStackDepth; 338 ExecutorService service = Executors.newFixedThreadPool(5); 339 try { 340 // Avoid counting frames from the executor itself, or the ExecutionSequencer 341 baseStackDepth = 342 serializer 343 .submit( 344 new Callable<Integer>() { 345 @Override 346 public Integer call() { 347 return Thread.currentThread().getStackTrace().length; 348 } 349 }, 350 service) 351 .get(); 352 final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create(); 353 ListenableFuture<?> unused = 354 serializer.submitAsync( 355 new AsyncCallable<@Nullable Void>() { 356 @Override 357 public ListenableFuture<@Nullable Void> call() { 358 return settableFuture; 359 } 360 }, 361 directExecutor()); 362 for (int i = 0; i < 50_000; i++) { 363 if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { 364 // after some number of iterations, switch threads 365 unused = 366 serializer.submit( 367 new Callable<@Nullable Void>() { 368 @Override 369 public @Nullable Void call() { 370 holder.count++; 371 return null; 372 } 373 }, 374 service); 375 } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { 376 // When at max depth, record stack trace depth 377 lengthChecks.add( 378 serializer.submit( 379 new Callable<Integer>() { 380 @Override 381 public Integer call() { 382 holder.count++; 383 return Thread.currentThread().getStackTrace().length; 384 } 385 }, 386 directExecutor())); 387 } else { 388 // Otherwise, schedule a task on directExecutor 389 unused = 390 serializer.submit( 391 new Callable<@Nullable Void>() { 392 @Override 393 public @Nullable Void call() { 394 holder.count++; 395 return null; 396 } 397 }, 398 directExecutor()); 399 } 400 } 401 settableFuture.set(null); 402 completeLengthChecks = allAsList(lengthChecks).get(); 403 } finally { 404 service.shutdown(); 405 } 406 assertThat(holder.count).isEqualTo(ITERATION_COUNT); 407 for (int length : completeLengthChecks) { 408 // Verify that at max depth, less than one stack frame per submitted task was consumed 409 assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); 410 } 411 } 412 413 @SuppressWarnings("ObjectToString") // Intended behavior 414 public void testToString() { 415 Future<?> unused = serializer.submitAsync(firstCallable, directExecutor()); 416 TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create()); 417 Future<?> second = serializer.submitAsync(secondCallable, directExecutor()); 418 assertThat(secondCallable.called).isFalse(); 419 assertThat(second.toString()).contains(secondCallable.toString()); 420 firstFuture.set(null); 421 assertThat(second.toString()).contains(secondCallable.future.toString()); 422 } 423 424 private static class BlockingCallable implements Callable<@Nullable Void> { 425 private final CountDownLatch startLatch = new CountDownLatch(1); 426 private final CountDownLatch stopLatch = new CountDownLatch(1); 427 428 private volatile boolean running = false; 429 430 @Override 431 public @Nullable Void call() throws InterruptedException { 432 running = true; 433 startLatch.countDown(); 434 stopLatch.await(); 435 running = false; 436 return null; 437 } 438 439 public void waitForStart() throws InterruptedException { 440 startLatch.await(); 441 } 442 443 public void stop() { 444 stopLatch.countDown(); 445 } 446 447 public boolean isRunning() { 448 return running; 449 } 450 } 451 452 private static final class TestCallable implements AsyncCallable<@Nullable Void> { 453 454 private final ListenableFuture<@Nullable Void> future; 455 private boolean called = false; 456 457 private TestCallable(ListenableFuture<@Nullable Void> future) { 458 this.future = future; 459 } 460 461 @Override 462 public ListenableFuture<@Nullable Void> call() throws Exception { 463 called = true; 464 return future; 465 } 466 } 467 } 468