• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.common.util.concurrent;
17 
18 import static com.google.common.truth.Truth.assertThat;
19 import static org.junit.Assert.assertThrows;
20 
21 import com.google.common.util.concurrent.InterruptibleTask.Blocker;
22 import java.nio.channels.spi.AbstractInterruptibleChannel;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.LockSupport;
26 import junit.framework.TestCase;
27 import org.checkerframework.checker.nullness.qual.Nullable;
28 
29 public final class InterruptibleTaskTest extends TestCase {
30 
31   // Regression test for a deadlock where a task could be stuck busy waiting for the task to
32   // transition to DONE
testInterruptThrows()33   public void testInterruptThrows() throws Exception {
34     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
35     InterruptibleTask<@Nullable Void> task =
36         new InterruptibleTask<@Nullable Void>() {
37           @Override
38           @Nullable Void runInterruptibly() throws Exception {
39             BrokenChannel bc = new BrokenChannel();
40             bc.doBegin();
41             isInterruptibleRegistered.countDown();
42             new CountDownLatch(1).await(); // the interrupt will wake us up
43             return null;
44           }
45 
46           @Override
47           boolean isDone() {
48             return false;
49           }
50 
51           @Override
52           String toPendingString() {
53             return "";
54           }
55 
56           @Override
57           void afterRanInterruptiblySuccess(@Nullable Void result) {}
58 
59           @Override
60           void afterRanInterruptiblyFailure(Throwable error) {}
61         };
62     Thread runner = new Thread(task);
63     runner.start();
64     isInterruptibleRegistered.await();
65     RuntimeException expected = assertThrows(RuntimeException.class, () -> task.interruptTask());
66     assertThat(expected)
67         .hasMessageThat()
68         .isEqualTo("I bet you didn't think Thread.interrupt could throw");
69     // We need to wait for the runner to exit.  It used to be that the runner would get stuck in the
70     // busy loop when interrupt threw.
71     runner.join(TimeUnit.SECONDS.toMillis(10));
72   }
73 
74   static final class BrokenChannel extends AbstractInterruptibleChannel {
75     @Override
implCloseChannel()76     protected void implCloseChannel() {
77       throw new RuntimeException("I bet you didn't think Thread.interrupt could throw");
78     }
79 
doBegin()80     void doBegin() {
81       super.begin();
82     }
83   }
84 
85   /**
86    * Because Thread.interrupt() can invoke arbitrary code, it can be slow (e.g. perform IO). To
87    * protect ourselves from that we want to make sure that tasks don't spin too much waiting for the
88    * interrupting thread to complete the protocol.
89    */
90   /*
91    * This test hangs (or maybe is just *very* slow) under Android.
92    *
93    * TODO(b/218700094): Ideally, get this to pass under Android. Failing that, convince ourselves
94    * that the test isn't exposing a real problem with InterruptibleTask, one that could matter in
95    * prod.
96    */
97   @AndroidIncompatible
testInterruptIsSlow()98   public void testInterruptIsSlow() throws Exception {
99     final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1);
100     final SlowChannel slowChannel = new SlowChannel();
101     final InterruptibleTask<@Nullable Void> task =
102         new InterruptibleTask<@Nullable Void>() {
103           @Override
104           @Nullable Void runInterruptibly() throws Exception {
105             slowChannel.doBegin();
106             isInterruptibleRegistered.countDown();
107             try {
108               new CountDownLatch(1).await(); // the interrupt will wake us up
109             } catch (InterruptedException ie) {
110               // continue
111             }
112             LockSupport.unpark(Thread.currentThread()); // simulate a spurious wakeup.
113             return null;
114           }
115 
116           @Override
117           boolean isDone() {
118             return false;
119           }
120 
121           @Override
122           String toPendingString() {
123             return "";
124           }
125 
126           @Override
127           void afterRanInterruptiblySuccess(@Nullable Void result) {}
128 
129           @Override
130           void afterRanInterruptiblyFailure(Throwable error) {}
131         };
132     Thread runner = new Thread(task, "runner");
133     runner.start();
134     isInterruptibleRegistered.await();
135     // trigger the interrupt on another thread since it will block
136     Thread interrupter =
137         new Thread("Interrupter") {
138           @Override
139           public void run() {
140             task.interruptTask();
141           }
142         };
143     interrupter.start();
144     // this will happen once the interrupt has been set which means that
145     // 1. the runner has been woken up
146     // 2. the interrupter is stuck in the call the Thread.interrupt()
147 
148     // after some period of time the runner thread should become blocked on the task because it is
149     // waiting for the slow interrupting thread to complete Thread.interrupt
150     awaitBlockedOnInstanceOf(runner, InterruptibleTask.Blocker.class);
151 
152     Blocker blocker = (Blocker) LockSupport.getBlocker(runner);
153     Thread owner = blocker.getOwner();
154     assertThat(owner).isSameInstanceAs(interrupter);
155 
156     slowChannel.exitClose.countDown(); // release the interrupter
157 
158     // We need to wait for the runner to exit.  To make sure that the interrupting thread wakes it
159     // back up.
160     runner.join(TimeUnit.SECONDS.toMillis(10));
161   }
162 
163   // waits for the given thread to be blocked on the given object
awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)164   private static void awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)
165       throws InterruptedException {
166     while (!isThreadBlockedOnInstanceOf(t, blocker)) {
167       if (t.getState() == Thread.State.TERMINATED) {
168         throw new RuntimeException("Thread " + t + " exited unexpectedly");
169       }
170       Thread.sleep(1);
171     }
172   }
173 
isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker)174   private static boolean isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker) {
175     return t.getState() == Thread.State.WAITING && blocker.isInstance(LockSupport.getBlocker(t));
176   }
177 
178   static final class SlowChannel extends AbstractInterruptibleChannel {
179     final CountDownLatch exitClose = new CountDownLatch(1);
180 
181     @Override
implCloseChannel()182     protected void implCloseChannel() {
183       Uninterruptibles.awaitUninterruptibly(exitClose);
184     }
185 
doBegin()186     void doBegin() {
187       super.begin();
188     }
189   }
190 }
191