• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 package org.appspot.apprtc;
12 
13 import android.os.Handler;
14 import android.support.annotation.Nullable;
15 import android.util.Log;
16 import de.tavendo.autobahn.WebSocket.WebSocketConnectionObserver;
17 import de.tavendo.autobahn.WebSocketConnection;
18 import de.tavendo.autobahn.WebSocketException;
19 import java.net.URI;
20 import java.net.URISyntaxException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import org.appspot.apprtc.util.AsyncHttpURLConnection;
24 import org.appspot.apprtc.util.AsyncHttpURLConnection.AsyncHttpEvents;
25 import org.json.JSONException;
26 import org.json.JSONObject;
27 
28 /**
29  * WebSocket client implementation.
30  *
31  * <p>All public methods should be called from a looper executor thread
32  * passed in a constructor, otherwise exception will be thrown.
33  * All events are dispatched on the same thread.
34  */
35 public class WebSocketChannelClient {
36   private static final String TAG = "WSChannelRTCClient";
37   private static final int CLOSE_TIMEOUT = 1000;
38   private final WebSocketChannelEvents events;
39   private final Handler handler;
40   private WebSocketConnection ws;
41   private String wsServerUrl;
42   private String postServerUrl;
43   @Nullable
44   private String roomID;
45   @Nullable
46   private String clientID;
47   private WebSocketConnectionState state;
48   // Do not remove this member variable. If this is removed, the observer gets garbage collected and
49   // this causes test breakages.
50   private WebSocketObserver wsObserver;
51   private final Object closeEventLock = new Object();
52   private boolean closeEvent;
53   // WebSocket send queue. Messages are added to the queue when WebSocket
54   // client is not registered and are consumed in register() call.
55   private final List<String> wsSendQueue = new ArrayList<>();
56 
57   /**
58    * Possible WebSocket connection states.
59    */
60   public enum WebSocketConnectionState { NEW, CONNECTED, REGISTERED, CLOSED, ERROR }
61 
62   /**
63    * Callback interface for messages delivered on WebSocket.
64    * All events are dispatched from a looper executor thread.
65    */
66   public interface WebSocketChannelEvents {
onWebSocketMessage(final String message)67     void onWebSocketMessage(final String message);
onWebSocketClose()68     void onWebSocketClose();
onWebSocketError(final String description)69     void onWebSocketError(final String description);
70   }
71 
WebSocketChannelClient(Handler handler, WebSocketChannelEvents events)72   public WebSocketChannelClient(Handler handler, WebSocketChannelEvents events) {
73     this.handler = handler;
74     this.events = events;
75     roomID = null;
76     clientID = null;
77     state = WebSocketConnectionState.NEW;
78   }
79 
getState()80   public WebSocketConnectionState getState() {
81     return state;
82   }
83 
connect(final String wsUrl, final String postUrl)84   public void connect(final String wsUrl, final String postUrl) {
85     checkIfCalledOnValidThread();
86     if (state != WebSocketConnectionState.NEW) {
87       Log.e(TAG, "WebSocket is already connected.");
88       return;
89     }
90     wsServerUrl = wsUrl;
91     postServerUrl = postUrl;
92     closeEvent = false;
93 
94     Log.d(TAG, "Connecting WebSocket to: " + wsUrl + ". Post URL: " + postUrl);
95     ws = new WebSocketConnection();
96     wsObserver = new WebSocketObserver();
97     try {
98       ws.connect(new URI(wsServerUrl), wsObserver);
99     } catch (URISyntaxException e) {
100       reportError("URI error: " + e.getMessage());
101     } catch (WebSocketException e) {
102       reportError("WebSocket connection error: " + e.getMessage());
103     }
104   }
105 
register(final String roomID, final String clientID)106   public void register(final String roomID, final String clientID) {
107     checkIfCalledOnValidThread();
108     this.roomID = roomID;
109     this.clientID = clientID;
110     if (state != WebSocketConnectionState.CONNECTED) {
111       Log.w(TAG, "WebSocket register() in state " + state);
112       return;
113     }
114     Log.d(TAG, "Registering WebSocket for room " + roomID + ". ClientID: " + clientID);
115     JSONObject json = new JSONObject();
116     try {
117       json.put("cmd", "register");
118       json.put("roomid", roomID);
119       json.put("clientid", clientID);
120       Log.d(TAG, "C->WSS: " + json.toString());
121       ws.sendTextMessage(json.toString());
122       state = WebSocketConnectionState.REGISTERED;
123       // Send any previously accumulated messages.
124       for (String sendMessage : wsSendQueue) {
125         send(sendMessage);
126       }
127       wsSendQueue.clear();
128     } catch (JSONException e) {
129       reportError("WebSocket register JSON error: " + e.getMessage());
130     }
131   }
132 
send(String message)133   public void send(String message) {
134     checkIfCalledOnValidThread();
135     switch (state) {
136       case NEW:
137       case CONNECTED:
138         // Store outgoing messages and send them after websocket client
139         // is registered.
140         Log.d(TAG, "WS ACC: " + message);
141         wsSendQueue.add(message);
142         return;
143       case ERROR:
144       case CLOSED:
145         Log.e(TAG, "WebSocket send() in error or closed state : " + message);
146         return;
147       case REGISTERED:
148         JSONObject json = new JSONObject();
149         try {
150           json.put("cmd", "send");
151           json.put("msg", message);
152           message = json.toString();
153           Log.d(TAG, "C->WSS: " + message);
154           ws.sendTextMessage(message);
155         } catch (JSONException e) {
156           reportError("WebSocket send JSON error: " + e.getMessage());
157         }
158         break;
159     }
160   }
161 
162   // This call can be used to send WebSocket messages before WebSocket
163   // connection is opened.
post(String message)164   public void post(String message) {
165     checkIfCalledOnValidThread();
166     sendWSSMessage("POST", message);
167   }
168 
disconnect(boolean waitForComplete)169   public void disconnect(boolean waitForComplete) {
170     checkIfCalledOnValidThread();
171     Log.d(TAG, "Disconnect WebSocket. State: " + state);
172     if (state == WebSocketConnectionState.REGISTERED) {
173       // Send "bye" to WebSocket server.
174       send("{\"type\": \"bye\"}");
175       state = WebSocketConnectionState.CONNECTED;
176       // Send http DELETE to http WebSocket server.
177       sendWSSMessage("DELETE", "");
178     }
179     // Close WebSocket in CONNECTED or ERROR states only.
180     if (state == WebSocketConnectionState.CONNECTED || state == WebSocketConnectionState.ERROR) {
181       ws.disconnect();
182       state = WebSocketConnectionState.CLOSED;
183 
184       // Wait for websocket close event to prevent websocket library from
185       // sending any pending messages to deleted looper thread.
186       if (waitForComplete) {
187         synchronized (closeEventLock) {
188           while (!closeEvent) {
189             try {
190               closeEventLock.wait(CLOSE_TIMEOUT);
191               break;
192             } catch (InterruptedException e) {
193               Log.e(TAG, "Wait error: " + e.toString());
194             }
195           }
196         }
197       }
198     }
199     Log.d(TAG, "Disconnecting WebSocket done.");
200   }
201 
reportError(final String errorMessage)202   private void reportError(final String errorMessage) {
203     Log.e(TAG, errorMessage);
204     handler.post(new Runnable() {
205       @Override
206       public void run() {
207         if (state != WebSocketConnectionState.ERROR) {
208           state = WebSocketConnectionState.ERROR;
209           events.onWebSocketError(errorMessage);
210         }
211       }
212     });
213   }
214 
215   // Asynchronously send POST/DELETE to WebSocket server.
sendWSSMessage(final String method, final String message)216   private void sendWSSMessage(final String method, final String message) {
217     String postUrl = postServerUrl + "/" + roomID + "/" + clientID;
218     Log.d(TAG, "WS " + method + " : " + postUrl + " : " + message);
219     AsyncHttpURLConnection httpConnection =
220         new AsyncHttpURLConnection(method, postUrl, message, new AsyncHttpEvents() {
221           @Override
222           public void onHttpError(String errorMessage) {
223             reportError("WS " + method + " error: " + errorMessage);
224           }
225 
226           @Override
227           public void onHttpComplete(String response) {}
228         });
229     httpConnection.send();
230   }
231 
232   // Helper method for debugging purposes. Ensures that WebSocket method is
233   // called on a looper thread.
checkIfCalledOnValidThread()234   private void checkIfCalledOnValidThread() {
235     if (Thread.currentThread() != handler.getLooper().getThread()) {
236       throw new IllegalStateException("WebSocket method is not called on valid thread");
237     }
238   }
239 
240   private class WebSocketObserver implements WebSocketConnectionObserver {
241     @Override
onOpen()242     public void onOpen() {
243       Log.d(TAG, "WebSocket connection opened to: " + wsServerUrl);
244       handler.post(new Runnable() {
245         @Override
246         public void run() {
247           state = WebSocketConnectionState.CONNECTED;
248           // Check if we have pending register request.
249           if (roomID != null && clientID != null) {
250             register(roomID, clientID);
251           }
252         }
253       });
254     }
255 
256     @Override
onClose(WebSocketCloseNotification code, String reason)257     public void onClose(WebSocketCloseNotification code, String reason) {
258       Log.d(TAG, "WebSocket connection closed. Code: " + code + ". Reason: " + reason + ". State: "
259               + state);
260       synchronized (closeEventLock) {
261         closeEvent = true;
262         closeEventLock.notify();
263       }
264       handler.post(new Runnable() {
265         @Override
266         public void run() {
267           if (state != WebSocketConnectionState.CLOSED) {
268             state = WebSocketConnectionState.CLOSED;
269             events.onWebSocketClose();
270           }
271         }
272       });
273     }
274 
275     @Override
onTextMessage(String payload)276     public void onTextMessage(String payload) {
277       Log.d(TAG, "WSS->C: " + payload);
278       final String message = payload;
279       handler.post(new Runnable() {
280         @Override
281         public void run() {
282           if (state == WebSocketConnectionState.CONNECTED
283               || state == WebSocketConnectionState.REGISTERED) {
284             events.onWebSocketMessage(message);
285           }
286         }
287       });
288     }
289 
290     @Override
onRawTextMessage(byte[] payload)291     public void onRawTextMessage(byte[] payload) {}
292 
293     @Override
onBinaryMessage(byte[] payload)294     public void onBinaryMessage(byte[] payload) {}
295   }
296 }
297