1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static java.util.concurrent.TimeUnit.SECONDS; 23 24 import com.google.common.util.concurrent.AbstractScheduledService.Cancellable; 25 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 26 import com.google.common.util.concurrent.Service.State; 27 import com.google.common.util.concurrent.testing.TestingExecutors; 28 import java.util.concurrent.BrokenBarrierException; 29 import java.util.concurrent.CancellationException; 30 import java.util.concurrent.CountDownLatch; 31 import java.util.concurrent.CyclicBarrier; 32 import java.util.concurrent.Delayed; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.Future; 35 import java.util.concurrent.ScheduledExecutorService; 36 import java.util.concurrent.ScheduledFuture; 37 import java.util.concurrent.ScheduledThreadPoolExecutor; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import java.util.concurrent.atomic.AtomicBoolean; 41 import java.util.concurrent.atomic.AtomicInteger; 42 import java.util.concurrent.atomic.AtomicReference; 43 import junit.framework.TestCase; 44 45 /** 46 * Unit test for {@link AbstractScheduledService}. 47 * 48 * @author Luke Sandberg 49 */ 50 51 public class AbstractScheduledServiceTest extends TestCase { 52 53 volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); 54 volatile ScheduledFuture<?> future = null; 55 56 volatile boolean atFixedRateCalled = false; 57 volatile boolean withFixedDelayCalled = false; 58 volatile boolean scheduleCalled = false; 59 60 final ScheduledExecutorService executor = 61 new ScheduledThreadPoolExecutor(10) { 62 @Override 63 public ScheduledFuture<?> scheduleWithFixedDelay( 64 Runnable command, long initialDelay, long delay, TimeUnit unit) { 65 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 66 } 67 }; 68 testServiceStartStop()69 public void testServiceStartStop() throws Exception { 70 NullService service = new NullService(); 71 service.startAsync().awaitRunning(); 72 assertFalse(future.isDone()); 73 service.stopAsync().awaitTerminated(); 74 assertTrue(future.isCancelled()); 75 } 76 77 private class NullService extends AbstractScheduledService { 78 @Override runOneIteration()79 protected void runOneIteration() throws Exception {} 80 81 @Override scheduler()82 protected Scheduler scheduler() { 83 return configuration; 84 } 85 86 @Override executor()87 protected ScheduledExecutorService executor() { 88 return executor; 89 } 90 } 91 testFailOnExceptionFromRun()92 public void testFailOnExceptionFromRun() throws Exception { 93 TestService service = new TestService(); 94 service.runException = new Exception(); 95 service.startAsync().awaitRunning(); 96 service.runFirstBarrier.await(); 97 service.runSecondBarrier.await(); 98 try { 99 future.get(); 100 fail(); 101 } catch (CancellationException expected) { 102 } 103 // An execution exception holds a runtime exception (from throwables.propagate) that holds our 104 // original exception. 105 assertEquals(service.runException, service.failureCause()); 106 assertEquals(Service.State.FAILED, service.state()); 107 } 108 testFailOnExceptionFromStartUp()109 public void testFailOnExceptionFromStartUp() { 110 TestService service = new TestService(); 111 service.startUpException = new Exception(); 112 try { 113 service.startAsync().awaitRunning(); 114 fail(); 115 } catch (IllegalStateException e) { 116 assertEquals(service.startUpException, e.getCause()); 117 } 118 assertEquals(0, service.numberOfTimesRunCalled.get()); 119 assertEquals(Service.State.FAILED, service.state()); 120 } 121 testFailOnErrorFromStartUpListener()122 public void testFailOnErrorFromStartUpListener() throws InterruptedException { 123 final Error error = new Error(); 124 final CountDownLatch latch = new CountDownLatch(1); 125 TestService service = new TestService(); 126 service.addListener( 127 new Service.Listener() { 128 @Override 129 public void running() { 130 throw error; 131 } 132 133 @Override 134 public void failed(State from, Throwable failure) { 135 assertEquals(State.RUNNING, from); 136 assertEquals(error, failure); 137 latch.countDown(); 138 } 139 }, 140 directExecutor()); 141 service.startAsync(); 142 latch.await(); 143 144 assertEquals(0, service.numberOfTimesRunCalled.get()); 145 assertEquals(Service.State.FAILED, service.state()); 146 } 147 testFailOnExceptionFromShutDown()148 public void testFailOnExceptionFromShutDown() throws Exception { 149 TestService service = new TestService(); 150 service.shutDownException = new Exception(); 151 service.startAsync().awaitRunning(); 152 service.runFirstBarrier.await(); 153 service.stopAsync(); 154 service.runSecondBarrier.await(); 155 try { 156 service.awaitTerminated(); 157 fail(); 158 } catch (IllegalStateException e) { 159 assertEquals(service.shutDownException, e.getCause()); 160 } 161 assertEquals(Service.State.FAILED, service.state()); 162 } 163 testRunOneIterationCalledMultipleTimes()164 public void testRunOneIterationCalledMultipleTimes() throws Exception { 165 TestService service = new TestService(); 166 service.startAsync().awaitRunning(); 167 for (int i = 1; i < 10; i++) { 168 service.runFirstBarrier.await(); 169 assertEquals(i, service.numberOfTimesRunCalled.get()); 170 service.runSecondBarrier.await(); 171 } 172 service.runFirstBarrier.await(); 173 service.stopAsync(); 174 service.runSecondBarrier.await(); 175 service.stopAsync().awaitTerminated(); 176 } 177 testExecutorOnlyCalledOnce()178 public void testExecutorOnlyCalledOnce() throws Exception { 179 TestService service = new TestService(); 180 service.startAsync().awaitRunning(); 181 // It should be called once during startup. 182 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 183 for (int i = 1; i < 10; i++) { 184 service.runFirstBarrier.await(); 185 assertEquals(i, service.numberOfTimesRunCalled.get()); 186 service.runSecondBarrier.await(); 187 } 188 service.runFirstBarrier.await(); 189 service.stopAsync(); 190 service.runSecondBarrier.await(); 191 service.stopAsync().awaitTerminated(); 192 // Only called once overall. 193 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 194 } 195 testDefaultExecutorIsShutdownWhenServiceIsStopped()196 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { 197 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 198 AbstractScheduledService service = 199 new AbstractScheduledService() { 200 @Override 201 protected void runOneIteration() throws Exception {} 202 203 @Override 204 protected ScheduledExecutorService executor() { 205 executor.set(super.executor()); 206 return executor.get(); 207 } 208 209 @Override 210 protected Scheduler scheduler() { 211 return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 212 } 213 }; 214 215 service.startAsync(); 216 assertFalse(service.executor().isShutdown()); 217 service.awaitRunning(); 218 service.stopAsync(); 219 service.awaitTerminated(); 220 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 221 } 222 testDefaultExecutorIsShutdownWhenServiceFails()223 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { 224 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 225 AbstractScheduledService service = 226 new AbstractScheduledService() { 227 @Override 228 protected void startUp() throws Exception { 229 throw new Exception("Failed"); 230 } 231 232 @Override 233 protected void runOneIteration() throws Exception {} 234 235 @Override 236 protected ScheduledExecutorService executor() { 237 executor.set(super.executor()); 238 return executor.get(); 239 } 240 241 @Override 242 protected Scheduler scheduler() { 243 return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 244 } 245 }; 246 247 try { 248 service.startAsync().awaitRunning(); 249 fail("Expected service to fail during startup"); 250 } catch (IllegalStateException expected) { 251 } 252 253 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS)); 254 } 255 testSchedulerOnlyCalledOnce()256 public void testSchedulerOnlyCalledOnce() throws Exception { 257 TestService service = new TestService(); 258 service.startAsync().awaitRunning(); 259 // It should be called once during startup. 260 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 261 for (int i = 1; i < 10; i++) { 262 service.runFirstBarrier.await(); 263 assertEquals(i, service.numberOfTimesRunCalled.get()); 264 service.runSecondBarrier.await(); 265 } 266 service.runFirstBarrier.await(); 267 service.stopAsync(); 268 service.runSecondBarrier.await(); 269 service.awaitTerminated(); 270 // Only called once overall. 271 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 272 } 273 testTimeout()274 public void testTimeout() { 275 // Create a service whose executor will never run its commands 276 Service service = 277 new AbstractScheduledService() { 278 @Override 279 protected Scheduler scheduler() { 280 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS); 281 } 282 283 @Override 284 protected ScheduledExecutorService executor() { 285 return TestingExecutors.noOpScheduledExecutor(); 286 } 287 288 @Override 289 protected void runOneIteration() throws Exception {} 290 291 @Override 292 protected String serviceName() { 293 return "Foo"; 294 } 295 }; 296 try { 297 service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS); 298 fail("Expected timeout"); 299 } catch (TimeoutException e) { 300 assertThat(e) 301 .hasMessageThat() 302 .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); 303 } 304 } 305 306 private class TestService extends AbstractScheduledService { 307 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 308 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 309 310 volatile boolean startUpCalled = false; 311 volatile boolean shutDownCalled = false; 312 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 313 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 314 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 315 volatile Exception runException = null; 316 volatile Exception startUpException = null; 317 volatile Exception shutDownException = null; 318 319 @Override runOneIteration()320 protected void runOneIteration() throws Exception { 321 assertTrue(startUpCalled); 322 assertFalse(shutDownCalled); 323 numberOfTimesRunCalled.incrementAndGet(); 324 assertEquals(State.RUNNING, state()); 325 runFirstBarrier.await(); 326 runSecondBarrier.await(); 327 if (runException != null) { 328 throw runException; 329 } 330 } 331 332 @Override startUp()333 protected void startUp() throws Exception { 334 assertFalse(startUpCalled); 335 assertFalse(shutDownCalled); 336 startUpCalled = true; 337 assertEquals(State.STARTING, state()); 338 if (startUpException != null) { 339 throw startUpException; 340 } 341 } 342 343 @Override shutDown()344 protected void shutDown() throws Exception { 345 assertTrue(startUpCalled); 346 assertFalse(shutDownCalled); 347 shutDownCalled = true; 348 if (shutDownException != null) { 349 throw shutDownException; 350 } 351 } 352 353 @Override executor()354 protected ScheduledExecutorService executor() { 355 numberOfTimesExecutorCalled.incrementAndGet(); 356 return executor; 357 } 358 359 @Override scheduler()360 protected Scheduler scheduler() { 361 numberOfTimesSchedulerCalled.incrementAndGet(); 362 return configuration; 363 } 364 } 365 366 public static class SchedulerTest extends TestCase { 367 // These constants are arbitrary and just used to make sure that the correct method is called 368 // with the correct parameters. 369 private static final int initialDelay = 10; 370 private static final int delay = 20; 371 private static final TimeUnit unit = TimeUnit.MILLISECONDS; 372 373 // Unique runnable object used for comparison. 374 final Runnable testRunnable = 375 new Runnable() { 376 @Override 377 public void run() {} 378 }; 379 boolean called = false; 380 assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)381 private void assertSingleCallWithCorrectParameters( 382 Runnable command, long initialDelay, long delay, TimeUnit unit) { 383 assertFalse(called); // only called once. 384 called = true; 385 assertEquals(SchedulerTest.initialDelay, initialDelay); 386 assertEquals(SchedulerTest.delay, delay); 387 assertEquals(SchedulerTest.unit, unit); 388 assertEquals(testRunnable, command); 389 } 390 testFixedRateSchedule()391 public void testFixedRateSchedule() { 392 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); 393 Cancellable unused = 394 schedule.schedule( 395 null, 396 new ScheduledThreadPoolExecutor(1) { 397 @Override 398 public ScheduledFuture<?> scheduleAtFixedRate( 399 Runnable command, long initialDelay, long period, TimeUnit unit) { 400 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 401 return new ThrowingScheduledFuture<>(); 402 } 403 }, 404 testRunnable); 405 assertTrue(called); 406 } 407 testFixedDelaySchedule()408 public void testFixedDelaySchedule() { 409 Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit); 410 Cancellable unused = 411 schedule.schedule( 412 null, 413 new ScheduledThreadPoolExecutor(10) { 414 @Override 415 public ScheduledFuture<?> scheduleWithFixedDelay( 416 Runnable command, long initialDelay, long delay, TimeUnit unit) { 417 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 418 return new ThrowingScheduledFuture<>(); 419 } 420 }, 421 testRunnable); 422 assertTrue(called); 423 } 424 425 private static final class ThrowingScheduledFuture<V> extends ForwardingFuture<V> 426 implements ScheduledFuture<V> { 427 @Override delegate()428 protected Future<V> delegate() { 429 throw new UnsupportedOperationException("test should not care about this"); 430 } 431 432 @Override getDelay(TimeUnit unit)433 public long getDelay(TimeUnit unit) { 434 throw new UnsupportedOperationException("test should not care about this"); 435 } 436 437 @Override compareTo(Delayed other)438 public int compareTo(Delayed other) { 439 throw new UnsupportedOperationException("test should not care about this"); 440 } 441 } 442 443 testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()444 public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() 445 throws Exception { 446 TestAbstractScheduledCustomService service = 447 new TestAbstractScheduledCustomService() { 448 @Override 449 protected Scheduler scheduler() { 450 return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS); 451 } 452 }; 453 service.startAsync().awaitRunning(); 454 try { 455 service.firstBarrier.await(5, SECONDS); 456 fail(); 457 } catch (TimeoutException expected) { 458 } 459 assertEquals(0, service.numIterations.get()); 460 service.stopAsync(); 461 service.awaitTerminated(); 462 } 463 464 testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()465 public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() 466 throws Exception { 467 TestAbstractScheduledCustomService service = 468 new TestAbstractScheduledCustomService() { 469 @Override 470 protected Scheduler scheduler() { 471 return new AbstractScheduledService.CustomScheduler() { 472 @Override 473 protected Schedule getNextSchedule() throws Exception { 474 return new Schedule(Long.MAX_VALUE, SECONDS); 475 } 476 }; 477 } 478 }; 479 service.startAsync().awaitRunning(); 480 try { 481 service.firstBarrier.await(5, SECONDS); 482 fail(); 483 } catch (TimeoutException expected) { 484 } 485 assertEquals(0, service.numIterations.get()); 486 service.stopAsync(); 487 service.awaitTerminated(); 488 } 489 490 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 491 public AtomicInteger scheduleCounter = new AtomicInteger(0); 492 493 @Override getNextSchedule()494 protected Schedule getNextSchedule() throws Exception { 495 scheduleCounter.incrementAndGet(); 496 return new Schedule(0, TimeUnit.SECONDS); 497 } 498 } 499 500 testCustomSchedule_startStop()501 public void testCustomSchedule_startStop() throws Exception { 502 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 503 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 504 final AtomicBoolean shouldWait = new AtomicBoolean(true); 505 Runnable task = 506 new Runnable() { 507 @Override 508 public void run() { 509 try { 510 if (shouldWait.get()) { 511 firstBarrier.await(); 512 secondBarrier.await(); 513 } 514 } catch (Exception e) { 515 throw new RuntimeException(e); 516 } 517 } 518 }; 519 TestCustomScheduler scheduler = new TestCustomScheduler(); 520 Cancellable future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 521 firstBarrier.await(); 522 assertEquals(1, scheduler.scheduleCounter.get()); 523 secondBarrier.await(); 524 firstBarrier.await(); 525 assertEquals(2, scheduler.scheduleCounter.get()); 526 shouldWait.set(false); 527 secondBarrier.await(); 528 future.cancel(false); 529 } 530 531 testCustomSchedulerServiceStop()532 public void testCustomSchedulerServiceStop() throws Exception { 533 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 534 service.startAsync().awaitRunning(); 535 service.firstBarrier.await(); 536 assertEquals(1, service.numIterations.get()); 537 service.stopAsync(); 538 service.secondBarrier.await(); 539 service.awaitTerminated(); 540 // Sleep for a while just to ensure that our task wasn't called again. 541 Thread.sleep(unit.toMillis(3 * delay)); 542 assertEquals(1, service.numIterations.get()); 543 } 544 545 testCustomScheduler_deadlock()546 public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException { 547 final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2); 548 // This will flakily deadlock, so run it multiple times to increase the flake likelihood 549 for (int i = 0; i < 1000; i++) { 550 Service service = 551 new AbstractScheduledService() { 552 @Override 553 protected void runOneIteration() {} 554 555 @Override 556 protected Scheduler scheduler() { 557 return new CustomScheduler() { 558 @Override 559 protected Schedule getNextSchedule() throws Exception { 560 if (state() != State.STARTING) { 561 inGetNextSchedule.await(); 562 Thread.yield(); 563 throw new RuntimeException("boom"); 564 } 565 return new Schedule(0, TimeUnit.NANOSECONDS); 566 } 567 }; 568 } 569 }; 570 service.startAsync().awaitRunning(); 571 inGetNextSchedule.await(); 572 service.stopAsync(); 573 } 574 } 575 576 testBig()577 public void testBig() throws Exception { 578 TestAbstractScheduledCustomService service = 579 new TestAbstractScheduledCustomService() { 580 @Override 581 protected Scheduler scheduler() { 582 return new AbstractScheduledService.CustomScheduler() { 583 @Override 584 protected Schedule getNextSchedule() throws Exception { 585 // Explicitly yield to increase the probability of a pathological scheduling. 586 Thread.yield(); 587 return new Schedule(0, TimeUnit.SECONDS); 588 } 589 }; 590 } 591 }; 592 service.useBarriers = false; 593 service.startAsync().awaitRunning(); 594 Thread.sleep(50); 595 service.useBarriers = true; 596 service.firstBarrier.await(); 597 int numIterations = service.numIterations.get(); 598 service.stopAsync(); 599 service.secondBarrier.await(); 600 service.awaitTerminated(); 601 assertEquals(numIterations, service.numIterations.get()); 602 } 603 604 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 605 final AtomicInteger numIterations = new AtomicInteger(0); 606 volatile boolean useBarriers = true; 607 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 608 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 609 610 @Override runOneIteration()611 protected void runOneIteration() throws Exception { 612 numIterations.incrementAndGet(); 613 if (useBarriers) { 614 firstBarrier.await(); 615 secondBarrier.await(); 616 } 617 } 618 619 @Override executor()620 protected ScheduledExecutorService executor() { 621 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 622 return Executors.newScheduledThreadPool(10); 623 } 624 625 @Override scheduler()626 protected Scheduler scheduler() { 627 return new CustomScheduler() { 628 @Override 629 protected Schedule getNextSchedule() throws Exception { 630 return new Schedule(delay, unit); 631 } 632 }; 633 } 634 } 635 636 testCustomSchedulerFailure()637 public void testCustomSchedulerFailure() throws Exception { 638 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 639 service.startAsync().awaitRunning(); 640 for (int i = 1; i < 4; i++) { 641 service.firstBarrier.await(); 642 assertEquals(i, service.numIterations.get()); 643 service.secondBarrier.await(); 644 } 645 Thread.sleep(1000); 646 try { 647 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS); 648 fail(); 649 } catch (IllegalStateException e) { 650 assertEquals(State.FAILED, service.state()); 651 } 652 } 653 654 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 655 final AtomicInteger numIterations = new AtomicInteger(0); 656 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 657 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 658 659 @Override runOneIteration()660 protected void runOneIteration() throws Exception { 661 numIterations.incrementAndGet(); 662 firstBarrier.await(); 663 secondBarrier.await(); 664 } 665 666 @Override executor()667 protected ScheduledExecutorService executor() { 668 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 669 return Executors.newScheduledThreadPool(10); 670 } 671 672 @Override scheduler()673 protected Scheduler scheduler() { 674 return new CustomScheduler() { 675 @Override 676 protected Schedule getNextSchedule() throws Exception { 677 if (numIterations.get() > 2) { 678 throw new IllegalStateException("Failed"); 679 } 680 return new Schedule(delay, unit); 681 } 682 }; 683 } 684 } 685 } 686 } 687