• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
20 import com.google.common.util.concurrent.Service.State;
21 
22 import junit.framework.TestCase;
23 
24 import java.util.concurrent.CyclicBarrier;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 
36 /**
37  * Unit test for {@link AbstractScheduledService}.
38  *
39  * @author Luke Sandberg
40  */
41 
42 public class AbstractScheduledServiceTest extends TestCase {
43 
44   volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
45   volatile ScheduledFuture<?> future = null;
46 
47   volatile boolean atFixedRateCalled = false;
48   volatile boolean withFixedDelayCalled = false;
49   volatile boolean scheduleCalled = false;
50 
51   final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
52     @Override
53     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
54         long delay, TimeUnit unit) {
55       return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
56     }
57   };
58 
testServiceStartStop()59   public void testServiceStartStop() throws Exception {
60     NullService service = new NullService();
61     service.startAndWait();
62     assertFalse(future.isDone());
63     service.stopAndWait();
64     assertTrue(future.isCancelled());
65   }
66 
67   private class NullService extends AbstractScheduledService {
runOneIteration()68     @Override protected void runOneIteration() throws Exception { }
startUp()69     @Override protected void startUp() throws Exception { }
shutDown()70     @Override protected void shutDown() throws Exception { }
scheduler()71     @Override protected Scheduler scheduler() { return configuration; }
executor()72     @Override protected ScheduledExecutorService executor() { return executor; }
73   }
74 
testFailOnExceptionFromRun()75   public void testFailOnExceptionFromRun() throws Exception {
76     TestService service = new TestService();
77     service.runException = new Exception();
78     service.startAndWait();
79     service.runFirstBarrier.await();
80     service.runSecondBarrier.await();
81     try {
82       future.get();
83       fail();
84     } catch (ExecutionException e) {
85       // An execution exception holds a runtime exception (from throwables.propogate) that holds our
86       // original exception.
87       assertEquals(service.runException, e.getCause().getCause());
88     }
89     assertEquals(service.state(), Service.State.FAILED);
90   }
91 
testFailOnExceptionFromStartUp()92   public void testFailOnExceptionFromStartUp() {
93     TestService service = new TestService();
94     service.startUpException = new Exception();
95     try {
96       service.startAndWait();
97       fail();
98     } catch (UncheckedExecutionException e) {
99       assertEquals(service.startUpException, e.getCause());
100     }
101     assertEquals(0, service.numberOfTimesRunCalled.get());
102     assertEquals(Service.State.FAILED, service.state());
103   }
104 
testFailOnExceptionFromShutDown()105   public void testFailOnExceptionFromShutDown() throws Exception {
106     TestService service = new TestService();
107     service.shutDownException = new Exception();
108     service.startAndWait();
109     service.runFirstBarrier.await();
110     ListenableFuture<Service.State> stopHandle = service.stop();
111     service.runSecondBarrier.await();
112     try {
113       stopHandle.get();
114       fail();
115     } catch (ExecutionException e) {
116       assertEquals(service.shutDownException, e.getCause());
117     }
118     assertEquals(Service.State.FAILED, service.state());
119   }
120 
testRunOneIterationCalledMultipleTimes()121   public void testRunOneIterationCalledMultipleTimes() throws Exception {
122     TestService service = new TestService();
123     service.startAndWait();
124     for (int i = 1; i < 10; i++) {
125       service.runFirstBarrier.await();
126       assertEquals(i, service.numberOfTimesRunCalled.get());
127       service.runSecondBarrier.await();
128     }
129     service.runFirstBarrier.await();
130     service.stop();
131     service.runSecondBarrier.await();
132     service.stopAndWait();
133   }
134 
testExecutorOnlyCalledOnce()135   public void testExecutorOnlyCalledOnce() throws Exception {
136     TestService service = new TestService();
137     service.startAndWait();
138     // It should be called once during startup.
139     assertEquals(1, service.numberOfTimesExecutorCalled.get());
140     for (int i = 1; i < 10; i++) {
141       service.runFirstBarrier.await();
142       assertEquals(i, service.numberOfTimesRunCalled.get());
143       service.runSecondBarrier.await();
144     }
145     service.runFirstBarrier.await();
146     service.stop();
147     service.runSecondBarrier.await();
148     service.stopAndWait();
149     // Only called once overall.
150     assertEquals(1, service.numberOfTimesExecutorCalled.get());
151   }
152 
testSchedulerOnlyCalledOnce()153   public void testSchedulerOnlyCalledOnce() throws Exception {
154     TestService service = new TestService();
155     service.startAndWait();
156     // It should be called once during startup.
157     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
158     for (int i = 1; i < 10; i++) {
159       service.runFirstBarrier.await();
160       assertEquals(i, service.numberOfTimesRunCalled.get());
161       service.runSecondBarrier.await();
162     }
163     service.runFirstBarrier.await();
164     service.stop();
165     service.runSecondBarrier.await();
166     service.stopAndWait();
167     // Only called once overall.
168     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
169   }
170 
171   private class TestService extends AbstractScheduledService {
172     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
173     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
174 
175     volatile boolean startUpCalled = false;
176     volatile boolean shutDownCalled = false;
177     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
178     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
179     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
180     volatile Exception runException = null;
181     volatile Exception startUpException = null;
182     volatile Exception shutDownException = null;
183 
184     @Override
runOneIteration()185     protected void runOneIteration() throws Exception {
186       assertTrue(startUpCalled);
187       assertFalse(shutDownCalled);
188       numberOfTimesRunCalled.incrementAndGet();
189       assertEquals(State.RUNNING, state());
190       runFirstBarrier.await();
191       runSecondBarrier.await();
192       if (runException != null) {
193         throw runException;
194       }
195     }
196 
197     @Override
startUp()198     protected void startUp() throws Exception {
199       assertFalse(startUpCalled);
200       assertFalse(shutDownCalled);
201       startUpCalled = true;
202       assertEquals(State.STARTING, state());
203       if (startUpException != null) {
204         throw startUpException;
205       }
206     }
207 
208     @Override
shutDown()209     protected void shutDown() throws Exception {
210       assertTrue(startUpCalled);
211       assertFalse(shutDownCalled);
212       shutDownCalled = true;
213       if (shutDownException != null) {
214         throw shutDownException;
215       }
216     }
217 
218     @Override
executor()219     protected ScheduledExecutorService executor() {
220       numberOfTimesExecutorCalled.incrementAndGet();
221       return executor;
222     }
223 
224     @Override
scheduler()225     protected Scheduler scheduler() {
226       numberOfTimesSchedulerCalled.incrementAndGet();
227       return configuration;
228     }
229   }
230 
231   public static class SchedulerTest extends TestCase {
232     // These constants are arbitrary and just used to make sure that the correct method is called
233     // with the correct parameters.
234     private static final int initialDelay = 10;
235     private static final int delay = 20;
236     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
237 
238     // Unique runnable object used for comparison.
239     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
240     boolean called = false;
241 
assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, long delay, TimeUnit unit)242     private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
243         long delay, TimeUnit unit) {
244       assertFalse(called);  // only called once.
245       called = true;
246       assertEquals(SchedulerTest.initialDelay, initialDelay);
247       assertEquals(SchedulerTest.delay, delay);
248       assertEquals(SchedulerTest.unit, unit);
249       assertEquals(testRunnable, command);
250     }
251 
testFixedRateSchedule()252     public void testFixedRateSchedule() {
253       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
254       schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
255         @Override
256         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
257             long period, TimeUnit unit) {
258           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
259           return null;
260         }
261       }, testRunnable);
262       assertTrue(called);
263     }
264 
testFixedDelaySchedule()265     public void testFixedDelaySchedule() {
266       Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
267       schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
268         @Override
269         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
270             long delay, TimeUnit unit) {
271           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
272           return null;
273         }
274       }, testRunnable);
275       assertTrue(called);
276     }
277 
278     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
279       public AtomicInteger scheduleCounter = new AtomicInteger(0);
280       @Override
getNextSchedule()281       protected Schedule getNextSchedule() throws Exception {
282         scheduleCounter.incrementAndGet();
283         return new Schedule(0, TimeUnit.SECONDS);
284       }
285     }
286 
testCustomSchedule_startStop()287     public void testCustomSchedule_startStop() throws Exception {
288       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
289       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
290       final AtomicBoolean shouldWait = new AtomicBoolean(true);
291       Runnable task = new Runnable() {
292         @Override public void run() {
293           try {
294             if (shouldWait.get()) {
295               firstBarrier.await();
296               secondBarrier.await();
297             }
298           } catch (Exception e) {
299             throw new RuntimeException(e);
300           }
301         }
302       };
303       TestCustomScheduler scheduler = new TestCustomScheduler();
304       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
305       firstBarrier.await();
306       assertEquals(1, scheduler.scheduleCounter.get());
307       secondBarrier.await();
308       firstBarrier.await();
309       assertEquals(2, scheduler.scheduleCounter.get());
310       shouldWait.set(false);
311       secondBarrier.await();
312       future.cancel(false);
313     }
314 
testCustomSchedulerServiceStop()315     public void testCustomSchedulerServiceStop() throws Exception {
316       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
317       service.startAndWait();
318       service.firstBarrier.await();
319       assertEquals(1, service.numIterations.get());
320       service.stop();
321       service.secondBarrier.await();
322       service.stopAndWait();
323       // Sleep for a while just to ensure that our task wasn't called again.
324       Thread.sleep(unit.toMillis(3 * delay));
325       assertEquals(1, service.numIterations.get());
326     }
327 
testBig()328     public void testBig() throws Exception {
329       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
330         @Override protected Scheduler scheduler() {
331           return new AbstractScheduledService.CustomScheduler(){
332             @Override
333             protected Schedule getNextSchedule() throws Exception {
334               // Explicitly yield to increase the probability of a pathological scheduling.
335               Thread.yield();
336               return new Schedule(0, TimeUnit.SECONDS);
337             }
338           };
339         }
340       };
341       service.useBarriers = false;
342       service.startAndWait();
343       Thread.sleep(50);
344       service.useBarriers = true;
345       service.firstBarrier.await();
346       int numIterations = service.numIterations.get();
347       service.stop();
348       service.secondBarrier.await();
349       service.stopAndWait();
350       assertEquals(numIterations, service.numIterations.get());
351     }
352 
353     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
354       final AtomicInteger numIterations = new AtomicInteger(0);
355       volatile boolean useBarriers = true;
356       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
357       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
358 
runOneIteration()359       @Override protected void runOneIteration() throws Exception {
360         numIterations.incrementAndGet();
361         if (useBarriers) {
362           firstBarrier.await();
363           secondBarrier.await();
364         }
365       }
366 
executor()367       @Override protected ScheduledExecutorService executor() {
368         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
369         return Executors.newScheduledThreadPool(10);
370       }
371 
startUp()372       @Override protected void startUp() throws Exception { }
373 
shutDown()374       @Override protected void shutDown() throws Exception { }
375 
scheduler()376       @Override protected Scheduler scheduler() {
377         return new CustomScheduler() {
378           @Override
379           protected Schedule getNextSchedule() throws Exception {
380             return new Schedule(delay, unit);
381           }};
382       }
383     }
384 
testCustomSchedulerFailure()385     public void testCustomSchedulerFailure() throws Exception {
386       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
387       service.startAndWait();
388       for (int i = 1; i < 4; i++) {
389         service.firstBarrier.await();
390         assertEquals(i, service.numIterations.get());
391         service.secondBarrier.await();
392       }
393       Thread.sleep(1000);
394       try {
395         service.stop().get(100, TimeUnit.SECONDS);
396         fail();
397       } catch (ExecutionException e) {
398         assertEquals(State.FAILED, service.state());
399       }
400     }
401 
402     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
403       final AtomicInteger numIterations = new AtomicInteger(0);
404       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
405       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
406 
runOneIteration()407       @Override protected void runOneIteration() throws Exception {
408         numIterations.incrementAndGet();
409         firstBarrier.await();
410         secondBarrier.await();
411       }
412 
executor()413       @Override protected ScheduledExecutorService executor() {
414         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
415         return Executors.newScheduledThreadPool(10);
416       }
417 
startUp()418       @Override protected void startUp() throws Exception { }
419 
shutDown()420       @Override protected void shutDown() throws Exception { }
421 
scheduler()422       @Override protected Scheduler scheduler() {
423         return new CustomScheduler() {
424           @Override
425           protected Schedule getNextSchedule() throws Exception {
426             if (numIterations.get() > 2) {
427               throw new IllegalStateException("Failed");
428             }
429             return new Schedule(delay, unit);
430           }};
431       }
432     }
433   }
434 }
435