• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.MoreExecutors.newSequentialExecutor;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.Lists;
25 import com.google.common.collect.Queues;
26 import java.util.List;
27 import java.util.Queue;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.CyclicBarrier;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import junit.framework.TestCase;
40 
41 /**
42  * Tests {@link SequentialExecutor}.
43  *
44  * @author JJ Furman
45  */
46 public class SequentialExecutorTest extends TestCase {
47 
48   private static class FakeExecutor implements Executor {
49     Queue<Runnable> tasks = Queues.newArrayDeque();
50 
51     @Override
execute(Runnable command)52     public void execute(Runnable command) {
53       tasks.add(command);
54     }
55 
hasNext()56     boolean hasNext() {
57       return !tasks.isEmpty();
58     }
59 
runNext()60     void runNext() {
61       assertTrue("expected at least one task to run", hasNext());
62       tasks.remove().run();
63     }
64 
runAll()65     void runAll() {
66       while (hasNext()) {
67         runNext();
68       }
69     }
70   }
71 
72   private FakeExecutor fakePool;
73   private SequentialExecutor e;
74 
75   @Override
setUp()76   public void setUp() {
77     fakePool = new FakeExecutor();
78     e = new SequentialExecutor(fakePool);
79   }
80 
testConstructingWithNullExecutor_fails()81   public void testConstructingWithNullExecutor_fails() {
82     try {
83       new SequentialExecutor(null);
84       fail("Should have failed with NullPointerException.");
85     } catch (NullPointerException expected) {
86     }
87   }
88 
testBasics()89   public void testBasics() {
90     final AtomicInteger totalCalls = new AtomicInteger();
91     Runnable intCounter =
92         new Runnable() {
93           @Override
94           public void run() {
95             totalCalls.incrementAndGet();
96             // Make sure that no other tasks are scheduled to run while this is running.
97             assertFalse(fakePool.hasNext());
98           }
99         };
100 
101     assertFalse(fakePool.hasNext());
102     e.execute(intCounter);
103     // A task should have been scheduled
104     assertTrue(fakePool.hasNext());
105     e.execute(intCounter);
106     // Our executor hasn't run any tasks yet.
107     assertEquals(0, totalCalls.get());
108     fakePool.runAll();
109     assertEquals(2, totalCalls.get());
110     // Queue is empty so no runner should be scheduled.
111     assertFalse(fakePool.hasNext());
112 
113     // Check that execute can be safely repeated
114     e.execute(intCounter);
115     e.execute(intCounter);
116     e.execute(intCounter);
117     // No change yet.
118     assertEquals(2, totalCalls.get());
119     fakePool.runAll();
120     assertEquals(5, totalCalls.get());
121     assertFalse(fakePool.hasNext());
122   }
123 
testOrdering()124   public void testOrdering() {
125     final List<Integer> callOrder = Lists.newArrayList();
126 
127     class FakeOp implements Runnable {
128       final int op;
129 
130       FakeOp(int op) {
131         this.op = op;
132       }
133 
134       @Override
135       public void run() {
136         callOrder.add(op);
137       }
138     }
139 
140     e.execute(new FakeOp(0));
141     e.execute(new FakeOp(1));
142     e.execute(new FakeOp(2));
143     fakePool.runAll();
144 
145     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
146   }
147 
testRuntimeException_doesNotStopExecution()148   public void testRuntimeException_doesNotStopExecution() {
149 
150     final AtomicInteger numCalls = new AtomicInteger();
151 
152     Runnable runMe =
153         new Runnable() {
154           @Override
155           public void run() {
156             numCalls.incrementAndGet();
157             throw new RuntimeException("FAKE EXCEPTION!");
158           }
159         };
160 
161     e.execute(runMe);
162     e.execute(runMe);
163     fakePool.runAll();
164 
165     assertEquals(2, numCalls.get());
166   }
167 
testInterrupt_beforeRunRestoresInterruption()168   public void testInterrupt_beforeRunRestoresInterruption() throws Exception {
169     // Run a task on the composed Executor that interrupts its thread (i.e. this thread).
170     fakePool.execute(
171         new Runnable() {
172           @Override
173           public void run() {
174             Thread.currentThread().interrupt();
175           }
176         });
177     // Run a task that expects that it is not interrupted while it is running.
178     e.execute(
179         new Runnable() {
180           @Override
181           public void run() {
182             assertThat(Thread.currentThread().isInterrupted()).isFalse();
183           }
184         });
185 
186     // Run these together.
187     fakePool.runAll();
188 
189     // Check that this thread has been marked as interrupted again now that the thread has been
190     // returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose
191     // JUnit or some other test case.
192     assertThat(Thread.interrupted()).isTrue();
193   }
194 
testInterrupt_doesNotInterruptSubsequentTask()195   public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception {
196     // Run a task that interrupts its thread (i.e. this thread).
197     e.execute(
198         new Runnable() {
199           @Override
200           public void run() {
201             Thread.currentThread().interrupt();
202           }
203         });
204     // Run a task that expects that it is not interrupted while it is running.
205     e.execute(
206         new Runnable() {
207           @Override
208           public void run() {
209             assertThat(Thread.currentThread().isInterrupted()).isFalse();
210           }
211         });
212 
213     // Run those tasks together.
214     fakePool.runAll();
215 
216     // Check that the interruption of a SequentialExecutor's task is restored to the thread once
217     // it is yielded. Clear the bit while checking so that the test doesn't hose JUnit or some other
218     // test case.
219     assertThat(Thread.interrupted()).isTrue();
220   }
221 
testInterrupt_doesNotStopExecution()222   public void testInterrupt_doesNotStopExecution() {
223 
224     final AtomicInteger numCalls = new AtomicInteger();
225 
226     Runnable runMe =
227         new Runnable() {
228           @Override
229           public void run() {
230             numCalls.incrementAndGet();
231           }
232         };
233 
234     Thread.currentThread().interrupt();
235 
236     e.execute(runMe);
237     e.execute(runMe);
238     fakePool.runAll();
239 
240     assertEquals(2, numCalls.get());
241 
242     assertTrue(Thread.interrupted());
243   }
244 
testDelegateRejection()245   public void testDelegateRejection() {
246     final AtomicInteger numCalls = new AtomicInteger();
247     final AtomicBoolean reject = new AtomicBoolean(true);
248     final SequentialExecutor executor =
249         new SequentialExecutor(
250             new Executor() {
251               @Override
252               public void execute(Runnable r) {
253                 if (reject.get()) {
254                   throw new RejectedExecutionException();
255                 }
256                 r.run();
257               }
258             });
259     Runnable task =
260         new Runnable() {
261           @Override
262           public void run() {
263             numCalls.incrementAndGet();
264           }
265         };
266     try {
267       executor.execute(task);
268       fail();
269     } catch (RejectedExecutionException expected) {
270     }
271     assertEquals(0, numCalls.get());
272     reject.set(false);
273     executor.execute(task);
274     assertEquals(1, numCalls.get());
275   }
276 
277   /*
278    * Under Android, MyError propagates up and fails the test?
279    *
280    * TODO(b/218700094): Does this matter to prod users, or is it just a feature of our testing
281    * environment? If the latter, maybe write a custom Executor that avoids failing the test when it
282    * sees an Error?
283    */
284   @AndroidIncompatible
testTaskThrowsError()285   public void testTaskThrowsError() throws Exception {
286     class MyError extends Error {}
287     final CyclicBarrier barrier = new CyclicBarrier(2);
288     // we need to make sure the error gets thrown on a different thread.
289     ExecutorService service = Executors.newSingleThreadExecutor();
290     try {
291       final SequentialExecutor executor = new SequentialExecutor(service);
292       Runnable errorTask =
293           new Runnable() {
294             @Override
295             public void run() {
296               throw new MyError();
297             }
298           };
299       Runnable barrierTask =
300           new Runnable() {
301             @Override
302             public void run() {
303               try {
304                 barrier.await();
305               } catch (Exception e) {
306                 throw new RuntimeException(e);
307               }
308             }
309           };
310       executor.execute(errorTask);
311       service.execute(barrierTask); // submit directly to the service
312       // the barrier task runs after the error task so we know that the error has been observed by
313       // SequentialExecutor by the time the barrier is satified
314       barrier.await(1, TimeUnit.SECONDS);
315       executor.execute(barrierTask);
316       // timeout means the second task wasn't even tried
317       barrier.await(1, TimeUnit.SECONDS);
318     } finally {
319       service.shutdown();
320     }
321   }
322 
testRejectedExecutionThrownWithMultipleCalls()323   public void testRejectedExecutionThrownWithMultipleCalls() throws Exception {
324     final CountDownLatch latch = new CountDownLatch(1);
325     final SettableFuture<?> future = SettableFuture.create();
326     final Executor delegate =
327         new Executor() {
328           @Override
329           public void execute(Runnable task) {
330             if (future.set(null)) {
331               awaitUninterruptibly(latch);
332             }
333             throw new RejectedExecutionException();
334           }
335         };
336     final SequentialExecutor executor = new SequentialExecutor(delegate);
337     final ExecutorService blocked = Executors.newCachedThreadPool();
338     Future<?> first =
339         blocked.submit(
340             new Runnable() {
341               @Override
342               public void run() {
343                 executor.execute(Runnables.doNothing());
344               }
345             });
346     future.get(10, TimeUnit.SECONDS);
347     try {
348       executor.execute(Runnables.doNothing());
349       fail();
350     } catch (RejectedExecutionException expected) {
351     }
352     latch.countDown();
353     try {
354       first.get(10, TimeUnit.SECONDS);
355       fail();
356     } catch (ExecutionException expected) {
357       assertThat(expected).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
358     }
359   }
360 
testToString()361   public void testToString() {
362     final Runnable[] currentTask = new Runnable[1];
363     final Executor delegate =
364         new Executor() {
365           @Override
366           public void execute(Runnable task) {
367             currentTask[0] = task;
368             task.run();
369             currentTask[0] = null;
370           }
371 
372           @Override
373           public String toString() {
374             return "theDelegate";
375           }
376         };
377     Executor sequential1 = newSequentialExecutor(delegate);
378     Executor sequential2 = newSequentialExecutor(delegate);
379     assertThat(sequential1.toString()).contains("theDelegate");
380     assertThat(sequential1.toString()).isNotEqualTo(sequential2.toString());
381     final String[] whileRunningToString = new String[1];
382     sequential1.execute(
383         new Runnable() {
384           @Override
385           public void run() {
386             whileRunningToString[0] = "" + currentTask[0];
387           }
388 
389           @Override
390           public String toString() {
391             return "my runnable's toString";
392           }
393         });
394     assertThat(whileRunningToString[0]).contains("my runnable's toString");
395   }
396 }
397