• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.base.StandardSystemProperty.JAVA_SPECIFICATION_VERSION;
20 import static com.google.common.base.StandardSystemProperty.OS_NAME;
21 import static com.google.common.truth.Truth.assertThat;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static java.util.Arrays.asList;
24 import static java.util.concurrent.TimeUnit.SECONDS;
25 import static org.junit.Assert.assertThrows;
26 
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.collect.Lists;
30 import com.google.common.collect.Sets;
31 import com.google.common.testing.NullPointerTester;
32 import com.google.common.testing.TestLogHandler;
33 import com.google.common.util.concurrent.Service.State;
34 import com.google.common.util.concurrent.ServiceManager.Listener;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.Set;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.Executor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.logging.Formatter;
44 import java.util.logging.Level;
45 import java.util.logging.LogRecord;
46 import java.util.logging.Logger;
47 import junit.framework.TestCase;
48 
49 /**
50  * Tests for {@link ServiceManager}.
51  *
52  * @author Luke Sandberg
53  * @author Chris Nokleberg
54  */
55 public class ServiceManagerTest extends TestCase {
56 
57   private static class NoOpService extends AbstractService {
58     @Override
doStart()59     protected void doStart() {
60       notifyStarted();
61     }
62 
63     @Override
doStop()64     protected void doStop() {
65       notifyStopped();
66     }
67   }
68 
69   /*
70    * A NoOp service that will delay the startup and shutdown notification for a configurable amount
71    * of time.
72    */
73   private static class NoOpDelayedService extends NoOpService {
74     private long delay;
75 
NoOpDelayedService(long delay)76     public NoOpDelayedService(long delay) {
77       this.delay = delay;
78     }
79 
80     @Override
doStart()81     protected void doStart() {
82       new Thread() {
83         @Override
84         public void run() {
85           Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
86           notifyStarted();
87         }
88       }.start();
89     }
90 
91     @Override
doStop()92     protected void doStop() {
93       new Thread() {
94         @Override
95         public void run() {
96           Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
97           notifyStopped();
98         }
99       }.start();
100     }
101   }
102 
103   private static class FailStartService extends NoOpService {
104     @Override
doStart()105     protected void doStart() {
106       notifyFailed(new IllegalStateException("start failure"));
107     }
108   }
109 
110   private static class FailRunService extends NoOpService {
111     @Override
doStart()112     protected void doStart() {
113       super.doStart();
114       notifyFailed(new IllegalStateException("run failure"));
115     }
116   }
117 
118   private static class FailStopService extends NoOpService {
119     @Override
doStop()120     protected void doStop() {
121       notifyFailed(new IllegalStateException("stop failure"));
122     }
123   }
124 
testServiceStartupTimes()125   public void testServiceStartupTimes() {
126     if (isWindows() && isJava8()) {
127       // Flaky there: https://github.com/google/guava/pull/6731#issuecomment-1736298607
128       return;
129     }
130     Service a = new NoOpDelayedService(150);
131     Service b = new NoOpDelayedService(353);
132     ServiceManager serviceManager = new ServiceManager(asList(a, b));
133     serviceManager.startAsync().awaitHealthy();
134     ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
135     assertThat(startupTimes).hasSize(2);
136     assertThat(startupTimes.get(a)).isAtLeast(150);
137     assertThat(startupTimes.get(b)).isAtLeast(353);
138   }
139 
testServiceStartupTimes_selfStartingServices()140   public void testServiceStartupTimes_selfStartingServices() {
141     // This tests to ensure that:
142     // 1. service times are accurate when the service is started by the manager
143     // 2. service times are recorded when the service is not started by the manager (but they may
144     // not be accurate).
145     final Service b =
146         new NoOpDelayedService(353) {
147           @Override
148           protected void doStart() {
149             super.doStart();
150             // This will delay service listener execution at least 150 milliseconds
151             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
152           }
153         };
154     Service a =
155         new NoOpDelayedService(150) {
156           @Override
157           protected void doStart() {
158             b.startAsync();
159             super.doStart();
160           }
161         };
162     ServiceManager serviceManager = new ServiceManager(asList(a, b));
163     serviceManager.startAsync().awaitHealthy();
164     ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
165     assertThat(startupTimes).hasSize(2);
166     assertThat(startupTimes.get(a)).isAtLeast(150);
167     // Service b startup takes at least 353 millis, but starting the timer is delayed by at least
168     // 150 milliseconds. so in a perfect world the timing would be 353-150=203ms, but since either
169     // of our sleep calls can be arbitrarily delayed we should just assert that there is a time
170     // recorded.
171     assertThat(startupTimes.get(b)).isNotNull();
172   }
173 
testServiceStartStop()174   public void testServiceStartStop() {
175     Service a = new NoOpService();
176     Service b = new NoOpService();
177     ServiceManager manager = new ServiceManager(asList(a, b));
178     RecordingListener listener = new RecordingListener();
179     manager.addListener(listener, directExecutor());
180     assertState(manager, Service.State.NEW, a, b);
181     assertFalse(manager.isHealthy());
182     manager.startAsync().awaitHealthy();
183     assertState(manager, Service.State.RUNNING, a, b);
184     assertTrue(manager.isHealthy());
185     assertTrue(listener.healthyCalled);
186     assertFalse(listener.stoppedCalled);
187     assertTrue(listener.failedServices.isEmpty());
188     manager.stopAsync().awaitStopped();
189     assertState(manager, Service.State.TERMINATED, a, b);
190     assertFalse(manager.isHealthy());
191     assertTrue(listener.stoppedCalled);
192     assertTrue(listener.failedServices.isEmpty());
193   }
194 
testFailStart()195   public void testFailStart() throws Exception {
196     Service a = new NoOpService();
197     Service b = new FailStartService();
198     Service c = new NoOpService();
199     Service d = new FailStartService();
200     Service e = new NoOpService();
201     ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
202     RecordingListener listener = new RecordingListener();
203     manager.addListener(listener, directExecutor());
204     assertState(manager, Service.State.NEW, a, b, c, d, e);
205     assertThrows(IllegalStateException.class, () -> manager.startAsync().awaitHealthy());
206     assertFalse(listener.healthyCalled);
207     assertState(manager, Service.State.RUNNING, a, c, e);
208     assertEquals(ImmutableSet.of(b, d), listener.failedServices);
209     assertState(manager, Service.State.FAILED, b, d);
210     assertFalse(manager.isHealthy());
211 
212     manager.stopAsync().awaitStopped();
213     assertFalse(manager.isHealthy());
214     assertFalse(listener.healthyCalled);
215     assertTrue(listener.stoppedCalled);
216   }
217 
testFailRun()218   public void testFailRun() throws Exception {
219     Service a = new NoOpService();
220     Service b = new FailRunService();
221     ServiceManager manager = new ServiceManager(asList(a, b));
222     RecordingListener listener = new RecordingListener();
223     manager.addListener(listener, directExecutor());
224     assertState(manager, Service.State.NEW, a, b);
225     assertThrows(IllegalStateException.class, () -> manager.startAsync().awaitHealthy());
226     assertTrue(listener.healthyCalled);
227     assertEquals(ImmutableSet.of(b), listener.failedServices);
228 
229     manager.stopAsync().awaitStopped();
230     assertState(manager, Service.State.FAILED, b);
231     assertState(manager, Service.State.TERMINATED, a);
232 
233     assertTrue(listener.stoppedCalled);
234   }
235 
testFailStop()236   public void testFailStop() throws Exception {
237     Service a = new NoOpService();
238     Service b = new FailStopService();
239     Service c = new NoOpService();
240     ServiceManager manager = new ServiceManager(asList(a, b, c));
241     RecordingListener listener = new RecordingListener();
242     manager.addListener(listener, directExecutor());
243 
244     manager.startAsync().awaitHealthy();
245     assertTrue(listener.healthyCalled);
246     assertFalse(listener.stoppedCalled);
247     manager.stopAsync().awaitStopped();
248 
249     assertTrue(listener.stoppedCalled);
250     assertEquals(ImmutableSet.of(b), listener.failedServices);
251     assertState(manager, Service.State.FAILED, b);
252     assertState(manager, Service.State.TERMINATED, a, c);
253   }
254 
testToString()255   public void testToString() throws Exception {
256     Service a = new NoOpService();
257     Service b = new FailStartService();
258     ServiceManager manager = new ServiceManager(asList(a, b));
259     String toString = manager.toString();
260     assertThat(toString).contains("NoOpService");
261     assertThat(toString).contains("FailStartService");
262   }
263 
testTimeouts()264   public void testTimeouts() throws Exception {
265     Service a = new NoOpDelayedService(50);
266     ServiceManager manager = new ServiceManager(asList(a));
267     manager.startAsync();
268     assertThrows(TimeoutException.class, () -> manager.awaitHealthy(1, TimeUnit.MILLISECONDS));
269     manager.awaitHealthy(5, SECONDS); // no exception thrown
270 
271     manager.stopAsync();
272     assertThrows(TimeoutException.class, () -> manager.awaitStopped(1, TimeUnit.MILLISECONDS));
273     manager.awaitStopped(5, SECONDS); // no exception thrown
274   }
275 
276   /**
277    * This covers a case where if the last service to stop failed then the stopped callback would
278    * never be called.
279    */
testSingleFailedServiceCallsStopped()280   public void testSingleFailedServiceCallsStopped() {
281     Service a = new FailStartService();
282     ServiceManager manager = new ServiceManager(asList(a));
283     RecordingListener listener = new RecordingListener();
284     manager.addListener(listener, directExecutor());
285     assertThrows(IllegalStateException.class, () -> manager.startAsync().awaitHealthy());
286     assertTrue(listener.stoppedCalled);
287   }
288 
289   /**
290    * This covers a bug where listener.healthy would get called when a single service failed during
291    * startup (it occurred in more complicated cases also).
292    */
testFailStart_singleServiceCallsHealthy()293   public void testFailStart_singleServiceCallsHealthy() {
294     Service a = new FailStartService();
295     ServiceManager manager = new ServiceManager(asList(a));
296     RecordingListener listener = new RecordingListener();
297     manager.addListener(listener, directExecutor());
298     assertThrows(IllegalStateException.class, () -> manager.startAsync().awaitHealthy());
299     assertFalse(listener.healthyCalled);
300   }
301 
302   /**
303    * This covers a bug where if a listener was installed that would stop the manager if any service
304    * fails and something failed during startup before service.start was called on all the services,
305    * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
306    * stop the timer(!).
307    */
testFailStart_stopOthers()308   public void testFailStart_stopOthers() throws TimeoutException {
309     Service a = new FailStartService();
310     Service b = new NoOpService();
311     final ServiceManager manager = new ServiceManager(asList(a, b));
312     manager.addListener(
313         new Listener() {
314           @Override
315           public void failure(Service service) {
316             manager.stopAsync();
317           }
318         },
319         directExecutor());
320     manager.startAsync();
321     manager.awaitStopped(10, TimeUnit.MILLISECONDS);
322   }
323 
testDoCancelStart()324   public void testDoCancelStart() throws TimeoutException {
325     Service a =
326         new AbstractService() {
327           @Override
328           protected void doStart() {
329             // Never starts!
330           }
331 
332           @Override
333           protected void doCancelStart() {
334             assertThat(state()).isEqualTo(Service.State.STOPPING);
335             notifyStopped();
336           }
337 
338           @Override
339           protected void doStop() {
340             throw new AssertionError(); // Should not be called.
341           }
342         };
343 
344     final ServiceManager manager = new ServiceManager(asList(a));
345     manager.startAsync();
346     manager.stopAsync();
347     manager.awaitStopped(10, TimeUnit.MILLISECONDS);
348     assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.TERMINATED);
349   }
350 
testNotifyStoppedAfterFailure()351   public void testNotifyStoppedAfterFailure() throws TimeoutException {
352     Service a =
353         new AbstractService() {
354           @Override
355           protected void doStart() {
356             notifyFailed(new IllegalStateException("start failure"));
357             notifyStopped(); // This will be a no-op.
358           }
359 
360           @Override
361           protected void doStop() {
362             notifyStopped();
363           }
364         };
365     final ServiceManager manager = new ServiceManager(asList(a));
366     manager.startAsync();
367     manager.awaitStopped(10, TimeUnit.MILLISECONDS);
368     assertThat(manager.servicesByState().keySet()).containsExactly(Service.State.FAILED);
369   }
370 
assertState( ServiceManager manager, Service.State state, Service... services)371   private static void assertState(
372       ServiceManager manager, Service.State state, Service... services) {
373     Collection<Service> managerServices = manager.servicesByState().get(state);
374     for (Service service : services) {
375       assertEquals(service.toString(), state, service.state());
376       assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
377       assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
378     }
379   }
380 
381   /**
382    * This is for covering a case where the ServiceManager would behave strangely if constructed with
383    * no service under management. Listeners would never fire because the ServiceManager was healthy
384    * and stopped at the same time. This test ensures that listeners fire and isHealthy makes sense.
385    */
testEmptyServiceManager()386   public void testEmptyServiceManager() {
387     Logger logger = Logger.getLogger(ServiceManager.class.getName());
388     logger.setLevel(Level.FINEST);
389     TestLogHandler logHandler = new TestLogHandler();
390     logger.addHandler(logHandler);
391     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
392     RecordingListener listener = new RecordingListener();
393     manager.addListener(listener, directExecutor());
394     manager.startAsync().awaitHealthy();
395     assertTrue(manager.isHealthy());
396     assertTrue(listener.healthyCalled);
397     assertFalse(listener.stoppedCalled);
398     assertTrue(listener.failedServices.isEmpty());
399     manager.stopAsync().awaitStopped();
400     assertFalse(manager.isHealthy());
401     assertTrue(listener.stoppedCalled);
402     assertTrue(listener.failedServices.isEmpty());
403     // check that our NoOpService is not directly observable via any of the inspection methods or
404     // via logging.
405     assertEquals("ServiceManager{services=[]}", manager.toString());
406     assertTrue(manager.servicesByState().isEmpty());
407     assertTrue(manager.startupTimes().isEmpty());
408     Formatter logFormatter =
409         new Formatter() {
410           @Override
411           public String format(LogRecord record) {
412             return formatMessage(record);
413           }
414         };
415     for (LogRecord record : logHandler.getStoredLogRecords()) {
416       assertThat(logFormatter.format(record)).doesNotContain("NoOpService");
417     }
418   }
419 
420   /**
421    * Tests that a ServiceManager can be fully shut down if one of its failure listeners is slow or
422    * even permanently blocked.
423    */
testListenerDeadlock()424   public void testListenerDeadlock() throws InterruptedException {
425     final CountDownLatch failEnter = new CountDownLatch(1);
426     final CountDownLatch failLeave = new CountDownLatch(1);
427     final CountDownLatch afterStarted = new CountDownLatch(1);
428     Service failRunService =
429         new AbstractService() {
430           @Override
431           protected void doStart() {
432             new Thread() {
433               @Override
434               public void run() {
435                 notifyStarted();
436                 // We need to wait for the main thread to leave the ServiceManager.startAsync call
437                 // to
438                 // ensure that the thread running the failure callbacks is not the main thread.
439                 Uninterruptibles.awaitUninterruptibly(afterStarted);
440                 notifyFailed(new Exception("boom"));
441               }
442             }.start();
443           }
444 
445           @Override
446           protected void doStop() {
447             notifyStopped();
448           }
449         };
450     final ServiceManager manager =
451         new ServiceManager(Arrays.asList(failRunService, new NoOpService()));
452     manager.addListener(
453         new ServiceManager.Listener() {
454           @Override
455           public void failure(Service service) {
456             failEnter.countDown();
457             // block until after the service manager is shutdown
458             Uninterruptibles.awaitUninterruptibly(failLeave);
459           }
460         },
461         directExecutor());
462     manager.startAsync();
463     afterStarted.countDown();
464     // We do not call awaitHealthy because, due to races, that method may throw an exception.  But
465     // we really just want to wait for the thread to be in the failure callback so we wait for that
466     // explicitly instead.
467     failEnter.await();
468     assertFalse("State should be updated before calling listeners", manager.isHealthy());
469     // now we want to stop the services.
470     Thread stoppingThread =
471         new Thread() {
472           @Override
473           public void run() {
474             manager.stopAsync().awaitStopped();
475           }
476         };
477     stoppingThread.start();
478     // this should be super fast since the only non-stopped service is a NoOpService
479     stoppingThread.join(1000);
480     assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive());
481     failLeave.countDown(); // release the background thread
482   }
483 
484   /**
485    * Catches a bug where when constructing a service manager failed, later interactions with the
486    * service could cause IllegalStateExceptions inside the partially constructed ServiceManager.
487    * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated
488    * the original error (which was not constructing ServiceManager correctly).
489    */
testPartiallyConstructedManager()490   public void testPartiallyConstructedManager() {
491     Logger logger = Logger.getLogger("global");
492     logger.setLevel(Level.FINEST);
493     TestLogHandler logHandler = new TestLogHandler();
494     logger.addHandler(logHandler);
495     NoOpService service = new NoOpService();
496     service.startAsync();
497     assertThrows(IllegalArgumentException.class, () -> new ServiceManager(Arrays.asList(service)));
498     service.stopAsync();
499     // Nothing was logged!
500     assertEquals(0, logHandler.getStoredLogRecords().size());
501   }
502 
testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady()503   public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() {
504     // The implementation of this test is pretty sensitive to the implementation :( but we want to
505     // ensure that if weird things happen during construction then we get exceptions.
506     final NoOpService service1 = new NoOpService();
507     // This service will start service1 when addListener is called.  This simulates service1 being
508     // started asynchronously.
509     Service service2 =
510         new Service() {
511           final NoOpService delegate = new NoOpService();
512 
513           @Override
514           public final void addListener(Listener listener, Executor executor) {
515             service1.startAsync();
516             delegate.addListener(listener, executor);
517           }
518 
519           // Delegates from here on down
520           @Override
521           public final Service startAsync() {
522             return delegate.startAsync();
523           }
524 
525           @Override
526           public final Service stopAsync() {
527             return delegate.stopAsync();
528           }
529 
530           @Override
531           public final void awaitRunning() {
532             delegate.awaitRunning();
533           }
534 
535           @Override
536           public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
537             delegate.awaitRunning(timeout, unit);
538           }
539 
540           @Override
541           public final void awaitTerminated() {
542             delegate.awaitTerminated();
543           }
544 
545           @Override
546           public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
547             delegate.awaitTerminated(timeout, unit);
548           }
549 
550           @Override
551           public final boolean isRunning() {
552             return delegate.isRunning();
553           }
554 
555           @Override
556           public final State state() {
557             return delegate.state();
558           }
559 
560           @Override
561           public final Throwable failureCause() {
562             return delegate.failureCause();
563           }
564         };
565     IllegalArgumentException expected =
566         assertThrows(
567             IllegalArgumentException.class,
568             () -> new ServiceManager(Arrays.asList(service1, service2)));
569     assertThat(expected.getMessage()).contains("started transitioning asynchronously");
570   }
571 
572   /**
573    * This test is for a case where two Service.Listener callbacks for the same service would call
574    * transitionService in the wrong order due to a race. Due to the fact that it is a race this test
575    * isn't guaranteed to expose the issue, but it is at least likely to become flaky if the race
576    * sneaks back in, and in this case flaky means something is definitely wrong.
577    *
578    * <p>Before the bug was fixed this test would fail at least 30% of the time.
579    */
testTransitionRace()580   public void testTransitionRace() throws TimeoutException {
581     for (int k = 0; k < 1000; k++) {
582       List<Service> services = Lists.newArrayList();
583       for (int i = 0; i < 5; i++) {
584         services.add(new SnappyShutdownService(i));
585       }
586       ServiceManager manager = new ServiceManager(services);
587       manager.startAsync().awaitHealthy();
588       manager.stopAsync().awaitStopped(10, TimeUnit.SECONDS);
589     }
590   }
591 
592   /**
593    * This service will shut down very quickly after stopAsync is called and uses a background thread
594    * so that we know that the stopping() listeners will execute on a different thread than the
595    * terminated() listeners.
596    */
597   private static class SnappyShutdownService extends AbstractExecutionThreadService {
598     final int index;
599     final CountDownLatch latch = new CountDownLatch(1);
600 
SnappyShutdownService(int index)601     SnappyShutdownService(int index) {
602       this.index = index;
603     }
604 
605     @Override
run()606     protected void run() throws Exception {
607       latch.await();
608     }
609 
610     @Override
triggerShutdown()611     protected void triggerShutdown() {
612       latch.countDown();
613     }
614 
615     @Override
serviceName()616     protected String serviceName() {
617       return this.getClass().getSimpleName() + "[" + index + "]";
618     }
619   }
620 
testNulls()621   public void testNulls() {
622     ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
623     new NullPointerTester()
624         .setDefault(ServiceManager.Listener.class, new RecordingListener())
625         .testAllPublicInstanceMethods(manager);
626   }
627 
628   private static final class RecordingListener extends ServiceManager.Listener {
629     volatile boolean healthyCalled;
630     volatile boolean stoppedCalled;
631     final Set<Service> failedServices = Sets.newConcurrentHashSet();
632 
633     @Override
healthy()634     public void healthy() {
635       healthyCalled = true;
636     }
637 
638     @Override
stopped()639     public void stopped() {
640       stoppedCalled = true;
641     }
642 
643     @Override
failure(Service service)644     public void failure(Service service) {
645       failedServices.add(service);
646     }
647   }
648 
isWindows()649   private static boolean isWindows() {
650     return OS_NAME.value().startsWith("Windows");
651   }
652 
isJava8()653   private static boolean isJava8() {
654     return JAVA_SPECIFICATION_VERSION.value().equals("1.8");
655   }
656 }
657