1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule; 21 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 22 import static java.util.concurrent.TimeUnit.MILLISECONDS; 23 import static java.util.concurrent.TimeUnit.NANOSECONDS; 24 import static java.util.concurrent.TimeUnit.SECONDS; 25 import static org.junit.Assert.assertThrows; 26 27 import com.google.common.util.concurrent.AbstractScheduledService.Cancellable; 28 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 29 import com.google.common.util.concurrent.Service.State; 30 import com.google.common.util.concurrent.testing.TestingExecutors; 31 import java.util.concurrent.BrokenBarrierException; 32 import java.util.concurrent.CancellationException; 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.CyclicBarrier; 35 import java.util.concurrent.Delayed; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.Future; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledFuture; 40 import java.util.concurrent.ScheduledThreadPoolExecutor; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.TimeoutException; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 import java.util.concurrent.atomic.AtomicInteger; 45 import java.util.concurrent.atomic.AtomicReference; 46 import junit.framework.TestCase; 47 import org.checkerframework.checker.nullness.qual.Nullable; 48 49 /** 50 * Unit test for {@link AbstractScheduledService}. 51 * 52 * @author Luke Sandberg 53 */ 54 public class AbstractScheduledServiceTest extends TestCase { 55 56 volatile Scheduler configuration = newFixedDelaySchedule(0, 10, MILLISECONDS); 57 volatile @Nullable ScheduledFuture<?> future = null; 58 59 volatile boolean atFixedRateCalled = false; 60 volatile boolean withFixedDelayCalled = false; 61 volatile boolean scheduleCalled = false; 62 63 final ScheduledExecutorService executor = 64 new ScheduledThreadPoolExecutor(10) { 65 @Override 66 public ScheduledFuture<?> scheduleWithFixedDelay( 67 Runnable command, long initialDelay, long delay, TimeUnit unit) { 68 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 69 } 70 }; 71 testServiceStartStop()72 public void testServiceStartStop() throws Exception { 73 NullService service = new NullService(); 74 service.startAsync().awaitRunning(); 75 assertFalse(future.isDone()); 76 service.stopAsync().awaitTerminated(); 77 assertTrue(future.isCancelled()); 78 } 79 80 private class NullService extends AbstractScheduledService { 81 @Override runOneIteration()82 protected void runOneIteration() throws Exception {} 83 84 @Override scheduler()85 protected Scheduler scheduler() { 86 return configuration; 87 } 88 89 @Override executor()90 protected ScheduledExecutorService executor() { 91 return executor; 92 } 93 } 94 testFailOnExceptionFromRun()95 public void testFailOnExceptionFromRun() throws Exception { 96 TestService service = new TestService(); 97 service.runException = new Exception(); 98 service.startAsync().awaitRunning(); 99 service.runFirstBarrier.await(); 100 service.runSecondBarrier.await(); 101 assertThrows(CancellationException.class, () -> future.get()); 102 // An execution exception holds a runtime exception (from throwables.propagate) that holds our 103 // original exception. 104 assertEquals(service.runException, service.failureCause()); 105 assertEquals(Service.State.FAILED, service.state()); 106 } 107 testFailOnExceptionFromStartUp()108 public void testFailOnExceptionFromStartUp() { 109 TestService service = new TestService(); 110 service.startUpException = new Exception(); 111 IllegalStateException e = 112 assertThrows(IllegalStateException.class, () -> service.startAsync().awaitRunning()); 113 assertThat(e).hasCauseThat().isEqualTo(service.startUpException); 114 assertEquals(0, service.numberOfTimesRunCalled.get()); 115 assertEquals(Service.State.FAILED, service.state()); 116 } 117 testFailOnErrorFromStartUpListener()118 public void testFailOnErrorFromStartUpListener() throws InterruptedException { 119 final Error error = new Error(); 120 final CountDownLatch latch = new CountDownLatch(1); 121 TestService service = new TestService(); 122 service.addListener( 123 new Service.Listener() { 124 @Override 125 public void running() { 126 throw error; 127 } 128 129 @Override 130 public void failed(State from, Throwable failure) { 131 assertEquals(State.RUNNING, from); 132 assertEquals(error, failure); 133 latch.countDown(); 134 } 135 }, 136 directExecutor()); 137 service.startAsync(); 138 latch.await(); 139 140 assertEquals(0, service.numberOfTimesRunCalled.get()); 141 assertEquals(Service.State.FAILED, service.state()); 142 } 143 testFailOnExceptionFromShutDown()144 public void testFailOnExceptionFromShutDown() throws Exception { 145 TestService service = new TestService(); 146 service.shutDownException = new Exception(); 147 service.startAsync().awaitRunning(); 148 service.runFirstBarrier.await(); 149 service.stopAsync(); 150 service.runSecondBarrier.await(); 151 IllegalStateException e = 152 assertThrows(IllegalStateException.class, () -> service.awaitTerminated()); 153 assertThat(e).hasCauseThat().isEqualTo(service.shutDownException); 154 assertEquals(Service.State.FAILED, service.state()); 155 } 156 testRunOneIterationCalledMultipleTimes()157 public void testRunOneIterationCalledMultipleTimes() throws Exception { 158 TestService service = new TestService(); 159 service.startAsync().awaitRunning(); 160 for (int i = 1; i < 10; i++) { 161 service.runFirstBarrier.await(); 162 assertEquals(i, service.numberOfTimesRunCalled.get()); 163 service.runSecondBarrier.await(); 164 } 165 service.runFirstBarrier.await(); 166 service.stopAsync(); 167 service.runSecondBarrier.await(); 168 service.stopAsync().awaitTerminated(); 169 } 170 testExecutorOnlyCalledOnce()171 public void testExecutorOnlyCalledOnce() throws Exception { 172 TestService service = new TestService(); 173 service.startAsync().awaitRunning(); 174 // It should be called once during startup. 175 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 176 for (int i = 1; i < 10; i++) { 177 service.runFirstBarrier.await(); 178 assertEquals(i, service.numberOfTimesRunCalled.get()); 179 service.runSecondBarrier.await(); 180 } 181 service.runFirstBarrier.await(); 182 service.stopAsync(); 183 service.runSecondBarrier.await(); 184 service.stopAsync().awaitTerminated(); 185 // Only called once overall. 186 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 187 } 188 testDefaultExecutorIsShutdownWhenServiceIsStopped()189 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { 190 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 191 AbstractScheduledService service = 192 new AbstractScheduledService() { 193 @Override 194 protected void runOneIteration() throws Exception {} 195 196 @Override 197 protected ScheduledExecutorService executor() { 198 executor.set(super.executor()); 199 return executor.get(); 200 } 201 202 @Override 203 protected Scheduler scheduler() { 204 return newFixedDelaySchedule(0, 1, MILLISECONDS); 205 } 206 }; 207 208 service.startAsync(); 209 assertFalse(service.executor().isShutdown()); 210 service.awaitRunning(); 211 service.stopAsync(); 212 service.awaitTerminated(); 213 assertTrue(executor.get().awaitTermination(100, MILLISECONDS)); 214 } 215 testDefaultExecutorIsShutdownWhenServiceFails()216 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { 217 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference(); 218 AbstractScheduledService service = 219 new AbstractScheduledService() { 220 @Override 221 protected void startUp() throws Exception { 222 throw new Exception("Failed"); 223 } 224 225 @Override 226 protected void runOneIteration() throws Exception {} 227 228 @Override 229 protected ScheduledExecutorService executor() { 230 executor.set(super.executor()); 231 return executor.get(); 232 } 233 234 @Override 235 protected Scheduler scheduler() { 236 return newFixedDelaySchedule(0, 1, MILLISECONDS); 237 } 238 }; 239 240 assertThrows(IllegalStateException.class, () -> service.startAsync().awaitRunning()); 241 242 assertTrue(executor.get().awaitTermination(100, MILLISECONDS)); 243 } 244 testSchedulerOnlyCalledOnce()245 public void testSchedulerOnlyCalledOnce() throws Exception { 246 TestService service = new TestService(); 247 service.startAsync().awaitRunning(); 248 // It should be called once during startup. 249 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 250 for (int i = 1; i < 10; i++) { 251 service.runFirstBarrier.await(); 252 assertEquals(i, service.numberOfTimesRunCalled.get()); 253 service.runSecondBarrier.await(); 254 } 255 service.runFirstBarrier.await(); 256 service.stopAsync(); 257 service.runSecondBarrier.await(); 258 service.awaitTerminated(); 259 // Only called once overall. 260 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 261 } 262 testTimeout()263 public void testTimeout() { 264 // Create a service whose executor will never run its commands 265 Service service = 266 new AbstractScheduledService() { 267 @Override 268 protected Scheduler scheduler() { 269 return Scheduler.newFixedDelaySchedule(0, 1, NANOSECONDS); 270 } 271 272 @Override 273 protected ScheduledExecutorService executor() { 274 return TestingExecutors.noOpScheduledExecutor(); 275 } 276 277 @Override 278 protected void runOneIteration() throws Exception {} 279 280 @Override 281 protected String serviceName() { 282 return "Foo"; 283 } 284 }; 285 TimeoutException e = 286 assertThrows( 287 TimeoutException.class, () -> service.startAsync().awaitRunning(1, MILLISECONDS)); 288 assertThat(e) 289 .hasMessageThat() 290 .isEqualTo("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); 291 } 292 293 private class TestService extends AbstractScheduledService { 294 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 295 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 296 297 volatile boolean startUpCalled = false; 298 volatile boolean shutDownCalled = false; 299 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 300 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 301 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 302 volatile @Nullable Exception runException = null; 303 volatile @Nullable Exception startUpException = null; 304 volatile @Nullable Exception shutDownException = null; 305 306 @Override runOneIteration()307 protected void runOneIteration() throws Exception { 308 assertTrue(startUpCalled); 309 assertFalse(shutDownCalled); 310 numberOfTimesRunCalled.incrementAndGet(); 311 assertEquals(State.RUNNING, state()); 312 runFirstBarrier.await(); 313 runSecondBarrier.await(); 314 if (runException != null) { 315 throw runException; 316 } 317 } 318 319 @Override startUp()320 protected void startUp() throws Exception { 321 assertFalse(startUpCalled); 322 assertFalse(shutDownCalled); 323 startUpCalled = true; 324 assertEquals(State.STARTING, state()); 325 if (startUpException != null) { 326 throw startUpException; 327 } 328 } 329 330 @Override shutDown()331 protected void shutDown() throws Exception { 332 assertTrue(startUpCalled); 333 assertFalse(shutDownCalled); 334 shutDownCalled = true; 335 if (shutDownException != null) { 336 throw shutDownException; 337 } 338 } 339 340 @Override executor()341 protected ScheduledExecutorService executor() { 342 numberOfTimesExecutorCalled.incrementAndGet(); 343 return executor; 344 } 345 346 @Override scheduler()347 protected Scheduler scheduler() { 348 numberOfTimesSchedulerCalled.incrementAndGet(); 349 return configuration; 350 } 351 } 352 353 // Tests for Scheduler: 354 355 // These constants are arbitrary and just used to make sure that the correct method is called 356 // with the correct parameters. 357 private static final int INITIAL_DELAY = 10; 358 private static final int DELAY = 20; 359 private static final TimeUnit UNIT = MILLISECONDS; 360 361 // Unique runnable object used for comparison. 362 final Runnable testRunnable = 363 new Runnable() { 364 @Override 365 public void run() {} 366 }; 367 boolean called = false; 368 assertSingleCallWithCorrectParameters( Runnable command, long initialDelay, long delay, TimeUnit unit)369 private void assertSingleCallWithCorrectParameters( 370 Runnable command, long initialDelay, long delay, TimeUnit unit) { 371 assertFalse(called); // only called once. 372 called = true; 373 assertEquals(INITIAL_DELAY, initialDelay); 374 assertEquals(DELAY, delay); 375 assertEquals(UNIT, unit); 376 assertEquals(testRunnable, command); 377 } 378 testFixedRateSchedule()379 public void testFixedRateSchedule() { 380 Scheduler schedule = Scheduler.newFixedRateSchedule(INITIAL_DELAY, DELAY, UNIT); 381 Cancellable unused = 382 schedule.schedule( 383 null, 384 new ScheduledThreadPoolExecutor(1) { 385 @Override 386 public ScheduledFuture<?> scheduleAtFixedRate( 387 Runnable command, long initialDelay, long period, TimeUnit unit) { 388 assertSingleCallWithCorrectParameters(command, initialDelay, period, unit); 389 return new ThrowingScheduledFuture<>(); 390 } 391 }, 392 testRunnable); 393 assertTrue(called); 394 } 395 testFixedDelaySchedule()396 public void testFixedDelaySchedule() { 397 Scheduler schedule = newFixedDelaySchedule(INITIAL_DELAY, DELAY, UNIT); 398 Cancellable unused = 399 schedule.schedule( 400 null, 401 new ScheduledThreadPoolExecutor(10) { 402 @Override 403 public ScheduledFuture<?> scheduleWithFixedDelay( 404 Runnable command, long initialDelay, long delay, TimeUnit unit) { 405 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 406 return new ThrowingScheduledFuture<>(); 407 } 408 }, 409 testRunnable); 410 assertTrue(called); 411 } 412 413 private static final class ThrowingScheduledFuture<V> extends ForwardingFuture<V> 414 implements ScheduledFuture<V> { 415 @Override delegate()416 protected Future<V> delegate() { 417 throw new UnsupportedOperationException("test should not care about this"); 418 } 419 420 @Override getDelay(TimeUnit unit)421 public long getDelay(TimeUnit unit) { 422 throw new UnsupportedOperationException("test should not care about this"); 423 } 424 425 @Override compareTo(Delayed other)426 public int compareTo(Delayed other) { 427 throw new UnsupportedOperationException("test should not care about this"); 428 } 429 } 430 testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()431 public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() 432 throws Exception { 433 TestAbstractScheduledCustomService service = 434 new TestAbstractScheduledCustomService() { 435 @Override 436 protected Scheduler scheduler() { 437 return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS); 438 } 439 }; 440 service.startAsync().awaitRunning(); 441 assertThrows(TimeoutException.class, () -> service.firstBarrier.await(5, SECONDS)); 442 assertEquals(0, service.numIterations.get()); 443 service.stopAsync(); 444 service.awaitTerminated(); 445 } 446 testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()447 public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() 448 throws Exception { 449 TestAbstractScheduledCustomService service = 450 new TestAbstractScheduledCustomService() { 451 @Override 452 protected Scheduler scheduler() { 453 return new AbstractScheduledService.CustomScheduler() { 454 @Override 455 protected Schedule getNextSchedule() throws Exception { 456 return new Schedule(Long.MAX_VALUE, SECONDS); 457 } 458 }; 459 } 460 }; 461 service.startAsync().awaitRunning(); 462 assertThrows(TimeoutException.class, () -> service.firstBarrier.await(5, SECONDS)); 463 assertEquals(0, service.numIterations.get()); 464 service.stopAsync(); 465 service.awaitTerminated(); 466 } 467 468 private static class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 469 public AtomicInteger scheduleCounter = new AtomicInteger(0); 470 471 @Override getNextSchedule()472 protected Schedule getNextSchedule() throws Exception { 473 scheduleCounter.incrementAndGet(); 474 return new Schedule(0, SECONDS); 475 } 476 } 477 testCustomSchedule_startStop()478 public void testCustomSchedule_startStop() throws Exception { 479 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 480 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 481 final AtomicBoolean shouldWait = new AtomicBoolean(true); 482 Runnable task = 483 new Runnable() { 484 @Override 485 public void run() { 486 try { 487 if (shouldWait.get()) { 488 firstBarrier.await(); 489 secondBarrier.await(); 490 } 491 } catch (Exception e) { 492 throw new RuntimeException(e); 493 } 494 } 495 }; 496 TestCustomScheduler scheduler = new TestCustomScheduler(); 497 Cancellable future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 498 firstBarrier.await(); 499 assertEquals(1, scheduler.scheduleCounter.get()); 500 secondBarrier.await(); 501 firstBarrier.await(); 502 assertEquals(2, scheduler.scheduleCounter.get()); 503 shouldWait.set(false); 504 secondBarrier.await(); 505 future.cancel(false); 506 } 507 testCustomSchedulerServiceStop()508 public void testCustomSchedulerServiceStop() throws Exception { 509 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 510 service.startAsync().awaitRunning(); 511 service.firstBarrier.await(); 512 assertEquals(1, service.numIterations.get()); 513 service.stopAsync(); 514 service.secondBarrier.await(); 515 service.awaitTerminated(); 516 // Sleep for a while just to ensure that our task wasn't called again. 517 Thread.sleep(UNIT.toMillis(3 * DELAY)); 518 assertEquals(1, service.numIterations.get()); 519 } 520 testCustomScheduler_deadlock()521 public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException { 522 final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2); 523 // This will flakily deadlock, so run it multiple times to increase the flake likelihood 524 for (int i = 0; i < 1000; i++) { 525 Service service = 526 new AbstractScheduledService() { 527 @Override 528 protected void runOneIteration() {} 529 530 @Override 531 protected Scheduler scheduler() { 532 return new CustomScheduler() { 533 @Override 534 protected Schedule getNextSchedule() throws Exception { 535 if (state() != State.STARTING) { 536 inGetNextSchedule.await(); 537 Thread.yield(); 538 throw new RuntimeException("boom"); 539 } 540 return new Schedule(0, NANOSECONDS); 541 } 542 }; 543 } 544 }; 545 service.startAsync().awaitRunning(); 546 inGetNextSchedule.await(); 547 service.stopAsync(); 548 } 549 } 550 testBig()551 public void testBig() throws Exception { 552 TestAbstractScheduledCustomService service = 553 new TestAbstractScheduledCustomService() { 554 @Override 555 protected Scheduler scheduler() { 556 return new AbstractScheduledService.CustomScheduler() { 557 @Override 558 protected Schedule getNextSchedule() throws Exception { 559 // Explicitly yield to increase the probability of a pathological scheduling. 560 Thread.yield(); 561 return new Schedule(0, SECONDS); 562 } 563 }; 564 } 565 }; 566 service.useBarriers = false; 567 service.startAsync().awaitRunning(); 568 Thread.sleep(50); 569 service.useBarriers = true; 570 service.firstBarrier.await(); 571 int numIterations = service.numIterations.get(); 572 service.stopAsync(); 573 service.secondBarrier.await(); 574 service.awaitTerminated(); 575 assertEquals(numIterations, service.numIterations.get()); 576 } 577 578 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 579 final AtomicInteger numIterations = new AtomicInteger(0); 580 volatile boolean useBarriers = true; 581 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 582 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 583 584 @Override runOneIteration()585 protected void runOneIteration() throws Exception { 586 numIterations.incrementAndGet(); 587 if (useBarriers) { 588 firstBarrier.await(); 589 secondBarrier.await(); 590 } 591 } 592 593 @Override executor()594 protected ScheduledExecutorService executor() { 595 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 596 return Executors.newScheduledThreadPool(10); 597 } 598 599 @Override scheduler()600 protected Scheduler scheduler() { 601 return new CustomScheduler() { 602 @Override 603 protected Schedule getNextSchedule() throws Exception { 604 return new Schedule(DELAY, UNIT); 605 } 606 }; 607 } 608 } 609 610 public void testCustomSchedulerFailure() throws Exception { 611 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 612 service.startAsync().awaitRunning(); 613 for (int i = 1; i < 4; i++) { 614 service.firstBarrier.await(); 615 assertEquals(i, service.numIterations.get()); 616 service.secondBarrier.await(); 617 } 618 Thread.sleep(1000); 619 assertThrows( 620 IllegalStateException.class, () -> service.stopAsync().awaitTerminated(100, SECONDS)); 621 assertEquals(State.FAILED, service.state()); 622 } 623 624 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 625 final AtomicInteger numIterations = new AtomicInteger(0); 626 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 627 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 628 629 @Override 630 protected void runOneIteration() throws Exception { 631 numIterations.incrementAndGet(); 632 firstBarrier.await(); 633 secondBarrier.await(); 634 } 635 636 @Override 637 protected ScheduledExecutorService executor() { 638 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 639 return Executors.newScheduledThreadPool(10); 640 } 641 642 @Override 643 protected Scheduler scheduler() { 644 return new CustomScheduler() { 645 @Override 646 protected Schedule getNextSchedule() throws Exception { 647 if (numIterations.get() > 2) { 648 throw new IllegalStateException("Failed"); 649 } 650 return new Schedule(DELAY, UNIT); 651 } 652 }; 653 } 654 } 655 } 656