• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.MoreExecutors.newSequentialExecutor;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.Lists;
25 import com.google.common.collect.Queues;
26 import java.util.List;
27 import java.util.Queue;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.CyclicBarrier;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import junit.framework.TestCase;
40 
41 /**
42  * Tests {@link SequentialExecutor}.
43  *
44  * @author JJ Furman
45  */
46 public class SequentialExecutorTest extends TestCase {
47 
48   private static class FakeExecutor implements Executor {
49     Queue<Runnable> tasks = Queues.newArrayDeque();
50 
51     @Override
execute(Runnable command)52     public void execute(Runnable command) {
53       tasks.add(command);
54     }
55 
hasNext()56     boolean hasNext() {
57       return !tasks.isEmpty();
58     }
59 
runNext()60     void runNext() {
61       assertTrue("expected at least one task to run", hasNext());
62       tasks.remove().run();
63     }
64 
runAll()65     void runAll() {
66       while (hasNext()) {
67         runNext();
68       }
69     }
70   }
71 
72   private FakeExecutor fakePool;
73   private SequentialExecutor e;
74 
75   @Override
setUp()76   public void setUp() {
77     fakePool = new FakeExecutor();
78     e = new SequentialExecutor(fakePool);
79   }
80 
testConstructingWithNullExecutor_fails()81   public void testConstructingWithNullExecutor_fails() {
82     try {
83       new SequentialExecutor(null);
84       fail("Should have failed with NullPointerException.");
85     } catch (NullPointerException expected) {
86     }
87   }
88 
testBasics()89   public void testBasics() {
90     final AtomicInteger totalCalls = new AtomicInteger();
91     Runnable intCounter =
92         new Runnable() {
93           @Override
94           public void run() {
95             totalCalls.incrementAndGet();
96             // Make sure that no other tasks are scheduled to run while this is running.
97             assertFalse(fakePool.hasNext());
98           }
99         };
100 
101     assertFalse(fakePool.hasNext());
102     e.execute(intCounter);
103     // A task should have been scheduled
104     assertTrue(fakePool.hasNext());
105     e.execute(intCounter);
106     // Our executor hasn't run any tasks yet.
107     assertEquals(0, totalCalls.get());
108     fakePool.runAll();
109     assertEquals(2, totalCalls.get());
110     // Queue is empty so no runner should be scheduled.
111     assertFalse(fakePool.hasNext());
112 
113     // Check that execute can be safely repeated
114     e.execute(intCounter);
115     e.execute(intCounter);
116     e.execute(intCounter);
117     // No change yet.
118     assertEquals(2, totalCalls.get());
119     fakePool.runAll();
120     assertEquals(5, totalCalls.get());
121     assertFalse(fakePool.hasNext());
122   }
123 
testOrdering()124   public void testOrdering() {
125     final List<Integer> callOrder = Lists.newArrayList();
126 
127     class FakeOp implements Runnable {
128       final int op;
129 
130       FakeOp(int op) {
131         this.op = op;
132       }
133 
134       @Override
135       public void run() {
136         callOrder.add(op);
137       }
138     }
139 
140     e.execute(new FakeOp(0));
141     e.execute(new FakeOp(1));
142     e.execute(new FakeOp(2));
143     fakePool.runAll();
144 
145     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
146   }
147 
testRuntimeException_doesNotStopExecution()148   public void testRuntimeException_doesNotStopExecution() {
149 
150     final AtomicInteger numCalls = new AtomicInteger();
151 
152     Runnable runMe =
153         new Runnable() {
154           @Override
155           public void run() {
156             numCalls.incrementAndGet();
157             throw new RuntimeException("FAKE EXCEPTION!");
158           }
159         };
160 
161     e.execute(runMe);
162     e.execute(runMe);
163     fakePool.runAll();
164 
165     assertEquals(2, numCalls.get());
166   }
167 
testInterrupt_beforeRunRestoresInterruption()168   public void testInterrupt_beforeRunRestoresInterruption() throws Exception {
169     // Run a task on the composed Executor that interrupts its thread (i.e. this thread).
170     fakePool.execute(
171         new Runnable() {
172           @Override
173           public void run() {
174             Thread.currentThread().interrupt();
175           }
176         });
177     // Run a task that expects that it is not interrupted while it is running.
178     e.execute(
179         new Runnable() {
180           @Override
181           public void run() {
182             assertThat(Thread.currentThread().isInterrupted()).isFalse();
183           }
184         });
185 
186     // Run these together.
187     fakePool.runAll();
188 
189     // Check that this thread has been marked as interrupted again now that the thread has been
190     // returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose
191     // JUnit or some other test case.
192     assertThat(Thread.interrupted()).isTrue();
193   }
194 
testInterrupt_doesNotInterruptSubsequentTask()195   public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception {
196     // Run a task that interrupts its thread (i.e. this thread).
197     e.execute(
198         new Runnable() {
199           @Override
200           public void run() {
201             Thread.currentThread().interrupt();
202           }
203         });
204     // Run a task that expects that it is not interrupted while it is running.
205     e.execute(
206         new Runnable() {
207           @Override
208           public void run() {
209             assertThat(Thread.currentThread().isInterrupted()).isFalse();
210           }
211         });
212 
213     // Run those tasks together.
214     fakePool.runAll();
215 
216     // Check that the interruption of a SequentialExecutor's task is restored to the thread once
217     // it is yielded. Clear the bit while checking so that the test doesn't hose JUnit or some other
218     // test case.
219     assertThat(Thread.interrupted()).isTrue();
220   }
221 
testInterrupt_doesNotStopExecution()222   public void testInterrupt_doesNotStopExecution() {
223 
224     final AtomicInteger numCalls = new AtomicInteger();
225 
226     Runnable runMe =
227         new Runnable() {
228           @Override
229           public void run() {
230             numCalls.incrementAndGet();
231           }
232         };
233 
234     Thread.currentThread().interrupt();
235 
236     e.execute(runMe);
237     e.execute(runMe);
238     fakePool.runAll();
239 
240     assertEquals(2, numCalls.get());
241 
242     assertTrue(Thread.interrupted());
243   }
244 
testDelegateRejection()245   public void testDelegateRejection() {
246     final AtomicInteger numCalls = new AtomicInteger();
247     final AtomicBoolean reject = new AtomicBoolean(true);
248     final SequentialExecutor executor =
249         new SequentialExecutor(
250             new Executor() {
251               @Override
252               public void execute(Runnable r) {
253                 if (reject.get()) {
254                   throw new RejectedExecutionException();
255                 }
256                 r.run();
257               }
258             });
259     Runnable task =
260         new Runnable() {
261           @Override
262           public void run() {
263             numCalls.incrementAndGet();
264           }
265         };
266     try {
267       executor.execute(task);
268       fail();
269     } catch (RejectedExecutionException expected) {
270     }
271     assertEquals(0, numCalls.get());
272     reject.set(false);
273     executor.execute(task);
274     assertEquals(1, numCalls.get());
275   }
276 
277 
testTaskThrowsError()278   public void testTaskThrowsError() throws Exception {
279     class MyError extends Error {}
280     final CyclicBarrier barrier = new CyclicBarrier(2);
281     // we need to make sure the error gets thrown on a different thread.
282     ExecutorService service = Executors.newSingleThreadExecutor();
283     try {
284       final SequentialExecutor executor = new SequentialExecutor(service);
285       Runnable errorTask =
286           new Runnable() {
287             @Override
288             public void run() {
289               throw new MyError();
290             }
291           };
292       Runnable barrierTask =
293           new Runnable() {
294             @Override
295             public void run() {
296               try {
297                 barrier.await();
298               } catch (Exception e) {
299                 throw new RuntimeException(e);
300               }
301             }
302           };
303       executor.execute(errorTask);
304       service.execute(barrierTask); // submit directly to the service
305       // the barrier task runs after the error task so we know that the error has been observed by
306       // SequentialExecutor by the time the barrier is satified
307       barrier.await(1, TimeUnit.SECONDS);
308       executor.execute(barrierTask);
309       // timeout means the second task wasn't even tried
310       barrier.await(1, TimeUnit.SECONDS);
311     } finally {
312       service.shutdown();
313     }
314   }
315 
316 
testRejectedExecutionThrownWithMultipleCalls()317   public void testRejectedExecutionThrownWithMultipleCalls() throws Exception {
318     final CountDownLatch latch = new CountDownLatch(1);
319     final SettableFuture<?> future = SettableFuture.create();
320     final Executor delegate =
321         new Executor() {
322           @Override
323           public void execute(Runnable task) {
324             if (future.set(null)) {
325               awaitUninterruptibly(latch);
326             }
327             throw new RejectedExecutionException();
328           }
329         };
330     final SequentialExecutor executor = new SequentialExecutor(delegate);
331     final ExecutorService blocked = Executors.newCachedThreadPool();
332     Future<?> first =
333         blocked.submit(
334             new Runnable() {
335               @Override
336               public void run() {
337                 executor.execute(Runnables.doNothing());
338               }
339             });
340     future.get(10, TimeUnit.SECONDS);
341     try {
342       executor.execute(Runnables.doNothing());
343       fail();
344     } catch (RejectedExecutionException expected) {
345     }
346     latch.countDown();
347     try {
348       first.get(10, TimeUnit.SECONDS);
349       fail();
350     } catch (ExecutionException expected) {
351       assertThat(expected).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
352     }
353   }
354 
testToString()355   public void testToString() {
356     final Runnable[] currentTask = new Runnable[1];
357     final Executor delegate =
358         new Executor() {
359           @Override
360           public void execute(Runnable task) {
361             currentTask[0] = task;
362             task.run();
363             currentTask[0] = null;
364           }
365 
366           @Override
367           public String toString() {
368             return "theDelegate";
369           }
370         };
371     Executor sequential1 = newSequentialExecutor(delegate);
372     Executor sequential2 = newSequentialExecutor(delegate);
373     assertThat(sequential1.toString()).contains("theDelegate");
374     assertThat(sequential1.toString()).isNotEqualTo(sequential2.toString());
375     final String[] whileRunningToString = new String[1];
376     sequential1.execute(
377         new Runnable() {
378           @Override
379           public void run() {
380             whileRunningToString[0] = "" + currentTask[0];
381           }
382 
383           @Override
384           public String toString() {
385             return "my runnable's toString";
386           }
387         });
388     assertThat(whileRunningToString[0]).contains("my runnable's toString");
389   }
390 }
391