• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.truth.Truth.assertThat;
18 import static com.google.common.util.concurrent.Futures.allAsList;
19 import static com.google.common.util.concurrent.Futures.getDone;
20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21 import static java.util.concurrent.TimeUnit.SECONDS;
22 
23 import com.google.common.annotations.GwtIncompatible;
24 import com.google.common.base.Function;
25 import com.google.common.testing.GcFinalization;
26 import com.google.common.testing.TestLogHandler;
27 import com.google.j2objc.annotations.J2ObjCIncompatible;
28 import java.lang.ref.WeakReference;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Logger;
39 import junit.framework.TestCase;
40 
41 /** Tests for {@link ExecutionSequencer} */
42 public class ExecutionSequencerTest extends TestCase {
43 
44   ExecutorService executor;
45 
46   private ExecutionSequencer serializer;
47   private SettableFuture<Void> firstFuture;
48   private TestCallable firstCallable;
49 
50   @Override
setUp()51   public void setUp() throws Exception {
52     executor = Executors.newCachedThreadPool();
53     serializer = ExecutionSequencer.create();
54     firstFuture = SettableFuture.create();
55     firstCallable = new TestCallable(firstFuture);
56   }
57 
58   @Override
tearDown()59   public void tearDown() throws Exception {
60     executor.shutdown();
61   }
62 
testCallableStartsAfterFirstFutureCompletes()63   public void testCallableStartsAfterFirstFutureCompletes() {
64     @SuppressWarnings({"unused", "nullness"})
65     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
66     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
67     @SuppressWarnings({"unused", "nullness"})
68     Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
69     assertThat(firstCallable.called).isTrue();
70     assertThat(secondCallable.called).isFalse();
71     firstFuture.set(null);
72     assertThat(secondCallable.called).isTrue();
73   }
74 
testCancellationDoesNotViolateSerialization()75   public void testCancellationDoesNotViolateSerialization() {
76     @SuppressWarnings({"unused", "nullness"})
77     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
78     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
79     ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
80     TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
81     @SuppressWarnings({"unused", "nullness"})
82     Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
83     secondFuture.cancel(true);
84     assertThat(secondCallable.called).isFalse();
85     assertThat(thirdCallable.called).isFalse();
86     firstFuture.set(null);
87     assertThat(secondCallable.called).isFalse();
88     assertThat(thirdCallable.called).isTrue();
89   }
90 
91 
testCancellationMultipleThreads()92   public void testCancellationMultipleThreads() throws Exception {
93     final BlockingCallable blockingCallable = new BlockingCallable();
94     ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
95     ListenableFuture<Boolean> future2 =
96         serializer.submit(
97             new Callable<Boolean>() {
98               @Override
99               public Boolean call() {
100                 return blockingCallable.isRunning();
101               }
102             },
103             directExecutor());
104 
105     // Wait for the first task to be started in the background. It will block until we explicitly
106     // stop it.
107     blockingCallable.waitForStart();
108 
109     // Give the second task a chance to (incorrectly) start up while the first task is running.
110     assertThat(future2.isDone()).isFalse();
111 
112     // Stop the first task. The second task should then run.
113     blockingCallable.stop();
114     executor.shutdown();
115     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
116     assertThat(getDone(future2)).isFalse();
117   }
118 
119 
testSecondTaskWaitsForFirstEvenIfCancelled()120   public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
121     final BlockingCallable blockingCallable = new BlockingCallable();
122     ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
123     ListenableFuture<Boolean> future2 =
124         serializer.submit(
125             new Callable<Boolean>() {
126               @Override
127               public Boolean call() {
128                 return blockingCallable.isRunning();
129               }
130             },
131             directExecutor());
132 
133     // Wait for the first task to be started in the background. It will block until we explicitly
134     // stop it.
135     blockingCallable.waitForStart();
136 
137     // This time, cancel the future for the first task. The task remains running, only the future
138     // is cancelled.
139     future1.cancel(false);
140 
141     // Give the second task a chance to (incorrectly) start up while the first task is running.
142     // (This is the assertion that fails.)
143     assertThat(future2.isDone()).isFalse();
144 
145     // Stop the first task. The second task should then run.
146     blockingCallable.stop();
147     executor.shutdown();
148     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
149     assertThat(getDone(future2)).isFalse();
150   }
151 
152   @GwtIncompatible
153   @J2ObjCIncompatible // gc
154   @AndroidIncompatible
testCancellationWithReferencedObject()155   public void testCancellationWithReferencedObject() throws Exception {
156     Object toBeGCed = new Object();
157     WeakReference<Object> ref = new WeakReference<>(toBeGCed);
158     final SettableFuture<Void> settableFuture = SettableFuture.create();
159     ListenableFuture<?> ignored =
160         serializer.submitAsync(
161             new AsyncCallable<Void>() {
162               @Override
163               public ListenableFuture<Void> call() {
164                 return settableFuture;
165               }
166             },
167             directExecutor());
168     serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
169     toBeGCed = null;
170     GcFinalization.awaitClear(ref);
171   }
172 
toStringCallable(final Object object)173   private static Callable<String> toStringCallable(final Object object) {
174     return new Callable<String>() {
175       @Override
176       public String call() {
177         return object.toString();
178       }
179     };
180   }
181 
182   public void testCancellationDuringReentrancy() throws Exception {
183     TestLogHandler logHandler = new TestLogHandler();
184     Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
185 
186     List<Future<?>> results = new ArrayList<>();
187     final Runnable[] manualExecutorTask = new Runnable[1];
188     Executor manualExecutor =
189         new Executor() {
190           @Override
191           public void execute(Runnable task) {
192             manualExecutorTask[0] = task;
193           }
194         };
195 
196     results.add(serializer.submit(Callables.returning(null), manualExecutor));
197     final Future<?>[] thingToCancel = new Future<?>[1];
198     results.add(
199         serializer.submit(
200             new Callable<Void>() {
201               @Override
202               public Void call() {
203                 thingToCancel[0].cancel(false);
204                 return null;
205               }
206             },
207             directExecutor()));
208     thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
209     results.add(thingToCancel[0]);
210     // Enqueue more than enough tasks to force reentrancy.
211     for (int i = 0; i < 5; i++) {
212       results.add(serializer.submit(Callables.returning(null), directExecutor()));
213     }
214 
215     manualExecutorTask[0].run();
216 
217     for (Future<?> result : results) {
218       if (!result.isCancelled()) {
219         result.get(10, SECONDS);
220       }
221       // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
222     }
223 
224     assertThat(logHandler.getStoredLogRecords()).isEmpty();
225   }
226 
227   public void testAvoidsStackOverflow_manySubmitted() throws Exception {
228     final SettableFuture<Void> settableFuture = SettableFuture.create();
229     ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001);
230     results.add(
231         serializer.submitAsync(
232             new AsyncCallable<Void>() {
233               @Override
234               public ListenableFuture<Void> call() {
235                 return settableFuture;
236               }
237             },
238             directExecutor()));
239     for (int i = 0; i < 50_000; i++) {
240       results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
241     }
242     settableFuture.set(null);
243     getDone(allAsList(results));
244   }
245 
246   public void testAvoidsStackOverflow_manyCancelled() throws Exception {
247     final SettableFuture<Void> settableFuture = SettableFuture.create();
248     ListenableFuture<Void> unused =
249         serializer.submitAsync(
250             new AsyncCallable<Void>() {
251               @Override
252               public ListenableFuture<Void> call() {
253                 return settableFuture;
254               }
255             },
256             directExecutor());
257     for (int i = 0; i < 50_000; i++) {
258       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
259     }
260     ListenableFuture<Integer> stackDepthCheck =
261         serializer.submit(
262             new Callable<Integer>() {
263               @Override
264               public Integer call() {
265                 return Thread.currentThread().getStackTrace().length;
266               }
267             },
268             directExecutor());
269     settableFuture.set(null);
270     assertThat(getDone(stackDepthCheck))
271         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
272   }
273 
274   public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
275     final SettableFuture<Void> settableFuture = SettableFuture.create();
276     ListenableFuture<Void> unused =
277         serializer.submitAsync(
278             new AsyncCallable<Void>() {
279               @Override
280               public ListenableFuture<Void> call() {
281                 return settableFuture;
282               }
283             },
284             directExecutor());
285     for (int i = 0; i < 25_000; i++) {
286       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
287       unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
288     }
289     ListenableFuture<Integer> stackDepthCheck =
290         serializer.submit(
291             new Callable<Integer>() {
292               @Override
293               public Integer call() {
294                 return Thread.currentThread().getStackTrace().length;
295               }
296             },
297             directExecutor());
298     settableFuture.set(null);
299     assertThat(getDone(stackDepthCheck))
300         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
301   }
302 
303   private static Function<Integer, Integer> add(final int delta) {
304     return new Function<Integer, Integer>() {
305       @Override
306       public Integer apply(Integer input) {
307         return input + delta;
308       }
309     };
310   }
311 
312   private static AsyncCallable<Integer> asyncAdd(
313       final ListenableFuture<Integer> future, final int delta, final Executor executor) {
314     return new AsyncCallable<Integer>() {
315       @Override
316       public ListenableFuture<Integer> call() throws Exception {
317         return Futures.transform(future, add(delta), executor);
318       }
319     };
320   }
321 
322   private static final class LongHolder {
323     long count;
324   }
325 
326   private static final int ITERATION_COUNT = 50_000;
327   private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
328 
329   @GwtIncompatible // threads
330 
331   public void testAvoidsStackOverflow_multipleThreads() throws Exception {
332     final LongHolder holder = new LongHolder();
333     final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
334     final List<Integer> completeLengthChecks;
335     final int baseStackDepth;
336     ExecutorService service = Executors.newFixedThreadPool(5);
337     try {
338       // Avoid counting frames from the executor itself, or the ExecutionSequencer
339       baseStackDepth =
340           serializer
341               .submit(
342                   new Callable<Integer>() {
343                     @Override
344                     public Integer call() {
345                       return Thread.currentThread().getStackTrace().length;
346                     }
347                   },
348                   service)
349               .get();
350       final SettableFuture<Void> settableFuture = SettableFuture.create();
351       ListenableFuture<?> unused =
352           serializer.submitAsync(
353               new AsyncCallable<Void>() {
354                 @Override
355                 public ListenableFuture<Void> call() {
356                   return settableFuture;
357                 }
358               },
359               directExecutor());
360       for (int i = 0; i < 50_000; i++) {
361         if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
362           // after some number of iterations, switch threads
363           unused =
364               serializer.submit(
365                   new Callable<Void>() {
366                     @Override
367                     public Void call() {
368                       holder.count++;
369                       return null;
370                     }
371                   },
372                   service);
373         } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
374           // When at max depth, record stack trace depth
375           lengthChecks.add(
376               serializer.submit(
377                   new Callable<Integer>() {
378                     @Override
379                     public Integer call() {
380                       holder.count++;
381                       return Thread.currentThread().getStackTrace().length;
382                     }
383                   },
384                   directExecutor()));
385         } else {
386           // Otherwise, schedule a task on directExecutor
387           unused =
388               serializer.submit(
389                   new Callable<Void>() {
390                     @Override
391                     public Void call() {
392                       holder.count++;
393                       return null;
394                     }
395                   },
396                   directExecutor());
397         }
398       }
399       settableFuture.set(null);
400       completeLengthChecks = allAsList(lengthChecks).get();
401     } finally {
402       service.shutdown();
403     }
404     assertThat(holder.count).isEqualTo(ITERATION_COUNT);
405     for (int length : completeLengthChecks) {
406       // Verify that at max depth, less than one stack frame per submitted task was consumed
407       assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
408     }
409   }
410 
411   @SuppressWarnings("ObjectToString") // Intended behavior
412   public void testToString() {
413     Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
414     TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
415     Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
416     assertThat(secondCallable.called).isFalse();
417     assertThat(second.toString()).contains(secondCallable.toString());
418     firstFuture.set(null);
419     assertThat(second.toString()).contains(secondCallable.future.toString());
420   }
421 
422   private static class BlockingCallable implements Callable<Void> {
423     private final CountDownLatch startLatch = new CountDownLatch(1);
424     private final CountDownLatch stopLatch = new CountDownLatch(1);
425 
426     private volatile boolean running = false;
427 
428     @Override
429     public Void call() throws InterruptedException {
430       running = true;
431       startLatch.countDown();
432       stopLatch.await();
433       running = false;
434       return null;
435     }
436 
437     public void waitForStart() throws InterruptedException {
438       startLatch.await();
439     }
440 
441     public void stop() {
442       stopLatch.countDown();
443     }
444 
445     public boolean isRunning() {
446       return running;
447     }
448   }
449 
450   private static final class TestCallable implements AsyncCallable<Void> {
451 
452     private final ListenableFuture<Void> future;
453     private boolean called = false;
454 
455     private TestCallable(ListenableFuture<Void> future) {
456       this.future = future;
457     }
458 
459     @Override
460     public ListenableFuture<Void> call() throws Exception {
461       called = true;
462       return future;
463     }
464   }
465 }
466