• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static org.junit.Assert.assertSame;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.inOrder;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.verify;
27 
28 import com.google.common.util.concurrent.testing.TestingExecutors;
29 import io.grpc.SynchronizationContext.ScheduledHandle;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.junit.After;
39 import org.junit.Rule;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 import org.junit.runners.JUnit4;
43 import org.mockito.InOrder;
44 import org.mockito.Mock;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.junit.MockitoJUnit;
47 import org.mockito.junit.MockitoRule;
48 import org.mockito.stubbing.Answer;
49 
50 /**
51  * Unit tests for {@link SynchronizationContext}.
52  */
53 @RunWith(JUnit4.class)
54 public class SynchronizationContextTest {
55   private final BlockingQueue<Throwable> uncaughtErrors = new LinkedBlockingQueue<>();
56   private final SynchronizationContext syncContext = new SynchronizationContext(
57       new Thread.UncaughtExceptionHandler() {
58         @Override
59         public void uncaughtException(Thread t, Throwable e) {
60           uncaughtErrors.add(e);
61         }
62       });
63 
64   @Rule
65   public final MockitoRule mocks = MockitoJUnit.rule();
66 
67   @Mock
68   private Runnable task1;
69 
70   @Mock
71   private Runnable task2;
72 
73   @Mock
74   private Runnable task3;
75 
tearDown()76   @After public void tearDown() {
77     assertThat(uncaughtErrors).isEmpty();
78   }
79 
80   @Test
singleThread()81   public void singleThread() {
82     syncContext.executeLater(task1);
83     syncContext.executeLater(task2);
84     InOrder inOrder = inOrder(task1, task2, task3);
85     inOrder.verifyNoMoreInteractions();
86     syncContext.drain();
87     inOrder.verify(task1).run();
88     inOrder.verify(task2).run();
89 
90     syncContext.executeLater(task3);
91     inOrder.verifyNoMoreInteractions();
92     syncContext.drain();
93     inOrder.verify(task3).run();
94   }
95 
96   @Test
multiThread()97   public void multiThread() throws Exception {
98     InOrder inOrder = inOrder(task1, task2);
99 
100     final CountDownLatch task1Added = new CountDownLatch(1);
101     final CountDownLatch task1Running = new CountDownLatch(1);
102     final CountDownLatch task1Proceed = new CountDownLatch(1);
103     final CountDownLatch sideThreadDone = new CountDownLatch(1);
104     final AtomicReference<Thread> task1Thread = new AtomicReference<>();
105     final AtomicReference<Thread> task2Thread = new AtomicReference<>();
106 
107     doAnswer(new Answer<Void>() {
108         @Override
109         public Void answer(InvocationOnMock invocation) {
110           task1Thread.set(Thread.currentThread());
111           task1Running.countDown();
112           try {
113             assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
114           } catch (InterruptedException e) {
115             throw new RuntimeException(e);
116           }
117           return null;
118         }
119       }).when(task1).run();
120 
121     doAnswer(new Answer<Void>() {
122         @Override
123         public Void answer(InvocationOnMock invocation) {
124           task2Thread.set(Thread.currentThread());
125           return null;
126         }
127       }).when(task2).run();
128 
129     Thread sideThread = new Thread() {
130         @Override
131         public void run() {
132           syncContext.executeLater(task1);
133           task1Added.countDown();
134           syncContext.drain();
135           sideThreadDone.countDown();
136         }
137       };
138     sideThread.start();
139 
140     assertTrue(task1Added.await(5, TimeUnit.SECONDS));
141     syncContext.executeLater(task2);
142     assertTrue(task1Running.await(5, TimeUnit.SECONDS));
143     // This will do nothing because task1 is running until task1Proceed is set
144     syncContext.drain();
145 
146     inOrder.verify(task1).run();
147     inOrder.verifyNoMoreInteractions();
148 
149     task1Proceed.countDown();
150     // drain() on the side thread has returned, which runs task2
151     assertTrue(sideThreadDone.await(5, TimeUnit.SECONDS));
152     inOrder.verify(task2).run();
153 
154     assertSame(sideThread, task1Thread.get());
155     assertSame(sideThread, task2Thread.get());
156   }
157 
158   @Test
throwIfNotInThisSynchronizationContext()159   public void throwIfNotInThisSynchronizationContext() throws Exception {
160     final AtomicBoolean taskSuccess = new AtomicBoolean(false);
161     final CountDownLatch task1Running = new CountDownLatch(1);
162     final CountDownLatch task1Proceed = new CountDownLatch(1);
163 
164     doAnswer(new Answer<Void>() {
165         @Override
166         public Void answer(InvocationOnMock invocation) {
167           task1Running.countDown();
168           syncContext.throwIfNotInThisSynchronizationContext();
169           try {
170             assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
171           } catch (InterruptedException e) {
172             throw new RuntimeException(e);
173           }
174           taskSuccess.set(true);
175           return null;
176         }
177       }).when(task1).run();
178 
179     Thread sideThread = new Thread() {
180         @Override
181         public void run() {
182           syncContext.execute(task1);
183         }
184       };
185     sideThread.start();
186 
187     assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue();
188 
189     // syncContext is draining, but the current thread is not in the context
190     try {
191       syncContext.throwIfNotInThisSynchronizationContext();
192       fail("Should throw");
193     } catch (IllegalStateException e) {
194       assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
195     }
196 
197     // Let task1 finish
198     task1Proceed.countDown();
199     sideThread.join();
200 
201     // throwIfNotInThisSynchronizationContext() didn't throw in task1
202     assertThat(taskSuccess.get()).isTrue();
203 
204     // syncContext is not draining, but the current thread is not in the context
205     try {
206       syncContext.throwIfNotInThisSynchronizationContext();
207       fail("Should throw");
208     } catch (IllegalStateException e) {
209       assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
210     }
211   }
212 
213   @Test
taskThrows()214   public void taskThrows() {
215     InOrder inOrder = inOrder(task1, task2, task3);
216     final RuntimeException e = new RuntimeException("Simulated");
217     doAnswer(new Answer<Void>() {
218         @Override
219         public Void answer(InvocationOnMock invocation) {
220           throw e;
221         }
222       }).when(task2).run();
223     syncContext.executeLater(task1);
224     syncContext.executeLater(task2);
225     syncContext.executeLater(task3);
226     syncContext.drain();
227     inOrder.verify(task1).run();
228     inOrder.verify(task2).run();
229     inOrder.verify(task3).run();
230     assertThat(uncaughtErrors).containsExactly(e);
231     uncaughtErrors.clear();
232   }
233 
234   @Test
schedule()235   public void schedule() {
236     MockScheduledExecutorService executorService = new MockScheduledExecutorService();
237     ScheduledHandle handle =
238         syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService);
239     assertThat(executorService.delay)
240         .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS));
241     assertThat(handle.isPending()).isTrue();
242     verify(task1, never()).run();
243 
244     executorService.command.run();
245     assertThat(handle.isPending()).isFalse();
246     verify(task1).run();
247   }
248 
249   @Test
scheduleDueImmediately()250   public void scheduleDueImmediately() {
251     MockScheduledExecutorService executorService = new MockScheduledExecutorService();
252     ScheduledHandle handle = syncContext.schedule(task1, -1, TimeUnit.NANOSECONDS, executorService);
253     assertThat(executorService.delay)
254         .isEqualTo(executorService.unit.convert(-1, TimeUnit.NANOSECONDS));
255     verify(task1, never()).run();
256     assertThat(handle.isPending()).isTrue();
257 
258     executorService.command.run();
259     assertThat(handle.isPending()).isFalse();
260     verify(task1).run();
261   }
262 
263   @Test
scheduleHandle_cancel()264   public void scheduleHandle_cancel() {
265     MockScheduledExecutorService executorService = new MockScheduledExecutorService();
266     ScheduledHandle handle =
267         syncContext.schedule(task1, 110, TimeUnit.NANOSECONDS, executorService);
268     assertThat(handle.isPending()).isTrue();
269     assertThat(executorService.delay)
270         .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS));
271 
272     handle.cancel();
273     assertThat(handle.isPending()).isFalse();
274     syncContext.drain();
275     assertThat(executorService.future.isCancelled()).isTrue();
276     verify(task1, never()).run();
277   }
278 
279   // Test that a scheduled task is cancelled after the timer has expired on the
280   // ScheduledExecutorService, but before the task is run.
281   @Test
scheduledHandle_cancelRacesWithTimerExpiration()282   public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception {
283     MockScheduledExecutorService executorService = new MockScheduledExecutorService();
284 
285     final CountDownLatch task1Running = new CountDownLatch(1);
286     final LinkedBlockingQueue<ScheduledHandle> task2HandleQueue = new LinkedBlockingQueue<>();
287     final AtomicBoolean task1Done = new AtomicBoolean();
288     final CountDownLatch sideThreadDone = new CountDownLatch(1);
289 
290     doAnswer(new Answer<Void>() {
291         @Override
292         public Void answer(InvocationOnMock invocation) {
293           task1Running.countDown();
294           try {
295             ScheduledHandle task2Handle;
296             assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull();
297             task2Handle.cancel();
298           } catch (InterruptedException e) {
299             throw new RuntimeException(e);
300           }
301           task1Done.set(true);
302           return null;
303         }
304       }).when(task1).run();
305 
306     Thread sideThread = new Thread() {
307         @Override
308         public void run() {
309           syncContext.execute(task1);
310           sideThreadDone.countDown();
311         }
312       };
313 
314     ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService);
315     // This will execute and block in task1
316     sideThread.start();
317 
318     // Make sure task1 is running and blocking the execution
319     assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue();
320 
321     // Timer expires. task2 will be enqueued, but blocked by task1
322     assertThat(executorService.delay)
323         .isEqualTo(executorService.unit.convert(10, TimeUnit.NANOSECONDS));
324     executorService.command.run();
325     assertThat(handle.isPending()).isTrue();
326 
327     // Enqueue task3 following task2
328     syncContext.executeLater(task3);
329 
330     // Let task1 proceed and cancel task2
331     task2HandleQueue.add(handle);
332 
333     // Wait until sideThread is done, which would have finished task1 and task3, while skipping
334     // task2.
335     assertThat(sideThreadDone.await(5, TimeUnit.SECONDS)).isTrue();
336     assertThat(task1Done.get()).isTrue();
337     assertThat(handle.isPending()).isFalse();
338     verify(task2, never()).run();
339     verify(task3).run();
340   }
341 
342   static class MockScheduledExecutorService extends ForwardingScheduledExecutorService {
343     private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor();
344 
345     Runnable command;
346     long delay;
347     TimeUnit unit;
348     ScheduledFuture<?> future;
349 
delegate()350     @Override public ScheduledExecutorService delegate() {
351       return delegate;
352     }
353 
schedule(Runnable command, long delay, TimeUnit unit)354     @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
355       this.command = command;
356       this.delay = delay;
357       this.unit = unit;
358       return future = super.schedule(command, delay, unit);
359     }
360   }
361 }
362