1 /*
2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above
11 * copyright notice, this list of conditions and the following disclaimer
12 * in the documentation and/or other materials provided with the
13 * distribution.
14 * * Neither the name of Google Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived from
16 * this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "config.h"
32
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
34
35 #include "bindings/core/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/dom/ExecutionContextTask.h"
40 #include "core/fileapi/Blob.h"
41 #include "core/inspector/ScriptCallFrame.h"
42 #include "core/inspector/ScriptCallStack.h"
43 #include "core/workers/WorkerGlobalScope.h"
44 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerThread.h"
46 #include "modules/websockets/MainThreadWebSocketChannel.h"
47 #include "modules/websockets/NewWebSocketChannelImpl.h"
48 #include "platform/RuntimeEnabledFeatures.h"
49 #include "public/platform/Platform.h"
50 #include "public/platform/WebWaitableEvent.h"
51 #include "wtf/ArrayBuffer.h"
52 #include "wtf/Assertions.h"
53 #include "wtf/Functional.h"
54 #include "wtf/MainThread.h"
55 #include "wtf/text/WTFString.h"
56
57 namespace blink {
58
59 typedef WorkerThreadableWebSocketChannel::Bridge Bridge;
60 typedef WorkerThreadableWebSocketChannel::Peer Peer;
61
62 // Created and destroyed on the worker thread. All setters of this class are
63 // called on the main thread, while all getters are called on the worker
64 // thread. signalWorkerThread() must be called before any getters are called.
65 class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<ThreadableWebSocketChannelSyncHelper> {
66 public:
create(PassOwnPtr<WebWaitableEvent> event)67 static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
68 {
69 return new ThreadableWebSocketChannelSyncHelper(event);
70 }
71
~ThreadableWebSocketChannelSyncHelper()72 ~ThreadableWebSocketChannelSyncHelper()
73 {
74 }
75
76 // All setters are called on the main thread.
setConnectRequestResult(bool connectRequestResult)77 void setConnectRequestResult(bool connectRequestResult)
78 {
79 m_connectRequestResult = connectRequestResult;
80 }
81
82 // All getter are called on the worker thread.
connectRequestResult() const83 bool connectRequestResult() const
84 {
85 return m_connectRequestResult;
86 }
87
88 // This should be called after all setters are called and before any
89 // getters are called.
signalWorkerThread()90 void signalWorkerThread()
91 {
92 m_event->signal();
93 }
wait()94 void wait()
95 {
96 m_event->wait();
97 }
98
trace(Visitor * visitor)99 void trace(Visitor* visitor) { }
100
101 private:
ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)102 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
103 : m_event(event)
104 , m_connectRequestResult(false)
105 {
106 }
107
108 OwnPtr<WebWaitableEvent> m_event;
109 bool m_connectRequestResult;
110 };
111
WorkerThreadableWebSocketChannel(WorkerGlobalScope & workerGlobalScope,WebSocketChannelClient * client,const String & sourceURL,unsigned lineNumber)112 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
113 : m_bridge(new Bridge(client, workerGlobalScope))
114 , m_sourceURLAtConnection(sourceURL)
115 , m_lineNumberAtConnection(lineNumber)
116 {
117 m_bridge->initialize(sourceURL, lineNumber);
118 }
119
~WorkerThreadableWebSocketChannel()120 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
121 {
122 ASSERT(!m_bridge);
123 }
124
connect(const KURL & url,const String & protocol)125 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
126 {
127 ASSERT(m_bridge);
128 return m_bridge->connect(url, protocol);
129 }
130
send(const String & message)131 void WorkerThreadableWebSocketChannel::send(const String& message)
132 {
133 ASSERT(m_bridge);
134 m_bridge->send(message);
135 }
136
send(const ArrayBuffer & binaryData,unsigned byteOffset,unsigned byteLength)137 void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
138 {
139 ASSERT(m_bridge);
140 m_bridge->send(binaryData, byteOffset, byteLength);
141 }
142
send(PassRefPtr<BlobDataHandle> blobData)143 void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
144 {
145 ASSERT(m_bridge);
146 m_bridge->send(blobData);
147 }
148
close(int code,const String & reason)149 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
150 {
151 ASSERT(m_bridge);
152 m_bridge->close(code, reason);
153 }
154
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)155 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
156 {
157 if (!m_bridge)
158 return;
159
160 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
161 if (callStack && callStack->size()) {
162 // In order to emulate the ConsoleMessage behavior,
163 // we should ignore the specified url and line number if
164 // we can get the JavaScript context.
165 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
166 } else if (sourceURL.isEmpty() && !lineNumber) {
167 // No information is specified by the caller - use the url
168 // and the line number at the connection.
169 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
170 } else {
171 // Use the specified information.
172 m_bridge->fail(reason, level, sourceURL, lineNumber);
173 }
174 }
175
disconnect()176 void WorkerThreadableWebSocketChannel::disconnect()
177 {
178 m_bridge->disconnect();
179 m_bridge.clear();
180 }
181
trace(Visitor * visitor)182 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
183 {
184 visitor->trace(m_bridge);
185 WebSocketChannel::trace(visitor);
186 }
187
Peer(Bridge * bridge,WorkerLoaderProxy & loaderProxy,ThreadableWebSocketChannelSyncHelper * syncHelper)188 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
189 : m_bridge(bridge)
190 , m_loaderProxy(loaderProxy)
191 , m_mainWebSocketChannel(nullptr)
192 , m_syncHelper(syncHelper)
193 {
194 ASSERT(!isMainThread());
195 }
196
~Peer()197 Peer::~Peer()
198 {
199 ASSERT(!isMainThread());
200 }
201
initializeInternal(ExecutionContext * context,const String & sourceURL,unsigned lineNumber)202 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
203 {
204 ASSERT(isMainThread());
205 Document* document = toDocument(context);
206 if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
207 m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
208 } else {
209 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
210 }
211 m_syncHelper->signalWorkerThread();
212 }
213
connect(const KURL & url,const String & protocol)214 void Peer::connect(const KURL& url, const String& protocol)
215 {
216 ASSERT(isMainThread());
217 ASSERT(m_syncHelper);
218 if (!m_mainWebSocketChannel) {
219 m_syncHelper->setConnectRequestResult(false);
220 } else {
221 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
222 m_syncHelper->setConnectRequestResult(connectRequestResult);
223 }
224 m_syncHelper->signalWorkerThread();
225 }
226
send(const String & message)227 void Peer::send(const String& message)
228 {
229 ASSERT(isMainThread());
230 if (m_mainWebSocketChannel)
231 m_mainWebSocketChannel->send(message);
232 }
233
sendArrayBuffer(PassOwnPtr<Vector<char>> data)234 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
235 {
236 ASSERT(isMainThread());
237 if (m_mainWebSocketChannel)
238 m_mainWebSocketChannel->send(data);
239 }
240
sendBlob(PassRefPtr<BlobDataHandle> blobData)241 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
242 {
243 ASSERT(isMainThread());
244 if (m_mainWebSocketChannel)
245 m_mainWebSocketChannel->send(blobData);
246 }
247
close(int code,const String & reason)248 void Peer::close(int code, const String& reason)
249 {
250 ASSERT(isMainThread());
251 ASSERT(m_syncHelper);
252 if (!m_mainWebSocketChannel)
253 return;
254 m_mainWebSocketChannel->close(code, reason);
255 }
256
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)257 void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
258 {
259 ASSERT(isMainThread());
260 ASSERT(m_syncHelper);
261 if (!m_mainWebSocketChannel)
262 return;
263 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
264 }
265
disconnect()266 void Peer::disconnect()
267 {
268 ASSERT(isMainThread());
269 ASSERT(m_syncHelper);
270 if (m_mainWebSocketChannel) {
271 m_mainWebSocketChannel->disconnect();
272 m_mainWebSocketChannel = nullptr;
273 }
274 m_syncHelper->signalWorkerThread();
275 }
276
workerGlobalScopeDidConnect(ExecutionContext * context,Bridge * bridge,const String & subprotocol,const String & extensions)277 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridge, const String& subprotocol, const String& extensions)
278 {
279 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
280 if (bridge->client())
281 bridge->client()->didConnect(subprotocol, extensions);
282 }
283
didConnect(const String & subprotocol,const String & extensions)284 void Peer::didConnect(const String& subprotocol, const String& extensions)
285 {
286 ASSERT(isMainThread());
287 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConnect, m_bridge, subprotocol, extensions));
288 }
289
workerGlobalScopeDidReceiveMessage(ExecutionContext * context,Bridge * bridge,const String & message)290 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, Bridge* bridge, const String& message)
291 {
292 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
293 if (bridge->client())
294 bridge->client()->didReceiveMessage(message);
295 }
296
didReceiveMessage(const String & message)297 void Peer::didReceiveMessage(const String& message)
298 {
299 ASSERT(isMainThread());
300 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_bridge, message));
301 }
302
workerGlobalScopeDidReceiveBinaryData(ExecutionContext * context,Bridge * bridge,PassOwnPtr<Vector<char>> binaryData)303 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > binaryData)
304 {
305 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
306 if (bridge->client())
307 bridge->client()->didReceiveBinaryData(binaryData);
308 }
309
didReceiveBinaryData(PassOwnPtr<Vector<char>> binaryData)310 void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
311 {
312 ASSERT(isMainThread());
313 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_bridge, binaryData));
314 }
315
workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext * context,Bridge * bridge,unsigned long consumed)316 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed)
317 {
318 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
319 if (bridge->client())
320 bridge->client()->didConsumeBufferedAmount(consumed);
321 }
322
didConsumeBufferedAmount(unsigned long consumed)323 void Peer::didConsumeBufferedAmount(unsigned long consumed)
324 {
325 ASSERT(isMainThread());
326 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_bridge, consumed));
327 }
328
workerGlobalScopeDidStartClosingHandshake(ExecutionContext * context,Bridge * bridge)329 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge)
330 {
331 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
332 if (bridge->client())
333 bridge->client()->didStartClosingHandshake();
334 }
335
didStartClosingHandshake()336 void Peer::didStartClosingHandshake()
337 {
338 ASSERT(isMainThread());
339 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_bridge));
340 }
341
workerGlobalScopeDidClose(ExecutionContext * context,Bridge * bridge,WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion,unsigned short code,const String & reason)342 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
343 {
344 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
345 if (bridge->client())
346 bridge->client()->didClose(closingHandshakeCompletion, code, reason);
347 }
348
didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion,unsigned short code,const String & reason)349 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
350 {
351 ASSERT(isMainThread());
352 if (m_mainWebSocketChannel) {
353 m_mainWebSocketChannel->disconnect();
354 m_mainWebSocketChannel = nullptr;
355 }
356 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason));
357 }
358
workerGlobalScopeDidReceiveMessageError(ExecutionContext * context,Bridge * bridge)359 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, Bridge* bridge)
360 {
361 ASSERT_UNUSED(context, context->isWorkerGlobalScope());
362 if (bridge->client())
363 bridge->client()->didReceiveMessageError();
364 }
365
didReceiveMessageError()366 void Peer::didReceiveMessageError()
367 {
368 ASSERT(isMainThread());
369 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_bridge));
370 }
371
trace(Visitor * visitor)372 void Peer::trace(Visitor* visitor)
373 {
374 visitor->trace(m_bridge);
375 visitor->trace(m_mainWebSocketChannel);
376 visitor->trace(m_syncHelper);
377 WebSocketChannelClient::trace(visitor);
378 }
379
Bridge(WebSocketChannelClient * client,WorkerGlobalScope & workerGlobalScope)380 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalScope)
381 : m_client(client)
382 , m_workerGlobalScope(workerGlobalScope)
383 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
384 , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
385 , m_peer(new Peer(this, m_loaderProxy, m_syncHelper))
386 {
387 }
388
~Bridge()389 Bridge::~Bridge()
390 {
391 ASSERT(!m_peer);
392 }
393
initialize(const String & sourceURL,unsigned lineNumber)394 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
395 {
396 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber))) {
397 // The worker thread has been signalled to shutdown before method completion.
398 disconnect();
399 }
400 }
401
connect(const KURL & url,const String & protocol)402 bool Bridge::connect(const KURL& url, const String& protocol)
403 {
404 if (!m_peer)
405 return false;
406
407 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
408 return false;
409
410 return m_syncHelper->connectRequestResult();
411 }
412
send(const String & message)413 void Bridge::send(const String& message)
414 {
415 ASSERT(m_peer);
416 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
417 }
418
send(const ArrayBuffer & binaryData,unsigned byteOffset,unsigned byteLength)419 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
420 {
421 ASSERT(m_peer);
422 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
423 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
424 if (binaryData.byteLength())
425 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
426
427 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
428 }
429
send(PassRefPtr<BlobDataHandle> data)430 void Bridge::send(PassRefPtr<BlobDataHandle> data)
431 {
432 ASSERT(m_peer);
433 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
434 }
435
close(int code,const String & reason)436 void Bridge::close(int code, const String& reason)
437 {
438 ASSERT(m_peer);
439 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
440 }
441
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)442 void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
443 {
444 ASSERT(m_peer);
445 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
446 }
447
disconnect()448 void Bridge::disconnect()
449 {
450 if (!m_peer)
451 return;
452
453 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
454 // Here |m_peer| is detached from the main thread and we can delete it.
455
456 m_client = nullptr;
457 m_peer = nullptr;
458 m_syncHelper = nullptr;
459 // We won't use this any more.
460 m_workerGlobalScope.clear();
461 }
462
463 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
464 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)465 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
466 {
467 ASSERT(m_workerGlobalScope);
468 ASSERT(m_syncHelper);
469
470 m_loaderProxy.postTaskToLoader(task);
471
472 // We wait for the syncHelper event even if a shutdown event is fired.
473 // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
474 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
475 m_syncHelper->wait();
476 // This is checking whether a shutdown event is fired or not.
477 return !m_workerGlobalScope->thread()->terminated();
478 }
479
trace(Visitor * visitor)480 void Bridge::trace(Visitor* visitor)
481 {
482 visitor->trace(m_client);
483 visitor->trace(m_workerGlobalScope);
484 visitor->trace(m_syncHelper);
485 visitor->trace(m_peer);
486 }
487
488 } // namespace blink
489