• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static java.util.concurrent.TimeUnit.SECONDS;
23 
24 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
25 import com.google.common.util.concurrent.Service.State;
26 import com.google.common.util.concurrent.testing.TestingExecutors;
27 import java.util.concurrent.BrokenBarrierException;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.CyclicBarrier;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.ScheduledThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicReference;
41 import junit.framework.TestCase;
42 
43 /**
44  * Unit test for {@link AbstractScheduledService}.
45  *
46  * @author Luke Sandberg
47  */
48 
49 public class AbstractScheduledServiceTest extends TestCase {
50 
51   volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
52   volatile ScheduledFuture<?> future = null;
53 
54   volatile boolean atFixedRateCalled = false;
55   volatile boolean withFixedDelayCalled = false;
56   volatile boolean scheduleCalled = false;
57 
58   final ScheduledExecutorService executor =
59       new ScheduledThreadPoolExecutor(10) {
60         @Override
61         public ScheduledFuture<?> scheduleWithFixedDelay(
62             Runnable command, long initialDelay, long delay, TimeUnit unit) {
63           return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
64         }
65       };
66 
testServiceStartStop()67   public void testServiceStartStop() throws Exception {
68     NullService service = new NullService();
69     service.startAsync().awaitRunning();
70     assertFalse(future.isDone());
71     service.stopAsync().awaitTerminated();
72     assertTrue(future.isCancelled());
73   }
74 
75   private class NullService extends AbstractScheduledService {
76     @Override
runOneIteration()77     protected void runOneIteration() throws Exception {}
78 
79     @Override
scheduler()80     protected Scheduler scheduler() {
81       return configuration;
82     }
83 
84     @Override
executor()85     protected ScheduledExecutorService executor() {
86       return executor;
87     }
88   }
89 
testFailOnExceptionFromRun()90   public void testFailOnExceptionFromRun() throws Exception {
91     TestService service = new TestService();
92     service.runException = new Exception();
93     service.startAsync().awaitRunning();
94     service.runFirstBarrier.await();
95     service.runSecondBarrier.await();
96     try {
97       future.get();
98       fail();
99     } catch (CancellationException expected) {
100     }
101     // An execution exception holds a runtime exception (from throwables.propogate) that holds our
102     // original exception.
103     assertEquals(service.runException, service.failureCause());
104     assertEquals(Service.State.FAILED, service.state());
105   }
106 
testFailOnExceptionFromStartUp()107   public void testFailOnExceptionFromStartUp() {
108     TestService service = new TestService();
109     service.startUpException = new Exception();
110     try {
111       service.startAsync().awaitRunning();
112       fail();
113     } catch (IllegalStateException e) {
114       assertEquals(service.startUpException, e.getCause());
115     }
116     assertEquals(0, service.numberOfTimesRunCalled.get());
117     assertEquals(Service.State.FAILED, service.state());
118   }
119 
testFailOnErrorFromStartUpListener()120   public void testFailOnErrorFromStartUpListener() throws InterruptedException {
121     final Error error = new Error();
122     final CountDownLatch latch = new CountDownLatch(1);
123     TestService service = new TestService();
124     service.addListener(
125         new Service.Listener() {
126           @Override
127           public void running() {
128             throw error;
129           }
130 
131           @Override
132           public void failed(State from, Throwable failure) {
133             assertEquals(State.RUNNING, from);
134             assertEquals(error, failure);
135             latch.countDown();
136           }
137         },
138         directExecutor());
139     service.startAsync();
140     latch.await();
141 
142     assertEquals(0, service.numberOfTimesRunCalled.get());
143     assertEquals(Service.State.FAILED, service.state());
144   }
145 
testFailOnExceptionFromShutDown()146   public void testFailOnExceptionFromShutDown() throws Exception {
147     TestService service = new TestService();
148     service.shutDownException = new Exception();
149     service.startAsync().awaitRunning();
150     service.runFirstBarrier.await();
151     service.stopAsync();
152     service.runSecondBarrier.await();
153     try {
154       service.awaitTerminated();
155       fail();
156     } catch (IllegalStateException e) {
157       assertEquals(service.shutDownException, e.getCause());
158     }
159     assertEquals(Service.State.FAILED, service.state());
160   }
161 
testRunOneIterationCalledMultipleTimes()162   public void testRunOneIterationCalledMultipleTimes() throws Exception {
163     TestService service = new TestService();
164     service.startAsync().awaitRunning();
165     for (int i = 1; i < 10; i++) {
166       service.runFirstBarrier.await();
167       assertEquals(i, service.numberOfTimesRunCalled.get());
168       service.runSecondBarrier.await();
169     }
170     service.runFirstBarrier.await();
171     service.stopAsync();
172     service.runSecondBarrier.await();
173     service.stopAsync().awaitTerminated();
174   }
175 
testExecutorOnlyCalledOnce()176   public void testExecutorOnlyCalledOnce() throws Exception {
177     TestService service = new TestService();
178     service.startAsync().awaitRunning();
179     // It should be called once during startup.
180     assertEquals(1, service.numberOfTimesExecutorCalled.get());
181     for (int i = 1; i < 10; i++) {
182       service.runFirstBarrier.await();
183       assertEquals(i, service.numberOfTimesRunCalled.get());
184       service.runSecondBarrier.await();
185     }
186     service.runFirstBarrier.await();
187     service.stopAsync();
188     service.runSecondBarrier.await();
189     service.stopAsync().awaitTerminated();
190     // Only called once overall.
191     assertEquals(1, service.numberOfTimesExecutorCalled.get());
192   }
193 
testDefaultExecutorIsShutdownWhenServiceIsStopped()194   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
195     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
196     AbstractScheduledService service =
197         new AbstractScheduledService() {
198           @Override
199           protected void runOneIteration() throws Exception {}
200 
201           @Override
202           protected ScheduledExecutorService executor() {
203             executor.set(super.executor());
204             return executor.get();
205           }
206 
207           @Override
208           protected Scheduler scheduler() {
209             return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
210           }
211         };
212 
213     service.startAsync();
214     assertFalse(service.executor().isShutdown());
215     service.awaitRunning();
216     service.stopAsync();
217     service.awaitTerminated();
218     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
219   }
220 
testDefaultExecutorIsShutdownWhenServiceFails()221   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
222     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
223     AbstractScheduledService service =
224         new AbstractScheduledService() {
225           @Override
226           protected void startUp() throws Exception {
227             throw new Exception("Failed");
228           }
229 
230           @Override
231           protected void runOneIteration() throws Exception {}
232 
233           @Override
234           protected ScheduledExecutorService executor() {
235             executor.set(super.executor());
236             return executor.get();
237           }
238 
239           @Override
240           protected Scheduler scheduler() {
241             return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
242           }
243         };
244 
245     try {
246       service.startAsync().awaitRunning();
247       fail("Expected service to fail during startup");
248     } catch (IllegalStateException expected) {
249     }
250 
251     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
252   }
253 
testSchedulerOnlyCalledOnce()254   public void testSchedulerOnlyCalledOnce() throws Exception {
255     TestService service = new TestService();
256     service.startAsync().awaitRunning();
257     // It should be called once during startup.
258     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
259     for (int i = 1; i < 10; i++) {
260       service.runFirstBarrier.await();
261       assertEquals(i, service.numberOfTimesRunCalled.get());
262       service.runSecondBarrier.await();
263     }
264     service.runFirstBarrier.await();
265     service.stopAsync();
266     service.runSecondBarrier.await();
267     service.awaitTerminated();
268     // Only called once overall.
269     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
270   }
271 
testTimeout()272   public void testTimeout() {
273     // Create a service whose executor will never run its commands
274     Service service =
275         new AbstractScheduledService() {
276           @Override
277           protected Scheduler scheduler() {
278             return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS);
279           }
280 
281           @Override
282           protected ScheduledExecutorService executor() {
283             return TestingExecutors.noOpScheduledExecutor();
284           }
285 
286           @Override
287           protected void runOneIteration() throws Exception {}
288 
289           @Override
290           protected String serviceName() {
291             return "Foo";
292           }
293         };
294     try {
295       service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
296       fail("Expected timeout");
297     } catch (TimeoutException e) {
298       assertThat(e)
299           .hasMessageThat()
300           .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
301     }
302   }
303 
304   private class TestService extends AbstractScheduledService {
305     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
306     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
307 
308     volatile boolean startUpCalled = false;
309     volatile boolean shutDownCalled = false;
310     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
311     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
312     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
313     volatile Exception runException = null;
314     volatile Exception startUpException = null;
315     volatile Exception shutDownException = null;
316 
317     @Override
runOneIteration()318     protected void runOneIteration() throws Exception {
319       assertTrue(startUpCalled);
320       assertFalse(shutDownCalled);
321       numberOfTimesRunCalled.incrementAndGet();
322       assertEquals(State.RUNNING, state());
323       runFirstBarrier.await();
324       runSecondBarrier.await();
325       if (runException != null) {
326         throw runException;
327       }
328     }
329 
330     @Override
startUp()331     protected void startUp() throws Exception {
332       assertFalse(startUpCalled);
333       assertFalse(shutDownCalled);
334       startUpCalled = true;
335       assertEquals(State.STARTING, state());
336       if (startUpException != null) {
337         throw startUpException;
338       }
339     }
340 
341     @Override
shutDown()342     protected void shutDown() throws Exception {
343       assertTrue(startUpCalled);
344       assertFalse(shutDownCalled);
345       shutDownCalled = true;
346       if (shutDownException != null) {
347         throw shutDownException;
348       }
349     }
350 
351     @Override
executor()352     protected ScheduledExecutorService executor() {
353       numberOfTimesExecutorCalled.incrementAndGet();
354       return executor;
355     }
356 
357     @Override
scheduler()358     protected Scheduler scheduler() {
359       numberOfTimesSchedulerCalled.incrementAndGet();
360       return configuration;
361     }
362   }
363 
364   public static class SchedulerTest extends TestCase {
365     // These constants are arbitrary and just used to make sure that the correct method is called
366     // with the correct parameters.
367     private static final int initialDelay = 10;
368     private static final int delay = 20;
369     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
370 
371     // Unique runnable object used for comparison.
372     final Runnable testRunnable =
373         new Runnable() {
374           @Override
375           public void run() {}
376         };
377     boolean called = false;
378 
assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)379     private void assertSingleCallWithCorrectParameters(
380         Runnable command, long initialDelay, long delay, TimeUnit unit) {
381       assertFalse(called); // only called once.
382       called = true;
383       assertEquals(SchedulerTest.initialDelay, initialDelay);
384       assertEquals(SchedulerTest.delay, delay);
385       assertEquals(SchedulerTest.unit, unit);
386       assertEquals(testRunnable, command);
387     }
388 
testFixedRateSchedule()389     public void testFixedRateSchedule() {
390       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
391       Future<?> unused =
392           schedule.schedule(
393               null,
394               new ScheduledThreadPoolExecutor(1) {
395                 @Override
396                 public ScheduledFuture<?> scheduleAtFixedRate(
397                     Runnable command, long initialDelay, long period, TimeUnit unit) {
398                   assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
399                   return null;
400                 }
401               },
402               testRunnable);
403       assertTrue(called);
404     }
405 
testFixedDelaySchedule()406     public void testFixedDelaySchedule() {
407       Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit);
408       Future<?> unused =
409           schedule.schedule(
410               null,
411               new ScheduledThreadPoolExecutor(10) {
412                 @Override
413                 public ScheduledFuture<?> scheduleWithFixedDelay(
414                     Runnable command, long initialDelay, long delay, TimeUnit unit) {
415                   assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
416                   return null;
417                 }
418               },
419               testRunnable);
420       assertTrue(called);
421     }
422 
testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()423     public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
424         throws Exception {
425       TestAbstractScheduledCustomService service =
426           new TestAbstractScheduledCustomService() {
427             @Override
428             protected Scheduler scheduler() {
429               return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS);
430             }
431           };
432       service.startAsync().awaitRunning();
433       try {
434         service.firstBarrier.await(5, SECONDS);
435         fail();
436       } catch (TimeoutException expected) {
437       }
438       assertEquals(0, service.numIterations.get());
439       service.stopAsync();
440       service.awaitTerminated();
441     }
442 
testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()443     public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
444         throws Exception {
445       TestAbstractScheduledCustomService service =
446           new TestAbstractScheduledCustomService() {
447             @Override
448             protected Scheduler scheduler() {
449               return new AbstractScheduledService.CustomScheduler() {
450                 @Override
451                 protected Schedule getNextSchedule() throws Exception {
452                   return new Schedule(Long.MAX_VALUE, SECONDS);
453                 }
454               };
455             }
456           };
457       service.startAsync().awaitRunning();
458       try {
459         service.firstBarrier.await(5, SECONDS);
460         fail();
461       } catch (TimeoutException expected) {
462       }
463       assertEquals(0, service.numIterations.get());
464       service.stopAsync();
465       service.awaitTerminated();
466     }
467 
468     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
469       public AtomicInteger scheduleCounter = new AtomicInteger(0);
470 
471       @Override
getNextSchedule()472       protected Schedule getNextSchedule() throws Exception {
473         scheduleCounter.incrementAndGet();
474         return new Schedule(0, TimeUnit.SECONDS);
475       }
476     }
477 
testCustomSchedule_startStop()478     public void testCustomSchedule_startStop() throws Exception {
479       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
480       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
481       final AtomicBoolean shouldWait = new AtomicBoolean(true);
482       Runnable task =
483           new Runnable() {
484             @Override
485             public void run() {
486               try {
487                 if (shouldWait.get()) {
488                   firstBarrier.await();
489                   secondBarrier.await();
490                 }
491               } catch (Exception e) {
492                 throw new RuntimeException(e);
493               }
494             }
495           };
496       TestCustomScheduler scheduler = new TestCustomScheduler();
497       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
498       firstBarrier.await();
499       assertEquals(1, scheduler.scheduleCounter.get());
500       secondBarrier.await();
501       firstBarrier.await();
502       assertEquals(2, scheduler.scheduleCounter.get());
503       shouldWait.set(false);
504       secondBarrier.await();
505       future.cancel(false);
506     }
507 
testCustomSchedulerServiceStop()508     public void testCustomSchedulerServiceStop() throws Exception {
509       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
510       service.startAsync().awaitRunning();
511       service.firstBarrier.await();
512       assertEquals(1, service.numIterations.get());
513       service.stopAsync();
514       service.secondBarrier.await();
515       service.awaitTerminated();
516       // Sleep for a while just to ensure that our task wasn't called again.
517       Thread.sleep(unit.toMillis(3 * delay));
518       assertEquals(1, service.numIterations.get());
519     }
520 
testCustomScheduler_deadlock()521     public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
522       final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
523       // This will flakily deadlock, so run it multiple times to increase the flake likelihood
524       for (int i = 0; i < 1000; i++) {
525         Service service =
526             new AbstractScheduledService() {
527               @Override
528               protected void runOneIteration() {}
529 
530               @Override
531               protected Scheduler scheduler() {
532                 return new CustomScheduler() {
533                   @Override
534                   protected Schedule getNextSchedule() throws Exception {
535                     if (state() != State.STARTING) {
536                       inGetNextSchedule.await();
537                       Thread.yield();
538                       throw new RuntimeException("boom");
539                     }
540                     return new Schedule(0, TimeUnit.NANOSECONDS);
541                   }
542                 };
543               }
544             };
545         service.startAsync().awaitRunning();
546         inGetNextSchedule.await();
547         service.stopAsync();
548       }
549     }
550 
testBig()551     public void testBig() throws Exception {
552       TestAbstractScheduledCustomService service =
553           new TestAbstractScheduledCustomService() {
554             @Override
555             protected Scheduler scheduler() {
556               return new AbstractScheduledService.CustomScheduler() {
557                 @Override
558                 protected Schedule getNextSchedule() throws Exception {
559                   // Explicitly yield to increase the probability of a pathological scheduling.
560                   Thread.yield();
561                   return new Schedule(0, TimeUnit.SECONDS);
562                 }
563               };
564             }
565           };
566       service.useBarriers = false;
567       service.startAsync().awaitRunning();
568       Thread.sleep(50);
569       service.useBarriers = true;
570       service.firstBarrier.await();
571       int numIterations = service.numIterations.get();
572       service.stopAsync();
573       service.secondBarrier.await();
574       service.awaitTerminated();
575       assertEquals(numIterations, service.numIterations.get());
576     }
577 
578     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
579       final AtomicInteger numIterations = new AtomicInteger(0);
580       volatile boolean useBarriers = true;
581       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
582       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
583 
584       @Override
runOneIteration()585       protected void runOneIteration() throws Exception {
586         numIterations.incrementAndGet();
587         if (useBarriers) {
588           firstBarrier.await();
589           secondBarrier.await();
590         }
591       }
592 
593       @Override
executor()594       protected ScheduledExecutorService executor() {
595         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
596         return Executors.newScheduledThreadPool(10);
597       }
598 
599       @Override
scheduler()600       protected Scheduler scheduler() {
601         return new CustomScheduler() {
602           @Override
603           protected Schedule getNextSchedule() throws Exception {
604             return new Schedule(delay, unit);
605           }
606         };
607       }
608     }
609 
testCustomSchedulerFailure()610     public void testCustomSchedulerFailure() throws Exception {
611       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
612       service.startAsync().awaitRunning();
613       for (int i = 1; i < 4; i++) {
614         service.firstBarrier.await();
615         assertEquals(i, service.numIterations.get());
616         service.secondBarrier.await();
617       }
618       Thread.sleep(1000);
619       try {
620         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
621         fail();
622       } catch (IllegalStateException e) {
623         assertEquals(State.FAILED, service.state());
624       }
625     }
626 
627     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
628       final AtomicInteger numIterations = new AtomicInteger(0);
629       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
630       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
631 
632       @Override
runOneIteration()633       protected void runOneIteration() throws Exception {
634         numIterations.incrementAndGet();
635         firstBarrier.await();
636         secondBarrier.await();
637       }
638 
639       @Override
executor()640       protected ScheduledExecutorService executor() {
641         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
642         return Executors.newScheduledThreadPool(10);
643       }
644 
645       @Override
scheduler()646       protected Scheduler scheduler() {
647         return new CustomScheduler() {
648           @Override
649           protected Schedule getNextSchedule() throws Exception {
650             if (numIterations.get() > 2) {
651               throw new IllegalStateException("Failed");
652             }
653             return new Schedule(delay, unit);
654           }
655         };
656       }
657     }
658   }
659 }
660