1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.util.concurrent.MoreExecutors.newSequentialExecutor; 21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 22 23 import com.google.common.collect.ImmutableList; 24 import com.google.common.collect.Lists; 25 import com.google.common.collect.Queues; 26 import java.util.List; 27 import java.util.Queue; 28 import java.util.concurrent.CountDownLatch; 29 import java.util.concurrent.CyclicBarrier; 30 import java.util.concurrent.ExecutionException; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.ExecutorService; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.Future; 35 import java.util.concurrent.RejectedExecutionException; 36 import java.util.concurrent.TimeUnit; 37 import java.util.concurrent.atomic.AtomicBoolean; 38 import java.util.concurrent.atomic.AtomicInteger; 39 import junit.framework.TestCase; 40 41 /** 42 * Tests {@link SequentialExecutor}. 43 * 44 * @author JJ Furman 45 */ 46 public class SequentialExecutorTest extends TestCase { 47 48 private static class FakeExecutor implements Executor { 49 Queue<Runnable> tasks = Queues.newArrayDeque(); 50 51 @Override execute(Runnable command)52 public void execute(Runnable command) { 53 tasks.add(command); 54 } 55 hasNext()56 boolean hasNext() { 57 return !tasks.isEmpty(); 58 } 59 runNext()60 void runNext() { 61 assertTrue("expected at least one task to run", hasNext()); 62 tasks.remove().run(); 63 } 64 runAll()65 void runAll() { 66 while (hasNext()) { 67 runNext(); 68 } 69 } 70 } 71 72 private FakeExecutor fakePool; 73 private SequentialExecutor e; 74 75 @Override setUp()76 public void setUp() { 77 fakePool = new FakeExecutor(); 78 e = new SequentialExecutor(fakePool); 79 } 80 testConstructingWithNullExecutor_fails()81 public void testConstructingWithNullExecutor_fails() { 82 try { 83 new SequentialExecutor(null); 84 fail("Should have failed with NullPointerException."); 85 } catch (NullPointerException expected) { 86 } 87 } 88 testBasics()89 public void testBasics() { 90 final AtomicInteger totalCalls = new AtomicInteger(); 91 Runnable intCounter = 92 new Runnable() { 93 @Override 94 public void run() { 95 totalCalls.incrementAndGet(); 96 // Make sure that no other tasks are scheduled to run while this is running. 97 assertFalse(fakePool.hasNext()); 98 } 99 }; 100 101 assertFalse(fakePool.hasNext()); 102 e.execute(intCounter); 103 // A task should have been scheduled 104 assertTrue(fakePool.hasNext()); 105 e.execute(intCounter); 106 // Our executor hasn't run any tasks yet. 107 assertEquals(0, totalCalls.get()); 108 fakePool.runAll(); 109 assertEquals(2, totalCalls.get()); 110 // Queue is empty so no runner should be scheduled. 111 assertFalse(fakePool.hasNext()); 112 113 // Check that execute can be safely repeated 114 e.execute(intCounter); 115 e.execute(intCounter); 116 e.execute(intCounter); 117 // No change yet. 118 assertEquals(2, totalCalls.get()); 119 fakePool.runAll(); 120 assertEquals(5, totalCalls.get()); 121 assertFalse(fakePool.hasNext()); 122 } 123 testOrdering()124 public void testOrdering() { 125 final List<Integer> callOrder = Lists.newArrayList(); 126 127 class FakeOp implements Runnable { 128 final int op; 129 130 FakeOp(int op) { 131 this.op = op; 132 } 133 134 @Override 135 public void run() { 136 callOrder.add(op); 137 } 138 } 139 140 e.execute(new FakeOp(0)); 141 e.execute(new FakeOp(1)); 142 e.execute(new FakeOp(2)); 143 fakePool.runAll(); 144 145 assertEquals(ImmutableList.of(0, 1, 2), callOrder); 146 } 147 testRuntimeException_doesNotStopExecution()148 public void testRuntimeException_doesNotStopExecution() { 149 150 final AtomicInteger numCalls = new AtomicInteger(); 151 152 Runnable runMe = 153 new Runnable() { 154 @Override 155 public void run() { 156 numCalls.incrementAndGet(); 157 throw new RuntimeException("FAKE EXCEPTION!"); 158 } 159 }; 160 161 e.execute(runMe); 162 e.execute(runMe); 163 fakePool.runAll(); 164 165 assertEquals(2, numCalls.get()); 166 } 167 testInterrupt_beforeRunRestoresInterruption()168 public void testInterrupt_beforeRunRestoresInterruption() throws Exception { 169 // Run a task on the composed Executor that interrupts its thread (i.e. this thread). 170 fakePool.execute( 171 new Runnable() { 172 @Override 173 public void run() { 174 Thread.currentThread().interrupt(); 175 } 176 }); 177 // Run a task that expects that it is not interrupted while it is running. 178 e.execute( 179 new Runnable() { 180 @Override 181 public void run() { 182 assertThat(Thread.currentThread().isInterrupted()).isFalse(); 183 } 184 }); 185 186 // Run these together. 187 fakePool.runAll(); 188 189 // Check that this thread has been marked as interrupted again now that the thread has been 190 // returned by SequentialExecutor. Clear the bit while checking so that the test doesn't hose 191 // JUnit or some other test case. 192 assertThat(Thread.interrupted()).isTrue(); 193 } 194 testInterrupt_doesNotInterruptSubsequentTask()195 public void testInterrupt_doesNotInterruptSubsequentTask() throws Exception { 196 // Run a task that interrupts its thread (i.e. this thread). 197 e.execute( 198 new Runnable() { 199 @Override 200 public void run() { 201 Thread.currentThread().interrupt(); 202 } 203 }); 204 // Run a task that expects that it is not interrupted while it is running. 205 e.execute( 206 new Runnable() { 207 @Override 208 public void run() { 209 assertThat(Thread.currentThread().isInterrupted()).isFalse(); 210 } 211 }); 212 213 // Run those tasks together. 214 fakePool.runAll(); 215 216 // Check that the interruption of a SequentialExecutor's task is restored to the thread once 217 // it is yielded. Clear the bit while checking so that the test doesn't hose JUnit or some other 218 // test case. 219 assertThat(Thread.interrupted()).isTrue(); 220 } 221 testInterrupt_doesNotStopExecution()222 public void testInterrupt_doesNotStopExecution() { 223 224 final AtomicInteger numCalls = new AtomicInteger(); 225 226 Runnable runMe = 227 new Runnable() { 228 @Override 229 public void run() { 230 numCalls.incrementAndGet(); 231 } 232 }; 233 234 Thread.currentThread().interrupt(); 235 236 e.execute(runMe); 237 e.execute(runMe); 238 fakePool.runAll(); 239 240 assertEquals(2, numCalls.get()); 241 242 assertTrue(Thread.interrupted()); 243 } 244 testDelegateRejection()245 public void testDelegateRejection() { 246 final AtomicInteger numCalls = new AtomicInteger(); 247 final AtomicBoolean reject = new AtomicBoolean(true); 248 final SequentialExecutor executor = 249 new SequentialExecutor( 250 new Executor() { 251 @Override 252 public void execute(Runnable r) { 253 if (reject.get()) { 254 throw new RejectedExecutionException(); 255 } 256 r.run(); 257 } 258 }); 259 Runnable task = 260 new Runnable() { 261 @Override 262 public void run() { 263 numCalls.incrementAndGet(); 264 } 265 }; 266 try { 267 executor.execute(task); 268 fail(); 269 } catch (RejectedExecutionException expected) { 270 } 271 assertEquals(0, numCalls.get()); 272 reject.set(false); 273 executor.execute(task); 274 assertEquals(1, numCalls.get()); 275 } 276 277 testTaskThrowsError()278 public void testTaskThrowsError() throws Exception { 279 class MyError extends Error {} 280 final CyclicBarrier barrier = new CyclicBarrier(2); 281 // we need to make sure the error gets thrown on a different thread. 282 ExecutorService service = Executors.newSingleThreadExecutor(); 283 try { 284 final SequentialExecutor executor = new SequentialExecutor(service); 285 Runnable errorTask = 286 new Runnable() { 287 @Override 288 public void run() { 289 throw new MyError(); 290 } 291 }; 292 Runnable barrierTask = 293 new Runnable() { 294 @Override 295 public void run() { 296 try { 297 barrier.await(); 298 } catch (Exception e) { 299 throw new RuntimeException(e); 300 } 301 } 302 }; 303 executor.execute(errorTask); 304 service.execute(barrierTask); // submit directly to the service 305 // the barrier task runs after the error task so we know that the error has been observed by 306 // SequentialExecutor by the time the barrier is satified 307 barrier.await(1, TimeUnit.SECONDS); 308 executor.execute(barrierTask); 309 // timeout means the second task wasn't even tried 310 barrier.await(1, TimeUnit.SECONDS); 311 } finally { 312 service.shutdown(); 313 } 314 } 315 316 testRejectedExecutionThrownWithMultipleCalls()317 public void testRejectedExecutionThrownWithMultipleCalls() throws Exception { 318 final CountDownLatch latch = new CountDownLatch(1); 319 final SettableFuture<?> future = SettableFuture.create(); 320 final Executor delegate = 321 new Executor() { 322 @Override 323 public void execute(Runnable task) { 324 if (future.set(null)) { 325 awaitUninterruptibly(latch); 326 } 327 throw new RejectedExecutionException(); 328 } 329 }; 330 final SequentialExecutor executor = new SequentialExecutor(delegate); 331 final ExecutorService blocked = Executors.newCachedThreadPool(); 332 Future<?> first = 333 blocked.submit( 334 new Runnable() { 335 @Override 336 public void run() { 337 executor.execute(Runnables.doNothing()); 338 } 339 }); 340 future.get(10, TimeUnit.SECONDS); 341 try { 342 executor.execute(Runnables.doNothing()); 343 fail(); 344 } catch (RejectedExecutionException expected) { 345 } 346 latch.countDown(); 347 try { 348 first.get(10, TimeUnit.SECONDS); 349 fail(); 350 } catch (ExecutionException expected) { 351 assertThat(expected).hasCauseThat().isInstanceOf(RejectedExecutionException.class); 352 } 353 } 354 testToString()355 public void testToString() { 356 final Runnable[] currentTask = new Runnable[1]; 357 final Executor delegate = 358 new Executor() { 359 @Override 360 public void execute(Runnable task) { 361 currentTask[0] = task; 362 task.run(); 363 currentTask[0] = null; 364 } 365 366 @Override 367 public String toString() { 368 return "theDelegate"; 369 } 370 }; 371 Executor sequential1 = newSequentialExecutor(delegate); 372 Executor sequential2 = newSequentialExecutor(delegate); 373 assertThat(sequential1.toString()).contains("theDelegate"); 374 assertThat(sequential1.toString()).isNotEqualTo(sequential2.toString()); 375 final String[] whileRunningToString = new String[1]; 376 sequential1.execute( 377 new Runnable() { 378 @Override 379 public void run() { 380 whileRunningToString[0] = "" + currentTask[0]; 381 } 382 383 @Override 384 public String toString() { 385 return "my runnable's toString"; 386 } 387 }); 388 assertThat(whileRunningToString[0]).contains("my runnable's toString"); 389 } 390 } 391