1 /* 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 package org.appspot.apprtc; 12 13 import android.support.annotation.Nullable; 14 import android.util.Log; 15 import java.io.BufferedReader; 16 import java.io.IOException; 17 import java.io.InputStreamReader; 18 import java.io.OutputStreamWriter; 19 import java.io.PrintWriter; 20 import java.net.InetAddress; 21 import java.net.ServerSocket; 22 import java.net.Socket; 23 import java.net.UnknownHostException; 24 import java.nio.charset.Charset; 25 import java.util.concurrent.ExecutorService; 26 import org.webrtc.ThreadUtils; 27 28 /** 29 * Replacement for WebSocketChannelClient for direct communication between two IP addresses. Handles 30 * the signaling between the two clients using a TCP connection. 31 * <p> 32 * All public methods should be called from a looper executor thread 33 * passed in a constructor, otherwise exception will be thrown. 34 * All events are dispatched on the same thread. 35 */ 36 public class TCPChannelClient { 37 private static final String TAG = "TCPChannelClient"; 38 39 private final ExecutorService executor; 40 private final ThreadUtils.ThreadChecker executorThreadCheck; 41 private final TCPChannelEvents eventListener; 42 private TCPSocket socket; 43 44 /** 45 * Callback interface for messages delivered on TCP Connection. All callbacks are invoked from the 46 * looper executor thread. 47 */ 48 public interface TCPChannelEvents { onTCPConnected(boolean server)49 void onTCPConnected(boolean server); onTCPMessage(String message)50 void onTCPMessage(String message); onTCPError(String description)51 void onTCPError(String description); onTCPClose()52 void onTCPClose(); 53 } 54 55 /** 56 * Initializes the TCPChannelClient. If IP is a local IP address, starts a listening server on 57 * that IP. If not, instead connects to the IP. 58 * 59 * @param eventListener Listener that will receive events from the client. 60 * @param ip IP address to listen on or connect to. 61 * @param port Port to listen on or connect to. 62 */ TCPChannelClient( ExecutorService executor, TCPChannelEvents eventListener, String ip, int port)63 public TCPChannelClient( 64 ExecutorService executor, TCPChannelEvents eventListener, String ip, int port) { 65 this.executor = executor; 66 executorThreadCheck = new ThreadUtils.ThreadChecker(); 67 executorThreadCheck.detachThread(); 68 this.eventListener = eventListener; 69 70 InetAddress address; 71 try { 72 address = InetAddress.getByName(ip); 73 } catch (UnknownHostException e) { 74 reportError("Invalid IP address."); 75 return; 76 } 77 78 if (address.isAnyLocalAddress()) { 79 socket = new TCPSocketServer(address, port); 80 } else { 81 socket = new TCPSocketClient(address, port); 82 } 83 84 socket.start(); 85 } 86 87 /** 88 * Disconnects the client if not already disconnected. This will fire the onTCPClose event. 89 */ disconnect()90 public void disconnect() { 91 executorThreadCheck.checkIsOnValidThread(); 92 93 socket.disconnect(); 94 } 95 96 /** 97 * Sends a message on the socket. 98 * 99 * @param message Message to be sent. 100 */ send(String message)101 public void send(String message) { 102 executorThreadCheck.checkIsOnValidThread(); 103 104 socket.send(message); 105 } 106 107 /** 108 * Helper method for firing onTCPError events. Calls onTCPError on the executor thread. 109 */ reportError(final String message)110 private void reportError(final String message) { 111 Log.e(TAG, "TCP Error: " + message); 112 executor.execute(new Runnable() { 113 @Override 114 public void run() { 115 eventListener.onTCPError(message); 116 } 117 }); 118 } 119 120 /** 121 * Base class for server and client sockets. Contains a listening thread that will call 122 * eventListener.onTCPMessage on new messages. 123 */ 124 private abstract class TCPSocket extends Thread { 125 // Lock for editing out and rawSocket 126 protected final Object rawSocketLock; 127 @Nullable 128 private PrintWriter out; 129 @Nullable 130 private Socket rawSocket; 131 132 /** 133 * Connect to the peer, potentially a slow operation. 134 * 135 * @return Socket connection, null if connection failed. 136 */ 137 @Nullable connect()138 public abstract Socket connect(); 139 140 /** Returns true if sockets is a server rawSocket. */ isServer()141 public abstract boolean isServer(); 142 TCPSocket()143 TCPSocket() { 144 rawSocketLock = new Object(); 145 } 146 147 /** 148 * The listening thread. 149 */ 150 @Override run()151 public void run() { 152 Log.d(TAG, "Listening thread started..."); 153 154 // Receive connection to temporary variable first, so we don't block. 155 Socket tempSocket = connect(); 156 BufferedReader in; 157 158 Log.d(TAG, "TCP connection established."); 159 160 synchronized (rawSocketLock) { 161 if (rawSocket != null) { 162 Log.e(TAG, "Socket already existed and will be replaced."); 163 } 164 165 rawSocket = tempSocket; 166 167 // Connecting failed, error has already been reported, just exit. 168 if (rawSocket == null) { 169 return; 170 } 171 172 try { 173 out = new PrintWriter( 174 new OutputStreamWriter(rawSocket.getOutputStream(), Charset.forName("UTF-8")), true); 175 in = new BufferedReader( 176 new InputStreamReader(rawSocket.getInputStream(), Charset.forName("UTF-8"))); 177 } catch (IOException e) { 178 reportError("Failed to open IO on rawSocket: " + e.getMessage()); 179 return; 180 } 181 } 182 183 Log.v(TAG, "Execute onTCPConnected"); 184 executor.execute(new Runnable() { 185 @Override 186 public void run() { 187 Log.v(TAG, "Run onTCPConnected"); 188 eventListener.onTCPConnected(isServer()); 189 } 190 }); 191 192 while (true) { 193 final String message; 194 try { 195 message = in.readLine(); 196 } catch (IOException e) { 197 synchronized (rawSocketLock) { 198 // If socket was closed, this is expected. 199 if (rawSocket == null) { 200 break; 201 } 202 } 203 204 reportError("Failed to read from rawSocket: " + e.getMessage()); 205 break; 206 } 207 208 // No data received, rawSocket probably closed. 209 if (message == null) { 210 break; 211 } 212 213 executor.execute(new Runnable() { 214 @Override 215 public void run() { 216 Log.v(TAG, "Receive: " + message); 217 eventListener.onTCPMessage(message); 218 } 219 }); 220 } 221 222 Log.d(TAG, "Receiving thread exiting..."); 223 224 // Close the rawSocket if it is still open. 225 disconnect(); 226 } 227 228 /** Closes the rawSocket if it is still open. Also fires the onTCPClose event. */ disconnect()229 public void disconnect() { 230 try { 231 synchronized (rawSocketLock) { 232 if (rawSocket != null) { 233 rawSocket.close(); 234 rawSocket = null; 235 out = null; 236 237 executor.execute(new Runnable() { 238 @Override 239 public void run() { 240 eventListener.onTCPClose(); 241 } 242 }); 243 } 244 } 245 } catch (IOException e) { 246 reportError("Failed to close rawSocket: " + e.getMessage()); 247 } 248 } 249 250 /** 251 * Sends a message on the socket. Should only be called on the executor thread. 252 */ send(String message)253 public void send(String message) { 254 Log.v(TAG, "Send: " + message); 255 256 synchronized (rawSocketLock) { 257 if (out == null) { 258 reportError("Sending data on closed socket."); 259 return; 260 } 261 262 out.write(message + "\n"); 263 out.flush(); 264 } 265 } 266 } 267 268 private class TCPSocketServer extends TCPSocket { 269 // Server socket is also guarded by rawSocketLock. 270 @Nullable 271 private ServerSocket serverSocket; 272 273 final private InetAddress address; 274 final private int port; 275 TCPSocketServer(InetAddress address, int port)276 public TCPSocketServer(InetAddress address, int port) { 277 this.address = address; 278 this.port = port; 279 } 280 281 /** Opens a listening socket and waits for a connection. */ 282 @Nullable 283 @Override connect()284 public Socket connect() { 285 Log.d(TAG, "Listening on [" + address.getHostAddress() + "]:" + Integer.toString(port)); 286 287 final ServerSocket tempSocket; 288 try { 289 tempSocket = new ServerSocket(port, 0, address); 290 } catch (IOException e) { 291 reportError("Failed to create server socket: " + e.getMessage()); 292 return null; 293 } 294 295 synchronized (rawSocketLock) { 296 if (serverSocket != null) { 297 Log.e(TAG, "Server rawSocket was already listening and new will be opened."); 298 } 299 300 serverSocket = tempSocket; 301 } 302 303 try { 304 return tempSocket.accept(); 305 } catch (IOException e) { 306 reportError("Failed to receive connection: " + e.getMessage()); 307 return null; 308 } 309 } 310 311 /** Closes the listening socket and calls super. */ 312 @Override disconnect()313 public void disconnect() { 314 try { 315 synchronized (rawSocketLock) { 316 if (serverSocket != null) { 317 serverSocket.close(); 318 serverSocket = null; 319 } 320 } 321 } catch (IOException e) { 322 reportError("Failed to close server socket: " + e.getMessage()); 323 } 324 325 super.disconnect(); 326 } 327 328 @Override isServer()329 public boolean isServer() { 330 return true; 331 } 332 } 333 334 private class TCPSocketClient extends TCPSocket { 335 final private InetAddress address; 336 final private int port; 337 TCPSocketClient(InetAddress address, int port)338 public TCPSocketClient(InetAddress address, int port) { 339 this.address = address; 340 this.port = port; 341 } 342 343 /** Connects to the peer. */ 344 @Nullable 345 @Override connect()346 public Socket connect() { 347 Log.d(TAG, "Connecting to [" + address.getHostAddress() + "]:" + Integer.toString(port)); 348 349 try { 350 return new Socket(address, port); 351 } catch (IOException e) { 352 reportError("Failed to connect: " + e.getMessage()); 353 return null; 354 } 355 } 356 357 @Override isServer()358 public boolean isServer() { 359 return false; 360 } 361 } 362 } 363