• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package libcore.java.net;
18 
19 import java.io.Closeable;
20 import java.io.IOException;
21 import java.net.DatagramPacket;
22 import java.net.DatagramSocket;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.ServerSocket;
26 import java.net.Socket;
27 import java.net.SocketAddress;
28 import java.net.SocketException;
29 import java.net.UnknownHostException;
30 import java.nio.channels.AsynchronousCloseException;
31 import java.nio.channels.ClosedChannelException;
32 import java.nio.channels.SocketChannel;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.concurrent.CopyOnWriteArrayList;
36 import java.util.concurrent.atomic.AtomicLong;
37 
38 /**
39  * Test that Socket.close called on another thread interrupts a thread that's blocked doing
40  * network I/O.
41  */
42 public class ConcurrentCloseTest extends junit.framework.TestCase {
43     private static final InetSocketAddress UNREACHABLE_ADDRESS
44             = new InetSocketAddress("192.0.2.0", 80); // RFC 5737
45 
test_accept()46     public void test_accept() throws Exception {
47         ServerSocket ss = new ServerSocket(0);
48         new Killer(ss).start();
49         try {
50             System.err.println("accept...");
51             Socket s = ss.accept();
52             fail("accept returned " + s + "!");
53         } catch (SocketException expected) {
54             assertEquals("Socket closed", expected.getMessage());
55         }
56     }
57 
test_connect()58     public void test_connect() throws Exception {
59         Socket s = new Socket();
60         new Killer(s).start();
61         try {
62             System.err.println("connect...");
63             s.connect(UNREACHABLE_ADDRESS);
64             fail("connect returned: " + s + "!");
65         } catch (SocketException expected) {
66             assertEquals("Socket closed", expected.getMessage());
67         }
68     }
69 
test_connect_timeout()70     public void test_connect_timeout() throws Exception {
71         Socket s = new Socket();
72         new Killer(s).start();
73         try {
74             System.err.println("connect (with timeout)...");
75             s.connect(UNREACHABLE_ADDRESS, 3600 * 1000);
76             fail("connect returned: " + s + "!");
77         } catch (SocketException expected) {
78             assertEquals("Socket closed", expected.getMessage());
79         }
80     }
81 
test_connect_nonBlocking()82     public void test_connect_nonBlocking() throws Exception {
83         SocketChannel s = SocketChannel.open();
84         new Killer(s.socket()).start();
85         try {
86             System.err.println("connect (non-blocking)...");
87             s.configureBlocking(false);
88             s.connect(UNREACHABLE_ADDRESS);
89             while (!s.finishConnect()) {
90                 // Spin like a mad thing!
91             }
92             fail("connect returned: " + s + "!");
93         } catch (SocketException expected) {
94             assertEquals("Socket closed", expected.getMessage());
95         } catch (AsynchronousCloseException alsoOkay) {
96             // See below.
97         } catch (ClosedChannelException alsoOkay) {
98             // For now, I'm assuming that we're happy as long as we get any reasonable exception.
99             // It may be that we're supposed to guarantee only one or the other.
100         }
101     }
102 
test_read()103     public void test_read() throws Exception {
104         SilentServer ss = new SilentServer();
105         Socket s = new Socket();
106         s.connect(ss.getLocalSocketAddress());
107         new Killer(s).start();
108         try {
109             System.err.println("read...");
110             int i = s.getInputStream().read();
111             fail("read returned: " + i);
112         } catch (SocketException expected) {
113             assertEquals("Socket closed", expected.getMessage());
114         }
115         ss.close();
116     }
117 
test_read_multiple()118     public void test_read_multiple() throws Throwable {
119         SilentServer ss = new SilentServer();
120         final Socket s = new Socket();
121         s.connect(ss.getLocalSocketAddress());
122 
123         // We want to test that we unblock *all* the threads blocked on a socket, not just one.
124         // We know the implementation uses the same mechanism for all blocking calls, so we just
125         // test read(2) because it's the easiest to test. (recv(2), for example, is only accessible
126         // from Java via a synchronized method.)
127         final ArrayList<Thread> threads = new ArrayList<Thread>();
128         final List<Throwable> thrownExceptions = new CopyOnWriteArrayList<Throwable>();
129         for (int i = 0; i < 10; ++i) {
130             Thread t = new Thread(new Runnable() {
131                 public void run() {
132                     try {
133                         try {
134                             System.err.println("read...");
135                             int i = s.getInputStream().read();
136                             fail("read returned: " + i);
137                         } catch (SocketException expected) {
138                             assertEquals("Socket closed", expected.getMessage());
139                         }
140                     } catch (Throwable ex) {
141                         thrownExceptions.add(ex);
142                     }
143                 }
144             });
145             threads.add(t);
146         }
147         for (Thread t : threads) {
148             t.start();
149         }
150         new Killer(s).start();
151         for (Thread t : threads) {
152             t.join();
153         }
154         for (Throwable exception : thrownExceptions) {
155             throw exception;
156         }
157 
158         ss.close();
159     }
160 
test_recv()161     public void test_recv() throws Exception {
162         DatagramSocket s = new DatagramSocket();
163         byte[] buf = new byte[200];
164         DatagramPacket p = new DatagramPacket(buf, 200);
165         new Killer(s).start();
166         try {
167             System.err.println("receive...");
168             s.receive(p);
169             fail("receive returned!");
170         } catch (SocketException expected) {
171             assertEquals("Socket closed", expected.getMessage());
172         }
173     }
174 
test_write()175     public void test_write() throws Exception {
176         final SilentServer ss = new SilentServer(128); // Minimal receive buffer size.
177 
178         // The test needs to send enough data to cause the write to block for long enough for
179         // the socket close to occur. Start with a large enough factor and, if that doesn't
180         // seem to be sufficient to cause the block, double it and try again. A factor of 2
181         // used to be OK, then that needed to be increased to 4 and now this is also not
182         // enough in some cases. (b/356850236)
183         int bufferFactor = 4;
184 
185         while (true) {
186             Socket s = new Socket();
187 
188             // Set the send buffer size really small, to ensure we block.
189             int sendBufferSize = 1024;
190             s.setSendBufferSize(sendBufferSize);
191             sendBufferSize = s.getSendBufferSize(); // How big is the buffer really, Linux?
192 
193             sendBufferSize *= bufferFactor;
194 
195             s.connect(ss.getLocalSocketAddress());
196             Killer killer = new Killer(s);
197             killer.start();
198             try {
199                 System.err.println("write...");
200                 // Write too much so the buffer is full and we block,
201                 // waiting for the server to read (which it never will).
202                 // If the asynchronous close fails, we'll see a test timeout here.
203                 byte[] buf = new byte[sendBufferSize];
204                 s.getOutputStream().write(buf);
205                 if (killer.wasDefinitelyKilled()) {
206                     fail("Socket close happened before write completed successfully");
207                 } else {
208                     // Increase the bytes sent to try and cause write() to block for enough time.
209                     bufferFactor *= 2;
210                 }
211             } catch (SocketException expected) {
212                 // We throw "Connection reset by peer", which I don't _think_ is a problem.
213                 // assertEquals("Socket closed", expected.getMessage());
214                 break;
215             }
216         }
217         ss.close();
218     }
219 
220     // This server accepts connections, but doesn't read or write anything.
221     // It holds on to the Socket connecting to the client so it won't be GCed.
222     // Call "close" to close both the server socket and its client connection.
223     static class SilentServer {
224         private final ServerSocket ss;
225         private Socket client;
226 
SilentServer()227         public SilentServer() throws IOException {
228             this(0);
229         }
230 
SilentServer(int receiveBufferSize)231         public SilentServer(int receiveBufferSize) throws IOException {
232             ss = new ServerSocket(0);
233             if (receiveBufferSize != 0) {
234                 ss.setReceiveBufferSize(receiveBufferSize);
235             }
236             new Thread(new Runnable() {
237                 public void run() {
238                     try {
239                         client = ss.accept();
240                     } catch (Exception ex) {
241                         ex.printStackTrace();
242                     }
243                 }
244             }).start();
245         }
246 
getLocalSocketAddress()247         public SocketAddress getLocalSocketAddress() {
248             return ss.getLocalSocketAddress();
249         }
250 
close()251         public void close() throws IOException {
252             client.close();
253             ss.close();
254         }
255     }
256 
257     // This thread calls the "close" method on the supplied T after 2s.
258     static class Killer<T extends Closeable> extends Thread {
259         private final T s;
260         private final AtomicLong killedTs = new AtomicLong(0);
261 
Killer(T s)262         public Killer(T s) {
263             this.s = s;
264         }
265 
run()266         public void run() {
267             try {
268                 System.err.println("sleep...");
269                 Thread.sleep(2000);
270                 System.err.println("close...");
271                 s.close();
272                 killedTs.set(System.nanoTime());
273             } catch (Exception ex) {
274                 ex.printStackTrace();
275             }
276         }
277 
wasDefinitelyKilled()278         public boolean wasDefinitelyKilled() {
279             final long minThresholdNs = 500 * 1000 * 1000;
280             final long now = System.nanoTime();
281             final long killed = killedTs.get();
282             return (killed > 0 && now - killed > minThresholdNs);
283         }
284     }
285 }
286