1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.truth.Truth.assertThat; 18 import static com.google.common.util.concurrent.Futures.allAsList; 19 import static com.google.common.util.concurrent.Futures.getDone; 20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 21 import static java.util.concurrent.TimeUnit.SECONDS; 22 23 import com.google.common.annotations.GwtIncompatible; 24 import com.google.common.base.Function; 25 import com.google.common.testing.GcFinalization; 26 import com.google.common.testing.TestLogHandler; 27 import com.google.j2objc.annotations.J2ObjCIncompatible; 28 import java.lang.ref.WeakReference; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Logger; 39 import junit.framework.TestCase; 40 41 /** Tests for {@link ExecutionSequencer} */ 42 public class ExecutionSequencerTest extends TestCase { 43 44 ExecutorService executor; 45 46 private ExecutionSequencer serializer; 47 private SettableFuture<Void> firstFuture; 48 private TestCallable firstCallable; 49 50 @Override setUp()51 public void setUp() throws Exception { 52 executor = Executors.newCachedThreadPool(); 53 serializer = ExecutionSequencer.create(); 54 firstFuture = SettableFuture.create(); 55 firstCallable = new TestCallable(firstFuture); 56 } 57 58 @Override tearDown()59 public void tearDown() throws Exception { 60 executor.shutdown(); 61 } 62 testCallableStartsAfterFirstFutureCompletes()63 public void testCallableStartsAfterFirstFutureCompletes() { 64 @SuppressWarnings({"unused", "nullness"}) 65 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 66 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 67 @SuppressWarnings({"unused", "nullness"}) 68 Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); 69 assertThat(firstCallable.called).isTrue(); 70 assertThat(secondCallable.called).isFalse(); 71 firstFuture.set(null); 72 assertThat(secondCallable.called).isTrue(); 73 } 74 testCancellationDoesNotViolateSerialization()75 public void testCancellationDoesNotViolateSerialization() { 76 @SuppressWarnings({"unused", "nullness"}) 77 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 78 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 79 ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor()); 80 TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 81 @SuppressWarnings({"unused", "nullness"}) 82 Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); 83 secondFuture.cancel(true); 84 assertThat(secondCallable.called).isFalse(); 85 assertThat(thirdCallable.called).isFalse(); 86 firstFuture.set(null); 87 assertThat(secondCallable.called).isFalse(); 88 assertThat(thirdCallable.called).isTrue(); 89 } 90 91 testCancellationMultipleThreads()92 public void testCancellationMultipleThreads() throws Exception { 93 final BlockingCallable blockingCallable = new BlockingCallable(); 94 ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor); 95 ListenableFuture<Boolean> future2 = 96 serializer.submit( 97 new Callable<Boolean>() { 98 @Override 99 public Boolean call() { 100 return blockingCallable.isRunning(); 101 } 102 }, 103 directExecutor()); 104 105 // Wait for the first task to be started in the background. It will block until we explicitly 106 // stop it. 107 blockingCallable.waitForStart(); 108 109 // Give the second task a chance to (incorrectly) start up while the first task is running. 110 assertThat(future2.isDone()).isFalse(); 111 112 // Stop the first task. The second task should then run. 113 blockingCallable.stop(); 114 executor.shutdown(); 115 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 116 assertThat(getDone(future2)).isFalse(); 117 } 118 119 testSecondTaskWaitsForFirstEvenIfCancelled()120 public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception { 121 final BlockingCallable blockingCallable = new BlockingCallable(); 122 ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor); 123 ListenableFuture<Boolean> future2 = 124 serializer.submit( 125 new Callable<Boolean>() { 126 @Override 127 public Boolean call() { 128 return blockingCallable.isRunning(); 129 } 130 }, 131 directExecutor()); 132 133 // Wait for the first task to be started in the background. It will block until we explicitly 134 // stop it. 135 blockingCallable.waitForStart(); 136 137 // This time, cancel the future for the first task. The task remains running, only the future 138 // is cancelled. 139 future1.cancel(false); 140 141 // Give the second task a chance to (incorrectly) start up while the first task is running. 142 // (This is the assertion that fails.) 143 assertThat(future2.isDone()).isFalse(); 144 145 // Stop the first task. The second task should then run. 146 blockingCallable.stop(); 147 executor.shutdown(); 148 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 149 assertThat(getDone(future2)).isFalse(); 150 } 151 152 @GwtIncompatible 153 @J2ObjCIncompatible // gc 154 @AndroidIncompatible testCancellationWithReferencedObject()155 public void testCancellationWithReferencedObject() throws Exception { 156 Object toBeGCed = new Object(); 157 WeakReference<Object> ref = new WeakReference<>(toBeGCed); 158 final SettableFuture<Void> settableFuture = SettableFuture.create(); 159 ListenableFuture<?> ignored = 160 serializer.submitAsync( 161 new AsyncCallable<Void>() { 162 @Override 163 public ListenableFuture<Void> call() { 164 return settableFuture; 165 } 166 }, 167 directExecutor()); 168 serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); 169 toBeGCed = null; 170 GcFinalization.awaitClear(ref); 171 } 172 toStringCallable(final Object object)173 private static Callable<String> toStringCallable(final Object object) { 174 return new Callable<String>() { 175 @Override 176 public String call() { 177 return object.toString(); 178 } 179 }; 180 } 181 182 public void testCancellationDuringReentrancy() throws Exception { 183 TestLogHandler logHandler = new TestLogHandler(); 184 Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); 185 186 List<Future<?>> results = new ArrayList<>(); 187 final Runnable[] manualExecutorTask = new Runnable[1]; 188 Executor manualExecutor = 189 new Executor() { 190 @Override 191 public void execute(Runnable task) { 192 manualExecutorTask[0] = task; 193 } 194 }; 195 196 results.add(serializer.submit(Callables.returning(null), manualExecutor)); 197 final Future<?>[] thingToCancel = new Future<?>[1]; 198 results.add( 199 serializer.submit( 200 new Callable<Void>() { 201 @Override 202 public Void call() { 203 thingToCancel[0].cancel(false); 204 return null; 205 } 206 }, 207 directExecutor())); 208 thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor()); 209 results.add(thingToCancel[0]); 210 // Enqueue more than enough tasks to force reentrancy. 211 for (int i = 0; i < 5; i++) { 212 results.add(serializer.submit(Callables.returning(null), directExecutor())); 213 } 214 215 manualExecutorTask[0].run(); 216 217 for (Future<?> result : results) { 218 if (!result.isCancelled()) { 219 result.get(10, SECONDS); 220 } 221 // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect. 222 } 223 224 assertThat(logHandler.getStoredLogRecords()).isEmpty(); 225 } 226 227 public void testAvoidsStackOverflow_manySubmitted() throws Exception { 228 final SettableFuture<Void> settableFuture = SettableFuture.create(); 229 ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001); 230 results.add( 231 serializer.submitAsync( 232 new AsyncCallable<Void>() { 233 @Override 234 public ListenableFuture<Void> call() { 235 return settableFuture; 236 } 237 }, 238 directExecutor())); 239 for (int i = 0; i < 50_000; i++) { 240 results.add(serializer.submit(Callables.<Void>returning(null), directExecutor())); 241 } 242 settableFuture.set(null); 243 getDone(allAsList(results)); 244 } 245 246 public void testAvoidsStackOverflow_manyCancelled() throws Exception { 247 final SettableFuture<Void> settableFuture = SettableFuture.create(); 248 ListenableFuture<Void> unused = 249 serializer.submitAsync( 250 new AsyncCallable<Void>() { 251 @Override 252 public ListenableFuture<Void> call() { 253 return settableFuture; 254 } 255 }, 256 directExecutor()); 257 for (int i = 0; i < 50_000; i++) { 258 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 259 } 260 ListenableFuture<Integer> stackDepthCheck = 261 serializer.submit( 262 new Callable<Integer>() { 263 @Override 264 public Integer call() { 265 return Thread.currentThread().getStackTrace().length; 266 } 267 }, 268 directExecutor()); 269 settableFuture.set(null); 270 assertThat(getDone(stackDepthCheck)) 271 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 272 } 273 274 public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { 275 final SettableFuture<Void> settableFuture = SettableFuture.create(); 276 ListenableFuture<Void> unused = 277 serializer.submitAsync( 278 new AsyncCallable<Void>() { 279 @Override 280 public ListenableFuture<Void> call() { 281 return settableFuture; 282 } 283 }, 284 directExecutor()); 285 for (int i = 0; i < 25_000; i++) { 286 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 287 unused = serializer.submit(Callables.<Void>returning(null), directExecutor()); 288 } 289 ListenableFuture<Integer> stackDepthCheck = 290 serializer.submit( 291 new Callable<Integer>() { 292 @Override 293 public Integer call() { 294 return Thread.currentThread().getStackTrace().length; 295 } 296 }, 297 directExecutor()); 298 settableFuture.set(null); 299 assertThat(getDone(stackDepthCheck)) 300 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 301 } 302 303 private static Function<Integer, Integer> add(final int delta) { 304 return new Function<Integer, Integer>() { 305 @Override 306 public Integer apply(Integer input) { 307 return input + delta; 308 } 309 }; 310 } 311 312 private static AsyncCallable<Integer> asyncAdd( 313 final ListenableFuture<Integer> future, final int delta, final Executor executor) { 314 return new AsyncCallable<Integer>() { 315 @Override 316 public ListenableFuture<Integer> call() throws Exception { 317 return Futures.transform(future, add(delta), executor); 318 } 319 }; 320 } 321 322 private static final class LongHolder { 323 long count; 324 } 325 326 private static final int ITERATION_COUNT = 50_000; 327 private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; 328 329 @GwtIncompatible // threads 330 331 public void testAvoidsStackOverflow_multipleThreads() throws Exception { 332 final LongHolder holder = new LongHolder(); 333 final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>(); 334 final List<Integer> completeLengthChecks; 335 final int baseStackDepth; 336 ExecutorService service = Executors.newFixedThreadPool(5); 337 try { 338 // Avoid counting frames from the executor itself, or the ExecutionSequencer 339 baseStackDepth = 340 serializer 341 .submit( 342 new Callable<Integer>() { 343 @Override 344 public Integer call() { 345 return Thread.currentThread().getStackTrace().length; 346 } 347 }, 348 service) 349 .get(); 350 final SettableFuture<Void> settableFuture = SettableFuture.create(); 351 ListenableFuture<?> unused = 352 serializer.submitAsync( 353 new AsyncCallable<Void>() { 354 @Override 355 public ListenableFuture<Void> call() { 356 return settableFuture; 357 } 358 }, 359 directExecutor()); 360 for (int i = 0; i < 50_000; i++) { 361 if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { 362 // after some number of iterations, switch threads 363 unused = 364 serializer.submit( 365 new Callable<Void>() { 366 @Override 367 public Void call() { 368 holder.count++; 369 return null; 370 } 371 }, 372 service); 373 } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { 374 // When at max depth, record stack trace depth 375 lengthChecks.add( 376 serializer.submit( 377 new Callable<Integer>() { 378 @Override 379 public Integer call() { 380 holder.count++; 381 return Thread.currentThread().getStackTrace().length; 382 } 383 }, 384 directExecutor())); 385 } else { 386 // Otherwise, schedule a task on directExecutor 387 unused = 388 serializer.submit( 389 new Callable<Void>() { 390 @Override 391 public Void call() { 392 holder.count++; 393 return null; 394 } 395 }, 396 directExecutor()); 397 } 398 } 399 settableFuture.set(null); 400 completeLengthChecks = allAsList(lengthChecks).get(); 401 } finally { 402 service.shutdown(); 403 } 404 assertThat(holder.count).isEqualTo(ITERATION_COUNT); 405 for (int length : completeLengthChecks) { 406 // Verify that at max depth, less than one stack frame per submitted task was consumed 407 assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); 408 } 409 } 410 411 @SuppressWarnings("ObjectToString") // Intended behavior 412 public void testToString() { 413 Future<?> unused = serializer.submitAsync(firstCallable, directExecutor()); 414 TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create()); 415 Future<?> second = serializer.submitAsync(secondCallable, directExecutor()); 416 assertThat(secondCallable.called).isFalse(); 417 assertThat(second.toString()).contains(secondCallable.toString()); 418 firstFuture.set(null); 419 assertThat(second.toString()).contains(secondCallable.future.toString()); 420 } 421 422 private static class BlockingCallable implements Callable<Void> { 423 private final CountDownLatch startLatch = new CountDownLatch(1); 424 private final CountDownLatch stopLatch = new CountDownLatch(1); 425 426 private volatile boolean running = false; 427 428 @Override 429 public Void call() throws InterruptedException { 430 running = true; 431 startLatch.countDown(); 432 stopLatch.await(); 433 running = false; 434 return null; 435 } 436 437 public void waitForStart() throws InterruptedException { 438 startLatch.await(); 439 } 440 441 public void stop() { 442 stopLatch.countDown(); 443 } 444 445 public boolean isRunning() { 446 return running; 447 } 448 } 449 450 private static final class TestCallable implements AsyncCallable<Void> { 451 452 private final ListenableFuture<Void> future; 453 private boolean called = false; 454 455 private TestCallable(ListenableFuture<Void> future) { 456 this.future = future; 457 } 458 459 @Override 460 public ListenableFuture<Void> call() throws Exception { 461 called = true; 462 return future; 463 } 464 } 465 } 466