• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.truth.Truth.assertThat;
18 import static com.google.common.util.concurrent.Futures.allAsList;
19 import static com.google.common.util.concurrent.Futures.getDone;
20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21 import static java.util.concurrent.TimeUnit.SECONDS;
22 
23 import com.google.common.annotations.GwtIncompatible;
24 import com.google.common.base.Function;
25 import com.google.common.testing.GcFinalization;
26 import com.google.common.testing.TestLogHandler;
27 import com.google.j2objc.annotations.J2ObjCIncompatible;
28 import java.lang.ref.WeakReference;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Logger;
39 import junit.framework.TestCase;
40 
41 /** Tests for {@link ExecutionSequencer} */
42 public class ExecutionSequencerTest extends TestCase {
43 
44   ExecutorService executor;
45 
46   private ExecutionSequencer serializer;
47   private SettableFuture<Void> firstFuture;
48   private TestCallable firstCallable;
49 
50   @Override
setUp()51   public void setUp() throws Exception {
52     executor = Executors.newCachedThreadPool();
53     serializer = ExecutionSequencer.create();
54     firstFuture = SettableFuture.create();
55     firstCallable = new TestCallable(firstFuture);
56   }
57 
58   @Override
tearDown()59   public void tearDown() throws Exception {
60     executor.shutdown();
61   }
62 
testCallableStartsAfterFirstFutureCompletes()63   public void testCallableStartsAfterFirstFutureCompletes() {
64     @SuppressWarnings({"unused", "nullness"})
65     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
66     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
67     @SuppressWarnings({"unused", "nullness"})
68     Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
69     assertThat(firstCallable.called).isTrue();
70     assertThat(secondCallable.called).isFalse();
71     firstFuture.set(null);
72     assertThat(secondCallable.called).isTrue();
73   }
74 
testCancellationDoesNotViolateSerialization()75   public void testCancellationDoesNotViolateSerialization() {
76     @SuppressWarnings({"unused", "nullness"})
77     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
78     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
79     ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
80     TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
81     @SuppressWarnings({"unused", "nullness"})
82     Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
83     secondFuture.cancel(true);
84     assertThat(secondCallable.called).isFalse();
85     assertThat(thirdCallable.called).isFalse();
86     firstFuture.set(null);
87     assertThat(secondCallable.called).isFalse();
88     assertThat(thirdCallable.called).isTrue();
89   }
90 
testCancellationMultipleThreads()91   public void testCancellationMultipleThreads() throws Exception {
92     final BlockingCallable blockingCallable = new BlockingCallable();
93     ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
94     ListenableFuture<Boolean> future2 =
95         serializer.submit(
96             new Callable<Boolean>() {
97               @Override
98               public Boolean call() {
99                 return blockingCallable.isRunning();
100               }
101             },
102             directExecutor());
103 
104     // Wait for the first task to be started in the background. It will block until we explicitly
105     // stop it.
106     blockingCallable.waitForStart();
107 
108     // Give the second task a chance to (incorrectly) start up while the first task is running.
109     assertThat(future2.isDone()).isFalse();
110 
111     // Stop the first task. The second task should then run.
112     blockingCallable.stop();
113     executor.shutdown();
114     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
115     assertThat(getDone(future2)).isFalse();
116   }
117 
testSecondTaskWaitsForFirstEvenIfCancelled()118   public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
119     final BlockingCallable blockingCallable = new BlockingCallable();
120     ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
121     ListenableFuture<Boolean> future2 =
122         serializer.submit(
123             new Callable<Boolean>() {
124               @Override
125               public Boolean call() {
126                 return blockingCallable.isRunning();
127               }
128             },
129             directExecutor());
130 
131     // Wait for the first task to be started in the background. It will block until we explicitly
132     // stop it.
133     blockingCallable.waitForStart();
134 
135     // This time, cancel the future for the first task. The task remains running, only the future
136     // is cancelled.
137     future1.cancel(false);
138 
139     // Give the second task a chance to (incorrectly) start up while the first task is running.
140     // (This is the assertion that fails.)
141     assertThat(future2.isDone()).isFalse();
142 
143     // Stop the first task. The second task should then run.
144     blockingCallable.stop();
145     executor.shutdown();
146     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
147     assertThat(getDone(future2)).isFalse();
148   }
149 
150   @GwtIncompatible
151   @J2ObjCIncompatible // gc
152   @AndroidIncompatible
testCancellationWithReferencedObject()153   public void testCancellationWithReferencedObject() throws Exception {
154     Object toBeGCed = new Object();
155     WeakReference<Object> ref = new WeakReference<>(toBeGCed);
156     final SettableFuture<Void> settableFuture = SettableFuture.create();
157     ListenableFuture<?> ignored =
158         serializer.submitAsync(
159             new AsyncCallable<Void>() {
160               @Override
161               public ListenableFuture<Void> call() {
162                 return settableFuture;
163               }
164             },
165             directExecutor());
166     serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
167     toBeGCed = null;
168     GcFinalization.awaitClear(ref);
169   }
170 
toStringCallable(final Object object)171   private static Callable<String> toStringCallable(final Object object) {
172     return new Callable<String>() {
173       @Override
174       public String call() {
175         return object.toString();
176       }
177     };
178   }
179 
180   public void testCancellationDuringReentrancy() throws Exception {
181     TestLogHandler logHandler = new TestLogHandler();
182     Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
183 
184     List<Future<?>> results = new ArrayList<>();
185     final Runnable[] manualExecutorTask = new Runnable[1];
186     Executor manualExecutor =
187         new Executor() {
188           @Override
189           public void execute(Runnable task) {
190             manualExecutorTask[0] = task;
191           }
192         };
193 
194     results.add(serializer.submit(Callables.returning(null), manualExecutor));
195     final Future<?>[] thingToCancel = new Future<?>[1];
196     results.add(
197         serializer.submit(
198             new Callable<Void>() {
199               @Override
200               public Void call() {
201                 thingToCancel[0].cancel(false);
202                 return null;
203               }
204             },
205             directExecutor()));
206     thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
207     results.add(thingToCancel[0]);
208     // Enqueue more than enough tasks to force reentrancy.
209     for (int i = 0; i < 5; i++) {
210       results.add(serializer.submit(Callables.returning(null), directExecutor()));
211     }
212 
213     manualExecutorTask[0].run();
214 
215     for (Future<?> result : results) {
216       if (!result.isCancelled()) {
217         result.get(10, SECONDS);
218       }
219       // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
220     }
221 
222     assertThat(logHandler.getStoredLogRecords()).isEmpty();
223   }
224 
225   public void testAvoidsStackOverflow_manySubmitted() throws Exception {
226     final SettableFuture<Void> settableFuture = SettableFuture.create();
227     ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001);
228     results.add(
229         serializer.submitAsync(
230             new AsyncCallable<Void>() {
231               @Override
232               public ListenableFuture<Void> call() {
233                 return settableFuture;
234               }
235             },
236             directExecutor()));
237     for (int i = 0; i < 50_000; i++) {
238       results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
239     }
240     settableFuture.set(null);
241     getDone(allAsList(results));
242   }
243 
244   public void testAvoidsStackOverflow_manyCancelled() throws Exception {
245     final SettableFuture<Void> settableFuture = SettableFuture.create();
246     ListenableFuture<Void> unused =
247         serializer.submitAsync(
248             new AsyncCallable<Void>() {
249               @Override
250               public ListenableFuture<Void> call() {
251                 return settableFuture;
252               }
253             },
254             directExecutor());
255     for (int i = 0; i < 50_000; i++) {
256       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
257     }
258     ListenableFuture<Integer> stackDepthCheck =
259         serializer.submit(
260             new Callable<Integer>() {
261               @Override
262               public Integer call() {
263                 return Thread.currentThread().getStackTrace().length;
264               }
265             },
266             directExecutor());
267     settableFuture.set(null);
268     assertThat(getDone(stackDepthCheck))
269         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
270   }
271 
272   public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
273     final SettableFuture<Void> settableFuture = SettableFuture.create();
274     ListenableFuture<Void> unused =
275         serializer.submitAsync(
276             new AsyncCallable<Void>() {
277               @Override
278               public ListenableFuture<Void> call() {
279                 return settableFuture;
280               }
281             },
282             directExecutor());
283     for (int i = 0; i < 25_000; i++) {
284       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
285       unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
286     }
287     ListenableFuture<Integer> stackDepthCheck =
288         serializer.submit(
289             new Callable<Integer>() {
290               @Override
291               public Integer call() {
292                 return Thread.currentThread().getStackTrace().length;
293               }
294             },
295             directExecutor());
296     settableFuture.set(null);
297     assertThat(getDone(stackDepthCheck))
298         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
299   }
300 
301   private static Function<Integer, Integer> add(final int delta) {
302     return new Function<Integer, Integer>() {
303       @Override
304       public Integer apply(Integer input) {
305         return input + delta;
306       }
307     };
308   }
309 
310   private static AsyncCallable<Integer> asyncAdd(
311       final ListenableFuture<Integer> future, final int delta, final Executor executor) {
312     return new AsyncCallable<Integer>() {
313       @Override
314       public ListenableFuture<Integer> call() throws Exception {
315         return Futures.transform(future, add(delta), executor);
316       }
317     };
318   }
319 
320   private static final class LongHolder {
321     long count;
322   }
323 
324   private static final int ITERATION_COUNT = 50_000;
325   private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
326 
327   @GwtIncompatible // threads
328   public void testAvoidsStackOverflow_multipleThreads() throws Exception {
329     final LongHolder holder = new LongHolder();
330     final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
331     final List<Integer> completeLengthChecks;
332     final int baseStackDepth;
333     ExecutorService service = Executors.newFixedThreadPool(5);
334     try {
335       // Avoid counting frames from the executor itself, or the ExecutionSequencer
336       baseStackDepth =
337           serializer
338               .submit(
339                   new Callable<Integer>() {
340                     @Override
341                     public Integer call() {
342                       return Thread.currentThread().getStackTrace().length;
343                     }
344                   },
345                   service)
346               .get();
347       final SettableFuture<Void> settableFuture = SettableFuture.create();
348       ListenableFuture<?> unused =
349           serializer.submitAsync(
350               new AsyncCallable<Void>() {
351                 @Override
352                 public ListenableFuture<Void> call() {
353                   return settableFuture;
354                 }
355               },
356               directExecutor());
357       for (int i = 0; i < 50_000; i++) {
358         if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
359           // after some number of iterations, switch threads
360           unused =
361               serializer.submit(
362                   new Callable<Void>() {
363                     @Override
364                     public Void call() {
365                       holder.count++;
366                       return null;
367                     }
368                   },
369                   service);
370         } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
371           // When at max depth, record stack trace depth
372           lengthChecks.add(
373               serializer.submit(
374                   new Callable<Integer>() {
375                     @Override
376                     public Integer call() {
377                       holder.count++;
378                       return Thread.currentThread().getStackTrace().length;
379                     }
380                   },
381                   directExecutor()));
382         } else {
383           // Otherwise, schedule a task on directExecutor
384           unused =
385               serializer.submit(
386                   new Callable<Void>() {
387                     @Override
388                     public Void call() {
389                       holder.count++;
390                       return null;
391                     }
392                   },
393                   directExecutor());
394         }
395       }
396       settableFuture.set(null);
397       completeLengthChecks = allAsList(lengthChecks).get();
398     } finally {
399       service.shutdown();
400     }
401     assertThat(holder.count).isEqualTo(ITERATION_COUNT);
402     for (int length : completeLengthChecks) {
403       // Verify that at max depth, less than one stack frame per submitted task was consumed
404       assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
405     }
406   }
407 
408   @SuppressWarnings("ObjectToString") // Intended behavior
409   public void testToString() {
410     Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
411     TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
412     Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
413     assertThat(secondCallable.called).isFalse();
414     assertThat(second.toString()).contains(secondCallable.toString());
415     firstFuture.set(null);
416     assertThat(second.toString()).contains(secondCallable.future.toString());
417   }
418 
419   private static class BlockingCallable implements Callable<Void> {
420     private final CountDownLatch startLatch = new CountDownLatch(1);
421     private final CountDownLatch stopLatch = new CountDownLatch(1);
422 
423     private volatile boolean running = false;
424 
425     @Override
426     public Void call() throws InterruptedException {
427       running = true;
428       startLatch.countDown();
429       stopLatch.await();
430       running = false;
431       return null;
432     }
433 
434     public void waitForStart() throws InterruptedException {
435       startLatch.await();
436     }
437 
438     public void stop() {
439       stopLatch.countDown();
440     }
441 
442     public boolean isRunning() {
443       return running;
444     }
445   }
446 
447   private static final class TestCallable implements AsyncCallable<Void> {
448 
449     private final ListenableFuture<Void> future;
450     private boolean called = false;
451 
452     private TestCallable(ListenableFuture<Void> future) {
453       this.future = future;
454     }
455 
456     @Override
457     public ListenableFuture<Void> call() throws Exception {
458       called = true;
459       return future;
460     }
461   }
462 }
463