• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "config.h"
32 
33 #include "modules/websockets/WorkerThreadableWebSocketChannel.h"
34 
35 #include "bindings/core/v8/ScriptCallStackFactory.h"
36 #include "core/dom/CrossThreadTask.h"
37 #include "core/dom/Document.h"
38 #include "core/dom/ExecutionContext.h"
39 #include "core/dom/ExecutionContextTask.h"
40 #include "core/fileapi/Blob.h"
41 #include "core/inspector/ScriptCallFrame.h"
42 #include "core/inspector/ScriptCallStack.h"
43 #include "core/workers/WorkerGlobalScope.h"
44 #include "core/workers/WorkerLoaderProxy.h"
45 #include "core/workers/WorkerThread.h"
46 #include "modules/websockets/MainThreadWebSocketChannel.h"
47 #include "modules/websockets/NewWebSocketChannelImpl.h"
48 #include "platform/RuntimeEnabledFeatures.h"
49 #include "public/platform/Platform.h"
50 #include "public/platform/WebWaitableEvent.h"
51 #include "wtf/ArrayBuffer.h"
52 #include "wtf/Assertions.h"
53 #include "wtf/Functional.h"
54 #include "wtf/MainThread.h"
55 #include "wtf/text/WTFString.h"
56 
57 namespace blink {
58 
59 typedef WorkerThreadableWebSocketChannel::Bridge Bridge;
60 typedef WorkerThreadableWebSocketChannel::Peer Peer;
61 
62 // Created and destroyed on the worker thread. All setters of this class are
63 // called on the main thread, while all getters are called on the worker
64 // thread. signalWorkerThread() must be called before any getters are called.
65 class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<ThreadableWebSocketChannelSyncHelper> {
66 public:
create(PassOwnPtr<WebWaitableEvent> event)67     static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event)
68     {
69         return new ThreadableWebSocketChannelSyncHelper(event);
70     }
71 
~ThreadableWebSocketChannelSyncHelper()72     ~ThreadableWebSocketChannelSyncHelper()
73     {
74     }
75 
76     // All setters are called on the main thread.
setConnectRequestResult(bool connectRequestResult)77     void setConnectRequestResult(bool connectRequestResult)
78     {
79         m_connectRequestResult = connectRequestResult;
80     }
81 
82     // All getter are called on the worker thread.
connectRequestResult() const83     bool connectRequestResult() const
84     {
85         return m_connectRequestResult;
86     }
87 
88     // This should be called after all setters are called and before any
89     // getters are called.
signalWorkerThread()90     void signalWorkerThread()
91     {
92         m_event->signal();
93     }
wait()94     void wait()
95     {
96         m_event->wait();
97     }
98 
trace(Visitor * visitor)99     void trace(Visitor* visitor) { }
100 
101 private:
ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)102     explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event)
103         : m_event(event)
104         , m_connectRequestResult(false)
105     {
106     }
107 
108     OwnPtr<WebWaitableEvent> m_event;
109     bool m_connectRequestResult;
110 };
111 
WorkerThreadableWebSocketChannel(WorkerGlobalScope & workerGlobalScope,WebSocketChannelClient * client,const String & sourceURL,unsigned lineNumber)112 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber)
113     : m_bridge(new Bridge(client, workerGlobalScope))
114     , m_sourceURLAtConnection(sourceURL)
115     , m_lineNumberAtConnection(lineNumber)
116 {
117     m_bridge->initialize(sourceURL, lineNumber);
118 }
119 
~WorkerThreadableWebSocketChannel()120 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
121 {
122     ASSERT(!m_bridge);
123 }
124 
connect(const KURL & url,const String & protocol)125 bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
126 {
127     ASSERT(m_bridge);
128     return m_bridge->connect(url, protocol);
129 }
130 
send(const String & message)131 void WorkerThreadableWebSocketChannel::send(const String& message)
132 {
133     ASSERT(m_bridge);
134     m_bridge->send(message);
135 }
136 
send(const ArrayBuffer & binaryData,unsigned byteOffset,unsigned byteLength)137 void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
138 {
139     ASSERT(m_bridge);
140     m_bridge->send(binaryData, byteOffset, byteLength);
141 }
142 
send(PassRefPtr<BlobDataHandle> blobData)143 void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData)
144 {
145     ASSERT(m_bridge);
146     m_bridge->send(blobData);
147 }
148 
close(int code,const String & reason)149 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
150 {
151     ASSERT(m_bridge);
152     m_bridge->close(code, reason);
153 }
154 
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)155 void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
156 {
157     if (!m_bridge)
158         return;
159 
160     RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true);
161     if (callStack && callStack->size())  {
162         // In order to emulate the ConsoleMessage behavior,
163         // we should ignore the specified url and line number if
164         // we can get the JavaScript context.
165         m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber());
166     } else if (sourceURL.isEmpty() && !lineNumber) {
167         // No information is specified by the caller - use the url
168         // and the line number at the connection.
169         m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection);
170     } else {
171         // Use the specified information.
172         m_bridge->fail(reason, level, sourceURL, lineNumber);
173     }
174 }
175 
disconnect()176 void WorkerThreadableWebSocketChannel::disconnect()
177 {
178     m_bridge->disconnect();
179     m_bridge.clear();
180 }
181 
trace(Visitor * visitor)182 void WorkerThreadableWebSocketChannel::trace(Visitor* visitor)
183 {
184     visitor->trace(m_bridge);
185     WebSocketChannel::trace(visitor);
186 }
187 
Peer(Bridge * bridge,WorkerLoaderProxy & loaderProxy,ThreadableWebSocketChannelSyncHelper * syncHelper)188 Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper)
189     : m_bridge(bridge)
190     , m_loaderProxy(loaderProxy)
191     , m_mainWebSocketChannel(nullptr)
192     , m_syncHelper(syncHelper)
193 {
194     ASSERT(!isMainThread());
195 }
196 
~Peer()197 Peer::~Peer()
198 {
199     ASSERT(!isMainThread());
200 }
201 
initializeInternal(ExecutionContext * context,const String & sourceURL,unsigned lineNumber)202 void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber)
203 {
204     ASSERT(isMainThread());
205     Document* document = toDocument(context);
206     if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) {
207         m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber);
208     } else {
209         m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber);
210     }
211     m_syncHelper->signalWorkerThread();
212 }
213 
connect(const KURL & url,const String & protocol)214 void Peer::connect(const KURL& url, const String& protocol)
215 {
216     ASSERT(isMainThread());
217     ASSERT(m_syncHelper);
218     if (!m_mainWebSocketChannel) {
219         m_syncHelper->setConnectRequestResult(false);
220     } else {
221         bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol);
222         m_syncHelper->setConnectRequestResult(connectRequestResult);
223     }
224     m_syncHelper->signalWorkerThread();
225 }
226 
send(const String & message)227 void Peer::send(const String& message)
228 {
229     ASSERT(isMainThread());
230     if (m_mainWebSocketChannel)
231         m_mainWebSocketChannel->send(message);
232 }
233 
sendArrayBuffer(PassOwnPtr<Vector<char>> data)234 void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data)
235 {
236     ASSERT(isMainThread());
237     if (m_mainWebSocketChannel)
238         m_mainWebSocketChannel->send(data);
239 }
240 
sendBlob(PassRefPtr<BlobDataHandle> blobData)241 void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData)
242 {
243     ASSERT(isMainThread());
244     if (m_mainWebSocketChannel)
245         m_mainWebSocketChannel->send(blobData);
246 }
247 
close(int code,const String & reason)248 void Peer::close(int code, const String& reason)
249 {
250     ASSERT(isMainThread());
251     ASSERT(m_syncHelper);
252     if (!m_mainWebSocketChannel)
253         return;
254     m_mainWebSocketChannel->close(code, reason);
255 }
256 
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)257 void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
258 {
259     ASSERT(isMainThread());
260     ASSERT(m_syncHelper);
261     if (!m_mainWebSocketChannel)
262         return;
263     m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber);
264 }
265 
disconnect()266 void Peer::disconnect()
267 {
268     ASSERT(isMainThread());
269     ASSERT(m_syncHelper);
270     if (m_mainWebSocketChannel) {
271         m_mainWebSocketChannel->disconnect();
272         m_mainWebSocketChannel = nullptr;
273     }
274     m_syncHelper->signalWorkerThread();
275 }
276 
workerGlobalScopeDidConnect(ExecutionContext * context,Bridge * bridge,const String & subprotocol,const String & extensions)277 static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridge, const String& subprotocol, const String& extensions)
278 {
279     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
280     if (bridge->client())
281         bridge->client()->didConnect(subprotocol, extensions);
282 }
283 
didConnect(const String & subprotocol,const String & extensions)284 void Peer::didConnect(const String& subprotocol, const String& extensions)
285 {
286     ASSERT(isMainThread());
287     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConnect, m_bridge, subprotocol, extensions));
288 }
289 
workerGlobalScopeDidReceiveMessage(ExecutionContext * context,Bridge * bridge,const String & message)290 static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, Bridge* bridge, const String& message)
291 {
292     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
293     if (bridge->client())
294         bridge->client()->didReceiveMessage(message);
295 }
296 
didReceiveMessage(const String & message)297 void Peer::didReceiveMessage(const String& message)
298 {
299     ASSERT(isMainThread());
300     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_bridge, message));
301 }
302 
workerGlobalScopeDidReceiveBinaryData(ExecutionContext * context,Bridge * bridge,PassOwnPtr<Vector<char>> binaryData)303 static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > binaryData)
304 {
305     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
306     if (bridge->client())
307         bridge->client()->didReceiveBinaryData(binaryData);
308 }
309 
didReceiveBinaryData(PassOwnPtr<Vector<char>> binaryData)310 void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
311 {
312     ASSERT(isMainThread());
313     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_bridge, binaryData));
314 }
315 
workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext * context,Bridge * bridge,unsigned long consumed)316 static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed)
317 {
318     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
319     if (bridge->client())
320         bridge->client()->didConsumeBufferedAmount(consumed);
321 }
322 
didConsumeBufferedAmount(unsigned long consumed)323 void Peer::didConsumeBufferedAmount(unsigned long consumed)
324 {
325     ASSERT(isMainThread());
326     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_bridge, consumed));
327 }
328 
workerGlobalScopeDidStartClosingHandshake(ExecutionContext * context,Bridge * bridge)329 static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge)
330 {
331     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
332     if (bridge->client())
333         bridge->client()->didStartClosingHandshake();
334 }
335 
didStartClosingHandshake()336 void Peer::didStartClosingHandshake()
337 {
338     ASSERT(isMainThread());
339     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_bridge));
340 }
341 
workerGlobalScopeDidClose(ExecutionContext * context,Bridge * bridge,WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion,unsigned short code,const String & reason)342 static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
343 {
344     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
345     if (bridge->client())
346         bridge->client()->didClose(closingHandshakeCompletion, code, reason);
347 }
348 
didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion,unsigned short code,const String & reason)349 void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
350 {
351     ASSERT(isMainThread());
352     if (m_mainWebSocketChannel) {
353         m_mainWebSocketChannel->disconnect();
354         m_mainWebSocketChannel = nullptr;
355     }
356     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason));
357 }
358 
workerGlobalScopeDidReceiveMessageError(ExecutionContext * context,Bridge * bridge)359 static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, Bridge* bridge)
360 {
361     ASSERT_UNUSED(context, context->isWorkerGlobalScope());
362     if (bridge->client())
363         bridge->client()->didReceiveMessageError();
364 }
365 
didReceiveMessageError()366 void Peer::didReceiveMessageError()
367 {
368     ASSERT(isMainThread());
369     m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_bridge));
370 }
371 
trace(Visitor * visitor)372 void Peer::trace(Visitor* visitor)
373 {
374     visitor->trace(m_bridge);
375     visitor->trace(m_mainWebSocketChannel);
376     visitor->trace(m_syncHelper);
377     WebSocketChannelClient::trace(visitor);
378 }
379 
Bridge(WebSocketChannelClient * client,WorkerGlobalScope & workerGlobalScope)380 Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalScope)
381     : m_client(client)
382     , m_workerGlobalScope(workerGlobalScope)
383     , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy())
384     , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent())))
385     , m_peer(new Peer(this, m_loaderProxy, m_syncHelper))
386 {
387 }
388 
~Bridge()389 Bridge::~Bridge()
390 {
391     ASSERT(!m_peer);
392 }
393 
initialize(const String & sourceURL,unsigned lineNumber)394 void Bridge::initialize(const String& sourceURL, unsigned lineNumber)
395 {
396     if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber))) {
397         // The worker thread has been signalled to shutdown before method completion.
398         disconnect();
399     }
400 }
401 
connect(const KURL & url,const String & protocol)402 bool Bridge::connect(const KURL& url, const String& protocol)
403 {
404     if (!m_peer)
405         return false;
406 
407     if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol)))
408         return false;
409 
410     return m_syncHelper->connectRequestResult();
411 }
412 
send(const String & message)413 void Bridge::send(const String& message)
414 {
415     ASSERT(m_peer);
416     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message));
417 }
418 
send(const ArrayBuffer & binaryData,unsigned byteOffset,unsigned byteLength)419 void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
420 {
421     ASSERT(m_peer);
422     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
423     OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
424     if (binaryData.byteLength())
425         memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
426 
427     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release()));
428 }
429 
send(PassRefPtr<BlobDataHandle> data)430 void Bridge::send(PassRefPtr<BlobDataHandle> data)
431 {
432     ASSERT(m_peer);
433     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data));
434 }
435 
close(int code,const String & reason)436 void Bridge::close(int code, const String& reason)
437 {
438     ASSERT(m_peer);
439     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason));
440 }
441 
fail(const String & reason,MessageLevel level,const String & sourceURL,unsigned lineNumber)442 void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber)
443 {
444     ASSERT(m_peer);
445     m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber));
446 }
447 
disconnect()448 void Bridge::disconnect()
449 {
450     if (!m_peer)
451         return;
452 
453     waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get()));
454     // Here |m_peer| is detached from the main thread and we can delete it.
455 
456     m_client = nullptr;
457     m_peer = nullptr;
458     m_syncHelper = nullptr;
459     // We won't use this any more.
460     m_workerGlobalScope.clear();
461 }
462 
463 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
464 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)465 bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task)
466 {
467     ASSERT(m_workerGlobalScope);
468     ASSERT(m_syncHelper);
469 
470     m_loaderProxy.postTaskToLoader(task);
471 
472     // We wait for the syncHelper event even if a shutdown event is fired.
473     // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this.
474     ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack);
475     m_syncHelper->wait();
476     // This is checking whether a shutdown event is fired or not.
477     return !m_workerGlobalScope->thread()->terminated();
478 }
479 
trace(Visitor * visitor)480 void Bridge::trace(Visitor* visitor)
481 {
482     visitor->trace(m_client);
483     visitor->trace(m_workerGlobalScope);
484     visitor->trace(m_syncHelper);
485     visitor->trace(m_peer);
486 }
487 
488 } // namespace blink
489