1 /* 2 * Copyright (C) 2018 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.google.common.util.concurrent; 17 18 import static com.google.common.truth.Truth.assertThat; 19 20 import java.lang.reflect.Method; 21 import java.nio.channels.spi.AbstractInterruptibleChannel; 22 import java.util.concurrent.CountDownLatch; 23 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.locks.AbstractOwnableSynchronizer; 25 import java.util.concurrent.locks.LockSupport; 26 import junit.framework.TestCase; 27 28 29 public final class InterruptibleTaskTest extends TestCase { 30 31 // Regression test for a deadlock where a task could be stuck busy waiting for the task to 32 // transition to DONE testInterruptThrows()33 public void testInterruptThrows() throws Exception { 34 final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1); 35 InterruptibleTask<Void> task = 36 new InterruptibleTask<Void>() { 37 @Override 38 Void runInterruptibly() throws Exception { 39 BrokenChannel bc = new BrokenChannel(); 40 bc.doBegin(); 41 isInterruptibleRegistered.countDown(); 42 new CountDownLatch(1).await(); // the interrupt will wake us up 43 return null; 44 } 45 46 @Override 47 boolean isDone() { 48 return false; 49 } 50 51 @Override 52 String toPendingString() { 53 return ""; 54 } 55 56 @Override 57 void afterRanInterruptiblySuccess(Void result) {} 58 59 @Override 60 void afterRanInterruptiblyFailure(Throwable error) {} 61 }; 62 Thread runner = new Thread(task); 63 runner.start(); 64 isInterruptibleRegistered.await(); 65 try { 66 task.interruptTask(); 67 fail(); 68 } catch (RuntimeException expected) { 69 assertThat(expected) 70 .hasMessageThat() 71 .isEqualTo("I bet you didn't think Thread.interrupt could throw"); 72 } 73 // We need to wait for the runner to exit. It used to be that the runner would get stuck in the 74 // busy loop when interrupt threw. 75 runner.join(TimeUnit.SECONDS.toMillis(10)); 76 } 77 78 static final class BrokenChannel extends AbstractInterruptibleChannel { 79 @Override implCloseChannel()80 protected void implCloseChannel() { 81 throw new RuntimeException("I bet you didn't think Thread.interrupt could throw"); 82 } 83 doBegin()84 void doBegin() { 85 super.begin(); 86 } 87 } 88 89 /** 90 * Because Thread.interrupt() can invoke arbitrary code, it can be slow (e.g. perform IO). To 91 * protect ourselves from that we want to make sure that tasks don't spin too much waiting for the 92 * interrupting thread to complete the protocol. 93 */ testInterruptIsSlow()94 public void testInterruptIsSlow() throws Exception { 95 final CountDownLatch isInterruptibleRegistered = new CountDownLatch(1); 96 final SlowChannel slowChannel = new SlowChannel(); 97 final InterruptibleTask<Void> task = 98 new InterruptibleTask<Void>() { 99 @Override 100 Void runInterruptibly() throws Exception { 101 slowChannel.doBegin(); 102 isInterruptibleRegistered.countDown(); 103 try { 104 new CountDownLatch(1).await(); // the interrupt will wake us up 105 } catch (InterruptedException ie) { 106 // continue 107 } 108 LockSupport.unpark(Thread.currentThread()); // simulate a spurious wakeup. 109 return null; 110 } 111 112 @Override 113 boolean isDone() { 114 return false; 115 } 116 117 @Override 118 String toPendingString() { 119 return ""; 120 } 121 122 @Override 123 void afterRanInterruptiblySuccess(Void result) {} 124 125 @Override 126 void afterRanInterruptiblyFailure(Throwable error) {} 127 }; 128 Thread runner = new Thread(task, "runner"); 129 runner.start(); 130 isInterruptibleRegistered.await(); 131 // trigger the interrupt on another thread since it will block 132 Thread interrupter = 133 new Thread("Interrupter") { 134 @Override 135 public void run() { 136 task.interruptTask(); 137 } 138 }; 139 interrupter.start(); 140 // this will happen once the interrupt has been set which means that 141 // 1. the runner has been woken up 142 // 2. the interrupter is stuck in the call the Thread.interrupt() 143 144 // after some period of time the runner thread should become blocked on the task because it is 145 // waiting for the slow interrupting thread to complete Thread.interrupt 146 awaitBlockedOnInstanceOf(runner, InterruptibleTask.Blocker.class); 147 148 Object blocker = LockSupport.getBlocker(runner); 149 assertThat(blocker).isInstanceOf(AbstractOwnableSynchronizer.class); 150 Method getExclusiveOwnerThread = 151 AbstractOwnableSynchronizer.class.getDeclaredMethod("getExclusiveOwnerThread"); 152 getExclusiveOwnerThread.setAccessible(true); 153 Thread owner = (Thread) getExclusiveOwnerThread.invoke(blocker); 154 assertThat(owner).isSameInstanceAs(interrupter); 155 156 slowChannel.exitClose.countDown(); // release the interrupter 157 158 // We need to wait for the runner to exit. To make sure that the interrupting thread wakes it 159 // back up. 160 runner.join(TimeUnit.SECONDS.toMillis(10)); 161 } 162 163 // waits for the given thread to be blocked on the given object awaitBlockedOnInstanceOf(Thread t, Class<?> blocker)164 private static void awaitBlockedOnInstanceOf(Thread t, Class<?> blocker) 165 throws InterruptedException { 166 while (!isThreadBlockedOnInstanceOf(t, blocker)) { 167 if (t.getState() == Thread.State.TERMINATED) { 168 throw new RuntimeException("Thread " + t + " exited unexpectedly"); 169 } 170 Thread.sleep(1); 171 } 172 } 173 isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker)174 private static boolean isThreadBlockedOnInstanceOf(Thread t, Class<?> blocker) { 175 return t.getState() == Thread.State.WAITING && blocker.isInstance(LockSupport.getBlocker(t)); 176 } 177 178 static final class SlowChannel extends AbstractInterruptibleChannel { 179 final CountDownLatch exitClose = new CountDownLatch(1); 180 181 @Override implCloseChannel()182 protected void implCloseChannel() { 183 Uninterruptibles.awaitUninterruptibly(exitClose); 184 } 185 doBegin()186 void doBegin() { 187 super.begin(); 188 } 189 } 190 } 191