1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.truth.Truth.assertThat; 18 import static com.google.common.util.concurrent.Futures.allAsList; 19 import static com.google.common.util.concurrent.Futures.getDone; 20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 21 import static java.util.concurrent.TimeUnit.SECONDS; 22 23 import com.google.common.annotations.GwtIncompatible; 24 import com.google.common.base.Function; 25 import com.google.common.testing.GcFinalization; 26 import com.google.common.testing.TestLogHandler; 27 import com.google.j2objc.annotations.J2ObjCIncompatible; 28 import java.lang.ref.WeakReference; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Logger; 39 import junit.framework.TestCase; 40 41 /** Tests for {@link ExecutionSequencer} */ 42 public class ExecutionSequencerTest extends TestCase { 43 44 ExecutorService executor; 45 46 private ExecutionSequencer serializer; 47 private SettableFuture<Void> firstFuture; 48 private TestCallable firstCallable; 49 50 @Override setUp()51 public void setUp() throws Exception { 52 executor = Executors.newCachedThreadPool(); 53 serializer = ExecutionSequencer.create(); 54 firstFuture = SettableFuture.create(); 55 firstCallable = new TestCallable(firstFuture); 56 } 57 58 @Override tearDown()59 public void tearDown() throws Exception { 60 executor.shutdown(); 61 } 62 testCallableStartsAfterFirstFutureCompletes()63 public void testCallableStartsAfterFirstFutureCompletes() { 64 @SuppressWarnings({"unused", "nullness"}) 65 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 66 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 67 @SuppressWarnings({"unused", "nullness"}) 68 Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor()); 69 assertThat(firstCallable.called).isTrue(); 70 assertThat(secondCallable.called).isFalse(); 71 firstFuture.set(null); 72 assertThat(secondCallable.called).isTrue(); 73 } 74 testCancellationDoesNotViolateSerialization()75 public void testCancellationDoesNotViolateSerialization() { 76 @SuppressWarnings({"unused", "nullness"}) 77 Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor()); 78 TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 79 ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor()); 80 TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null)); 81 @SuppressWarnings({"unused", "nullness"}) 82 Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor()); 83 secondFuture.cancel(true); 84 assertThat(secondCallable.called).isFalse(); 85 assertThat(thirdCallable.called).isFalse(); 86 firstFuture.set(null); 87 assertThat(secondCallable.called).isFalse(); 88 assertThat(thirdCallable.called).isTrue(); 89 } 90 testCancellationMultipleThreads()91 public void testCancellationMultipleThreads() throws Exception { 92 final BlockingCallable blockingCallable = new BlockingCallable(); 93 ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor); 94 ListenableFuture<Boolean> future2 = 95 serializer.submit( 96 new Callable<Boolean>() { 97 @Override 98 public Boolean call() { 99 return blockingCallable.isRunning(); 100 } 101 }, 102 directExecutor()); 103 104 // Wait for the first task to be started in the background. It will block until we explicitly 105 // stop it. 106 blockingCallable.waitForStart(); 107 108 // Give the second task a chance to (incorrectly) start up while the first task is running. 109 assertThat(future2.isDone()).isFalse(); 110 111 // Stop the first task. The second task should then run. 112 blockingCallable.stop(); 113 executor.shutdown(); 114 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 115 assertThat(getDone(future2)).isFalse(); 116 } 117 testSecondTaskWaitsForFirstEvenIfCancelled()118 public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception { 119 final BlockingCallable blockingCallable = new BlockingCallable(); 120 ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor); 121 ListenableFuture<Boolean> future2 = 122 serializer.submit( 123 new Callable<Boolean>() { 124 @Override 125 public Boolean call() { 126 return blockingCallable.isRunning(); 127 } 128 }, 129 directExecutor()); 130 131 // Wait for the first task to be started in the background. It will block until we explicitly 132 // stop it. 133 blockingCallable.waitForStart(); 134 135 // This time, cancel the future for the first task. The task remains running, only the future 136 // is cancelled. 137 future1.cancel(false); 138 139 // Give the second task a chance to (incorrectly) start up while the first task is running. 140 // (This is the assertion that fails.) 141 assertThat(future2.isDone()).isFalse(); 142 143 // Stop the first task. The second task should then run. 144 blockingCallable.stop(); 145 executor.shutdown(); 146 assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue(); 147 assertThat(getDone(future2)).isFalse(); 148 } 149 150 @GwtIncompatible 151 @J2ObjCIncompatible // gc 152 @AndroidIncompatible testCancellationWithReferencedObject()153 public void testCancellationWithReferencedObject() throws Exception { 154 Object toBeGCed = new Object(); 155 WeakReference<Object> ref = new WeakReference<>(toBeGCed); 156 final SettableFuture<Void> settableFuture = SettableFuture.create(); 157 ListenableFuture<?> ignored = 158 serializer.submitAsync( 159 new AsyncCallable<Void>() { 160 @Override 161 public ListenableFuture<Void> call() { 162 return settableFuture; 163 } 164 }, 165 directExecutor()); 166 serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true); 167 toBeGCed = null; 168 GcFinalization.awaitClear(ref); 169 } 170 toStringCallable(final Object object)171 private static Callable<String> toStringCallable(final Object object) { 172 return new Callable<String>() { 173 @Override 174 public String call() { 175 return object.toString(); 176 } 177 }; 178 } 179 180 public void testCancellationDuringReentrancy() throws Exception { 181 TestLogHandler logHandler = new TestLogHandler(); 182 Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler); 183 184 List<Future<?>> results = new ArrayList<>(); 185 final Runnable[] manualExecutorTask = new Runnable[1]; 186 Executor manualExecutor = 187 new Executor() { 188 @Override 189 public void execute(Runnable task) { 190 manualExecutorTask[0] = task; 191 } 192 }; 193 194 results.add(serializer.submit(Callables.returning(null), manualExecutor)); 195 final Future<?>[] thingToCancel = new Future<?>[1]; 196 results.add( 197 serializer.submit( 198 new Callable<Void>() { 199 @Override 200 public Void call() { 201 thingToCancel[0].cancel(false); 202 return null; 203 } 204 }, 205 directExecutor())); 206 thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor()); 207 results.add(thingToCancel[0]); 208 // Enqueue more than enough tasks to force reentrancy. 209 for (int i = 0; i < 5; i++) { 210 results.add(serializer.submit(Callables.returning(null), directExecutor())); 211 } 212 213 manualExecutorTask[0].run(); 214 215 for (Future<?> result : results) { 216 if (!result.isCancelled()) { 217 result.get(10, SECONDS); 218 } 219 // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect. 220 } 221 222 assertThat(logHandler.getStoredLogRecords()).isEmpty(); 223 } 224 225 public void testAvoidsStackOverflow_manySubmitted() throws Exception { 226 final SettableFuture<Void> settableFuture = SettableFuture.create(); 227 ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001); 228 results.add( 229 serializer.submitAsync( 230 new AsyncCallable<Void>() { 231 @Override 232 public ListenableFuture<Void> call() { 233 return settableFuture; 234 } 235 }, 236 directExecutor())); 237 for (int i = 0; i < 50_000; i++) { 238 results.add(serializer.submit(Callables.<Void>returning(null), directExecutor())); 239 } 240 settableFuture.set(null); 241 getDone(allAsList(results)); 242 } 243 244 public void testAvoidsStackOverflow_manyCancelled() throws Exception { 245 final SettableFuture<Void> settableFuture = SettableFuture.create(); 246 ListenableFuture<Void> unused = 247 serializer.submitAsync( 248 new AsyncCallable<Void>() { 249 @Override 250 public ListenableFuture<Void> call() { 251 return settableFuture; 252 } 253 }, 254 directExecutor()); 255 for (int i = 0; i < 50_000; i++) { 256 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 257 } 258 ListenableFuture<Integer> stackDepthCheck = 259 serializer.submit( 260 new Callable<Integer>() { 261 @Override 262 public Integer call() { 263 return Thread.currentThread().getStackTrace().length; 264 } 265 }, 266 directExecutor()); 267 settableFuture.set(null); 268 assertThat(getDone(stackDepthCheck)) 269 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 270 } 271 272 public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception { 273 final SettableFuture<Void> settableFuture = SettableFuture.create(); 274 ListenableFuture<Void> unused = 275 serializer.submitAsync( 276 new AsyncCallable<Void>() { 277 @Override 278 public ListenableFuture<Void> call() { 279 return settableFuture; 280 } 281 }, 282 directExecutor()); 283 for (int i = 0; i < 25_000; i++) { 284 serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true); 285 unused = serializer.submit(Callables.<Void>returning(null), directExecutor()); 286 } 287 ListenableFuture<Integer> stackDepthCheck = 288 serializer.submit( 289 new Callable<Integer>() { 290 @Override 291 public Integer call() { 292 return Thread.currentThread().getStackTrace().length; 293 } 294 }, 295 directExecutor()); 296 settableFuture.set(null); 297 assertThat(getDone(stackDepthCheck)) 298 .isLessThan(Thread.currentThread().getStackTrace().length + 100); 299 } 300 301 private static Function<Integer, Integer> add(final int delta) { 302 return new Function<Integer, Integer>() { 303 @Override 304 public Integer apply(Integer input) { 305 return input + delta; 306 } 307 }; 308 } 309 310 private static AsyncCallable<Integer> asyncAdd( 311 final ListenableFuture<Integer> future, final int delta, final Executor executor) { 312 return new AsyncCallable<Integer>() { 313 @Override 314 public ListenableFuture<Integer> call() throws Exception { 315 return Futures.transform(future, add(delta), executor); 316 } 317 }; 318 } 319 320 private static final class LongHolder { 321 long count; 322 } 323 324 private static final int ITERATION_COUNT = 50_000; 325 private static final int DIRECT_EXECUTIONS_PER_THREAD = 100; 326 327 @GwtIncompatible // threads 328 public void testAvoidsStackOverflow_multipleThreads() throws Exception { 329 final LongHolder holder = new LongHolder(); 330 final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>(); 331 final List<Integer> completeLengthChecks; 332 final int baseStackDepth; 333 ExecutorService service = Executors.newFixedThreadPool(5); 334 try { 335 // Avoid counting frames from the executor itself, or the ExecutionSequencer 336 baseStackDepth = 337 serializer 338 .submit( 339 new Callable<Integer>() { 340 @Override 341 public Integer call() { 342 return Thread.currentThread().getStackTrace().length; 343 } 344 }, 345 service) 346 .get(); 347 final SettableFuture<Void> settableFuture = SettableFuture.create(); 348 ListenableFuture<?> unused = 349 serializer.submitAsync( 350 new AsyncCallable<Void>() { 351 @Override 352 public ListenableFuture<Void> call() { 353 return settableFuture; 354 } 355 }, 356 directExecutor()); 357 for (int i = 0; i < 50_000; i++) { 358 if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) { 359 // after some number of iterations, switch threads 360 unused = 361 serializer.submit( 362 new Callable<Void>() { 363 @Override 364 public Void call() { 365 holder.count++; 366 return null; 367 } 368 }, 369 service); 370 } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) { 371 // When at max depth, record stack trace depth 372 lengthChecks.add( 373 serializer.submit( 374 new Callable<Integer>() { 375 @Override 376 public Integer call() { 377 holder.count++; 378 return Thread.currentThread().getStackTrace().length; 379 } 380 }, 381 directExecutor())); 382 } else { 383 // Otherwise, schedule a task on directExecutor 384 unused = 385 serializer.submit( 386 new Callable<Void>() { 387 @Override 388 public Void call() { 389 holder.count++; 390 return null; 391 } 392 }, 393 directExecutor()); 394 } 395 } 396 settableFuture.set(null); 397 completeLengthChecks = allAsList(lengthChecks).get(); 398 } finally { 399 service.shutdown(); 400 } 401 assertThat(holder.count).isEqualTo(ITERATION_COUNT); 402 for (int length : completeLengthChecks) { 403 // Verify that at max depth, less than one stack frame per submitted task was consumed 404 assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2); 405 } 406 } 407 408 @SuppressWarnings("ObjectToString") // Intended behavior 409 public void testToString() { 410 Future<?> unused = serializer.submitAsync(firstCallable, directExecutor()); 411 TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create()); 412 Future<?> second = serializer.submitAsync(secondCallable, directExecutor()); 413 assertThat(secondCallable.called).isFalse(); 414 assertThat(second.toString()).contains(secondCallable.toString()); 415 firstFuture.set(null); 416 assertThat(second.toString()).contains(secondCallable.future.toString()); 417 } 418 419 private static class BlockingCallable implements Callable<Void> { 420 private final CountDownLatch startLatch = new CountDownLatch(1); 421 private final CountDownLatch stopLatch = new CountDownLatch(1); 422 423 private volatile boolean running = false; 424 425 @Override 426 public Void call() throws InterruptedException { 427 running = true; 428 startLatch.countDown(); 429 stopLatch.await(); 430 running = false; 431 return null; 432 } 433 434 public void waitForStart() throws InterruptedException { 435 startLatch.await(); 436 } 437 438 public void stop() { 439 stopLatch.countDown(); 440 } 441 442 public boolean isRunning() { 443 return running; 444 } 445 } 446 447 private static final class TestCallable implements AsyncCallable<Void> { 448 449 private final ListenableFuture<Void> future; 450 private boolean called = false; 451 452 private TestCallable(ListenableFuture<Void> future) { 453 this.future = future; 454 } 455 456 @Override 457 public ListenableFuture<Void> call() throws Exception { 458 called = true; 459 return future; 460 } 461 } 462 } 463