• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 package org.appspot.apprtc;
12 
13 import android.support.annotation.Nullable;
14 import android.util.Log;
15 import java.io.BufferedReader;
16 import java.io.IOException;
17 import java.io.InputStreamReader;
18 import java.io.OutputStreamWriter;
19 import java.io.PrintWriter;
20 import java.net.InetAddress;
21 import java.net.ServerSocket;
22 import java.net.Socket;
23 import java.net.UnknownHostException;
24 import java.nio.charset.Charset;
25 import java.util.concurrent.ExecutorService;
26 import org.webrtc.ThreadUtils;
27 
28 /**
29  * Replacement for WebSocketChannelClient for direct communication between two IP addresses. Handles
30  * the signaling between the two clients using a TCP connection.
31  * <p>
32  * All public methods should be called from a looper executor thread
33  * passed in a constructor, otherwise exception will be thrown.
34  * All events are dispatched on the same thread.
35  */
36 public class TCPChannelClient {
37   private static final String TAG = "TCPChannelClient";
38 
39   private final ExecutorService executor;
40   private final ThreadUtils.ThreadChecker executorThreadCheck;
41   private final TCPChannelEvents eventListener;
42   private TCPSocket socket;
43 
44   /**
45    * Callback interface for messages delivered on TCP Connection. All callbacks are invoked from the
46    * looper executor thread.
47    */
48   public interface TCPChannelEvents {
onTCPConnected(boolean server)49     void onTCPConnected(boolean server);
onTCPMessage(String message)50     void onTCPMessage(String message);
onTCPError(String description)51     void onTCPError(String description);
onTCPClose()52     void onTCPClose();
53   }
54 
55   /**
56    * Initializes the TCPChannelClient. If IP is a local IP address, starts a listening server on
57    * that IP. If not, instead connects to the IP.
58    *
59    * @param eventListener Listener that will receive events from the client.
60    * @param ip            IP address to listen on or connect to.
61    * @param port          Port to listen on or connect to.
62    */
TCPChannelClient( ExecutorService executor, TCPChannelEvents eventListener, String ip, int port)63   public TCPChannelClient(
64       ExecutorService executor, TCPChannelEvents eventListener, String ip, int port) {
65     this.executor = executor;
66     executorThreadCheck = new ThreadUtils.ThreadChecker();
67     executorThreadCheck.detachThread();
68     this.eventListener = eventListener;
69 
70     InetAddress address;
71     try {
72       address = InetAddress.getByName(ip);
73     } catch (UnknownHostException e) {
74       reportError("Invalid IP address.");
75       return;
76     }
77 
78     if (address.isAnyLocalAddress()) {
79       socket = new TCPSocketServer(address, port);
80     } else {
81       socket = new TCPSocketClient(address, port);
82     }
83 
84     socket.start();
85   }
86 
87   /**
88    * Disconnects the client if not already disconnected. This will fire the onTCPClose event.
89    */
disconnect()90   public void disconnect() {
91     executorThreadCheck.checkIsOnValidThread();
92 
93     socket.disconnect();
94   }
95 
96   /**
97    * Sends a message on the socket.
98    *
99    * @param message Message to be sent.
100    */
send(String message)101   public void send(String message) {
102     executorThreadCheck.checkIsOnValidThread();
103 
104     socket.send(message);
105   }
106 
107   /**
108    * Helper method for firing onTCPError events. Calls onTCPError on the executor thread.
109    */
reportError(final String message)110   private void reportError(final String message) {
111     Log.e(TAG, "TCP Error: " + message);
112     executor.execute(new Runnable() {
113       @Override
114       public void run() {
115         eventListener.onTCPError(message);
116       }
117     });
118   }
119 
120   /**
121    * Base class for server and client sockets. Contains a listening thread that will call
122    * eventListener.onTCPMessage on new messages.
123    */
124   private abstract class TCPSocket extends Thread {
125     // Lock for editing out and rawSocket
126     protected final Object rawSocketLock;
127     @Nullable
128     private PrintWriter out;
129     @Nullable
130     private Socket rawSocket;
131 
132     /**
133      * Connect to the peer, potentially a slow operation.
134      *
135      * @return Socket connection, null if connection failed.
136      */
137     @Nullable
connect()138     public abstract Socket connect();
139 
140     /** Returns true if sockets is a server rawSocket. */
isServer()141     public abstract boolean isServer();
142 
TCPSocket()143     TCPSocket() {
144       rawSocketLock = new Object();
145     }
146 
147     /**
148      * The listening thread.
149      */
150     @Override
run()151     public void run() {
152       Log.d(TAG, "Listening thread started...");
153 
154       // Receive connection to temporary variable first, so we don't block.
155       Socket tempSocket = connect();
156       BufferedReader in;
157 
158       Log.d(TAG, "TCP connection established.");
159 
160       synchronized (rawSocketLock) {
161         if (rawSocket != null) {
162           Log.e(TAG, "Socket already existed and will be replaced.");
163         }
164 
165         rawSocket = tempSocket;
166 
167         // Connecting failed, error has already been reported, just exit.
168         if (rawSocket == null) {
169           return;
170         }
171 
172         try {
173           out = new PrintWriter(
174               new OutputStreamWriter(rawSocket.getOutputStream(), Charset.forName("UTF-8")), true);
175           in = new BufferedReader(
176               new InputStreamReader(rawSocket.getInputStream(), Charset.forName("UTF-8")));
177         } catch (IOException e) {
178           reportError("Failed to open IO on rawSocket: " + e.getMessage());
179           return;
180         }
181       }
182 
183       Log.v(TAG, "Execute onTCPConnected");
184       executor.execute(new Runnable() {
185         @Override
186         public void run() {
187           Log.v(TAG, "Run onTCPConnected");
188           eventListener.onTCPConnected(isServer());
189         }
190       });
191 
192       while (true) {
193         final String message;
194         try {
195           message = in.readLine();
196         } catch (IOException e) {
197           synchronized (rawSocketLock) {
198             // If socket was closed, this is expected.
199             if (rawSocket == null) {
200               break;
201             }
202           }
203 
204           reportError("Failed to read from rawSocket: " + e.getMessage());
205           break;
206         }
207 
208         // No data received, rawSocket probably closed.
209         if (message == null) {
210           break;
211         }
212 
213         executor.execute(new Runnable() {
214           @Override
215           public void run() {
216             Log.v(TAG, "Receive: " + message);
217             eventListener.onTCPMessage(message);
218           }
219         });
220       }
221 
222       Log.d(TAG, "Receiving thread exiting...");
223 
224       // Close the rawSocket if it is still open.
225       disconnect();
226     }
227 
228     /** Closes the rawSocket if it is still open. Also fires the onTCPClose event. */
disconnect()229     public void disconnect() {
230       try {
231         synchronized (rawSocketLock) {
232           if (rawSocket != null) {
233             rawSocket.close();
234             rawSocket = null;
235             out = null;
236 
237             executor.execute(new Runnable() {
238               @Override
239               public void run() {
240                 eventListener.onTCPClose();
241               }
242             });
243           }
244         }
245       } catch (IOException e) {
246         reportError("Failed to close rawSocket: " + e.getMessage());
247       }
248     }
249 
250     /**
251      * Sends a message on the socket. Should only be called on the executor thread.
252      */
send(String message)253     public void send(String message) {
254       Log.v(TAG, "Send: " + message);
255 
256       synchronized (rawSocketLock) {
257         if (out == null) {
258           reportError("Sending data on closed socket.");
259           return;
260         }
261 
262         out.write(message + "\n");
263         out.flush();
264       }
265     }
266   }
267 
268   private class TCPSocketServer extends TCPSocket {
269     // Server socket is also guarded by rawSocketLock.
270     @Nullable
271     private ServerSocket serverSocket;
272 
273     final private InetAddress address;
274     final private int port;
275 
TCPSocketServer(InetAddress address, int port)276     public TCPSocketServer(InetAddress address, int port) {
277       this.address = address;
278       this.port = port;
279     }
280 
281     /** Opens a listening socket and waits for a connection. */
282     @Nullable
283     @Override
connect()284     public Socket connect() {
285       Log.d(TAG, "Listening on [" + address.getHostAddress() + "]:" + Integer.toString(port));
286 
287       final ServerSocket tempSocket;
288       try {
289         tempSocket = new ServerSocket(port, 0, address);
290       } catch (IOException e) {
291         reportError("Failed to create server socket: " + e.getMessage());
292         return null;
293       }
294 
295       synchronized (rawSocketLock) {
296         if (serverSocket != null) {
297           Log.e(TAG, "Server rawSocket was already listening and new will be opened.");
298         }
299 
300         serverSocket = tempSocket;
301       }
302 
303       try {
304         return tempSocket.accept();
305       } catch (IOException e) {
306         reportError("Failed to receive connection: " + e.getMessage());
307         return null;
308       }
309     }
310 
311     /** Closes the listening socket and calls super. */
312     @Override
disconnect()313     public void disconnect() {
314       try {
315         synchronized (rawSocketLock) {
316           if (serverSocket != null) {
317             serverSocket.close();
318             serverSocket = null;
319           }
320         }
321       } catch (IOException e) {
322         reportError("Failed to close server socket: " + e.getMessage());
323       }
324 
325       super.disconnect();
326     }
327 
328     @Override
isServer()329     public boolean isServer() {
330       return true;
331     }
332   }
333 
334   private class TCPSocketClient extends TCPSocket {
335     final private InetAddress address;
336     final private int port;
337 
TCPSocketClient(InetAddress address, int port)338     public TCPSocketClient(InetAddress address, int port) {
339       this.address = address;
340       this.port = port;
341     }
342 
343     /** Connects to the peer. */
344     @Nullable
345     @Override
connect()346     public Socket connect() {
347       Log.d(TAG, "Connecting to [" + address.getHostAddress() + "]:" + Integer.toString(port));
348 
349       try {
350         return new Socket(address, port);
351       } catch (IOException e) {
352         reportError("Failed to connect: " + e.getMessage());
353         return null;
354       }
355     }
356 
357     @Override
isServer()358     public boolean isServer() {
359       return false;
360     }
361   }
362 }
363