• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.truth.Truth.assertThat;
18 import static com.google.common.util.concurrent.Futures.allAsList;
19 import static com.google.common.util.concurrent.Futures.getDone;
20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21 import static java.util.concurrent.TimeUnit.SECONDS;
22 
23 import com.google.common.annotations.GwtIncompatible;
24 import com.google.common.annotations.J2ktIncompatible;
25 import com.google.common.base.Function;
26 import com.google.common.testing.GcFinalization;
27 import com.google.common.testing.TestLogHandler;
28 import com.google.j2objc.annotations.J2ObjCIncompatible;
29 import java.lang.ref.WeakReference;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.logging.Logger;
40 import junit.framework.TestCase;
41 import org.checkerframework.checker.nullness.qual.Nullable;
42 
43 /** Tests for {@link ExecutionSequencer} */
44 public class ExecutionSequencerTest extends TestCase {
45 
46   ExecutorService executor;
47 
48   private ExecutionSequencer serializer;
49   private SettableFuture<@Nullable Void> firstFuture;
50   private TestCallable firstCallable;
51 
52   @Override
setUp()53   public void setUp() throws Exception {
54     executor = Executors.newCachedThreadPool();
55     serializer = ExecutionSequencer.create();
56     firstFuture = SettableFuture.create();
57     firstCallable = new TestCallable(firstFuture);
58   }
59 
60   @Override
tearDown()61   public void tearDown() throws Exception {
62     executor.shutdown();
63   }
64 
testCallableStartsAfterFirstFutureCompletes()65   public void testCallableStartsAfterFirstFutureCompletes() {
66     @SuppressWarnings({"unused", "nullness"})
67     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
68     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
69     @SuppressWarnings({"unused", "nullness"})
70     Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
71     assertThat(firstCallable.called).isTrue();
72     assertThat(secondCallable.called).isFalse();
73     firstFuture.set(null);
74     assertThat(secondCallable.called).isTrue();
75   }
76 
testCancellationDoesNotViolateSerialization()77   public void testCancellationDoesNotViolateSerialization() {
78     @SuppressWarnings({"unused", "nullness"})
79     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
80     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
81     ListenableFuture<@Nullable Void> secondFuture =
82         serializer.submitAsync(secondCallable, directExecutor());
83     TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
84     @SuppressWarnings({"unused", "nullness"})
85     Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
86     secondFuture.cancel(true);
87     assertThat(secondCallable.called).isFalse();
88     assertThat(thirdCallable.called).isFalse();
89     firstFuture.set(null);
90     assertThat(secondCallable.called).isFalse();
91     assertThat(thirdCallable.called).isTrue();
92   }
93 
testCancellationMultipleThreads()94   public void testCancellationMultipleThreads() throws Exception {
95     final BlockingCallable blockingCallable = new BlockingCallable();
96     ListenableFuture<@Nullable Void> unused = serializer.submit(blockingCallable, executor);
97     ListenableFuture<Boolean> future2 =
98         serializer.submit(
99             new Callable<Boolean>() {
100               @Override
101               public Boolean call() {
102                 return blockingCallable.isRunning();
103               }
104             },
105             directExecutor());
106 
107     // Wait for the first task to be started in the background. It will block until we explicitly
108     // stop it.
109     blockingCallable.waitForStart();
110 
111     // Give the second task a chance to (incorrectly) start up while the first task is running.
112     assertThat(future2.isDone()).isFalse();
113 
114     // Stop the first task. The second task should then run.
115     blockingCallable.stop();
116     executor.shutdown();
117     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
118     assertThat(getDone(future2)).isFalse();
119   }
120 
testSecondTaskWaitsForFirstEvenIfCancelled()121   public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
122     final BlockingCallable blockingCallable = new BlockingCallable();
123     ListenableFuture<@Nullable Void> future1 = serializer.submit(blockingCallable, executor);
124     ListenableFuture<Boolean> future2 =
125         serializer.submit(
126             new Callable<Boolean>() {
127               @Override
128               public Boolean call() {
129                 return blockingCallable.isRunning();
130               }
131             },
132             directExecutor());
133 
134     // Wait for the first task to be started in the background. It will block until we explicitly
135     // stop it.
136     blockingCallable.waitForStart();
137 
138     // This time, cancel the future for the first task. The task remains running, only the future
139     // is cancelled.
140     future1.cancel(false);
141 
142     // Give the second task a chance to (incorrectly) start up while the first task is running.
143     // (This is the assertion that fails.)
144     assertThat(future2.isDone()).isFalse();
145 
146     // Stop the first task. The second task should then run.
147     blockingCallable.stop();
148     executor.shutdown();
149     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
150     assertThat(getDone(future2)).isFalse();
151   }
152 
153   @J2ktIncompatible
154   @GwtIncompatible
155   @J2ObjCIncompatible // gc
156   @AndroidIncompatible
testCancellationWithReferencedObject()157   public void testCancellationWithReferencedObject() throws Exception {
158     Object toBeGCed = new Object();
159     WeakReference<Object> ref = new WeakReference<>(toBeGCed);
160     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
161     ListenableFuture<?> ignored =
162         serializer.submitAsync(
163             new AsyncCallable<@Nullable Void>() {
164               @Override
165               public ListenableFuture<@Nullable Void> call() {
166                 return settableFuture;
167               }
168             },
169             directExecutor());
170     serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
171     toBeGCed = null;
172     GcFinalization.awaitClear(ref);
173   }
174 
toStringCallable(final Object object)175   private static Callable<String> toStringCallable(final Object object) {
176     return new Callable<String>() {
177       @Override
178       public String call() {
179         return object.toString();
180       }
181     };
182   }
183 
184   public void testCancellationDuringReentrancy() throws Exception {
185     TestLogHandler logHandler = new TestLogHandler();
186     Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
187 
188     List<Future<?>> results = new ArrayList<>();
189     final Runnable[] manualExecutorTask = new Runnable[1];
190     Executor manualExecutor =
191         new Executor() {
192           @Override
193           public void execute(Runnable task) {
194             manualExecutorTask[0] = task;
195           }
196         };
197 
198     results.add(serializer.submit(Callables.returning(null), manualExecutor));
199     final Future<?>[] thingToCancel = new Future<?>[1];
200     results.add(
201         serializer.submit(
202             new Callable<@Nullable Void>() {
203               @Override
204               public @Nullable Void call() {
205                 thingToCancel[0].cancel(false);
206                 return null;
207               }
208             },
209             directExecutor()));
210     thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
211     results.add(thingToCancel[0]);
212     // Enqueue more than enough tasks to force reentrancy.
213     for (int i = 0; i < 5; i++) {
214       results.add(serializer.submit(Callables.returning(null), directExecutor()));
215     }
216 
217     manualExecutorTask[0].run();
218 
219     for (Future<?> result : results) {
220       if (!result.isCancelled()) {
221         result.get(10, SECONDS);
222       }
223       // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
224     }
225 
226     assertThat(logHandler.getStoredLogRecords()).isEmpty();
227   }
228 
229   public void testAvoidsStackOverflow_manySubmitted() throws Exception {
230     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
231     ArrayList<ListenableFuture<@Nullable Void>> results = new ArrayList<>(50_001);
232     results.add(
233         serializer.submitAsync(
234             new AsyncCallable<@Nullable Void>() {
235               @Override
236               public ListenableFuture<@Nullable Void> call() {
237                 return settableFuture;
238               }
239             },
240             directExecutor()));
241     for (int i = 0; i < 50_000; i++) {
242       results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
243     }
244     settableFuture.set(null);
245     getDone(allAsList(results));
246   }
247 
248   public void testAvoidsStackOverflow_manyCancelled() throws Exception {
249     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
250     ListenableFuture<@Nullable Void> unused =
251         serializer.submitAsync(
252             new AsyncCallable<@Nullable Void>() {
253               @Override
254               public ListenableFuture<@Nullable Void> call() {
255                 return settableFuture;
256               }
257             },
258             directExecutor());
259     for (int i = 0; i < 50_000; i++) {
260       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
261     }
262     ListenableFuture<Integer> stackDepthCheck =
263         serializer.submit(
264             new Callable<Integer>() {
265               @Override
266               public Integer call() {
267                 return Thread.currentThread().getStackTrace().length;
268               }
269             },
270             directExecutor());
271     settableFuture.set(null);
272     assertThat(getDone(stackDepthCheck))
273         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
274   }
275 
276   public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
277     final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
278     ListenableFuture<@Nullable Void> unused =
279         serializer.submitAsync(
280             new AsyncCallable<@Nullable Void>() {
281               @Override
282               public ListenableFuture<@Nullable Void> call() {
283                 return settableFuture;
284               }
285             },
286             directExecutor());
287     for (int i = 0; i < 25_000; i++) {
288       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
289       unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
290     }
291     ListenableFuture<Integer> stackDepthCheck =
292         serializer.submit(
293             new Callable<Integer>() {
294               @Override
295               public Integer call() {
296                 return Thread.currentThread().getStackTrace().length;
297               }
298             },
299             directExecutor());
300     settableFuture.set(null);
301     assertThat(getDone(stackDepthCheck))
302         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
303   }
304 
305   private static Function<Integer, Integer> add(final int delta) {
306     return new Function<Integer, Integer>() {
307       @Override
308       public Integer apply(Integer input) {
309         return input + delta;
310       }
311     };
312   }
313 
314   private static AsyncCallable<Integer> asyncAdd(
315       final ListenableFuture<Integer> future, final int delta, final Executor executor) {
316     return new AsyncCallable<Integer>() {
317       @Override
318       public ListenableFuture<Integer> call() throws Exception {
319         return Futures.transform(future, add(delta), executor);
320       }
321     };
322   }
323 
324   private static final class LongHolder {
325     long count;
326   }
327 
328   private static final int ITERATION_COUNT = 50_000;
329   private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
330 
331   @J2ktIncompatible
332   @GwtIncompatible // threads
333   public void testAvoidsStackOverflow_multipleThreads() throws Exception {
334     final LongHolder holder = new LongHolder();
335     final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
336     final List<Integer> completeLengthChecks;
337     final int baseStackDepth;
338     ExecutorService service = Executors.newFixedThreadPool(5);
339     try {
340       // Avoid counting frames from the executor itself, or the ExecutionSequencer
341       baseStackDepth =
342           serializer
343               .submit(
344                   new Callable<Integer>() {
345                     @Override
346                     public Integer call() {
347                       return Thread.currentThread().getStackTrace().length;
348                     }
349                   },
350                   service)
351               .get();
352       final SettableFuture<@Nullable Void> settableFuture = SettableFuture.create();
353       ListenableFuture<?> unused =
354           serializer.submitAsync(
355               new AsyncCallable<@Nullable Void>() {
356                 @Override
357                 public ListenableFuture<@Nullable Void> call() {
358                   return settableFuture;
359                 }
360               },
361               directExecutor());
362       for (int i = 0; i < 50_000; i++) {
363         if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
364           // after some number of iterations, switch threads
365           unused =
366               serializer.submit(
367                   new Callable<@Nullable Void>() {
368                     @Override
369                     public @Nullable Void call() {
370                       holder.count++;
371                       return null;
372                     }
373                   },
374                   service);
375         } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
376           // When at max depth, record stack trace depth
377           lengthChecks.add(
378               serializer.submit(
379                   new Callable<Integer>() {
380                     @Override
381                     public Integer call() {
382                       holder.count++;
383                       return Thread.currentThread().getStackTrace().length;
384                     }
385                   },
386                   directExecutor()));
387         } else {
388           // Otherwise, schedule a task on directExecutor
389           unused =
390               serializer.submit(
391                   new Callable<@Nullable Void>() {
392                     @Override
393                     public @Nullable Void call() {
394                       holder.count++;
395                       return null;
396                     }
397                   },
398                   directExecutor());
399         }
400       }
401       settableFuture.set(null);
402       completeLengthChecks = allAsList(lengthChecks).get();
403     } finally {
404       service.shutdown();
405     }
406     assertThat(holder.count).isEqualTo(ITERATION_COUNT);
407     for (int length : completeLengthChecks) {
408       // Verify that at max depth, less than one stack frame per submitted task was consumed
409       assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
410     }
411   }
412 
413   @SuppressWarnings("ObjectToString") // Intended behavior
414   public void testToString() {
415     Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
416     TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
417     Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
418     assertThat(secondCallable.called).isFalse();
419     assertThat(second.toString()).contains(secondCallable.toString());
420     firstFuture.set(null);
421     assertThat(second.toString()).contains(secondCallable.future.toString());
422   }
423 
424   private static class BlockingCallable implements Callable<@Nullable Void> {
425     private final CountDownLatch startLatch = new CountDownLatch(1);
426     private final CountDownLatch stopLatch = new CountDownLatch(1);
427 
428     private volatile boolean running = false;
429 
430     @Override
431     public @Nullable Void call() throws InterruptedException {
432       running = true;
433       startLatch.countDown();
434       stopLatch.await();
435       running = false;
436       return null;
437     }
438 
439     public void waitForStart() throws InterruptedException {
440       startLatch.await();
441     }
442 
443     public void stop() {
444       stopLatch.countDown();
445     }
446 
447     public boolean isRunning() {
448       return running;
449     }
450   }
451 
452   private static final class TestCallable implements AsyncCallable<@Nullable Void> {
453 
454     private final ListenableFuture<@Nullable Void> future;
455     private boolean called = false;
456 
457     private TestCallable(ListenableFuture<@Nullable Void> future) {
458       this.future = future;
459     }
460 
461     @Override
462     public ListenableFuture<@Nullable Void> call() throws Exception {
463       called = true;
464       return future;
465     }
466   }
467 }
468