• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.common.util.concurrent;
17 
18 import static com.google.common.truth.Truth.assertThat;
19 
20 import java.lang.reflect.Method;
21 import java.nio.channels.spi.AbstractInterruptibleChannel;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.AbstractOwnableSynchronizer;
25 import java.util.concurrent.locks.LockSupport;
26 import junit.framework.TestCase;
27 
28 
29 public final class InterruptibleTaskTest extends TestCase {
30 
31   // Regression test for a deadlock where a task could be stuck busy waiting for the task to
32   // transition to DONE
testInterruptThrows()33   public void testInterruptThrows() throws Exception {
34     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
35     InterruptibleTask<Void> task =
36         new InterruptibleTask<Void>() {
37           @Override
38           Void runInterruptibly() throws Exception {
39             BrokenChannel bc = new BrokenChannel();
40             bc.doBegin();
41             isInterruptibleRegistered.countDown();
42             new CountDownLatch(1).await(); // the interrupt will wake us up
43             return null;
44           }
45 
46           @Override
47           boolean isDone() {
48             return false;
49           }
50 
51           @Override
52           String toPendingString() {
53             return "";
54           }
55 
56           @Override
57           void afterRanInterruptiblySuccess(Void result) {}
58 
59           @Override
60           void afterRanInterruptiblyFailure(Throwable error) {}
61         };
62     Thread runner = new Thread(task);
63     runner.start();
64     isInterruptibleRegistered.await();
65     try {
66       task.interruptTask();
67       fail();
68     } catch (RuntimeException expected) {
69       assertThat(expected)
70           .hasMessageThat()
71           .isEqualTo("I bet you didn't think Thread.interrupt could throw");
72     }
73     // We need to wait for the runner to exit.  It used to be that the runner would get stuck in the
74     // busy loop when interrupt threw.
75     runner.join(TimeUnit.SECONDS.toMillis(10));
76   }
77 
78   static final class BrokenChannel extends AbstractInterruptibleChannel {
79     @Override
implCloseChannel()80     protected void implCloseChannel() {
81       throw new RuntimeException("I bet you didn't think Thread.interrupt could throw");
82     }
83 
doBegin()84     void doBegin() {
85       super.begin();
86     }
87   }
88 
89   /**
90    * Because Thread.interrupt() can invoke arbitrary code, it can be slow (e.g. perform IO). To
91    * protect ourselves from that we want to make sure that tasks don't spin too much waiting for the
92    * interrupting thread to complete the protocol.
93    */
testInterruptIsSlow()94   public void testInterruptIsSlow() throws Exception {
95     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
96     final SlowChannel slowChannel = new SlowChannel();
97     final InterruptibleTask<Void> task =
98         new InterruptibleTask<Void>() {
99           @Override
100           Void runInterruptibly() throws Exception {
101             slowChannel.doBegin();
102             isInterruptibleRegistered.countDown();
103             try {
104               new CountDownLatch(1).await(); // the interrupt will wake us up
105             } catch (InterruptedException ie) {
106               // continue
107             }
108             LockSupport.unpark(Thread.currentThread()); // simulate a spurious wakeup.
109             return null;
110           }
111 
112           @Override
113           boolean isDone() {
114             return false;
115           }
116 
117           @Override
118           String toPendingString() {
119             return "";
120           }
121 
122           @Override
123           void afterRanInterruptiblySuccess(Void result) {}
124 
125           @Override
126           void afterRanInterruptiblyFailure(Throwable error) {}
127         };
128     Thread runner = new Thread(task, "runner");
129     runner.start();
130     isInterruptibleRegistered.await();
131     // trigger the interrupt on another thread since it will block
132     Thread interrupter =
133         new Thread("Interrupter") {
134           @Override
135           public void run() {
136             task.interruptTask();
137           }
138         };
139     interrupter.start();
140     // this will happen once the interrupt has been set which means that
141     // 1. the runner has been woken up
142     // 2. the interrupter is stuck in the call the Thread.interrupt()
143 
144     // after some period of time the runner thread should become blocked on the task because it is
145     // waiting for the slow interrupting thread to complete Thread.interrupt
146     awaitBlockedOnInstanceOf(runner, InterruptibleTask.Blocker.class);
147 
148     Object blocker = LockSupport.getBlocker(runner);
149     assertThat(blocker).isInstanceOf(AbstractOwnableSynchronizer.class);
150     Method getExclusiveOwnerThread =
151         AbstractOwnableSynchronizer.class.getDeclaredMethod("getExclusiveOwnerThread");
152     getExclusiveOwnerThread.setAccessible(true);
153     Thread owner = (Thread) getExclusiveOwnerThread.invoke(blocker);
154     assertThat(owner).isSameInstanceAs(interrupter);
155 
156     slowChannel.exitClose.countDown(); // release the interrupter
157 
158     // We need to wait for the runner to exit.  To make sure that the interrupting thread wakes it
159     // back up.
160     runner.join(TimeUnit.SECONDS.toMillis(10));
161   }
162 
163   // waits for the given thread to be blocked on the given object
awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)164   private static void awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)
165       throws InterruptedException {
166     while (!isThreadBlockedOnInstanceOf(t, blocker)) {
167       if (t.getState() == Thread.State.TERMINATED) {
168         throw new RuntimeException("Thread " + t + " exited unexpectedly");
169       }
170       Thread.sleep(1);
171     }
172   }
173 
isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker)174   private static boolean isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker) {
175     return t.getState() == Thread.State.WAITING && blocker.isInstance(LockSupport.getBlocker(t));
176   }
177 
178   static final class SlowChannel extends AbstractInterruptibleChannel {
179     final CountDownLatch exitClose = new CountDownLatch(1);
180 
181     @Override
implCloseChannel()182     protected void implCloseChannel() {
183       Uninterruptibles.awaitUninterruptibly(exitClose);
184     }
185 
doBegin()186     void doBegin() {
187       super.begin();
188     }
189   }
190 }
191