1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.components.devtools_bridge; 6 7 import android.net.LocalSocket; 8 import android.net.LocalSocketAddress; 9 10 import java.io.IOException; 11 import java.nio.ByteBuffer; 12 import java.util.concurrent.ExecutorService; 13 import java.util.concurrent.Executors; 14 import java.util.concurrent.atomic.AtomicInteger; 15 import java.util.concurrent.atomic.AtomicReference; 16 import java.util.concurrent.locks.Lock; 17 import java.util.concurrent.locks.ReadWriteLock; 18 import java.util.concurrent.locks.ReentrantReadWriteLock; 19 20 /** 21 * Base class for client and server that tunnels DevToolsServer's UNIX socket 22 * over WebRTC data channel. 23 * 24 * Server runs on a android device with Chromium (or alike). Client runs where socket 25 * needed to be accesses (it could be the same device if socket names are different; this 26 * configuration useful for testing). 27 * 28 * Client listens LocalServerSocket and each time it receives connection it forwards 29 * CLIENT_OPEN packet to the server with newly assigned connection id. On receiving this packet 30 * server tries to connect to DevToolsServer socket. If succeeded it sends back SERVER_OPEN_ACK 31 * with the same connection id. If failed it sends SERVER_CLOSE. 32 * 33 * When input stream on client shuts down it sends CLIENT_CLOSE. The same with SERVER_CLOSE 34 * on the server side (only if SERVER_OPEN_ACK had sent). Between CLIENT_OPEN and CLIENT_CLOSE 35 * any amount of data packets may be transferred (the same for SERVER_OPEN_ACK/SERVER_CLOSE 36 * on the server side). 37 * 38 * Since all communication is reliable and ordered it's safe for client to assume that 39 * if CLIENT_CLOSE has sent and SERVER_CLOSE has received with the same connection ID this 40 * ID is safe to be reused. 41 */ 42 public abstract class SocketTunnelBase { 43 // Data channel is threadsafe but access to the reference needs synchromization. 44 private final ReadWriteLock mDataChanneliReferenceLock = new ReentrantReadWriteLock(); 45 private volatile AbstractDataChannel mDataChannel; 46 47 // Packet structure encapsulated in buildControlPacket, buildDataPacket and PacketDecoderBase. 48 // Structure of control packet: 49 // 1-st byte: CONTROL_CONNECTION_ID. 50 // 2-d byte: op code. 51 // 3-d byte: connection id. 52 // 53 // Structure of data packet: 54 // 1-st byte: connection id. 55 // 2..n: data. 56 57 private static final int CONTROL_PACKET_SIZE = 3; 58 59 // Client to server control packets. 60 protected static final byte CLIENT_OPEN = (byte) 0; 61 protected static final byte CLIENT_CLOSE = (byte) 1; 62 63 // Server to client control packets. 64 protected static final byte SERVER_OPEN_ACK = (byte) 0; 65 protected static final byte SERVER_CLOSE = (byte) 1; 66 67 // Must not exceed WebRTC limit. Exceeding it closes 68 // data channel automatically. TODO(serya): WebRTC limit supposed to be removed. 69 static final int READING_BUFFER_SIZE = 4 * 1024; 70 71 private static final int CONTROL_CONNECTION_ID = 0; 72 73 // DevTools supports up to ~10 connections at the time. A few extra IDs usefull for 74 // delays in closing acknowledgement. 75 protected static final int MIN_CONNECTION_ID = 1; 76 protected static final int MAX_CONNECTION_ID = 64; 77 78 // Signaling thread isn't accessible via API. Assumes that first caller 79 // checkCalledOnSignalingThread is called on it indeed. It also works well for tests. 80 private final AtomicReference<Thread> mSignalingThread = new AtomicReference<Thread>(); 81 82 // For writing in socket without blocking signaling thread. 83 private final ExecutorService mWritingThread = Executors.newSingleThreadExecutor(); 84 isBound()85 public boolean isBound() { 86 final Lock lock = mDataChanneliReferenceLock.readLock(); 87 lock.lock(); 88 try { 89 return mDataChannel != null; 90 } finally { 91 lock.unlock(); 92 } 93 } 94 95 /** 96 * Binds the tunnel to the data channel. Tunnel starts its activity when data channel 97 * open. 98 */ bind(AbstractDataChannel dataChannel)99 public void bind(AbstractDataChannel dataChannel) { 100 // Observer registrution must not be done in constructor. 101 final Lock lock = mDataChanneliReferenceLock.writeLock(); 102 lock.lock(); 103 try { 104 mDataChannel = dataChannel; 105 } finally { 106 lock.unlock(); 107 } 108 dataChannel.registerObserver(new DataChannelObserver()); 109 } 110 111 /** 112 * Stops all tunnel activity and returns the prevously bound data channel. 113 * It's safe to dispose the data channel after it. 114 */ unbind()115 public AbstractDataChannel unbind() { 116 final Lock lock = mDataChanneliReferenceLock.writeLock(); 117 lock.lock(); 118 final AbstractDataChannel dataChannel; 119 try { 120 dataChannel = mDataChannel; 121 mDataChannel = null; 122 } finally { 123 lock.unlock(); 124 } 125 dataChannel.unregisterObserver(); 126 mSignalingThread.set(null); 127 mWritingThread.shutdownNow(); 128 return dataChannel; 129 } 130 checkCalledOnSignalingThread()131 protected void checkCalledOnSignalingThread() { 132 if (!mSignalingThread.compareAndSet(null, Thread.currentThread())) { 133 if (mSignalingThread.get() != Thread.currentThread()) { 134 throw new RuntimeException("Must be called on signaling thread"); 135 } 136 } 137 } 138 checkConnectionId(int connectionId)139 protected static void checkConnectionId(int connectionId) throws ProtocolError { 140 if (connectionId < MIN_CONNECTION_ID || connectionId > MAX_CONNECTION_ID) { 141 throw new ProtocolError("Invalid connection id: " + Integer.toString(connectionId)); 142 } 143 } 144 onProtocolError(ProtocolError e)145 protected void onProtocolError(ProtocolError e) { 146 checkCalledOnSignalingThread(); 147 148 // When integrity of data channel is broken then best way to survive is to close it. 149 final Lock lock = mDataChanneliReferenceLock.readLock(); 150 lock.lock(); 151 try { 152 mDataChannel.close(); 153 } finally { 154 lock.unlock(); 155 } 156 } 157 onReceivedDataPacket(int connectionId, byte[] data)158 protected abstract void onReceivedDataPacket(int connectionId, byte[] data) 159 throws ProtocolError; onReceivedControlPacket(int connectionId, byte opCode)160 protected abstract void onReceivedControlPacket(int connectionId, byte opCode) 161 throws ProtocolError; onSocketException(IOException e, int connectionId)162 protected void onSocketException(IOException e, int connectionId) {} onDataChannelOpened()163 protected void onDataChannelOpened() {} onDataChannelClosed()164 protected void onDataChannelClosed() {} 165 buildControlPacket(int connectionId, byte opCode)166 static ByteBuffer buildControlPacket(int connectionId, byte opCode) { 167 ByteBuffer packet = ByteBuffer.allocateDirect(CONTROL_PACKET_SIZE); 168 packet.put((byte) CONTROL_CONNECTION_ID); 169 packet.put(opCode); 170 packet.put((byte) connectionId); 171 return packet; 172 } 173 buildDataPacket(int connectionId, byte[] buffer, int count)174 static ByteBuffer buildDataPacket(int connectionId, byte[] buffer, int count) { 175 ByteBuffer packet = ByteBuffer.allocateDirect(count + 1); 176 packet.put((byte) connectionId); 177 packet.put(buffer, 0, count); 178 return packet; 179 } 180 sendToDataChannel(ByteBuffer packet)181 protected void sendToDataChannel(ByteBuffer packet) { 182 packet.limit(packet.position()); 183 packet.position(0); 184 final Lock lock = mDataChanneliReferenceLock.readLock(); 185 lock.lock(); 186 try { 187 if (mDataChannel != null) { 188 mDataChannel.send(packet, AbstractDataChannel.MessageType.BINARY); 189 } 190 } finally { 191 lock.unlock(); 192 } 193 } 194 195 /** 196 * Packet decoding exposed for tests. 197 */ 198 abstract static class PacketDecoderBase { decodePacket(ByteBuffer packet)199 protected void decodePacket(ByteBuffer packet) throws ProtocolError { 200 if (packet.remaining() == 0) { 201 throw new ProtocolError("Empty packet"); 202 } 203 204 int connectionId = packet.get(); 205 if (connectionId != CONTROL_CONNECTION_ID) { 206 checkConnectionId(connectionId); 207 byte[] data = new byte[packet.remaining()]; 208 packet.get(data); 209 onReceivedDataPacket(connectionId, data); 210 } else { 211 if (packet.remaining() != CONTROL_PACKET_SIZE - 1) { 212 throw new ProtocolError("Invalid control packet size"); 213 } 214 215 byte opCode = packet.get(); 216 connectionId = packet.get(); 217 checkConnectionId(connectionId); 218 onReceivedControlPacket(connectionId, opCode); 219 } 220 } 221 onReceivedDataPacket(int connectionId, byte[] data)222 protected abstract void onReceivedDataPacket(int connectionId, byte[] data) 223 throws ProtocolError; onReceivedControlPacket(int connectionId, byte opcode)224 protected abstract void onReceivedControlPacket(int connectionId, byte opcode) 225 throws ProtocolError; 226 } 227 228 private final class DataChannelObserver 229 extends PacketDecoderBase implements AbstractDataChannel.Observer { 230 @Override onStateChange(AbstractDataChannel.State state)231 public void onStateChange(AbstractDataChannel.State state) { 232 checkCalledOnSignalingThread(); 233 234 if (state == AbstractDataChannel.State.OPEN) { 235 onDataChannelOpened(); 236 } else { 237 onDataChannelClosed(); 238 } 239 } 240 241 @Override onMessage(ByteBuffer message)242 public void onMessage(ByteBuffer message) { 243 checkCalledOnSignalingThread(); 244 245 try { 246 decodePacket(message); 247 } catch (ProtocolError e) { 248 onProtocolError(e); 249 } 250 } 251 252 @Override onReceivedDataPacket(int connectionId, byte[] data)253 protected void onReceivedDataPacket(int connectionId, byte[] data) throws ProtocolError { 254 checkCalledOnSignalingThread(); 255 256 SocketTunnelBase.this.onReceivedDataPacket(connectionId, data); 257 } 258 259 @Override onReceivedControlPacket(int connectionId, byte opCode)260 protected void onReceivedControlPacket(int connectionId, byte opCode) throws ProtocolError { 261 checkCalledOnSignalingThread(); 262 263 SocketTunnelBase.this.onReceivedControlPacket(connectionId, opCode); 264 } 265 } 266 267 /** 268 * Any problem happened while handling incoming message that breaks state integrity. 269 */ 270 static class ProtocolError extends Exception { ProtocolError(String description)271 public ProtocolError(String description) { 272 super(description); 273 } 274 } 275 276 /** 277 * Base utility class for client and server connections. 278 */ 279 protected abstract class ConnectionBase { 280 protected final int mId; 281 protected final LocalSocket mSocket; 282 private final AtomicInteger mOpenedStreams = new AtomicInteger(2); // input and output. 283 private volatile boolean mConnected; 284 private byte[] mBuffer; 285 ConnectionBase(int id, LocalSocket socket, boolean preconnected)286 private ConnectionBase(int id, LocalSocket socket, boolean preconnected) { 287 mId = id; 288 mSocket = socket; 289 mConnected = preconnected; 290 } 291 ConnectionBase(int id, LocalSocket socket)292 protected ConnectionBase(int id, LocalSocket socket) { 293 this(id, socket, true); 294 } 295 ConnectionBase(int id)296 protected ConnectionBase(int id) { 297 this(id, new LocalSocket(), false); 298 } 299 connect(LocalSocketAddress address)300 protected boolean connect(LocalSocketAddress address) { 301 assert !mConnected; 302 try { 303 mSocket.connect(address); 304 mConnected = true; 305 return true; 306 } catch (IOException e) { 307 onSocketException(e, mId); 308 return false; 309 } 310 } 311 runReadingLoop()312 protected void runReadingLoop() { 313 mBuffer = new byte[READING_BUFFER_SIZE]; 314 try { 315 boolean open; 316 do { 317 open = pump(); 318 } while (open); 319 } catch (IOException e) { 320 onSocketException(e, mId); 321 } finally { 322 mBuffer = null; 323 } 324 } 325 pump()326 private boolean pump() throws IOException { 327 int count = mSocket.getInputStream().read(mBuffer); 328 if (count <= 0) 329 return false; 330 sendToDataChannel(buildDataPacket(mId, mBuffer, count)); 331 return true; 332 } 333 writeData(byte[] data)334 protected void writeData(byte[] data) { 335 // Called on writing thread. 336 try { 337 mSocket.getOutputStream().write(data); 338 } catch (IOException e) { 339 onSocketException(e, mId); 340 } 341 } 342 onReceivedDataPacket(final byte[] data)343 public void onReceivedDataPacket(final byte[] data) { 344 mWritingThread.execute(new Runnable() { 345 @Override 346 public void run() { 347 writeData(data); 348 } 349 }); 350 } 351 terminate()352 public void terminate() { 353 close(); 354 } 355 shutdownOutput()356 protected void shutdownOutput() { 357 // Shutdown output on writing thread to make sure all pending writes finished. 358 mWritingThread.execute(new Runnable() { 359 @Override 360 public void run() { 361 shutdownOutputOnWritingThread(); 362 } 363 }); 364 } 365 shutdownOutputOnWritingThread()366 private void shutdownOutputOnWritingThread() { 367 try { 368 if (mConnected) mSocket.shutdownOutput(); 369 } catch (IOException e) { 370 onSocketException(e, mId); 371 } 372 releaseStream(); 373 } 374 shutdownInput()375 protected void shutdownInput() { 376 try { 377 if (mConnected) mSocket.shutdownInput(); 378 } catch (IOException e) { 379 onSocketException(e, mId); 380 } 381 releaseStream(); 382 } 383 releaseStream()384 private void releaseStream() { 385 if (mOpenedStreams.decrementAndGet() == 0) close(); 386 } 387 close()388 protected void close() { 389 try { 390 mSocket.close(); 391 } catch (IOException e) { 392 onSocketException(e, mId); 393 } 394 } 395 } 396 } 397