• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.truth.Truth.assertThat;
18 import static com.google.common.util.concurrent.Futures.allAsList;
19 import static com.google.common.util.concurrent.Futures.getDone;
20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21 import static java.util.concurrent.TimeUnit.SECONDS;
22 
23 import com.google.common.annotations.GwtIncompatible;
24 import com.google.common.base.Function;
25 import com.google.common.testing.GcFinalization;
26 import com.google.common.testing.TestLogHandler;
27 import com.google.j2objc.annotations.J2ObjCIncompatible;
28 import java.lang.ref.WeakReference;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Logger;
39 import junit.framework.TestCase;
40 import org.checkerframework.checker.nullness.qual.Nullable;
41 
42 /** Tests for {@link ExecutionSequencer} */
43 public class ExecutionSequencerTest extends TestCase {
44 
45   ExecutorService executor;
46 
47   private ExecutionSequencer serializer;
48   private SettableFuture<@Nullable Void> firstFuture;
49   private TestCallable firstCallable;
50 
51   @Override
setUp()52   public void setUp() throws Exception {
53     executor = Executors.newCachedThreadPool();
54     serializer = ExecutionSequencer.create();
55     firstFuture = SettableFuture.create();
56     firstCallable = new TestCallable(firstFuture);
57   }
58 
59   @Override
tearDown()60   public void tearDown() throws Exception {
61     executor.shutdown();
62   }
63 
testCallableStartsAfterFirstFutureCompletes()64   public void testCallableStartsAfterFirstFutureCompletes() {
65     @SuppressWarnings({"unused", "nullness"})
66     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
67     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
68     @SuppressWarnings({"unused", "nullness"})
69     Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
70     assertThat(firstCallable.called).isTrue();
71     assertThat(secondCallable.called).isFalse();
72     firstFuture.set(null);
73     assertThat(secondCallable.called).isTrue();
74   }
75 
testCancellationDoesNotViolateSerialization()76   public void testCancellationDoesNotViolateSerialization() {
77     @SuppressWarnings({"unused", "nullness"})
78     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
79     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
80     ListenableFuture<@Nullable Void> secondFuture =
81         serializer.submitAsync(secondCallable, directExecutor());
82     TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
83     @SuppressWarnings({"unused", "nullness"})
84     Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
85     secondFuture.cancel(true);
86     assertThat(secondCallable.called).isFalse();
87     assertThat(thirdCallable.called).isFalse();
88     firstFuture.set(null);
89     assertThat(secondCallable.called).isFalse();
90     assertThat(thirdCallable.called).isTrue();
91   }
92 
testCancellationMultipleThreads()93   public void testCancellationMultipleThreads() throws Exception {
94     final BlockingCallable blockingCallable = new BlockingCallable();
95     ListenableFuture<@Nullable Void> unused = serializer.submit(blockingCallable, executor);
96     ListenableFuture<Boolean> future2 =
97         serializer.submit(
98             new Callable<Boolean>() {
99               @Override
100               public Boolean call() {
101                 return blockingCallable.isRunning();
102               }
103             },
104             directExecutor());
105 
106     // Wait for the first task to be started in the background. It will block until we explicitly
107     // stop it.
108     blockingCallable.waitForStart();
109 
110     // Give the second task a chance to (incorrectly) start up while the first task is running.
111     assertThat(future2.isDone()).isFalse();
112 
113     // Stop the first task. The second task should then run.
114     blockingCallable.stop();
115     executor.shutdown();
116     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
117     assertThat(getDone(future2)).isFalse();
118   }
119 
testSecondTaskWaitsForFirstEvenIfCancelled()120   public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
121     final BlockingCallable blockingCallable = new BlockingCallable();
122     ListenableFuture<@Nullable Void> future1 = serializer.submit(blockingCallable, executor);
123     ListenableFuture<Boolean> future2 =
124         serializer.submit(
125             new Callable<Boolean>() {
126               @Override
127               public Boolean call() {
128                 return blockingCallable.isRunning();
129               }
130             },
131             directExecutor());
132 
133     // Wait for the first task to be started in the background. It will block until we explicitly
134     // stop it.
135     blockingCallable.waitForStart();
136 
137     // This time, cancel the future for the first task. The task remains running, only the future
138     // is cancelled.
139     future1.cancel(false);
140 
141     // Give the second task a chance to (incorrectly) start up while the first task is running.
142     // (This is the assertion that fails.)
143     assertThat(future2.isDone()).isFalse();
144 
145     // Stop the first task. The second task should then run.
146     blockingCallable.stop();
147     executor.shutdown();
148     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
149     assertThat(getDone(future2)).isFalse();
150   }
151 
152   @GwtIncompatible
153   @J2ObjCIncompatible // gc
154   @AndroidIncompatible
testCancellationWithReferencedObject()155   public void testCancellationWithReferencedObject() throws Exception {
156     Object toBeGCed = new Object();
157     WeakReference<Object> ref = new WeakReference<>(toBeGCed);
158     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
159     ListenableFuture<?> ignored =
160         serializer.submitAsync(
161             new AsyncCallable<@Nullable Void>() {
162               @Override
163               public ListenableFuture<@Nullable Void> call() {
164                 return settableFuture;
165               }
166             },
167             directExecutor());
168     serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
169     toBeGCed = null;
170     GcFinalization.awaitClear(ref);
171   }
172 
toStringCallable(final Object object)173   private static Callable<String> toStringCallable(final Object object) {
174     return new Callable<String>() {
175       @Override
176       public String call() {
177         return object.toString();
178       }
179     };
180   }
181 
182   public void testCancellationDuringReentrancy() throws Exception {
183     TestLogHandler logHandler = new TestLogHandler();
184     Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
185 
186     List<Future<?>> results = new ArrayList<>();
187     final Runnable[] manualExecutorTask = new Runnable[1];
188     Executor manualExecutor =
189         new Executor() {
190           @Override
191           public void execute(Runnable task) {
192             manualExecutorTask[0] = task;
193           }
194         };
195 
196     results.add(serializer.submit(Callables.returning(null), manualExecutor));
197     final Future<?>[] thingToCancel = new Future<?>[1];
198     results.add(
199         serializer.submit(
200             new Callable<@Nullable Void>() {
201               @Override
202               public @Nullable Void call() {
203                 thingToCancel[0].cancel(false);
204                 return null;
205               }
206             },
207             directExecutor()));
208     thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
209     results.add(thingToCancel[0]);
210     // Enqueue more than enough tasks to force reentrancy.
211     for (int i = 0; i < 5; i++) {
212       results.add(serializer.submit(Callables.returning(null), directExecutor()));
213     }
214 
215     manualExecutorTask[0].run();
216 
217     for (Future<?> result : results) {
218       if (!result.isCancelled()) {
219         result.get(10, SECONDS);
220       }
221       // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
222     }
223 
224     assertThat(logHandler.getStoredLogRecords()).isEmpty();
225   }
226 
227   public void testAvoidsStackOverflow_manySubmitted() throws Exception {
228     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
229     ArrayList<ListenableFuture<@Nullable Void>> results = new ArrayList<>(50_001);
230     results.add(
231         serializer.submitAsync(
232             new AsyncCallable<@Nullable Void>() {
233               @Override
234               public ListenableFuture<@Nullable Void> call() {
235                 return settableFuture;
236               }
237             },
238             directExecutor()));
239     for (int i = 0; i < 50_000; i++) {
240       results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
241     }
242     settableFuture.set(null);
243     getDone(allAsList(results));
244   }
245 
246   public void testAvoidsStackOverflow_manyCancelled() throws Exception {
247     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
248     ListenableFuture<@Nullable Void> unused =
249         serializer.submitAsync(
250             new AsyncCallable<@Nullable Void>() {
251               @Override
252               public ListenableFuture<@Nullable Void> call() {
253                 return settableFuture;
254               }
255             },
256             directExecutor());
257     for (int i = 0; i < 50_000; i++) {
258       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
259     }
260     ListenableFuture<Integer> stackDepthCheck =
261         serializer.submit(
262             new Callable<Integer>() {
263               @Override
264               public Integer call() {
265                 return Thread.currentThread().getStackTrace().length;
266               }
267             },
268             directExecutor());
269     settableFuture.set(null);
270     assertThat(getDone(stackDepthCheck))
271         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
272   }
273 
274   public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
275     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
276     ListenableFuture<@Nullable Void> unused =
277         serializer.submitAsync(
278             new AsyncCallable<@Nullable Void>() {
279               @Override
280               public ListenableFuture<@Nullable Void> call() {
281                 return settableFuture;
282               }
283             },
284             directExecutor());
285     for (int i = 0; i < 25_000; i++) {
286       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
287       unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
288     }
289     ListenableFuture<Integer> stackDepthCheck =
290         serializer.submit(
291             new Callable<Integer>() {
292               @Override
293               public Integer call() {
294                 return Thread.currentThread().getStackTrace().length;
295               }
296             },
297             directExecutor());
298     settableFuture.set(null);
299     assertThat(getDone(stackDepthCheck))
300         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
301   }
302 
303   private static Function<Integer, Integer> add(final int delta) {
304     return new Function<Integer, Integer>() {
305       @Override
306       public Integer apply(Integer input) {
307         return input + delta;
308       }
309     };
310   }
311 
312   private static AsyncCallable<Integer> asyncAdd(
313       final ListenableFuture<Integer> future, final int delta, final Executor executor) {
314     return new AsyncCallable<Integer>() {
315       @Override
316       public ListenableFuture<Integer> call() throws Exception {
317         return Futures.transform(future, add(delta), executor);
318       }
319     };
320   }
321 
322   private static final class LongHolder {
323     long count;
324   }
325 
326   private static final int ITERATION_COUNT = 50_000;
327   private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
328 
329   @GwtIncompatible // threads
330   public void testAvoidsStackOverflow_multipleThreads() throws Exception {
331     final LongHolder holder = new LongHolder();
332     final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
333     final List<Integer> completeLengthChecks;
334     final int baseStackDepth;
335     ExecutorService service = Executors.newFixedThreadPool(5);
336     try {
337       // Avoid counting frames from the executor itself, or the ExecutionSequencer
338       baseStackDepth =
339           serializer
340               .submit(
341                   new Callable<Integer>() {
342                     @Override
343                     public Integer call() {
344                       return Thread.currentThread().getStackTrace().length;
345                     }
346                   },
347                   service)
348               .get();
349       final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
350       ListenableFuture<?> unused =
351           serializer.submitAsync(
352               new AsyncCallable<@Nullable Void>() {
353                 @Override
354                 public ListenableFuture<@Nullable Void> call() {
355                   return settableFuture;
356                 }
357               },
358               directExecutor());
359       for (int i = 0; i < 50_000; i++) {
360         if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
361           // after some number of iterations, switch threads
362           unused =
363               serializer.submit(
364                   new Callable<@Nullable Void>() {
365                     @Override
366                     public @Nullable Void call() {
367                       holder.count++;
368                       return null;
369                     }
370                   },
371                   service);
372         } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
373           // When at max depth, record stack trace depth
374           lengthChecks.add(
375               serializer.submit(
376                   new Callable<Integer>() {
377                     @Override
378                     public Integer call() {
379                       holder.count++;
380                       return Thread.currentThread().getStackTrace().length;
381                     }
382                   },
383                   directExecutor()));
384         } else {
385           // Otherwise, schedule a task on directExecutor
386           unused =
387               serializer.submit(
388                   new Callable<@Nullable Void>() {
389                     @Override
390                     public @Nullable Void call() {
391                       holder.count++;
392                       return null;
393                     }
394                   },
395                   directExecutor());
396         }
397       }
398       settableFuture.set(null);
399       completeLengthChecks = allAsList(lengthChecks).get();
400     } finally {
401       service.shutdown();
402     }
403     assertThat(holder.count).isEqualTo(ITERATION_COUNT);
404     for (int length : completeLengthChecks) {
405       // Verify that at max depth, less than one stack frame per submitted task was consumed
406       assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
407     }
408   }
409 
410   @SuppressWarnings("ObjectToString") // Intended behavior
411   public void testToString() {
412     Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
413     TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
414     Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
415     assertThat(secondCallable.called).isFalse();
416     assertThat(second.toString()).contains(secondCallable.toString());
417     firstFuture.set(null);
418     assertThat(second.toString()).contains(secondCallable.future.toString());
419   }
420 
421   private static class BlockingCallable implements Callable<@Nullable Void> {
422     private final CountDownLatch startLatch = new CountDownLatch(1);
423     private final CountDownLatch stopLatch = new CountDownLatch(1);
424 
425     private volatile boolean running = false;
426 
427     @Override
428     public @Nullable Void call() throws InterruptedException {
429       running = true;
430       startLatch.countDown();
431       stopLatch.await();
432       running = false;
433       return null;
434     }
435 
436     public void waitForStart() throws InterruptedException {
437       startLatch.await();
438     }
439 
440     public void stop() {
441       stopLatch.countDown();
442     }
443 
444     public boolean isRunning() {
445       return running;
446     }
447   }
448 
449   private static final class TestCallable implements AsyncCallable<@Nullable Void> {
450 
451     private final ListenableFuture<@Nullable Void> future;
452     private boolean called = false;
453 
454     private TestCallable(ListenableFuture<@Nullable Void> future) {
455       this.future = future;
456     }
457 
458     @Override
459     public ListenableFuture<@Nullable Void> call() throws Exception {
460       called = true;
461       return future;
462     }
463   }
464 }
465