• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule;
21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22 import static java.util.concurrent.TimeUnit.MILLISECONDS;
23 import static java.util.concurrent.TimeUnit.NANOSECONDS;
24 import static java.util.concurrent.TimeUnit.SECONDS;
25 import static org.junit.Assert.assertThrows;
26 
27 import com.google.common.util.concurrent.AbstractScheduledService.Cancellable;
28 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
29 import com.google.common.util.concurrent.Service.State;
30 import com.google.common.util.concurrent.testing.TestingExecutors;
31 import java.util.concurrent.BrokenBarrierException;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.CyclicBarrier;
35 import java.util.concurrent.Delayed;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledFuture;
40 import java.util.concurrent.ScheduledThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.concurrent.atomic.AtomicReference;
46 import junit.framework.TestCase;
47 import org.checkerframework.checker.nullness.qual.Nullable;
48 
49 /**
50  * Unit test for {@link AbstractScheduledService}.
51  *
52  * @author Luke Sandberg
53  */
54 public class AbstractScheduledServiceTest extends TestCase {
55 
56   volatile Scheduler configuration = newFixedDelaySchedule(0, 10, MILLISECONDS);
57   volatile @Nullable ScheduledFuture<?> future = null;
58 
59   volatile boolean atFixedRateCalled = false;
60   volatile boolean withFixedDelayCalled = false;
61   volatile boolean scheduleCalled = false;
62 
63   final ScheduledExecutorService executor =
64       new ScheduledThreadPoolExecutor(10) {
65         @Override
66         public ScheduledFuture<?> scheduleWithFixedDelay(
67             Runnable command, long initialDelay, long delay, TimeUnit unit) {
68           return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
69         }
70       };
71 
testServiceStartStop()72   public void testServiceStartStop() throws Exception {
73     NullService service = new NullService();
74     service.startAsync().awaitRunning();
75     assertFalse(future.isDone());
76     service.stopAsync().awaitTerminated();
77     assertTrue(future.isCancelled());
78   }
79 
80   private class NullService extends AbstractScheduledService {
81     @Override
runOneIteration()82     protected void runOneIteration() throws Exception {}
83 
84     @Override
scheduler()85     protected Scheduler scheduler() {
86       return configuration;
87     }
88 
89     @Override
executor()90     protected ScheduledExecutorService executor() {
91       return executor;
92     }
93   }
94 
testFailOnExceptionFromRun()95   public void testFailOnExceptionFromRun() throws Exception {
96     TestService service = new TestService();
97     service.runException = new Exception();
98     service.startAsync().awaitRunning();
99     service.runFirstBarrier.await();
100     service.runSecondBarrier.await();
101     assertThrows(CancellationException.class, () -> future.get());
102     // An execution exception holds a runtime exception (from throwables.propagate) that holds our
103     // original exception.
104     assertEquals(service.runException, service.failureCause());
105     assertEquals(Service.State.FAILED, service.state());
106   }
107 
testFailOnExceptionFromStartUp()108   public void testFailOnExceptionFromStartUp() {
109     TestService service = new TestService();
110     service.startUpException = new Exception();
111     IllegalStateException e =
112         assertThrows(IllegalStateException.class, () -> service.startAsync().awaitRunning());
113     assertThat(e).hasCauseThat().isEqualTo(service.startUpException);
114     assertEquals(0, service.numberOfTimesRunCalled.get());
115     assertEquals(Service.State.FAILED, service.state());
116   }
117 
testFailOnErrorFromStartUpListener()118   public void testFailOnErrorFromStartUpListener() throws InterruptedException {
119     final Error error = new Error();
120     final CountDownLatch latch = new CountDownLatch(1);
121     TestService service = new TestService();
122     service.addListener(
123         new Service.Listener() {
124           @Override
125           public void running() {
126             throw error;
127           }
128 
129           @Override
130           public void failed(State from, Throwable failure) {
131             assertEquals(State.RUNNING, from);
132             assertEquals(error, failure);
133             latch.countDown();
134           }
135         },
136         directExecutor());
137     service.startAsync();
138     latch.await();
139 
140     assertEquals(0, service.numberOfTimesRunCalled.get());
141     assertEquals(Service.State.FAILED, service.state());
142   }
143 
testFailOnExceptionFromShutDown()144   public void testFailOnExceptionFromShutDown() throws Exception {
145     TestService service = new TestService();
146     service.shutDownException = new Exception();
147     service.startAsync().awaitRunning();
148     service.runFirstBarrier.await();
149     service.stopAsync();
150     service.runSecondBarrier.await();
151     IllegalStateException e =
152         assertThrows(IllegalStateException.class, () -> service.awaitTerminated());
153     assertThat(e).hasCauseThat().isEqualTo(service.shutDownException);
154     assertEquals(Service.State.FAILED, service.state());
155   }
156 
testRunOneIterationCalledMultipleTimes()157   public void testRunOneIterationCalledMultipleTimes() throws Exception {
158     TestService service = new TestService();
159     service.startAsync().awaitRunning();
160     for (int i = 1; i < 10; i++) {
161       service.runFirstBarrier.await();
162       assertEquals(i, service.numberOfTimesRunCalled.get());
163       service.runSecondBarrier.await();
164     }
165     service.runFirstBarrier.await();
166     service.stopAsync();
167     service.runSecondBarrier.await();
168     service.stopAsync().awaitTerminated();
169   }
170 
testExecutorOnlyCalledOnce()171   public void testExecutorOnlyCalledOnce() throws Exception {
172     TestService service = new TestService();
173     service.startAsync().awaitRunning();
174     // It should be called once during startup.
175     assertEquals(1, service.numberOfTimesExecutorCalled.get());
176     for (int i = 1; i < 10; i++) {
177       service.runFirstBarrier.await();
178       assertEquals(i, service.numberOfTimesRunCalled.get());
179       service.runSecondBarrier.await();
180     }
181     service.runFirstBarrier.await();
182     service.stopAsync();
183     service.runSecondBarrier.await();
184     service.stopAsync().awaitTerminated();
185     // Only called once overall.
186     assertEquals(1, service.numberOfTimesExecutorCalled.get());
187   }
188 
testDefaultExecutorIsShutdownWhenServiceIsStopped()189   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
190     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
191     AbstractScheduledService service =
192         new AbstractScheduledService() {
193           @Override
194           protected void runOneIteration() throws Exception {}
195 
196           @Override
197           protected ScheduledExecutorService executor() {
198             executor.set(super.executor());
199             return executor.get();
200           }
201 
202           @Override
203           protected Scheduler scheduler() {
204             return newFixedDelaySchedule(0, 1, MILLISECONDS);
205           }
206         };
207 
208     service.startAsync();
209     assertFalse(service.executor().isShutdown());
210     service.awaitRunning();
211     service.stopAsync();
212     service.awaitTerminated();
213     assertTrue(executor.get().awaitTermination(100, MILLISECONDS));
214   }
215 
testDefaultExecutorIsShutdownWhenServiceFails()216   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
217     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
218     AbstractScheduledService service =
219         new AbstractScheduledService() {
220           @Override
221           protected void startUp() throws Exception {
222             throw new Exception("Failed");
223           }
224 
225           @Override
226           protected void runOneIteration() throws Exception {}
227 
228           @Override
229           protected ScheduledExecutorService executor() {
230             executor.set(super.executor());
231             return executor.get();
232           }
233 
234           @Override
235           protected Scheduler scheduler() {
236             return newFixedDelaySchedule(0, 1, MILLISECONDS);
237           }
238         };
239 
240     assertThrows(IllegalStateException.class, () -> service.startAsync().awaitRunning());
241 
242     assertTrue(executor.get().awaitTermination(100, MILLISECONDS));
243   }
244 
testSchedulerOnlyCalledOnce()245   public void testSchedulerOnlyCalledOnce() throws Exception {
246     TestService service = new TestService();
247     service.startAsync().awaitRunning();
248     // It should be called once during startup.
249     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
250     for (int i = 1; i < 10; i++) {
251       service.runFirstBarrier.await();
252       assertEquals(i, service.numberOfTimesRunCalled.get());
253       service.runSecondBarrier.await();
254     }
255     service.runFirstBarrier.await();
256     service.stopAsync();
257     service.runSecondBarrier.await();
258     service.awaitTerminated();
259     // Only called once overall.
260     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
261   }
262 
testTimeout()263   public void testTimeout() {
264     // Create a service whose executor will never run its commands
265     Service service =
266         new AbstractScheduledService() {
267           @Override
268           protected Scheduler scheduler() {
269             return Scheduler.newFixedDelaySchedule(0, 1, NANOSECONDS);
270           }
271 
272           @Override
273           protected ScheduledExecutorService executor() {
274             return TestingExecutors.noOpScheduledExecutor();
275           }
276 
277           @Override
278           protected void runOneIteration() throws Exception {}
279 
280           @Override
281           protected String serviceName() {
282             return "Foo";
283           }
284         };
285     TimeoutException e =
286         assertThrows(
287             TimeoutException.class, () -> service.startAsync().awaitRunning(1, MILLISECONDS));
288     assertThat(e)
289         .hasMessageThat()
290         .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
291   }
292 
293   private class TestService extends AbstractScheduledService {
294     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
295     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
296 
297     volatile boolean startUpCalled = false;
298     volatile boolean shutDownCalled = false;
299     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
300     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
301     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
302     volatile @Nullable Exception runException = null;
303     volatile @Nullable Exception startUpException = null;
304     volatile @Nullable Exception shutDownException = null;
305 
306     @Override
runOneIteration()307     protected void runOneIteration() throws Exception {
308       assertTrue(startUpCalled);
309       assertFalse(shutDownCalled);
310       numberOfTimesRunCalled.incrementAndGet();
311       assertEquals(State.RUNNING, state());
312       runFirstBarrier.await();
313       runSecondBarrier.await();
314       if (runException != null) {
315         throw runException;
316       }
317     }
318 
319     @Override
startUp()320     protected void startUp() throws Exception {
321       assertFalse(startUpCalled);
322       assertFalse(shutDownCalled);
323       startUpCalled = true;
324       assertEquals(State.STARTING, state());
325       if (startUpException != null) {
326         throw startUpException;
327       }
328     }
329 
330     @Override
shutDown()331     protected void shutDown() throws Exception {
332       assertTrue(startUpCalled);
333       assertFalse(shutDownCalled);
334       shutDownCalled = true;
335       if (shutDownException != null) {
336         throw shutDownException;
337       }
338     }
339 
340     @Override
executor()341     protected ScheduledExecutorService executor() {
342       numberOfTimesExecutorCalled.incrementAndGet();
343       return executor;
344     }
345 
346     @Override
scheduler()347     protected Scheduler scheduler() {
348       numberOfTimesSchedulerCalled.incrementAndGet();
349       return configuration;
350     }
351   }
352 
353   // Tests for Scheduler:
354 
355   // These constants are arbitrary and just used to make sure that the correct method is called
356   // with the correct parameters.
357   private static final int INITIAL_DELAY = 10;
358   private static final int DELAY = 20;
359   private static final TimeUnit UNIT = MILLISECONDS;
360 
361   // Unique runnable object used for comparison.
362   final Runnable testRunnable =
363       new Runnable() {
364         @Override
365         public void run() {}
366       };
367   boolean called = false;
368 
assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)369   private void assertSingleCallWithCorrectParameters(
370       Runnable command, long initialDelay, long delay, TimeUnit unit) {
371     assertFalse(called); // only called once.
372     called = true;
373     assertEquals(INITIAL_DELAY, initialDelay);
374     assertEquals(DELAY, delay);
375     assertEquals(UNIT, unit);
376     assertEquals(testRunnable, command);
377   }
378 
testFixedRateSchedule()379   public void testFixedRateSchedule() {
380     Scheduler schedule = Scheduler.newFixedRateSchedule(INITIAL_DELAY, DELAY, UNIT);
381     Cancellable unused =
382         schedule.schedule(
383             null,
384             new ScheduledThreadPoolExecutor(1) {
385               @Override
386               public ScheduledFuture<?> scheduleAtFixedRate(
387                   Runnable command, long initialDelay, long period, TimeUnit unit) {
388                 assertSingleCallWithCorrectParameters(command, initialDelay, period, unit);
389                 return new ThrowingScheduledFuture<>();
390               }
391             },
392             testRunnable);
393     assertTrue(called);
394   }
395 
testFixedDelaySchedule()396   public void testFixedDelaySchedule() {
397     Scheduler schedule = newFixedDelaySchedule(INITIAL_DELAY, DELAY, UNIT);
398     Cancellable unused =
399         schedule.schedule(
400             null,
401             new ScheduledThreadPoolExecutor(10) {
402               @Override
403               public ScheduledFuture<?> scheduleWithFixedDelay(
404                   Runnable command, long initialDelay, long delay, TimeUnit unit) {
405                 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
406                 return new ThrowingScheduledFuture<>();
407               }
408             },
409             testRunnable);
410     assertTrue(called);
411   }
412 
413   private static final class ThrowingScheduledFuture<V> extends ForwardingFuture<V>
414       implements ScheduledFuture<V> {
415     @Override
delegate()416     protected Future<V> delegate() {
417       throw new UnsupportedOperationException("test should not care about this");
418     }
419 
420     @Override
getDelay(TimeUnit unit)421     public long getDelay(TimeUnit unit) {
422       throw new UnsupportedOperationException("test should not care about this");
423     }
424 
425     @Override
compareTo(Delayed other)426     public int compareTo(Delayed other) {
427       throw new UnsupportedOperationException("test should not care about this");
428     }
429   }
430 
testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()431   public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
432       throws Exception {
433     TestAbstractScheduledCustomService service =
434         new TestAbstractScheduledCustomService() {
435           @Override
436           protected Scheduler scheduler() {
437             return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS);
438           }
439         };
440     service.startAsync().awaitRunning();
441     assertThrows(TimeoutException.class, () -> service.firstBarrier.await(5, SECONDS));
442     assertEquals(0, service.numIterations.get());
443     service.stopAsync();
444     service.awaitTerminated();
445   }
446 
testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()447   public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
448       throws Exception {
449     TestAbstractScheduledCustomService service =
450         new TestAbstractScheduledCustomService() {
451           @Override
452           protected Scheduler scheduler() {
453             return new AbstractScheduledService.CustomScheduler() {
454               @Override
455               protected Schedule getNextSchedule() throws Exception {
456                 return new Schedule(Long.MAX_VALUE, SECONDS);
457               }
458             };
459           }
460         };
461     service.startAsync().awaitRunning();
462     assertThrows(TimeoutException.class, () -> service.firstBarrier.await(5, SECONDS));
463     assertEquals(0, service.numIterations.get());
464     service.stopAsync();
465     service.awaitTerminated();
466   }
467 
468   private static class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
469     public AtomicInteger scheduleCounter = new AtomicInteger(0);
470 
471     @Override
getNextSchedule()472     protected Schedule getNextSchedule() throws Exception {
473       scheduleCounter.incrementAndGet();
474       return new Schedule(0, SECONDS);
475     }
476   }
477 
testCustomSchedule_startStop()478   public void testCustomSchedule_startStop() throws Exception {
479     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
480     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
481     final AtomicBoolean shouldWait = new AtomicBoolean(true);
482     Runnable task =
483         new Runnable() {
484           @Override
485           public void run() {
486             try {
487               if (shouldWait.get()) {
488                 firstBarrier.await();
489                 secondBarrier.await();
490               }
491             } catch (Exception e) {
492               throw new RuntimeException(e);
493             }
494           }
495         };
496     TestCustomScheduler scheduler = new TestCustomScheduler();
497     Cancellable future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
498     firstBarrier.await();
499     assertEquals(1, scheduler.scheduleCounter.get());
500     secondBarrier.await();
501     firstBarrier.await();
502     assertEquals(2, scheduler.scheduleCounter.get());
503     shouldWait.set(false);
504     secondBarrier.await();
505     future.cancel(false);
506   }
507 
testCustomSchedulerServiceStop()508   public void testCustomSchedulerServiceStop() throws Exception {
509     TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
510     service.startAsync().awaitRunning();
511     service.firstBarrier.await();
512     assertEquals(1, service.numIterations.get());
513     service.stopAsync();
514     service.secondBarrier.await();
515     service.awaitTerminated();
516     // Sleep for a while just to ensure that our task wasn't called again.
517     Thread.sleep(UNIT.toMillis(3 * DELAY));
518     assertEquals(1, service.numIterations.get());
519   }
520 
testCustomScheduler_deadlock()521   public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
522     final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
523     // This will flakily deadlock, so run it multiple times to increase the flake likelihood
524     for (int i = 0; i < 1000; i++) {
525       Service service =
526           new AbstractScheduledService() {
527             @Override
528             protected void runOneIteration() {}
529 
530             @Override
531             protected Scheduler scheduler() {
532               return new CustomScheduler() {
533                 @Override
534                 protected Schedule getNextSchedule() throws Exception {
535                   if (state() != State.STARTING) {
536                     inGetNextSchedule.await();
537                     Thread.yield();
538                     throw new RuntimeException("boom");
539                   }
540                   return new Schedule(0, NANOSECONDS);
541                 }
542               };
543             }
544           };
545       service.startAsync().awaitRunning();
546       inGetNextSchedule.await();
547       service.stopAsync();
548     }
549   }
550 
testBig()551   public void testBig() throws Exception {
552     TestAbstractScheduledCustomService service =
553         new TestAbstractScheduledCustomService() {
554           @Override
555           protected Scheduler scheduler() {
556             return new AbstractScheduledService.CustomScheduler() {
557               @Override
558               protected Schedule getNextSchedule() throws Exception {
559                 // Explicitly yield to increase the probability of a pathological scheduling.
560                 Thread.yield();
561                 return new Schedule(0, SECONDS);
562               }
563             };
564           }
565         };
566     service.useBarriers = false;
567     service.startAsync().awaitRunning();
568     Thread.sleep(50);
569     service.useBarriers = true;
570     service.firstBarrier.await();
571     int numIterations = service.numIterations.get();
572     service.stopAsync();
573     service.secondBarrier.await();
574     service.awaitTerminated();
575     assertEquals(numIterations, service.numIterations.get());
576   }
577 
578   private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
579     final AtomicInteger numIterations = new AtomicInteger(0);
580     volatile boolean useBarriers = true;
581     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
582     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
583 
584     @Override
runOneIteration()585     protected void runOneIteration() throws Exception {
586       numIterations.incrementAndGet();
587       if (useBarriers) {
588         firstBarrier.await();
589         secondBarrier.await();
590       }
591     }
592 
593     @Override
executor()594     protected ScheduledExecutorService executor() {
595       // use a bunch of threads so that weird overlapping schedules are more likely to happen.
596       return Executors.newScheduledThreadPool(10);
597     }
598 
599     @Override
scheduler()600     protected Scheduler scheduler() {
601       return new CustomScheduler() {
602         @Override
603         protected Schedule getNextSchedule() throws Exception {
604           return new Schedule(DELAY, UNIT);
605         }
606       };
607     }
608   }
609 
610   public void testCustomSchedulerFailure() throws Exception {
611     TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
612     service.startAsync().awaitRunning();
613     for (int i = 1; i < 4; i++) {
614       service.firstBarrier.await();
615       assertEquals(i, service.numIterations.get());
616       service.secondBarrier.await();
617     }
618     Thread.sleep(1000);
619     assertThrows(
620         IllegalStateException.class, () -> service.stopAsync().awaitTerminated(100, SECONDS));
621     assertEquals(State.FAILED, service.state());
622   }
623 
624   private static class TestFailingCustomScheduledService extends AbstractScheduledService {
625     final AtomicInteger numIterations = new AtomicInteger(0);
626     final CyclicBarrier firstBarrier = new CyclicBarrier(2);
627     final CyclicBarrier secondBarrier = new CyclicBarrier(2);
628 
629     @Override
630     protected void runOneIteration() throws Exception {
631       numIterations.incrementAndGet();
632       firstBarrier.await();
633       secondBarrier.await();
634     }
635 
636     @Override
637     protected ScheduledExecutorService executor() {
638       // use a bunch of threads so that weird overlapping schedules are more likely to happen.
639       return Executors.newScheduledThreadPool(10);
640     }
641 
642     @Override
643     protected Scheduler scheduler() {
644       return new CustomScheduler() {
645         @Override
646         protected Schedule getNextSchedule() throws Exception {
647           if (numIterations.get() > 2) {
648             throw new IllegalStateException("Failed");
649           }
650           return new Schedule(DELAY, UNIT);
651         }
652       };
653     }
654   }
655 }
656