• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.common.util.concurrent;
17 
18 import static com.google.common.truth.Truth.assertThat;
19 
20 import java.nio.channels.spi.AbstractInterruptibleChannel;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.LockSupport;
24 import junit.framework.TestCase;
25 
26 public final class InterruptibleTaskTest extends TestCase {
27 
28   // Regression test for a deadlock where a task could be stuck busy waiting for the task to
29   // transition to DONE
testInterruptThrows()30   public void testInterruptThrows() throws Exception {
31     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
32     InterruptibleTask<Void> task =
33         new InterruptibleTask<Void>() {
34           @Override
35           Void runInterruptibly() throws Exception {
36             BrokenChannel bc = new BrokenChannel();
37             bc.doBegin();
38             isInterruptibleRegistered.countDown();
39             new CountDownLatch(1).await(); // the interrupt will wake us up
40             return null;
41           }
42 
43           @Override
44           boolean isDone() {
45             return false;
46           }
47 
48           @Override
49           String toPendingString() {
50             return "";
51           }
52 
53           @Override
54           void afterRanInterruptibly(Void result, Throwable error) {}
55         };
56     Thread runner = new Thread(task);
57     runner.start();
58     isInterruptibleRegistered.await();
59     try {
60       task.interruptTask();
61       fail();
62     } catch (RuntimeException expected) {
63       assertThat(expected)
64           .hasMessageThat()
65           .isEqualTo("I bet you didn't think Thread.interrupt could throw");
66     }
67     // We need to wait for the runner to exit.  It used to be that the runner would get stuck in the
68     // busy loop when interrupt threw.
69     runner.join(TimeUnit.SECONDS.toMillis(10));
70   }
71 
72   static final class BrokenChannel extends AbstractInterruptibleChannel {
73     @Override
implCloseChannel()74     protected void implCloseChannel() {
75       throw new RuntimeException("I bet you didn't think Thread.interrupt could throw");
76     }
77 
doBegin()78     void doBegin() {
79       super.begin();
80     }
81   }
82 
83   /**
84    * Because Thread.interrupt() can invoke arbitrary code, it can be slow (e.g. perform IO). To
85    * protect ourselves from that we want to make sure that tasks don't spin too much waiting for the
86    * interrupting thread to complete the protocol.
87    */
testInterruptIsSlow()88   public void testInterruptIsSlow() throws Exception {
89     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
90     final SlowChannel slowChannel = new SlowChannel();
91     final InterruptibleTask<Void> task =
92         new InterruptibleTask<Void>() {
93           @Override
94           Void runInterruptibly() throws Exception {
95             slowChannel.doBegin();
96             isInterruptibleRegistered.countDown();
97             try {
98               new CountDownLatch(1).await(); // the interrupt will wake us up
99             } catch (InterruptedException ie) {
100               // continue
101             }
102             LockSupport.unpark(Thread.currentThread()); // simulate a spurious wakeup.
103             return null;
104           }
105 
106           @Override
107           boolean isDone() {
108             return false;
109           }
110 
111           @Override
112           String toPendingString() {
113             return "";
114           }
115 
116           @Override
117           void afterRanInterruptibly(Void result, Throwable error) {}
118         };
119     Thread runner = new Thread(task, "runner");
120     runner.start();
121     isInterruptibleRegistered.await();
122     // trigger the interrupt on another thread since it will block
123     new Thread("Interrupter") {
124       @Override
125       public void run() {
126         task.interruptTask();
127       }
128     }.start();
129     // this will happen once the interrupt has been set which means that
130     // 1. the runner has been woken up
131     // 2. the interrupter is stuck in the call the Thread.interrupt()
132 
133     // after some period of time the runner thread should become blocked on the task because it is
134     // waiting for the slow interrupting thread to complete Thread.interrupt
135     awaitBlockedOn(runner, task);
136 
137     slowChannel.exitClose.countDown(); // release the interrupter
138 
139     // We need to wait for the runner to exit.  To make sure that the interrupting thread wakes it
140     // back up.
141     runner.join(TimeUnit.SECONDS.toMillis(10));
142   }
143 
144   // waits for the given thread to be blocked on the given object
awaitBlockedOn(Thread t, Object blocker)145   private static void awaitBlockedOn(Thread t, Object blocker) throws InterruptedException {
146     while (!isThreadBlockedOn(t, blocker)) {
147       if (t.getState() == Thread.State.TERMINATED) {
148         throw new RuntimeException("Thread " + t + " exited unexpectedly");
149       }
150       Thread.sleep(1);
151     }
152   }
153 
isThreadBlockedOn(Thread t, Object blocker)154   private static boolean isThreadBlockedOn(Thread t, Object blocker) {
155     return t.getState() == Thread.State.WAITING && LockSupport.getBlocker(t) == blocker;
156   }
157 
158   static final class SlowChannel extends AbstractInterruptibleChannel {
159     final CountDownLatch exitClose = new CountDownLatch(1);
160 
161     @Override
implCloseChannel()162     protected void implCloseChannel() {
163       Uninterruptibles.awaitUninterruptibly(exitClose);
164     }
165 
doBegin()166     void doBegin() {
167       super.begin();
168     }
169   }
170 }
171