1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static java.util.concurrent.TimeUnit.SECONDS; 23 24 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 25 import com.google.common.util.concurrent.Service.State; 26 import com.google.common.util.concurrent.testing.TestingExecutors; 27 import java.util.concurrent.BrokenBarrierException; 28 import java.util.concurrent.CancellationException; 29 import java.util.concurrent.CountDownLatch; 30 import java.util.concurrent.CyclicBarrier; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.Future; 33 import java.util.concurrent.ScheduledExecutorService; 34 import java.util.concurrent.ScheduledFuture; 35 import java.util.concurrent.ScheduledThreadPoolExecutor; 36 import java.util.concurrent.TimeUnit; 37 import java.util.concurrent.TimeoutException; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 import java.util.concurrent.atomic.AtomicInteger; 40 import java.util.concurrent.atomic.AtomicReference; 41 import junit.framework.TestCase; 42 43 /** 44 * Unit test for {@link AbstractScheduledService}. 45 * 46 * @author Luke Sandberg 47 */ 48 49 public class AbstractScheduledServiceTest extends TestCase { 50 51 volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); 52 volatile ScheduledFuture<?> future = null; 53 54 volatile boolean atFixedRateCalled = false; 55 volatile boolean withFixedDelayCalled = false; 56 volatile boolean scheduleCalled = false; 57 58 final ScheduledExecutorService executor = 59 new ScheduledThreadPoolExecutor(10) { 60 @Override 61 public ScheduledFuture<?> scheduleWithFixedDelay( 62 Runnable command, long initialDelay, long delay, TimeUnit unit) { 63 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 64 } 65 }; 66 testServiceStartStop()67 public void testServiceStartStop() throws Exception { 68 NullService service = new NullService(); 69 service.startAsync().awaitRunning(); 70 assertFalse(future.isDone()); 71 service.stopAsync().awaitTerminated(); 72 assertTrue(future.isCancelled()); 73 } 74 75 private class NullService extends AbstractScheduledService { 76 @Override runOneIteration()77 protected void runOneIteration() throws Exception {} 78 79 @Override scheduler()80 protected Scheduler scheduler() { 81 return configuration; 82 } 83 84 @Override executor()85 protected ScheduledExecutorService executor() { 86 return executor; 87 } 88 } 89 testFailOnExceptionFromRun()90 public void testFailOnExceptionFromRun() throws Exception { 91 TestService service = new TestService(); 92 service.runException = new Exception(); 93 service.startAsync().awaitRunning(); 94 service.runFirstBarrier.await(); 95 service.runSecondBarrier.await(); 96 try { 97 future.get(); 98 fail(); 99 } catch (CancellationException expected) { 100 } 101 // An execution exception holds a runtime exception (from throwables.propogate) that holds our 102 // original exception. 103 assertEquals(service.runException, service.failureCause()); 104 assertEquals(Service.State.FAILED, service.state()); 105 } 106 testFailOnExceptionFromStartUp()107 public void testFailOnExceptionFromStartUp() { 108 TestService service = new TestService(); 109 service.startUpException = new Exception(); 110 try { 111 service.startAsync().awaitRunning(); 112 fail(); 113 } catch (IllegalStateException e) { 114 assertEquals(service.startUpException, e.getCause()); 115 } 116 assertEquals(0, service.numberOfTimesRunCalled.get()); 117 assertEquals(Service.State.FAILED, service.state()); 118 } 119 testFailOnErrorFromStartUpListener()120 public void testFailOnErrorFromStartUpListener() throws InterruptedException { 121 final Error error = new Error(); 122 final CountDownLatch latch = new CountDownLatch(1); 123 TestService service = new TestService(); 124 service.addListener( 125 new Service.Listener() { 126 @Override 127 public void running() { 128 throw error; 129 } 130 131 @Override 132 public void failed(State from, Throwable failure) { 133 assertEquals(State.RUNNING, from); 134 assertEquals(error, failure); 135 latch.countDown(); 136 } 137 }, 138 directExecutor()); 139 service.startAsync(); 140 latch.await(); 141 142 assertEquals(0, service.numberOfTimesRunCalled.get()); 143 assertEquals(Service.State.FAILED, service.state()); 144 } 145 testFailOnExceptionFromShutDown()146 public void testFailOnExceptionFromShutDown() throws Exception { 147 TestService service = new TestService(); 148 service.shutDownException = new Exception(); 149 service.startAsync().awaitRunning(); 150 service.runFirstBarrier.await(); 151 service.stopAsync(); 152 service.runSecondBarrier.await(); 153 try { 154 service.awaitTerminated(); 155 fail(); 156 } catch (IllegalStateException e) { 157 assertEquals(service.shutDownException, e.getCause()); 158 } 159 assertEquals(Service.State.FAILED, service.state()); 160 } 161 testRunOneIterationCalledMultipleTimes()162 public void testRunOneIterationCalledMultipleTimes() throws Exception { 163 TestService service = new TestService(); 164 service.startAsync().awaitRunning(); 165 for (int i = 1; i < 10; i++) { 166 service.runFirstBarrier.await(); 167 assertEquals(i, service.numberOfTimesRunCalled.get()); 168 service.runSecondBarrier.await(); 169 } 170 service.runFirstBarrier.await(); 171 service.stopAsync(); 172 service.runSecondBarrier.await(); 173 service.stopAsync().awaitTerminated(); 174 } 175 testExecutorOnlyCalledOnce()176 public void testExecutorOnlyCalledOnce() throws Exception { 177 TestService service = new TestService(); 178 service.startAsync().awaitRunning(); 179 // It should be called once during startup. 180 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 181 for (int i = 1; i < 10; i++) { 182 service.runFirstBarrier.await(); 183 assertEquals(i, service.numberOfTimesRunCalled.get()); 184 service.runSecondBarrier.await(); 185 } 186 service.runFirstBarrier.await(); 187 service.stopAsync(); 188 service.runSecondBarrier.await(); 189 service.stopAsync().awaitTerminated(); 190 // Only called once overall. 191 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 192 } 193 testDefaultExecutorIsShutdownWhenServiceIsStopped()194 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { 195 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 196 AbstractScheduledService service = 197 new AbstractScheduledService() { 198 @Override 199 protected void runOneIteration() throws Exception {} 200 201 @Override 202 protected ScheduledExecutorService executor() { 203 executor.set(super.executor()); 204 return executor.get(); 205 } 206 207 @Override 208 protected Scheduler scheduler() { 209 return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 210 } 211 }; 212 213 service.startAsync(); 214 assertFalse(service.executor().isShutdown()); 215 service.awaitRunning(); 216 service.stopAsync(); 217 service.awaitTerminated(); 218 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 219 } 220 testDefaultExecutorIsShutdownWhenServiceFails()221 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { 222 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 223 AbstractScheduledService service = 224 new AbstractScheduledService() { 225 @Override 226 protected void startUp() throws Exception { 227 throw new Exception("Failed"); 228 } 229 230 @Override 231 protected void runOneIteration() throws Exception {} 232 233 @Override 234 protected ScheduledExecutorService executor() { 235 executor.set(super.executor()); 236 return executor.get(); 237 } 238 239 @Override 240 protected Scheduler scheduler() { 241 return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 242 } 243 }; 244 245 try { 246 service.startAsync().awaitRunning(); 247 fail("Expected service to fail during startup"); 248 } catch (IllegalStateException expected) { 249 } 250 251 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 252 } 253 testSchedulerOnlyCalledOnce()254 public void testSchedulerOnlyCalledOnce() throws Exception { 255 TestService service = new TestService(); 256 service.startAsync().awaitRunning(); 257 // It should be called once during startup. 258 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 259 for (int i = 1; i < 10; i++) { 260 service.runFirstBarrier.await(); 261 assertEquals(i, service.numberOfTimesRunCalled.get()); 262 service.runSecondBarrier.await(); 263 } 264 service.runFirstBarrier.await(); 265 service.stopAsync(); 266 service.runSecondBarrier.await(); 267 service.awaitTerminated(); 268 // Only called once overall. 269 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 270 } 271 testTimeout()272 public void testTimeout() { 273 // Create a service whose executor will never run its commands 274 Service service = 275 new AbstractScheduledService() { 276 @Override 277 protected Scheduler scheduler() { 278 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS); 279 } 280 281 @Override 282 protected ScheduledExecutorService executor() { 283 return TestingExecutors.noOpScheduledExecutor(); 284 } 285 286 @Override 287 protected void runOneIteration() throws Exception {} 288 289 @Override 290 protected String serviceName() { 291 return "Foo"; 292 } 293 }; 294 try { 295 service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS); 296 fail("Expected timeout"); 297 } catch (TimeoutException e) { 298 assertThat(e) 299 .hasMessageThat() 300 .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); 301 } 302 } 303 304 private class TestService extends AbstractScheduledService { 305 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 306 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 307 308 volatile boolean startUpCalled = false; 309 volatile boolean shutDownCalled = false; 310 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 311 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 312 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 313 volatile Exception runException = null; 314 volatile Exception startUpException = null; 315 volatile Exception shutDownException = null; 316 317 @Override runOneIteration()318 protected void runOneIteration() throws Exception { 319 assertTrue(startUpCalled); 320 assertFalse(shutDownCalled); 321 numberOfTimesRunCalled.incrementAndGet(); 322 assertEquals(State.RUNNING, state()); 323 runFirstBarrier.await(); 324 runSecondBarrier.await(); 325 if (runException != null) { 326 throw runException; 327 } 328 } 329 330 @Override startUp()331 protected void startUp() throws Exception { 332 assertFalse(startUpCalled); 333 assertFalse(shutDownCalled); 334 startUpCalled = true; 335 assertEquals(State.STARTING, state()); 336 if (startUpException != null) { 337 throw startUpException; 338 } 339 } 340 341 @Override shutDown()342 protected void shutDown() throws Exception { 343 assertTrue(startUpCalled); 344 assertFalse(shutDownCalled); 345 shutDownCalled = true; 346 if (shutDownException != null) { 347 throw shutDownException; 348 } 349 } 350 351 @Override executor()352 protected ScheduledExecutorService executor() { 353 numberOfTimesExecutorCalled.incrementAndGet(); 354 return executor; 355 } 356 357 @Override scheduler()358 protected Scheduler scheduler() { 359 numberOfTimesSchedulerCalled.incrementAndGet(); 360 return configuration; 361 } 362 } 363 364 public static class SchedulerTest extends TestCase { 365 // These constants are arbitrary and just used to make sure that the correct method is called 366 // with the correct parameters. 367 private static final int initialDelay = 10; 368 private static final int delay = 20; 369 private static final TimeUnit unit = TimeUnit.MILLISECONDS; 370 371 // Unique runnable object used for comparison. 372 final Runnable testRunnable = 373 new Runnable() { 374 @Override 375 public void run() {} 376 }; 377 boolean called = false; 378 assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)379 private void assertSingleCallWithCorrectParameters( 380 Runnable command, long initialDelay, long delay, TimeUnit unit) { 381 assertFalse(called); // only called once. 382 called = true; 383 assertEquals(SchedulerTest.initialDelay, initialDelay); 384 assertEquals(SchedulerTest.delay, delay); 385 assertEquals(SchedulerTest.unit, unit); 386 assertEquals(testRunnable, command); 387 } 388 testFixedRateSchedule()389 public void testFixedRateSchedule() { 390 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); 391 Future<?> unused = 392 schedule.schedule( 393 null, 394 new ScheduledThreadPoolExecutor(1) { 395 @Override 396 public ScheduledFuture<?> scheduleAtFixedRate( 397 Runnable command, long initialDelay, long period, TimeUnit unit) { 398 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 399 return null; 400 } 401 }, 402 testRunnable); 403 assertTrue(called); 404 } 405 testFixedDelaySchedule()406 public void testFixedDelaySchedule() { 407 Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit); 408 Future<?> unused = 409 schedule.schedule( 410 null, 411 new ScheduledThreadPoolExecutor(10) { 412 @Override 413 public ScheduledFuture<?> scheduleWithFixedDelay( 414 Runnable command, long initialDelay, long delay, TimeUnit unit) { 415 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 416 return null; 417 } 418 }, 419 testRunnable); 420 assertTrue(called); 421 } 422 testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()423 public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() 424 throws Exception { 425 TestAbstractScheduledCustomService service = 426 new TestAbstractScheduledCustomService() { 427 @Override 428 protected Scheduler scheduler() { 429 return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS); 430 } 431 }; 432 service.startAsync().awaitRunning(); 433 try { 434 service.firstBarrier.await(5, SECONDS); 435 fail(); 436 } catch (TimeoutException expected) { 437 } 438 assertEquals(0, service.numIterations.get()); 439 service.stopAsync(); 440 service.awaitTerminated(); 441 } 442 testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()443 public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() 444 throws Exception { 445 TestAbstractScheduledCustomService service = 446 new TestAbstractScheduledCustomService() { 447 @Override 448 protected Scheduler scheduler() { 449 return new AbstractScheduledService.CustomScheduler() { 450 @Override 451 protected Schedule getNextSchedule() throws Exception { 452 return new Schedule(Long.MAX_VALUE, SECONDS); 453 } 454 }; 455 } 456 }; 457 service.startAsync().awaitRunning(); 458 try { 459 service.firstBarrier.await(5, SECONDS); 460 fail(); 461 } catch (TimeoutException expected) { 462 } 463 assertEquals(0, service.numIterations.get()); 464 service.stopAsync(); 465 service.awaitTerminated(); 466 } 467 468 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 469 public AtomicInteger scheduleCounter = new AtomicInteger(0); 470 471 @Override getNextSchedule()472 protected Schedule getNextSchedule() throws Exception { 473 scheduleCounter.incrementAndGet(); 474 return new Schedule(0, TimeUnit.SECONDS); 475 } 476 } 477 testCustomSchedule_startStop()478 public void testCustomSchedule_startStop() throws Exception { 479 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 480 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 481 final AtomicBoolean shouldWait = new AtomicBoolean(true); 482 Runnable task = 483 new Runnable() { 484 @Override 485 public void run() { 486 try { 487 if (shouldWait.get()) { 488 firstBarrier.await(); 489 secondBarrier.await(); 490 } 491 } catch (Exception e) { 492 throw new RuntimeException(e); 493 } 494 } 495 }; 496 TestCustomScheduler scheduler = new TestCustomScheduler(); 497 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 498 firstBarrier.await(); 499 assertEquals(1, scheduler.scheduleCounter.get()); 500 secondBarrier.await(); 501 firstBarrier.await(); 502 assertEquals(2, scheduler.scheduleCounter.get()); 503 shouldWait.set(false); 504 secondBarrier.await(); 505 future.cancel(false); 506 } 507 testCustomSchedulerServiceStop()508 public void testCustomSchedulerServiceStop() throws Exception { 509 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 510 service.startAsync().awaitRunning(); 511 service.firstBarrier.await(); 512 assertEquals(1, service.numIterations.get()); 513 service.stopAsync(); 514 service.secondBarrier.await(); 515 service.awaitTerminated(); 516 // Sleep for a while just to ensure that our task wasn't called again. 517 Thread.sleep(unit.toMillis(3 * delay)); 518 assertEquals(1, service.numIterations.get()); 519 } 520 testCustomScheduler_deadlock()521 public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException { 522 final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2); 523 // This will flakily deadlock, so run it multiple times to increase the flake likelihood 524 for (int i = 0; i < 1000; i++) { 525 Service service = 526 new AbstractScheduledService() { 527 @Override 528 protected void runOneIteration() {} 529 530 @Override 531 protected Scheduler scheduler() { 532 return new CustomScheduler() { 533 @Override 534 protected Schedule getNextSchedule() throws Exception { 535 if (state() != State.STARTING) { 536 inGetNextSchedule.await(); 537 Thread.yield(); 538 throw new RuntimeException("boom"); 539 } 540 return new Schedule(0, TimeUnit.NANOSECONDS); 541 } 542 }; 543 } 544 }; 545 service.startAsync().awaitRunning(); 546 inGetNextSchedule.await(); 547 service.stopAsync(); 548 } 549 } 550 testBig()551 public void testBig() throws Exception { 552 TestAbstractScheduledCustomService service = 553 new TestAbstractScheduledCustomService() { 554 @Override 555 protected Scheduler scheduler() { 556 return new AbstractScheduledService.CustomScheduler() { 557 @Override 558 protected Schedule getNextSchedule() throws Exception { 559 // Explicitly yield to increase the probability of a pathological scheduling. 560 Thread.yield(); 561 return new Schedule(0, TimeUnit.SECONDS); 562 } 563 }; 564 } 565 }; 566 service.useBarriers = false; 567 service.startAsync().awaitRunning(); 568 Thread.sleep(50); 569 service.useBarriers = true; 570 service.firstBarrier.await(); 571 int numIterations = service.numIterations.get(); 572 service.stopAsync(); 573 service.secondBarrier.await(); 574 service.awaitTerminated(); 575 assertEquals(numIterations, service.numIterations.get()); 576 } 577 578 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 579 final AtomicInteger numIterations = new AtomicInteger(0); 580 volatile boolean useBarriers = true; 581 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 582 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 583 584 @Override runOneIteration()585 protected void runOneIteration() throws Exception { 586 numIterations.incrementAndGet(); 587 if (useBarriers) { 588 firstBarrier.await(); 589 secondBarrier.await(); 590 } 591 } 592 593 @Override executor()594 protected ScheduledExecutorService executor() { 595 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 596 return Executors.newScheduledThreadPool(10); 597 } 598 599 @Override scheduler()600 protected Scheduler scheduler() { 601 return new CustomScheduler() { 602 @Override 603 protected Schedule getNextSchedule() throws Exception { 604 return new Schedule(delay, unit); 605 } 606 }; 607 } 608 } 609 testCustomSchedulerFailure()610 public void testCustomSchedulerFailure() throws Exception { 611 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 612 service.startAsync().awaitRunning(); 613 for (int i = 1; i < 4; i++) { 614 service.firstBarrier.await(); 615 assertEquals(i, service.numIterations.get()); 616 service.secondBarrier.await(); 617 } 618 Thread.sleep(1000); 619 try { 620 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS); 621 fail(); 622 } catch (IllegalStateException e) { 623 assertEquals(State.FAILED, service.state()); 624 } 625 } 626 627 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 628 final AtomicInteger numIterations = new AtomicInteger(0); 629 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 630 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 631 632 @Override runOneIteration()633 protected void runOneIteration() throws Exception { 634 numIterations.incrementAndGet(); 635 firstBarrier.await(); 636 secondBarrier.await(); 637 } 638 639 @Override executor()640 protected ScheduledExecutorService executor() { 641 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 642 return Executors.newScheduledThreadPool(10); 643 } 644 645 @Override scheduler()646 protected Scheduler scheduler() { 647 return new CustomScheduler() { 648 @Override 649 protected Schedule getNextSchedule() throws Exception { 650 if (numIterations.get() > 2) { 651 throw new IllegalStateException("Failed"); 652 } 653 return new Schedule(delay, unit); 654 } 655 }; 656 } 657 } 658 } 659 } 660