1 /* 2 * Copyright 2014 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 package org.appspot.apprtc; 12 13 import android.os.Handler; 14 import android.support.annotation.Nullable; 15 import android.util.Log; 16 import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver; 17 import de.tavendo.autobahn.WebSocketConnection; 18 import de.tavendo.autobahn.WebSocketException; 19 import java.net.URI; 20 import java.net.URISyntaxException; 21 import java.util.ArrayList; 22 import java.util.List; 23 import org.appspot.apprtc.util.AsyncHttpURLConnection; 24 import org.appspot.apprtc.util.AsyncHttpURLConnection.AsyncHttpEvents; 25 import org.json.JSONException; 26 import org.json.JSONObject; 27 28 /** 29 * WebSocket client implementation. 30 * 31 * <p>All public methods should be called from a looper executor thread 32 * passed in a constructor, otherwise exception will be thrown. 33 * All events are dispatched on the same thread. 34 */ 35 public class WebSocketChannelClient { 36 private static final String TAG = "WSChannelRTCClient"; 37 private static final int CLOSE_TIMEOUT = 1000; 38 private final WebSocketChannelEvents events; 39 private final Handler handler; 40 private WebSocketConnection ws; 41 private String wsServerUrl; 42 private String postServerUrl; 43 @Nullable 44 private String roomID; 45 @Nullable 46 private String clientID; 47 private WebSocketConnectionState state; 48 // Do not remove this member variable. If this is removed, the observer gets garbage collected and 49 // this causes test breakages. 50 private WebSocketObserver wsObserver; 51 private final Object closeEventLock = new Object(); 52 private boolean closeEvent; 53 // WebSocket send queue. Messages are added to the queue when WebSocket 54 // client is not registered and are consumed in register() call. 55 private final List<String> wsSendQueue = new ArrayList<>(); 56 57 /** 58 * Possible WebSocket connection states. 59 */ 60 public enum WebSocketConnectionState { NEW, CONNECTED, REGISTERED, CLOSED, ERROR } 61 62 /** 63 * Callback interface for messages delivered on WebSocket. 64 * All events are dispatched from a looper executor thread. 65 */ 66 public interface WebSocketChannelEvents { onWebSocketMessage(final String message)67 void onWebSocketMessage(final String message); onWebSocketClose()68 void onWebSocketClose(); onWebSocketError(final String description)69 void onWebSocketError(final String description); 70 } 71 WebSocketChannelClient(Handler handler, WebSocketChannelEvents events)72 public WebSocketChannelClient(Handler handler, WebSocketChannelEvents events) { 73 this.handler = handler; 74 this.events = events; 75 roomID = null; 76 clientID = null; 77 state = WebSocketConnectionState.NEW; 78 } 79 getState()80 public WebSocketConnectionState getState() { 81 return state; 82 } 83 connect(final String wsUrl, final String postUrl)84 public void connect(final String wsUrl, final String postUrl) { 85 checkIfCalledOnValidThread(); 86 if (state != WebSocketConnectionState.NEW) { 87 Log.e(TAG, "WebSocket is already connected."); 88 return; 89 } 90 wsServerUrl = wsUrl; 91 postServerUrl = postUrl; 92 closeEvent = false; 93 94 Log.d(TAG, "Connecting WebSocket to: " + wsUrl + ". Post URL: " + postUrl); 95 ws = new WebSocketConnection(); 96 wsObserver = new WebSocketObserver(); 97 try { 98 ws.connect(new URI(wsServerUrl), wsObserver); 99 } catch (URISyntaxException e) { 100 reportError("URI error: " + e.getMessage()); 101 } catch (WebSocketException e) { 102 reportError("WebSocket connection error: " + e.getMessage()); 103 } 104 } 105 register(final String roomID, final String clientID)106 public void register(final String roomID, final String clientID) { 107 checkIfCalledOnValidThread(); 108 this.roomID = roomID; 109 this.clientID = clientID; 110 if (state != WebSocketConnectionState.CONNECTED) { 111 Log.w(TAG, "WebSocket register() in state " + state); 112 return; 113 } 114 Log.d(TAG, "Registering WebSocket for room " + roomID + ". ClientID: " + clientID); 115 JSONObject json = new JSONObject(); 116 try { 117 json.put("cmd", "register"); 118 json.put("roomid", roomID); 119 json.put("clientid", clientID); 120 Log.d(TAG, "C->WSS: " + json.toString()); 121 ws.sendTextMessage(json.toString()); 122 state = WebSocketConnectionState.REGISTERED; 123 // Send any previously accumulated messages. 124 for (String sendMessage : wsSendQueue) { 125 send(sendMessage); 126 } 127 wsSendQueue.clear(); 128 } catch (JSONException e) { 129 reportError("WebSocket register JSON error: " + e.getMessage()); 130 } 131 } 132 send(String message)133 public void send(String message) { 134 checkIfCalledOnValidThread(); 135 switch (state) { 136 case NEW: 137 case CONNECTED: 138 // Store outgoing messages and send them after websocket client 139 // is registered. 140 Log.d(TAG, "WS ACC: " + message); 141 wsSendQueue.add(message); 142 return; 143 case ERROR: 144 case CLOSED: 145 Log.e(TAG, "WebSocket send() in error or closed state : " + message); 146 return; 147 case REGISTERED: 148 JSONObject json = new JSONObject(); 149 try { 150 json.put("cmd", "send"); 151 json.put("msg", message); 152 message = json.toString(); 153 Log.d(TAG, "C->WSS: " + message); 154 ws.sendTextMessage(message); 155 } catch (JSONException e) { 156 reportError("WebSocket send JSON error: " + e.getMessage()); 157 } 158 break; 159 } 160 } 161 162 // This call can be used to send WebSocket messages before WebSocket 163 // connection is opened. post(String message)164 public void post(String message) { 165 checkIfCalledOnValidThread(); 166 sendWSSMessage("POST", message); 167 } 168 disconnect(boolean waitForComplete)169 public void disconnect(boolean waitForComplete) { 170 checkIfCalledOnValidThread(); 171 Log.d(TAG, "Disconnect WebSocket. State: " + state); 172 if (state == WebSocketConnectionState.REGISTERED) { 173 // Send "bye" to WebSocket server. 174 send("{\"type\": \"bye\"}"); 175 state = WebSocketConnectionState.CONNECTED; 176 // Send http DELETE to http WebSocket server. 177 sendWSSMessage("DELETE", ""); 178 } 179 // Close WebSocket in CONNECTED or ERROR states only. 180 if (state == WebSocketConnectionState.CONNECTED || state == WebSocketConnectionState.ERROR) { 181 ws.disconnect(); 182 state = WebSocketConnectionState.CLOSED; 183 184 // Wait for websocket close event to prevent websocket library from 185 // sending any pending messages to deleted looper thread. 186 if (waitForComplete) { 187 synchronized (closeEventLock) { 188 while (!closeEvent) { 189 try { 190 closeEventLock.wait(CLOSE_TIMEOUT); 191 break; 192 } catch (InterruptedException e) { 193 Log.e(TAG, "Wait error: " + e.toString()); 194 } 195 } 196 } 197 } 198 } 199 Log.d(TAG, "Disconnecting WebSocket done."); 200 } 201 reportError(final String errorMessage)202 private void reportError(final String errorMessage) { 203 Log.e(TAG, errorMessage); 204 handler.post(new Runnable() { 205 @Override 206 public void run() { 207 if (state != WebSocketConnectionState.ERROR) { 208 state = WebSocketConnectionState.ERROR; 209 events.onWebSocketError(errorMessage); 210 } 211 } 212 }); 213 } 214 215 // Asynchronously send POST/DELETE to WebSocket server. sendWSSMessage(final String method, final String message)216 private void sendWSSMessage(final String method, final String message) { 217 String postUrl = postServerUrl + "/" + roomID + "/" + clientID; 218 Log.d(TAG, "WS " + method + " : " + postUrl + " : " + message); 219 AsyncHttpURLConnection httpConnection = 220 new AsyncHttpURLConnection(method, postUrl, message, new AsyncHttpEvents() { 221 @Override 222 public void onHttpError(String errorMessage) { 223 reportError("WS " + method + " error: " + errorMessage); 224 } 225 226 @Override 227 public void onHttpComplete(String response) {} 228 }); 229 httpConnection.send(); 230 } 231 232 // Helper method for debugging purposes. Ensures that WebSocket method is 233 // called on a looper thread. checkIfCalledOnValidThread()234 private void checkIfCalledOnValidThread() { 235 if (Thread.currentThread() != handler.getLooper().getThread()) { 236 throw new IllegalStateException("WebSocket method is not called on valid thread"); 237 } 238 } 239 240 private class WebSocketObserver implements WebSocketConnectionObserver { 241 @Override onOpen()242 public void onOpen() { 243 Log.d(TAG, "WebSocket connection opened to: " + wsServerUrl); 244 handler.post(new Runnable() { 245 @Override 246 public void run() { 247 state = WebSocketConnectionState.CONNECTED; 248 // Check if we have pending register request. 249 if (roomID != null && clientID != null) { 250 register(roomID, clientID); 251 } 252 } 253 }); 254 } 255 256 @Override onClose(WebSocketCloseNotification code, String reason)257 public void onClose(WebSocketCloseNotification code, String reason) { 258 Log.d(TAG, "WebSocket connection closed. Code: " + code + ". Reason: " + reason + ". State: " 259 + state); 260 synchronized (closeEventLock) { 261 closeEvent = true; 262 closeEventLock.notify(); 263 } 264 handler.post(new Runnable() { 265 @Override 266 public void run() { 267 if (state != WebSocketConnectionState.CLOSED) { 268 state = WebSocketConnectionState.CLOSED; 269 events.onWebSocketClose(); 270 } 271 } 272 }); 273 } 274 275 @Override onTextMessage(String payload)276 public void onTextMessage(String payload) { 277 Log.d(TAG, "WSS->C: " + payload); 278 final String message = payload; 279 handler.post(new Runnable() { 280 @Override 281 public void run() { 282 if (state == WebSocketConnectionState.CONNECTED 283 || state == WebSocketConnectionState.REGISTERED) { 284 events.onWebSocketMessage(message); 285 } 286 } 287 }); 288 } 289 290 @Override onRawTextMessage(byte[] payload)291 public void onRawTextMessage(byte[] payload) {} 292 293 @Override onBinaryMessage(byte[] payload)294 public void onBinaryMessage(byte[] payload) {} 295 } 296 } 297