1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static org.junit.Assert.assertSame; 21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.fail; 23 import static org.mockito.Mockito.doAnswer; 24 import static org.mockito.Mockito.inOrder; 25 import static org.mockito.Mockito.never; 26 import static org.mockito.Mockito.verify; 27 28 import com.google.common.util.concurrent.testing.TestingExecutors; 29 import io.grpc.SynchronizationContext.ScheduledHandle; 30 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.CountDownLatch; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.ScheduledExecutorService; 34 import java.util.concurrent.ScheduledFuture; 35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicReference; 38 import org.junit.After; 39 import org.junit.Rule; 40 import org.junit.Test; 41 import org.junit.runner.RunWith; 42 import org.junit.runners.JUnit4; 43 import org.mockito.InOrder; 44 import org.mockito.Mock; 45 import org.mockito.invocation.InvocationOnMock; 46 import org.mockito.junit.MockitoJUnit; 47 import org.mockito.junit.MockitoRule; 48 import org.mockito.stubbing.Answer; 49 50 /** 51 * Unit tests for {@link SynchronizationContext}. 52 */ 53 @RunWith(JUnit4.class) 54 public class SynchronizationContextTest { 55 private final BlockingQueue<Throwable> uncaughtErrors = new LinkedBlockingQueue<>(); 56 private final SynchronizationContext syncContext = new SynchronizationContext( 57 new Thread.UncaughtExceptionHandler() { 58 @Override 59 public void uncaughtException(Thread t, Throwable e) { 60 uncaughtErrors.add(e); 61 } 62 }); 63 64 @Rule 65 public final MockitoRule mocks = MockitoJUnit.rule(); 66 67 @Mock 68 private Runnable task1; 69 70 @Mock 71 private Runnable task2; 72 73 @Mock 74 private Runnable task3; 75 tearDown()76 @After public void tearDown() { 77 assertThat(uncaughtErrors).isEmpty(); 78 } 79 80 @Test singleThread()81 public void singleThread() { 82 syncContext.executeLater(task1); 83 syncContext.executeLater(task2); 84 InOrder inOrder = inOrder(task1, task2, task3); 85 inOrder.verifyNoMoreInteractions(); 86 syncContext.drain(); 87 inOrder.verify(task1).run(); 88 inOrder.verify(task2).run(); 89 90 syncContext.executeLater(task3); 91 inOrder.verifyNoMoreInteractions(); 92 syncContext.drain(); 93 inOrder.verify(task3).run(); 94 } 95 96 @Test multiThread()97 public void multiThread() throws Exception { 98 InOrder inOrder = inOrder(task1, task2); 99 100 final CountDownLatch task1Added = new CountDownLatch(1); 101 final CountDownLatch task1Running = new CountDownLatch(1); 102 final CountDownLatch task1Proceed = new CountDownLatch(1); 103 final CountDownLatch sideThreadDone = new CountDownLatch(1); 104 final AtomicReference<Thread> task1Thread = new AtomicReference<>(); 105 final AtomicReference<Thread> task2Thread = new AtomicReference<>(); 106 107 doAnswer(new Answer<Void>() { 108 @Override 109 public Void answer(InvocationOnMock invocation) { 110 task1Thread.set(Thread.currentThread()); 111 task1Running.countDown(); 112 try { 113 assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); 114 } catch (InterruptedException e) { 115 throw new RuntimeException(e); 116 } 117 return null; 118 } 119 }).when(task1).run(); 120 121 doAnswer(new Answer<Void>() { 122 @Override 123 public Void answer(InvocationOnMock invocation) { 124 task2Thread.set(Thread.currentThread()); 125 return null; 126 } 127 }).when(task2).run(); 128 129 Thread sideThread = new Thread() { 130 @Override 131 public void run() { 132 syncContext.executeLater(task1); 133 task1Added.countDown(); 134 syncContext.drain(); 135 sideThreadDone.countDown(); 136 } 137 }; 138 sideThread.start(); 139 140 assertTrue(task1Added.await(5, TimeUnit.SECONDS)); 141 syncContext.executeLater(task2); 142 assertTrue(task1Running.await(5, TimeUnit.SECONDS)); 143 // This will do nothing because task1 is running until task1Proceed is set 144 syncContext.drain(); 145 146 inOrder.verify(task1).run(); 147 inOrder.verifyNoMoreInteractions(); 148 149 task1Proceed.countDown(); 150 // drain() on the side thread has returned, which runs task2 151 assertTrue(sideThreadDone.await(5, TimeUnit.SECONDS)); 152 inOrder.verify(task2).run(); 153 154 assertSame(sideThread, task1Thread.get()); 155 assertSame(sideThread, task2Thread.get()); 156 } 157 158 @Test throwIfNotInThisSynchronizationContext()159 public void throwIfNotInThisSynchronizationContext() throws Exception { 160 final AtomicBoolean taskSuccess = new AtomicBoolean(false); 161 final CountDownLatch task1Running = new CountDownLatch(1); 162 final CountDownLatch task1Proceed = new CountDownLatch(1); 163 164 doAnswer(new Answer<Void>() { 165 @Override 166 public Void answer(InvocationOnMock invocation) { 167 task1Running.countDown(); 168 syncContext.throwIfNotInThisSynchronizationContext(); 169 try { 170 assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); 171 } catch (InterruptedException e) { 172 throw new RuntimeException(e); 173 } 174 taskSuccess.set(true); 175 return null; 176 } 177 }).when(task1).run(); 178 179 Thread sideThread = new Thread() { 180 @Override 181 public void run() { 182 syncContext.execute(task1); 183 } 184 }; 185 sideThread.start(); 186 187 assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); 188 189 // syncContext is draining, but the current thread is not in the context 190 try { 191 syncContext.throwIfNotInThisSynchronizationContext(); 192 fail("Should throw"); 193 } catch (IllegalStateException e) { 194 assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext"); 195 } 196 197 // Let task1 finish 198 task1Proceed.countDown(); 199 sideThread.join(); 200 201 // throwIfNotInThisSynchronizationContext() didn't throw in task1 202 assertThat(taskSuccess.get()).isTrue(); 203 204 // syncContext is not draining, but the current thread is not in the context 205 try { 206 syncContext.throwIfNotInThisSynchronizationContext(); 207 fail("Should throw"); 208 } catch (IllegalStateException e) { 209 assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext"); 210 } 211 } 212 213 @Test taskThrows()214 public void taskThrows() { 215 InOrder inOrder = inOrder(task1, task2, task3); 216 final RuntimeException e = new RuntimeException("Simulated"); 217 doAnswer(new Answer<Void>() { 218 @Override 219 public Void answer(InvocationOnMock invocation) { 220 throw e; 221 } 222 }).when(task2).run(); 223 syncContext.executeLater(task1); 224 syncContext.executeLater(task2); 225 syncContext.executeLater(task3); 226 syncContext.drain(); 227 inOrder.verify(task1).run(); 228 inOrder.verify(task2).run(); 229 inOrder.verify(task3).run(); 230 assertThat(uncaughtErrors).containsExactly(e); 231 uncaughtErrors.clear(); 232 } 233 234 @Test schedule()235 public void schedule() { 236 MockScheduledExecutorService executorService = new MockScheduledExecutorService(); 237 ScheduledHandle handle = 238 syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService); 239 assertThat(executorService.delay) 240 .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); 241 assertThat(handle.isPending()).isTrue(); 242 verify(task1, never()).run(); 243 244 executorService.command.run(); 245 assertThat(handle.isPending()).isFalse(); 246 verify(task1).run(); 247 } 248 249 @Test scheduleDueImmediately()250 public void scheduleDueImmediately() { 251 MockScheduledExecutorService executorService = new MockScheduledExecutorService(); 252 ScheduledHandle handle = syncContext.schedule(task1, -1, TimeUnit.NANOSECONDS, executorService); 253 assertThat(executorService.delay) 254 .isEqualTo(executorService.unit.convert(-1, TimeUnit.NANOSECONDS)); 255 verify(task1, never()).run(); 256 assertThat(handle.isPending()).isTrue(); 257 258 executorService.command.run(); 259 assertThat(handle.isPending()).isFalse(); 260 verify(task1).run(); 261 } 262 263 @Test scheduleHandle_cancel()264 public void scheduleHandle_cancel() { 265 MockScheduledExecutorService executorService = new MockScheduledExecutorService(); 266 ScheduledHandle handle = 267 syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService); 268 assertThat(handle.isPending()).isTrue(); 269 assertThat(executorService.delay) 270 .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); 271 272 handle.cancel(); 273 assertThat(handle.isPending()).isFalse(); 274 syncContext.drain(); 275 assertThat(executorService.future.isCancelled()).isTrue(); 276 verify(task1, never()).run(); 277 } 278 279 // Test that a scheduled task is cancelled after the timer has expired on the 280 // ScheduledExecutorService, but before the task is run. 281 @Test scheduledHandle_cancelRacesWithTimerExpiration()282 public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { 283 MockScheduledExecutorService executorService = new MockScheduledExecutorService(); 284 285 final CountDownLatch task1Running = new CountDownLatch(1); 286 final LinkedBlockingQueue<ScheduledHandle> task2HandleQueue = new LinkedBlockingQueue<>(); 287 final AtomicBoolean task1Done = new AtomicBoolean(); 288 final CountDownLatch sideThreadDone = new CountDownLatch(1); 289 290 doAnswer(new Answer<Void>() { 291 @Override 292 public Void answer(InvocationOnMock invocation) { 293 task1Running.countDown(); 294 try { 295 ScheduledHandle task2Handle; 296 assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); 297 task2Handle.cancel(); 298 } catch (InterruptedException e) { 299 throw new RuntimeException(e); 300 } 301 task1Done.set(true); 302 return null; 303 } 304 }).when(task1).run(); 305 306 Thread sideThread = new Thread() { 307 @Override 308 public void run() { 309 syncContext.execute(task1); 310 sideThreadDone.countDown(); 311 } 312 }; 313 314 ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); 315 // This will execute and block in task1 316 sideThread.start(); 317 318 // Make sure task1 is running and blocking the execution 319 assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); 320 321 // Timer expires. task2 will be enqueued, but blocked by task1 322 assertThat(executorService.delay) 323 .isEqualTo(executorService.unit.convert(10, TimeUnit.NANOSECONDS)); 324 executorService.command.run(); 325 assertThat(handle.isPending()).isTrue(); 326 327 // Enqueue task3 following task2 328 syncContext.executeLater(task3); 329 330 // Let task1 proceed and cancel task2 331 task2HandleQueue.add(handle); 332 333 // Wait until sideThread is done, which would have finished task1 and task3, while skipping 334 // task2. 335 assertThat(sideThreadDone.await(5, TimeUnit.SECONDS)).isTrue(); 336 assertThat(task1Done.get()).isTrue(); 337 assertThat(handle.isPending()).isFalse(); 338 verify(task2, never()).run(); 339 verify(task3).run(); 340 } 341 342 static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { 343 private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); 344 345 Runnable command; 346 long delay; 347 TimeUnit unit; 348 ScheduledFuture<?> future; 349 delegate()350 @Override public ScheduledExecutorService delegate() { 351 return delegate; 352 } 353 schedule(Runnable command, long delay, TimeUnit unit)354 @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 355 this.command = command; 356 this.delay = delay; 357 this.unit = unit; 358 return future = super.schedule(command, delay, unit); 359 } 360 } 361 } 362