1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.google.common.util.concurrent; 17 18 import static com.google.common.truth.Truth.assertThat; 19 import static org.junit.Assert.assertThrows; 20 21 import com.google.common.util.concurrent.InterruptibleTask.Blocker; 22 import java.nio.channels.spi.AbstractInterruptibleChannel; 23 import java.util.concurrent.CountDownLatch; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.locks.LockSupport; 26 import junit.framework.TestCase; 27 import org.checkerframework.checker.nullness.qual.Nullable; 28 29 public final class InterruptibleTaskTest extends TestCase { 30 31 // Regression test for a deadlock where a task could be stuck busy waiting for the task to 32 // transition to DONE testInterruptThrows()33 public void testInterruptThrows() throws Exception { 34 final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1); 35 InterruptibleTask<@Nullable Void> task = 36 new InterruptibleTask<@Nullable Void>() { 37 @Override 38 @Nullable Void runInterruptibly() throws Exception { 39 BrokenChannel bc = new BrokenChannel(); 40 bc.doBegin(); 41 isInterruptibleRegistered.countDown(); 42 new CountDownLatch(1).await(); // the interrupt will wake us up 43 return null; 44 } 45 46 @Override 47 boolean isDone() { 48 return false; 49 } 50 51 @Override 52 String toPendingString() { 53 return ""; 54 } 55 56 @Override 57 void afterRanInterruptiblySuccess(@Nullable Void result) {} 58 59 @Override 60 void afterRanInterruptiblyFailure(Throwable error) {} 61 }; 62 Thread runner = new Thread(task); 63 runner.start(); 64 isInterruptibleRegistered.await(); 65 RuntimeException expected = assertThrows(RuntimeException.class, () -> task.interruptTask()); 66 assertThat(expected) 67 .hasMessageThat() 68 .isEqualTo("I bet you didn't think Thread.interrupt could throw"); 69 // We need to wait for the runner to exit. It used to be that the runner would get stuck in the 70 // busy loop when interrupt threw. 71 runner.join(TimeUnit.SECONDS.toMillis(10)); 72 } 73 74 static final class BrokenChannel extends AbstractInterruptibleChannel { 75 @Override implCloseChannel()76 protected void implCloseChannel() { 77 throw new RuntimeException("I bet you didn't think Thread.interrupt could throw"); 78 } 79 doBegin()80 void doBegin() { 81 super.begin(); 82 } 83 } 84 85 /** 86 * Because Thread.interrupt() can invoke arbitrary code, it can be slow (e.g. perform IO). To 87 * protect ourselves from that we want to make sure that tasks don't spin too much waiting for the 88 * interrupting thread to complete the protocol. 89 */ 90 /* 91 * This test hangs (or maybe is just *very* slow) under Android. 92 * 93 * TODO(b/218700094): Ideally, get this to pass under Android. Failing that, convince ourselves 94 * that the test isn't exposing a real problem with InterruptibleTask, one that could matter in 95 * prod. 96 */ 97 @AndroidIncompatible testInterruptIsSlow()98 public void testInterruptIsSlow() throws Exception { 99 final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1); 100 final SlowChannel slowChannel = new SlowChannel(); 101 final InterruptibleTask<@Nullable Void> task = 102 new InterruptibleTask<@Nullable Void>() { 103 @Override 104 @Nullable Void runInterruptibly() throws Exception { 105 slowChannel.doBegin(); 106 isInterruptibleRegistered.countDown(); 107 try { 108 new CountDownLatch(1).await(); // the interrupt will wake us up 109 } catch (InterruptedException ie) { 110 // continue 111 } 112 LockSupport.unpark(Thread.currentThread()); // simulate a spurious wakeup. 113 return null; 114 } 115 116 @Override 117 boolean isDone() { 118 return false; 119 } 120 121 @Override 122 String toPendingString() { 123 return ""; 124 } 125 126 @Override 127 void afterRanInterruptiblySuccess(@Nullable Void result) {} 128 129 @Override 130 void afterRanInterruptiblyFailure(Throwable error) {} 131 }; 132 Thread runner = new Thread(task, "runner"); 133 runner.start(); 134 isInterruptibleRegistered.await(); 135 // trigger the interrupt on another thread since it will block 136 Thread interrupter = 137 new Thread("Interrupter") { 138 @Override 139 public void run() { 140 task.interruptTask(); 141 } 142 }; 143 interrupter.start(); 144 // this will happen once the interrupt has been set which means that 145 // 1. the runner has been woken up 146 // 2. the interrupter is stuck in the call the Thread.interrupt() 147 148 // after some period of time the runner thread should become blocked on the task because it is 149 // waiting for the slow interrupting thread to complete Thread.interrupt 150 awaitBlockedOnInstanceOf(runner, InterruptibleTask.Blocker.class); 151 152 Blocker blocker = (Blocker) LockSupport.getBlocker(runner); 153 Thread owner = blocker.getOwner(); 154 assertThat(owner).isSameInstanceAs(interrupter); 155 156 slowChannel.exitClose.countDown(); // release the interrupter 157 158 // We need to wait for the runner to exit. To make sure that the interrupting thread wakes it 159 // back up. 160 runner.join(TimeUnit.SECONDS.toMillis(10)); 161 } 162 163 // waits for the given thread to be blocked on the given object awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)164 private static void awaitBlockedOnInstanceOf(Thread t, Class<?> blocker) 165 throws InterruptedException { 166 while (!isThreadBlockedOnInstanceOf(t, blocker)) { 167 if (t.getState() == Thread.State.TERMINATED) { 168 throw new RuntimeException("Thread " + t + " exited unexpectedly"); 169 } 170 Thread.sleep(1); 171 } 172 } 173 isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker)174 private static boolean isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker) { 175 return t.getState() == Thread.State.WAITING && blocker.isInstance(LockSupport.getBlocker(t)); 176 } 177 178 static final class SlowChannel extends AbstractInterruptibleChannel { 179 final CountDownLatch exitClose = new CountDownLatch(1); 180 181 @Override implCloseChannel()182 protected void implCloseChannel() { 183 Uninterruptibles.awaitUninterruptibly(exitClose); 184 } 185 doBegin()186 void doBegin() { 187 super.begin(); 188 } 189 } 190 } 191