• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static java.util.concurrent.TimeUnit.SECONDS;
23 
24 import com.google.common.util.concurrent.AbstractScheduledService.Cancellable;
25 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
26 import com.google.common.util.concurrent.Service.State;
27 import com.google.common.util.concurrent.testing.TestingExecutors;
28 import java.util.concurrent.BrokenBarrierException;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.CyclicBarrier;
32 import java.util.concurrent.Delayed;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicReference;
43 import junit.framework.TestCase;
44 
45 /**
46  * Unit test for {@link AbstractScheduledService}.
47  *
48  * @author Luke Sandberg
49  */
50 
51 public class AbstractScheduledServiceTest extends TestCase {
52 
53   volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
54   volatile ScheduledFuture<?> future = null;
55 
56   volatile boolean atFixedRateCalled = false;
57   volatile boolean withFixedDelayCalled = false;
58   volatile boolean scheduleCalled = false;
59 
60   final ScheduledExecutorService executor =
61       new ScheduledThreadPoolExecutor(10) {
62         @Override
63         public ScheduledFuture<?> scheduleWithFixedDelay(
64             Runnable command, long initialDelay, long delay, TimeUnit unit) {
65           return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
66         }
67       };
68 
testServiceStartStop()69   public void testServiceStartStop() throws Exception {
70     NullService service = new NullService();
71     service.startAsync().awaitRunning();
72     assertFalse(future.isDone());
73     service.stopAsync().awaitTerminated();
74     assertTrue(future.isCancelled());
75   }
76 
77   private class NullService extends AbstractScheduledService {
78     @Override
runOneIteration()79     protected void runOneIteration() throws Exception {}
80 
81     @Override
scheduler()82     protected Scheduler scheduler() {
83       return configuration;
84     }
85 
86     @Override
executor()87     protected ScheduledExecutorService executor() {
88       return executor;
89     }
90   }
91 
testFailOnExceptionFromRun()92   public void testFailOnExceptionFromRun() throws Exception {
93     TestService service = new TestService();
94     service.runException = new Exception();
95     service.startAsync().awaitRunning();
96     service.runFirstBarrier.await();
97     service.runSecondBarrier.await();
98     try {
99       future.get();
100       fail();
101     } catch (CancellationException expected) {
102     }
103     // An execution exception holds a runtime exception (from throwables.propagate) that holds our
104     // original exception.
105     assertEquals(service.runException, service.failureCause());
106     assertEquals(Service.State.FAILED, service.state());
107   }
108 
testFailOnExceptionFromStartUp()109   public void testFailOnExceptionFromStartUp() {
110     TestService service = new TestService();
111     service.startUpException = new Exception();
112     try {
113       service.startAsync().awaitRunning();
114       fail();
115     } catch (IllegalStateException e) {
116       assertEquals(service.startUpException, e.getCause());
117     }
118     assertEquals(0, service.numberOfTimesRunCalled.get());
119     assertEquals(Service.State.FAILED, service.state());
120   }
121 
testFailOnErrorFromStartUpListener()122   public void testFailOnErrorFromStartUpListener() throws InterruptedException {
123     final Error error = new Error();
124     final CountDownLatch latch = new CountDownLatch(1);
125     TestService service = new TestService();
126     service.addListener(
127         new Service.Listener() {
128           @Override
129           public void running() {
130             throw error;
131           }
132 
133           @Override
134           public void failed(State from, Throwable failure) {
135             assertEquals(State.RUNNING, from);
136             assertEquals(error, failure);
137             latch.countDown();
138           }
139         },
140         directExecutor());
141     service.startAsync();
142     latch.await();
143 
144     assertEquals(0, service.numberOfTimesRunCalled.get());
145     assertEquals(Service.State.FAILED, service.state());
146   }
147 
testFailOnExceptionFromShutDown()148   public void testFailOnExceptionFromShutDown() throws Exception {
149     TestService service = new TestService();
150     service.shutDownException = new Exception();
151     service.startAsync().awaitRunning();
152     service.runFirstBarrier.await();
153     service.stopAsync();
154     service.runSecondBarrier.await();
155     try {
156       service.awaitTerminated();
157       fail();
158     } catch (IllegalStateException e) {
159       assertEquals(service.shutDownException, e.getCause());
160     }
161     assertEquals(Service.State.FAILED, service.state());
162   }
163 
testRunOneIterationCalledMultipleTimes()164   public void testRunOneIterationCalledMultipleTimes() throws Exception {
165     TestService service = new TestService();
166     service.startAsync().awaitRunning();
167     for (int i = 1; i < 10; i++) {
168       service.runFirstBarrier.await();
169       assertEquals(i, service.numberOfTimesRunCalled.get());
170       service.runSecondBarrier.await();
171     }
172     service.runFirstBarrier.await();
173     service.stopAsync();
174     service.runSecondBarrier.await();
175     service.stopAsync().awaitTerminated();
176   }
177 
testExecutorOnlyCalledOnce()178   public void testExecutorOnlyCalledOnce() throws Exception {
179     TestService service = new TestService();
180     service.startAsync().awaitRunning();
181     // It should be called once during startup.
182     assertEquals(1, service.numberOfTimesExecutorCalled.get());
183     for (int i = 1; i < 10; i++) {
184       service.runFirstBarrier.await();
185       assertEquals(i, service.numberOfTimesRunCalled.get());
186       service.runSecondBarrier.await();
187     }
188     service.runFirstBarrier.await();
189     service.stopAsync();
190     service.runSecondBarrier.await();
191     service.stopAsync().awaitTerminated();
192     // Only called once overall.
193     assertEquals(1, service.numberOfTimesExecutorCalled.get());
194   }
195 
testDefaultExecutorIsShutdownWhenServiceIsStopped()196   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
197     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
198     AbstractScheduledService service =
199         new AbstractScheduledService() {
200           @Override
201           protected void runOneIteration() throws Exception {}
202 
203           @Override
204           protected ScheduledExecutorService executor() {
205             executor.set(super.executor());
206             return executor.get();
207           }
208 
209           @Override
210           protected Scheduler scheduler() {
211             return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
212           }
213         };
214 
215     service.startAsync();
216     assertFalse(service.executor().isShutdown());
217     service.awaitRunning();
218     service.stopAsync();
219     service.awaitTerminated();
220     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
221   }
222 
testDefaultExecutorIsShutdownWhenServiceFails()223   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
224     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
225     AbstractScheduledService service =
226         new AbstractScheduledService() {
227           @Override
228           protected void startUp() throws Exception {
229             throw new Exception("Failed");
230           }
231 
232           @Override
233           protected void runOneIteration() throws Exception {}
234 
235           @Override
236           protected ScheduledExecutorService executor() {
237             executor.set(super.executor());
238             return executor.get();
239           }
240 
241           @Override
242           protected Scheduler scheduler() {
243             return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
244           }
245         };
246 
247     try {
248       service.startAsync().awaitRunning();
249       fail("Expected service to fail during startup");
250     } catch (IllegalStateException expected) {
251     }
252 
253     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
254   }
255 
testSchedulerOnlyCalledOnce()256   public void testSchedulerOnlyCalledOnce() throws Exception {
257     TestService service = new TestService();
258     service.startAsync().awaitRunning();
259     // It should be called once during startup.
260     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
261     for (int i = 1; i < 10; i++) {
262       service.runFirstBarrier.await();
263       assertEquals(i, service.numberOfTimesRunCalled.get());
264       service.runSecondBarrier.await();
265     }
266     service.runFirstBarrier.await();
267     service.stopAsync();
268     service.runSecondBarrier.await();
269     service.awaitTerminated();
270     // Only called once overall.
271     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
272   }
273 
testTimeout()274   public void testTimeout() {
275     // Create a service whose executor will never run its commands
276     Service service =
277         new AbstractScheduledService() {
278           @Override
279           protected Scheduler scheduler() {
280             return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS);
281           }
282 
283           @Override
284           protected ScheduledExecutorService executor() {
285             return TestingExecutors.noOpScheduledExecutor();
286           }
287 
288           @Override
289           protected void runOneIteration() throws Exception {}
290 
291           @Override
292           protected String serviceName() {
293             return "Foo";
294           }
295         };
296     try {
297       service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
298       fail("Expected timeout");
299     } catch (TimeoutException e) {
300       assertThat(e)
301           .hasMessageThat()
302           .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
303     }
304   }
305 
306   private class TestService extends AbstractScheduledService {
307     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
308     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
309 
310     volatile boolean startUpCalled = false;
311     volatile boolean shutDownCalled = false;
312     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
313     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
314     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
315     volatile Exception runException = null;
316     volatile Exception startUpException = null;
317     volatile Exception shutDownException = null;
318 
319     @Override
runOneIteration()320     protected void runOneIteration() throws Exception {
321       assertTrue(startUpCalled);
322       assertFalse(shutDownCalled);
323       numberOfTimesRunCalled.incrementAndGet();
324       assertEquals(State.RUNNING, state());
325       runFirstBarrier.await();
326       runSecondBarrier.await();
327       if (runException != null) {
328         throw runException;
329       }
330     }
331 
332     @Override
startUp()333     protected void startUp() throws Exception {
334       assertFalse(startUpCalled);
335       assertFalse(shutDownCalled);
336       startUpCalled = true;
337       assertEquals(State.STARTING, state());
338       if (startUpException != null) {
339         throw startUpException;
340       }
341     }
342 
343     @Override
shutDown()344     protected void shutDown() throws Exception {
345       assertTrue(startUpCalled);
346       assertFalse(shutDownCalled);
347       shutDownCalled = true;
348       if (shutDownException != null) {
349         throw shutDownException;
350       }
351     }
352 
353     @Override
executor()354     protected ScheduledExecutorService executor() {
355       numberOfTimesExecutorCalled.incrementAndGet();
356       return executor;
357     }
358 
359     @Override
scheduler()360     protected Scheduler scheduler() {
361       numberOfTimesSchedulerCalled.incrementAndGet();
362       return configuration;
363     }
364   }
365 
366   public static class SchedulerTest extends TestCase {
367     // These constants are arbitrary and just used to make sure that the correct method is called
368     // with the correct parameters.
369     private static final int initialDelay = 10;
370     private static final int delay = 20;
371     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
372 
373     // Unique runnable object used for comparison.
374     final Runnable testRunnable =
375         new Runnable() {
376           @Override
377           public void run() {}
378         };
379     boolean called = false;
380 
assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)381     private void assertSingleCallWithCorrectParameters(
382         Runnable command, long initialDelay, long delay, TimeUnit unit) {
383       assertFalse(called); // only called once.
384       called = true;
385       assertEquals(SchedulerTest.initialDelay, initialDelay);
386       assertEquals(SchedulerTest.delay, delay);
387       assertEquals(SchedulerTest.unit, unit);
388       assertEquals(testRunnable, command);
389     }
390 
testFixedRateSchedule()391     public void testFixedRateSchedule() {
392       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
393       Cancellable unused =
394           schedule.schedule(
395               null,
396               new ScheduledThreadPoolExecutor(1) {
397                 @Override
398                 public ScheduledFuture<?> scheduleAtFixedRate(
399                     Runnable command, long initialDelay, long period, TimeUnit unit) {
400                   assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
401                   return new ThrowingScheduledFuture<>();
402                 }
403               },
404               testRunnable);
405       assertTrue(called);
406     }
407 
testFixedDelaySchedule()408     public void testFixedDelaySchedule() {
409       Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit);
410       Cancellable unused =
411           schedule.schedule(
412               null,
413               new ScheduledThreadPoolExecutor(10) {
414                 @Override
415                 public ScheduledFuture<?> scheduleWithFixedDelay(
416                     Runnable command, long initialDelay, long delay, TimeUnit unit) {
417                   assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
418                   return new ThrowingScheduledFuture<>();
419                 }
420               },
421               testRunnable);
422       assertTrue(called);
423     }
424 
425     private static final class ThrowingScheduledFuture<V> extends ForwardingFuture<V>
426         implements ScheduledFuture<V> {
427       @Override
delegate()428       protected Future<V> delegate() {
429         throw new UnsupportedOperationException("test should not care about this");
430       }
431 
432       @Override
getDelay(TimeUnit unit)433       public long getDelay(TimeUnit unit) {
434         throw new UnsupportedOperationException("test should not care about this");
435       }
436 
437       @Override
compareTo(Delayed other)438       public int compareTo(Delayed other) {
439         throw new UnsupportedOperationException("test should not care about this");
440       }
441     }
442 
443 
testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()444     public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
445         throws Exception {
446       TestAbstractScheduledCustomService service =
447           new TestAbstractScheduledCustomService() {
448             @Override
449             protected Scheduler scheduler() {
450               return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS);
451             }
452           };
453       service.startAsync().awaitRunning();
454       try {
455         service.firstBarrier.await(5, SECONDS);
456         fail();
457       } catch (TimeoutException expected) {
458       }
459       assertEquals(0, service.numIterations.get());
460       service.stopAsync();
461       service.awaitTerminated();
462     }
463 
464 
testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()465     public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
466         throws Exception {
467       TestAbstractScheduledCustomService service =
468           new TestAbstractScheduledCustomService() {
469             @Override
470             protected Scheduler scheduler() {
471               return new AbstractScheduledService.CustomScheduler() {
472                 @Override
473                 protected Schedule getNextSchedule() throws Exception {
474                   return new Schedule(Long.MAX_VALUE, SECONDS);
475                 }
476               };
477             }
478           };
479       service.startAsync().awaitRunning();
480       try {
481         service.firstBarrier.await(5, SECONDS);
482         fail();
483       } catch (TimeoutException expected) {
484       }
485       assertEquals(0, service.numIterations.get());
486       service.stopAsync();
487       service.awaitTerminated();
488     }
489 
490     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
491       public AtomicInteger scheduleCounter = new AtomicInteger(0);
492 
493       @Override
getNextSchedule()494       protected Schedule getNextSchedule() throws Exception {
495         scheduleCounter.incrementAndGet();
496         return new Schedule(0, TimeUnit.SECONDS);
497       }
498     }
499 
500 
testCustomSchedule_startStop()501     public void testCustomSchedule_startStop() throws Exception {
502       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
503       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
504       final AtomicBoolean shouldWait = new AtomicBoolean(true);
505       Runnable task =
506           new Runnable() {
507             @Override
508             public void run() {
509               try {
510                 if (shouldWait.get()) {
511                   firstBarrier.await();
512                   secondBarrier.await();
513                 }
514               } catch (Exception e) {
515                 throw new RuntimeException(e);
516               }
517             }
518           };
519       TestCustomScheduler scheduler = new TestCustomScheduler();
520       Cancellable future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
521       firstBarrier.await();
522       assertEquals(1, scheduler.scheduleCounter.get());
523       secondBarrier.await();
524       firstBarrier.await();
525       assertEquals(2, scheduler.scheduleCounter.get());
526       shouldWait.set(false);
527       secondBarrier.await();
528       future.cancel(false);
529     }
530 
531 
testCustomSchedulerServiceStop()532     public void testCustomSchedulerServiceStop() throws Exception {
533       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
534       service.startAsync().awaitRunning();
535       service.firstBarrier.await();
536       assertEquals(1, service.numIterations.get());
537       service.stopAsync();
538       service.secondBarrier.await();
539       service.awaitTerminated();
540       // Sleep for a while just to ensure that our task wasn't called again.
541       Thread.sleep(unit.toMillis(3 * delay));
542       assertEquals(1, service.numIterations.get());
543     }
544 
545 
testCustomScheduler_deadlock()546     public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
547       final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
548       // This will flakily deadlock, so run it multiple times to increase the flake likelihood
549       for (int i = 0; i < 1000; i++) {
550         Service service =
551             new AbstractScheduledService() {
552               @Override
553               protected void runOneIteration() {}
554 
555               @Override
556               protected Scheduler scheduler() {
557                 return new CustomScheduler() {
558                   @Override
559                   protected Schedule getNextSchedule() throws Exception {
560                     if (state() != State.STARTING) {
561                       inGetNextSchedule.await();
562                       Thread.yield();
563                       throw new RuntimeException("boom");
564                     }
565                     return new Schedule(0, TimeUnit.NANOSECONDS);
566                   }
567                 };
568               }
569             };
570         service.startAsync().awaitRunning();
571         inGetNextSchedule.await();
572         service.stopAsync();
573       }
574     }
575 
576 
testBig()577     public void testBig() throws Exception {
578       TestAbstractScheduledCustomService service =
579           new TestAbstractScheduledCustomService() {
580             @Override
581             protected Scheduler scheduler() {
582               return new AbstractScheduledService.CustomScheduler() {
583                 @Override
584                 protected Schedule getNextSchedule() throws Exception {
585                   // Explicitly yield to increase the probability of a pathological scheduling.
586                   Thread.yield();
587                   return new Schedule(0, TimeUnit.SECONDS);
588                 }
589               };
590             }
591           };
592       service.useBarriers = false;
593       service.startAsync().awaitRunning();
594       Thread.sleep(50);
595       service.useBarriers = true;
596       service.firstBarrier.await();
597       int numIterations = service.numIterations.get();
598       service.stopAsync();
599       service.secondBarrier.await();
600       service.awaitTerminated();
601       assertEquals(numIterations, service.numIterations.get());
602     }
603 
604     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
605       final AtomicInteger numIterations = new AtomicInteger(0);
606       volatile boolean useBarriers = true;
607       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
608       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
609 
610       @Override
runOneIteration()611       protected void runOneIteration() throws Exception {
612         numIterations.incrementAndGet();
613         if (useBarriers) {
614           firstBarrier.await();
615           secondBarrier.await();
616         }
617       }
618 
619       @Override
executor()620       protected ScheduledExecutorService executor() {
621         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
622         return Executors.newScheduledThreadPool(10);
623       }
624 
625       @Override
scheduler()626       protected Scheduler scheduler() {
627         return new CustomScheduler() {
628           @Override
629           protected Schedule getNextSchedule() throws Exception {
630             return new Schedule(delay, unit);
631           }
632         };
633       }
634     }
635 
636 
testCustomSchedulerFailure()637     public void testCustomSchedulerFailure() throws Exception {
638       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
639       service.startAsync().awaitRunning();
640       for (int i = 1; i < 4; i++) {
641         service.firstBarrier.await();
642         assertEquals(i, service.numIterations.get());
643         service.secondBarrier.await();
644       }
645       Thread.sleep(1000);
646       try {
647         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
648         fail();
649       } catch (IllegalStateException e) {
650         assertEquals(State.FAILED, service.state());
651       }
652     }
653 
654     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
655       final AtomicInteger numIterations = new AtomicInteger(0);
656       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
657       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
658 
659       @Override
runOneIteration()660       protected void runOneIteration() throws Exception {
661         numIterations.incrementAndGet();
662         firstBarrier.await();
663         secondBarrier.await();
664       }
665 
666       @Override
executor()667       protected ScheduledExecutorService executor() {
668         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
669         return Executors.newScheduledThreadPool(10);
670       }
671 
672       @Override
scheduler()673       protected Scheduler scheduler() {
674         return new CustomScheduler() {
675           @Override
676           protected Schedule getNextSchedule() throws Exception {
677             if (numIterations.get() > 2) {
678               throw new IllegalStateException("Failed");
679             }
680             return new Schedule(delay, unit);
681           }
682         };
683       }
684     }
685   }
686 }
687