1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package libcore.java.net; 18 19 import java.io.Closeable; 20 import java.io.IOException; 21 import java.net.DatagramPacket; 22 import java.net.DatagramSocket; 23 import java.net.InetAddress; 24 import java.net.InetSocketAddress; 25 import java.net.ServerSocket; 26 import java.net.Socket; 27 import java.net.SocketAddress; 28 import java.net.SocketException; 29 import java.net.UnknownHostException; 30 import java.nio.channels.AsynchronousCloseException; 31 import java.nio.channels.ClosedChannelException; 32 import java.nio.channels.SocketChannel; 33 import java.util.ArrayList; 34 import java.util.List; 35 import java.util.concurrent.CopyOnWriteArrayList; 36 import java.util.concurrent.atomic.AtomicLong; 37 38 /** 39 * Test that Socket.close called on another thread interrupts a thread that's blocked doing 40 * network I/O. 41 */ 42 public class ConcurrentCloseTest extends junit.framework.TestCase { 43 private static final InetSocketAddress UNREACHABLE_ADDRESS 44 = new InetSocketAddress("192.0.2.0", 80); // RFC 5737 45 test_accept()46 public void test_accept() throws Exception { 47 ServerSocket ss = new ServerSocket(0); 48 new Killer(ss).start(); 49 try { 50 System.err.println("accept..."); 51 Socket s = ss.accept(); 52 fail("accept returned " + s + "!"); 53 } catch (SocketException expected) { 54 assertEquals("Socket closed", expected.getMessage()); 55 } 56 } 57 test_connect()58 public void test_connect() throws Exception { 59 Socket s = new Socket(); 60 new Killer(s).start(); 61 try { 62 System.err.println("connect..."); 63 s.connect(UNREACHABLE_ADDRESS); 64 fail("connect returned: " + s + "!"); 65 } catch (SocketException expected) { 66 assertEquals("Socket closed", expected.getMessage()); 67 } 68 } 69 test_connect_timeout()70 public void test_connect_timeout() throws Exception { 71 Socket s = new Socket(); 72 new Killer(s).start(); 73 try { 74 System.err.println("connect (with timeout)..."); 75 s.connect(UNREACHABLE_ADDRESS, 3600 * 1000); 76 fail("connect returned: " + s + "!"); 77 } catch (SocketException expected) { 78 assertEquals("Socket closed", expected.getMessage()); 79 } 80 } 81 test_connect_nonBlocking()82 public void test_connect_nonBlocking() throws Exception { 83 SocketChannel s = SocketChannel.open(); 84 new Killer(s.socket()).start(); 85 try { 86 System.err.println("connect (non-blocking)..."); 87 s.configureBlocking(false); 88 s.connect(UNREACHABLE_ADDRESS); 89 while (!s.finishConnect()) { 90 // Spin like a mad thing! 91 } 92 fail("connect returned: " + s + "!"); 93 } catch (SocketException expected) { 94 assertEquals("Socket closed", expected.getMessage()); 95 } catch (AsynchronousCloseException alsoOkay) { 96 // See below. 97 } catch (ClosedChannelException alsoOkay) { 98 // For now, I'm assuming that we're happy as long as we get any reasonable exception. 99 // It may be that we're supposed to guarantee only one or the other. 100 } 101 } 102 test_read()103 public void test_read() throws Exception { 104 SilentServer ss = new SilentServer(); 105 Socket s = new Socket(); 106 s.connect(ss.getLocalSocketAddress()); 107 new Killer(s).start(); 108 try { 109 System.err.println("read..."); 110 int i = s.getInputStream().read(); 111 fail("read returned: " + i); 112 } catch (SocketException expected) { 113 assertEquals("Socket closed", expected.getMessage()); 114 } 115 ss.close(); 116 } 117 test_read_multiple()118 public void test_read_multiple() throws Throwable { 119 SilentServer ss = new SilentServer(); 120 final Socket s = new Socket(); 121 s.connect(ss.getLocalSocketAddress()); 122 123 // We want to test that we unblock *all* the threads blocked on a socket, not just one. 124 // We know the implementation uses the same mechanism for all blocking calls, so we just 125 // test read(2) because it's the easiest to test. (recv(2), for example, is only accessible 126 // from Java via a synchronized method.) 127 final ArrayList<Thread> threads = new ArrayList<Thread>(); 128 final List<Throwable> thrownExceptions = new CopyOnWriteArrayList<Throwable>(); 129 for (int i = 0; i < 10; ++i) { 130 Thread t = new Thread(new Runnable() { 131 public void run() { 132 try { 133 try { 134 System.err.println("read..."); 135 int i = s.getInputStream().read(); 136 fail("read returned: " + i); 137 } catch (SocketException expected) { 138 assertEquals("Socket closed", expected.getMessage()); 139 } 140 } catch (Throwable ex) { 141 thrownExceptions.add(ex); 142 } 143 } 144 }); 145 threads.add(t); 146 } 147 for (Thread t : threads) { 148 t.start(); 149 } 150 new Killer(s).start(); 151 for (Thread t : threads) { 152 t.join(); 153 } 154 for (Throwable exception : thrownExceptions) { 155 throw exception; 156 } 157 158 ss.close(); 159 } 160 test_recv()161 public void test_recv() throws Exception { 162 DatagramSocket s = new DatagramSocket(); 163 byte[] buf = new byte[200]; 164 DatagramPacket p = new DatagramPacket(buf, 200); 165 new Killer(s).start(); 166 try { 167 System.err.println("receive..."); 168 s.receive(p); 169 fail("receive returned!"); 170 } catch (SocketException expected) { 171 assertEquals("Socket closed", expected.getMessage()); 172 } 173 } 174 test_write()175 public void test_write() throws Exception { 176 final SilentServer ss = new SilentServer(128); // Minimal receive buffer size. 177 178 // The test needs to send enough data to cause the write to block for long enough for 179 // the socket close to occur. Start with a large enough factor and, if that doesn't 180 // seem to be sufficient to cause the block, double it and try again. A factor of 2 181 // used to be OK, then that needed to be increased to 4 and now this is also not 182 // enough in some cases. (b/356850236) 183 int bufferFactor = 4; 184 185 while (true) { 186 Socket s = new Socket(); 187 188 // Set the send buffer size really small, to ensure we block. 189 int sendBufferSize = 1024; 190 s.setSendBufferSize(sendBufferSize); 191 sendBufferSize = s.getSendBufferSize(); // How big is the buffer really, Linux? 192 193 sendBufferSize *= bufferFactor; 194 195 s.connect(ss.getLocalSocketAddress()); 196 Killer killer = new Killer(s); 197 killer.start(); 198 try { 199 System.err.println("write..."); 200 // Write too much so the buffer is full and we block, 201 // waiting for the server to read (which it never will). 202 // If the asynchronous close fails, we'll see a test timeout here. 203 byte[] buf = new byte[sendBufferSize]; 204 s.getOutputStream().write(buf); 205 if (killer.wasDefinitelyKilled()) { 206 fail("Socket close happened before write completed successfully"); 207 } else { 208 // Increase the bytes sent to try and cause write() to block for enough time. 209 bufferFactor *= 2; 210 } 211 } catch (SocketException expected) { 212 // We throw "Connection reset by peer", which I don't _think_ is a problem. 213 // assertEquals("Socket closed", expected.getMessage()); 214 break; 215 } 216 } 217 ss.close(); 218 } 219 220 // This server accepts connections, but doesn't read or write anything. 221 // It holds on to the Socket connecting to the client so it won't be GCed. 222 // Call "close" to close both the server socket and its client connection. 223 static class SilentServer { 224 private final ServerSocket ss; 225 private Socket client; 226 SilentServer()227 public SilentServer() throws IOException { 228 this(0); 229 } 230 SilentServer(int receiveBufferSize)231 public SilentServer(int receiveBufferSize) throws IOException { 232 ss = new ServerSocket(0); 233 if (receiveBufferSize != 0) { 234 ss.setReceiveBufferSize(receiveBufferSize); 235 } 236 new Thread(new Runnable() { 237 public void run() { 238 try { 239 client = ss.accept(); 240 } catch (Exception ex) { 241 ex.printStackTrace(); 242 } 243 } 244 }).start(); 245 } 246 getLocalSocketAddress()247 public SocketAddress getLocalSocketAddress() { 248 return ss.getLocalSocketAddress(); 249 } 250 close()251 public void close() throws IOException { 252 client.close(); 253 ss.close(); 254 } 255 } 256 257 // This thread calls the "close" method on the supplied T after 2s. 258 static class Killer<T extends Closeable> extends Thread { 259 private final T s; 260 private final AtomicLong killedTs = new AtomicLong(0); 261 Killer(T s)262 public Killer(T s) { 263 this.s = s; 264 } 265 run()266 public void run() { 267 try { 268 System.err.println("sleep..."); 269 Thread.sleep(2000); 270 System.err.println("close..."); 271 s.close(); 272 killedTs.set(System.nanoTime()); 273 } catch (Exception ex) { 274 ex.printStackTrace(); 275 } 276 } 277 wasDefinitelyKilled()278 public boolean wasDefinitelyKilled() { 279 final long minThresholdNs = 500 * 1000 * 1000; 280 final long now = System.nanoTime(); 281 final long killed = killedTs.get(); 282 return (killed > 0 && now - killed > minThresholdNs); 283 } 284 } 285 } 286