1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.truth.Truth.assertThat; 18 import static com.google.common.util.concurrent.Futures.allAsList; 19 import static com.google.common.util.concurrent.Futures.getDone; 20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 21 import static java.util.concurrent.TimeUnit.SECONDS; 22 23 import com.google.common.annotations.GwtIncompatible; 24 import com.google.common.base.Function; 25 import com.google.common.testing.GcFinalization; 26 import com.google.common.testing.TestLogHandler; 27 import com.google.j2objc.annotations.J2ObjCIncompatible; 28 import java.lang.ref.WeakReference; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Logger; 39 import junit.framework.TestCase; 40 41 /** Tests for {@link ExecutionSequencer} */ 42 public class ExecutionSequencerTest extends TestCase { 43 44 ExecutorService executor; 45 46 private ExecutionSequencer serializer; 47 private SettableFuture<Void> firstFuture; 48 private TestCallable firstCallable; 49 50 @Override setUp()51 public void setUp() throws Exception { 52 executor = Executors.newCachedThreadPool(); 53 serializer = ExecutionSequencer.create(); 54 firstFuture = SettableFuture.create(); 55 firstCallable = new TestCallable(firstFuture); 56 } 57 58 @Override tearDown()59 public void tearDown() throws Exception { 60 executor.shutdown(); 61 } 62 testCallableStartsAfterFirstFutureCompletes()63 public void testCallableStartsAfterFirstFutureCompletes() { 64 @SuppressWarnings({"unused", "nullness"}) 65 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 66 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 67 @SuppressWarnings({"unused", "nullness"}) 68 Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); 69 assertThat(firstCallable.called).isTrue(); 70 assertThat(secondCallable.called).isFalse(); 71 firstFuture.set(null); 72 assertThat(secondCallable.called).isTrue(); 73 } 74 testCancellationDoesNotViolateSerialization()75 public void testCancellationDoesNotViolateSerialization() { 76 @SuppressWarnings({"unused", "nullness"}) 77 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 78 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 79 ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor()); 80 TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 81 @SuppressWarnings({"unused", "nullness"}) 82 Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); 83 secondFuture.cancel(true); 84 assertThat(secondCallable.called).isFalse(); 85 assertThat(thirdCallable.called).isFalse(); 86 firstFuture.set(null); 87 assertThat(secondCallable.called).isFalse(); 88 assertThat(thirdCallable.called).isTrue(); 89 } 90 testCancellationMultipleThreads()91 public void testCancellationMultipleThreads() throws Exception { 92 final BlockingCallable blockingCallable = new BlockingCallable(); 93 ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor); 94 ListenableFuture<Boolean> future2 = 95 serializer.submit( 96 new Callable<Boolean>() { 97 @Override 98 public Boolean call() { 99 return blockingCallable.isRunning(); 100 } 101 }, 102 directExecutor()); 103 104 // Wait for the first task to be started in the background. It will block until we explicitly 105 // stop it. 106 blockingCallable.waitForStart(); 107 108 // Give the second task a chance to (incorrectly) start up while the first task is running. 109 assertThat(future2.isDone()).isFalse(); 110 111 // Stop the first task. The second task should then run. 112 blockingCallable.stop(); 113 executor.shutdown(); 114 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 115 assertThat(getDone(future2)).isFalse(); 116 } 117 testSecondTaskWaitsForFirstEvenIfCancelled()118 public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception { 119 final BlockingCallable blockingCallable = new BlockingCallable(); 120 ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor); 121 ListenableFuture<Boolean> future2 = 122 serializer.submit( 123 new Callable<Boolean>() { 124 @Override 125 public Boolean call() { 126 return blockingCallable.isRunning(); 127 } 128 }, 129 directExecutor()); 130 131 // Wait for the first task to be started in the background. It will block until we explicitly 132 // stop it. 133 blockingCallable.waitForStart(); 134 135 // This time, cancel the future for the first task. The task remains running, only the future 136 // is cancelled. 137 future1.cancel(false); 138 139 // Give the second task a chance to (incorrectly) start up while the first task is running. 140 // (This is the assertion that fails.) 141 assertThat(future2.isDone()).isFalse(); 142 143 // Stop the first task. The second task should then run. 144 blockingCallable.stop(); 145 executor.shutdown(); 146 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 147 assertThat(getDone(future2)).isFalse(); 148 } 149 150 @GwtIncompatible 151 @J2ObjCIncompatible // gc 152 @AndroidIncompatible testCancellationWithReferencedObject()153 public void testCancellationWithReferencedObject() throws Exception { 154 Object toBeGCed = new Object(); 155 WeakReference<Object> ref = new WeakReference<>(toBeGCed); 156 final SettableFuture<Void> settableFuture = SettableFuture.create(); 157 ListenableFuture<?> ignored = 158 serializer.submitAsync( 159 new AsyncCallable<Void>() { 160 @Override 161 public ListenableFuture<Void> call() { 162 return settableFuture; 163 } 164 }, 165 directExecutor()); 166 serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); 167 toBeGCed = null; 168 GcFinalization.awaitClear(ref); 169 } 170 toStringCallable(final Object object)171 private static Callable<String> toStringCallable(final Object object) { 172 return new Callable<String>() { 173 @Override 174 public String call() { 175 return object.toString(); 176 } 177 }; 178 } 179 180 public void testCancellationDuringReentrancy() throws Exception { 181 TestLogHandler logHandler = new TestLogHandler(); 182 Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); 183 184 List<Future<?>> results = new ArrayList<>(); 185 final Runnable[] manualExecutorTask = new Runnable[1]; 186 Executor manualExecutor = 187 new Executor() { 188 @Override 189 public void execute(Runnable task) { 190 manualExecutorTask[0] = task; 191 } 192 }; 193 194 results.add(serializer.submit(Callables.returning(null), manualExecutor)); 195 final Future<?>[] thingToCancel = new Future<?>[1]; 196 results.add( 197 serializer.submit( 198 new Callable<Void>() { 199 @Override 200 public Void call() { 201 thingToCancel[0].cancel(false); 202 return null; 203 } 204 }, 205 directExecutor())); 206 thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor()); 207 results.add(thingToCancel[0]); 208 // Enqueue more than enough tasks to force reentrancy. 209 for (int i = 0; i < 5; i++) { 210 results.add(serializer.submit(Callables.returning(null), directExecutor())); 211 } 212 213 manualExecutorTask[0].run(); 214 215 for (Future<?> result : results) { 216 if (!result.isCancelled()) { 217 result.get(10, SECONDS); 218 } 219 // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect. 220 } 221 222 assertThat(logHandler.getStoredLogRecords()).isEmpty(); 223 } 224 225 public void testAvoidsStackOverflow_manySubmitted() throws Exception { 226 final SettableFuture<Void> settableFuture = SettableFuture.create(); 227 ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001); 228 results.add( 229 serializer.submitAsync( 230 new AsyncCallable<Void>() { 231 @Override 232 public ListenableFuture<Void> call() { 233 return settableFuture; 234 } 235 }, 236 directExecutor())); 237 for (int i = 0; i < 50_000; i++) { 238 results.add(serializer.submit(Callables.<Void>returning(null), directExecutor())); 239 } 240 settableFuture.set(null); 241 getDone(allAsList(results)); 242 } 243 244 public void testAvoidsStackOverflow_manyCancelled() throws Exception { 245 final SettableFuture<Void> settableFuture = SettableFuture.create(); 246 ListenableFuture<Void> unused = 247 serializer.submitAsync( 248 new AsyncCallable<Void>() { 249 @Override 250 public ListenableFuture<Void> call() { 251 return settableFuture; 252 } 253 }, 254 directExecutor()); 255 for (int i = 0; i < 50_000; i++) { 256 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 257 } 258 ListenableFuture<Integer> stackDepthCheck = 259 serializer.submit( 260 new Callable<Integer>() { 261 @Override 262 public Integer call() { 263 return Thread.currentThread().getStackTrace().length; 264 } 265 }, 266 directExecutor()); 267 settableFuture.set(null); 268 assertThat(getDone(stackDepthCheck)) 269 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 270 } 271 272 public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { 273 final SettableFuture<Void> settableFuture = SettableFuture.create(); 274 ListenableFuture<Void> unused = 275 serializer.submitAsync( 276 new AsyncCallable<Void>() { 277 @Override 278 public ListenableFuture<Void> call() { 279 return settableFuture; 280 } 281 }, 282 directExecutor()); 283 for (int i = 0; i < 25_000; i++) { 284 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 285 unused = serializer.submit(Callables.<Void>returning(null), directExecutor()); 286 } 287 ListenableFuture<Integer> stackDepthCheck = 288 serializer.submit( 289 new Callable<Integer>() { 290 @Override 291 public Integer call() { 292 return Thread.currentThread().getStackTrace().length; 293 } 294 }, 295 directExecutor()); 296 settableFuture.set(null); 297 assertThat(getDone(stackDepthCheck)) 298 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 299 } 300 301 private static Function<Integer, Integer> add(final int delta) { 302 return new Function<Integer, Integer>() { 303 @Override 304 public Integer apply(Integer input) { 305 return input + delta; 306 } 307 }; 308 } 309 310 private static AsyncCallable<Integer> asyncAdd( 311 final ListenableFuture<Integer> future, final int delta, final Executor executor) { 312 return new AsyncCallable<Integer>() { 313 @Override 314 public ListenableFuture<Integer> call() throws Exception { 315 return Futures.transform(future, add(delta), executor); 316 } 317 }; 318 } 319 320 private static final class LongHolder { 321 long count; 322 } 323 324 private static final int ITERATION_COUNT = 50_000; 325 private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; 326 327 @GwtIncompatible // threads 328 329 public void testAvoidsStackOverflow_multipleThreads() throws Exception { 330 final LongHolder holder = new LongHolder(); 331 final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>(); 332 final List<Integer> completeLengthChecks; 333 final int baseStackDepth; 334 ExecutorService service = Executors.newFixedThreadPool(5); 335 try { 336 // Avoid counting frames from the executor itself, or the ExecutionSequencer 337 baseStackDepth = 338 serializer 339 .submit( 340 new Callable<Integer>() { 341 @Override 342 public Integer call() { 343 return Thread.currentThread().getStackTrace().length; 344 } 345 }, 346 service) 347 .get(); 348 final SettableFuture<Void> settableFuture = SettableFuture.create(); 349 ListenableFuture<?> unused = 350 serializer.submitAsync( 351 new AsyncCallable<Void>() { 352 @Override 353 public ListenableFuture<Void> call() { 354 return settableFuture; 355 } 356 }, 357 directExecutor()); 358 for (int i = 0; i < 50_000; i++) { 359 if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { 360 // after some number of iterations, switch threads 361 unused = 362 serializer.submit( 363 new Callable<Void>() { 364 @Override 365 public Void call() { 366 holder.count++; 367 return null; 368 } 369 }, 370 service); 371 } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { 372 // When at max depth, record stack trace depth 373 lengthChecks.add( 374 serializer.submit( 375 new Callable<Integer>() { 376 @Override 377 public Integer call() { 378 holder.count++; 379 return Thread.currentThread().getStackTrace().length; 380 } 381 }, 382 directExecutor())); 383 } else { 384 // Otherwise, schedule a task on directExecutor 385 unused = 386 serializer.submit( 387 new Callable<Void>() { 388 @Override 389 public Void call() { 390 holder.count++; 391 return null; 392 } 393 }, 394 directExecutor()); 395 } 396 } 397 settableFuture.set(null); 398 completeLengthChecks = allAsList(lengthChecks).get(); 399 } finally { 400 service.shutdown(); 401 } 402 assertThat(holder.count).isEqualTo(ITERATION_COUNT); 403 for (int length : completeLengthChecks) { 404 // Verify that at max depth, less than one stack frame per submitted task was consumed 405 assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); 406 } 407 } 408 409 @SuppressWarnings("ObjectToString") // Intended behavior 410 public void testToString() { 411 Future<?> unused = serializer.submitAsync(firstCallable, directExecutor()); 412 TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create()); 413 Future<?> second = serializer.submitAsync(secondCallable, directExecutor()); 414 assertThat(secondCallable.called).isFalse(); 415 assertThat(second.toString()).contains(secondCallable.toString()); 416 firstFuture.set(null); 417 assertThat(second.toString()).contains(secondCallable.future.toString()); 418 } 419 420 private static class BlockingCallable implements Callable<Void> { 421 private final CountDownLatch startLatch = new CountDownLatch(1); 422 private final CountDownLatch stopLatch = new CountDownLatch(1); 423 424 private volatile boolean running = false; 425 426 @Override 427 public Void call() throws InterruptedException { 428 running = true; 429 startLatch.countDown(); 430 stopLatch.await(); 431 running = false; 432 return null; 433 } 434 435 public void waitForStart() throws InterruptedException { 436 startLatch.await(); 437 } 438 439 public void stop() { 440 stopLatch.countDown(); 441 } 442 443 public boolean isRunning() { 444 return running; 445 } 446 } 447 448 private static final class TestCallable implements AsyncCallable<Void> { 449 450 private final ListenableFuture<Void> future; 451 private boolean called = false; 452 453 private TestCallable(ListenableFuture<Void> future) { 454 this.future = future; 455 } 456 457 @Override 458 public ListenableFuture<Void> call() throws Exception { 459 called = true; 460 return future; 461 } 462 } 463 } 464