1 /*
2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include "BinarySemaphore.h"
30 #include "CoreIPCMessageKinds.h"
31 #include "RunLoop.h"
32 #include "WebProcess.h"
33 #include "WorkItem.h"
34 #include <wtf/CurrentTime.h>
35
36 using namespace std;
37
38 namespace CoreIPC {
39
40 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
41 public:
42 static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
43 ~SyncMessageState();
44
wakeUpClientRunLoop()45 void wakeUpClientRunLoop()
46 {
47 m_waitForSyncReplySemaphore.signal();
48 }
49
wait(double absoluteTime)50 bool wait(double absoluteTime)
51 {
52 return m_waitForSyncReplySemaphore.wait(absoluteTime);
53 }
54
55 #if PLATFORM(WIN)
waitWhileDispatchingSentWin32Messages(double absoluteTime,const Vector<HWND> & windowsToReceiveMessages)56 bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
57 {
58 return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
59 }
60 #endif
61
62 // Returns true if this message will be handled on a client thread that is currently
63 // waiting for a reply to a synchronous message.
64 bool processIncomingMessage(Connection*, IncomingMessage&);
65
66 void dispatchMessages();
67
68 private:
69 explicit SyncMessageState(RunLoop*);
70
71 typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
syncMessageStateMap()72 static SyncMessageStateMap& syncMessageStateMap()
73 {
74 DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
75 return syncMessageStateMap;
76 }
77
syncMessageStateMapMutex()78 static Mutex& syncMessageStateMapMutex()
79 {
80 DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
81 return syncMessageStateMapMutex;
82 }
83
84 void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
85
86 RunLoop* m_runLoop;
87 BinarySemaphore m_waitForSyncReplySemaphore;
88
89 // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
90 Mutex m_mutex;
91
92 bool m_didScheduleDispatchMessagesWork;
93
94 struct ConnectionAndIncomingMessage {
95 RefPtr<Connection> connection;
96 IncomingMessage incomingMessage;
97 };
98 Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
99 };
100
getOrCreate(RunLoop * runLoop)101 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
102 {
103 MutexLocker locker(syncMessageStateMapMutex());
104 pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
105
106 if (!result.second) {
107 ASSERT(result.first->second);
108 return result.first->second;
109 }
110
111 RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
112 result.first->second = syncMessageState.get();
113
114 return syncMessageState.release();
115 }
116
SyncMessageState(RunLoop * runLoop)117 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
118 : m_runLoop(runLoop)
119 , m_didScheduleDispatchMessagesWork(false)
120 {
121 }
122
~SyncMessageState()123 Connection::SyncMessageState::~SyncMessageState()
124 {
125 MutexLocker locker(syncMessageStateMapMutex());
126
127 ASSERT(syncMessageStateMap().contains(m_runLoop));
128 syncMessageStateMap().remove(m_runLoop);
129 }
130
processIncomingMessage(Connection * connection,IncomingMessage & incomingMessage)131 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
132 {
133 MessageID messageID = incomingMessage.messageID();
134 if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
135 return false;
136
137 ConnectionAndIncomingMessage connectionAndIncomingMessage;
138 connectionAndIncomingMessage.connection = connection;
139 connectionAndIncomingMessage.incomingMessage = incomingMessage;
140
141 {
142 MutexLocker locker(m_mutex);
143
144 if (!m_didScheduleDispatchMessagesWork) {
145 m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
146 m_didScheduleDispatchMessagesWork = true;
147 }
148
149 m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
150 }
151
152 wakeUpClientRunLoop();
153
154 return true;
155 }
156
dispatchMessages()157 void Connection::SyncMessageState::dispatchMessages()
158 {
159 ASSERT(m_runLoop == RunLoop::current());
160
161 Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
162
163 {
164 MutexLocker locker(m_mutex);
165 m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
166 }
167
168 for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
169 ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
170 connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
171 }
172 }
173
dispatchMessageAndResetDidScheduleDispatchMessagesWork()174 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
175 {
176 {
177 MutexLocker locker(m_mutex);
178 ASSERT(m_didScheduleDispatchMessagesWork);
179 m_didScheduleDispatchMessagesWork = false;
180 }
181
182 dispatchMessages();
183 }
184
createServerConnection(Identifier identifier,Client * client,RunLoop * clientRunLoop)185 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
186 {
187 return adoptRef(new Connection(identifier, true, client, clientRunLoop));
188 }
189
createClientConnection(Identifier identifier,Client * client,RunLoop * clientRunLoop)190 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
191 {
192 return adoptRef(new Connection(identifier, false, client, clientRunLoop));
193 }
194
Connection(Identifier identifier,bool isServer,Client * client,RunLoop * clientRunLoop)195 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
196 : m_client(client)
197 , m_isServer(isServer)
198 , m_syncRequestID(0)
199 , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
200 , m_shouldExitOnSyncMessageSendFailure(false)
201 , m_didCloseOnConnectionWorkQueueCallback(0)
202 , m_isConnected(false)
203 , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
204 , m_clientRunLoop(clientRunLoop)
205 , m_inDispatchMessageCount(0)
206 , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
207 , m_didReceiveInvalidMessage(false)
208 , m_defaultSyncMessageTimeout(NoTimeout)
209 , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
210 , m_shouldWaitForSyncReplies(true)
211 {
212 ASSERT(m_client);
213
214 platformInitialize(identifier);
215 }
216
~Connection()217 Connection::~Connection()
218 {
219 ASSERT(!isValid());
220
221 m_connectionQueue.invalidate();
222 }
223
setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)224 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
225 {
226 ASSERT(!m_isConnected);
227
228 m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
229 }
230
setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)231 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
232 {
233 ASSERT(!m_isConnected);
234
235 m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
236 }
237
setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)238 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
239 {
240 ASSERT(!m_isConnected);
241
242 m_didCloseOnConnectionWorkQueueCallback = callback;
243 }
244
invalidate()245 void Connection::invalidate()
246 {
247 if (!isValid()) {
248 // Someone already called invalidate().
249 return;
250 }
251
252 // Reset the client.
253 m_client = 0;
254
255 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
256 }
257
markCurrentlyDispatchedMessageAsInvalid()258 void Connection::markCurrentlyDispatchedMessageAsInvalid()
259 {
260 // This should only be called while processing a message.
261 ASSERT(m_inDispatchMessageCount > 0);
262
263 m_didReceiveInvalidMessage = true;
264 }
265
setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)266 void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
267 {
268 ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
269
270 m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
271 }
272
createSyncMessageArgumentEncoder(uint64_t destinationID,uint64_t & syncRequestID)273 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
274 {
275 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
276
277 // Encode the sync request ID.
278 syncRequestID = ++m_syncRequestID;
279 argumentEncoder->encode(syncRequestID);
280
281 return argumentEncoder.release();
282 }
283
sendMessage(MessageID messageID,PassOwnPtr<ArgumentEncoder> arguments,unsigned messageSendFlags)284 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
285 {
286 if (!isValid())
287 return false;
288
289 if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
290 && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
291 || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
292 messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
293
294 MutexLocker locker(m_outgoingMessagesLock);
295 m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
296
297 // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
298 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
299 return true;
300 }
301
sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)302 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
303 {
304 return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
305 }
306
waitForMessage(MessageID messageID,uint64_t destinationID,double timeout)307 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
308 {
309 // First, check if this message is already in the incoming messages queue.
310 {
311 MutexLocker locker(m_incomingMessagesLock);
312
313 for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
314 const IncomingMessage& message = m_incomingMessages[i];
315
316 if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
317 OwnPtr<ArgumentDecoder> arguments(message.arguments());
318
319 // Erase the incoming message.
320 m_incomingMessages.remove(i);
321 return arguments.release();
322 }
323 }
324 }
325
326 double absoluteTime = currentTime() + timeout;
327
328 std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
329
330 {
331 MutexLocker locker(m_waitForMessageMutex);
332
333 // We don't support having multiple clients wait for the same message.
334 ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
335
336 // Insert our pending wait.
337 m_waitForMessageMap.set(messageAndDestination, 0);
338 }
339
340 // Now wait for it to be set.
341 while (true) {
342 MutexLocker locker(m_waitForMessageMutex);
343
344 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
345 if (it->second) {
346 OwnPtr<ArgumentDecoder> arguments(it->second);
347 m_waitForMessageMap.remove(it);
348
349 return arguments.release();
350 }
351
352 // Now we wait.
353 if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
354 // We timed out, now remove the pending wait.
355 m_waitForMessageMap.remove(messageAndDestination);
356
357 break;
358 }
359 }
360
361 return PassOwnPtr<ArgumentDecoder>();
362 }
363
sendSyncMessage(MessageID messageID,uint64_t syncRequestID,PassOwnPtr<ArgumentEncoder> encoder,double timeout)364 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
365 {
366 // We only allow sending sync messages from the client run loop.
367 ASSERT(RunLoop::current() == m_clientRunLoop);
368
369 if (!isValid()) {
370 didFailToSendSyncMessage();
371 return 0;
372 }
373
374 // Push the pending sync reply information on our stack.
375 {
376 MutexLocker locker(m_syncReplyStateMutex);
377 if (!m_shouldWaitForSyncReplies) {
378 didFailToSendSyncMessage();
379 return 0;
380 }
381
382 m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
383 }
384
385 // First send the message.
386 sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
387
388 // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
389 // keep an extra reference to the connection here in case it's invalidated.
390 RefPtr<Connection> protect(this);
391 OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
392
393 // Finally, pop the pending sync reply information.
394 {
395 MutexLocker locker(m_syncReplyStateMutex);
396 ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
397 m_pendingSyncReplies.removeLast();
398 }
399
400 if (!reply)
401 didFailToSendSyncMessage();
402
403 return reply.release();
404 }
405
waitForSyncReply(uint64_t syncRequestID,double timeout)406 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
407 {
408 if (timeout == DefaultTimeout)
409 timeout = m_defaultSyncMessageTimeout;
410
411 // Use a really long timeout.
412 if (timeout == NoTimeout)
413 timeout = 1e10;
414
415 double absoluteTime = currentTime() + timeout;
416
417 bool timedOut = false;
418 while (!timedOut) {
419 // First, check if we have any messages that we need to process.
420 m_syncMessageState->dispatchMessages();
421
422 {
423 MutexLocker locker(m_syncReplyStateMutex);
424
425 // Second, check if there is a sync reply at the top of the stack.
426 ASSERT(!m_pendingSyncReplies.isEmpty());
427
428 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
429 ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
430
431 // We found the sync reply, or the connection was closed.
432 if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
433 return pendingSyncReply.releaseReplyDecoder();
434 }
435
436 // We didn't find a sync reply yet, keep waiting.
437 #if PLATFORM(WIN)
438 timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
439 #else
440 timedOut = !m_syncMessageState->wait(absoluteTime);
441 #endif
442 }
443
444 // We timed out.
445 if (m_client)
446 m_client->syncMessageSendTimedOut(this);
447
448 return 0;
449 }
450
processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)451 void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
452 {
453 MutexLocker locker(m_syncReplyStateMutex);
454 ASSERT(!m_pendingSyncReplies.isEmpty());
455
456 // Go through the stack of sync requests that have pending replies and see which one
457 // this reply is for.
458 for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
459 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
460
461 if (pendingSyncReply.syncRequestID != arguments->destinationID())
462 continue;
463
464 ASSERT(!pendingSyncReply.replyDecoder);
465
466 pendingSyncReply.replyDecoder = arguments.leakPtr();
467 pendingSyncReply.didReceiveReply = true;
468
469 // We got a reply to the last send message, wake up the client run loop so it can be processed.
470 if (i == m_pendingSyncReplies.size())
471 m_syncMessageState->wakeUpClientRunLoop();
472
473 return;
474 }
475
476 // We got a reply for a message we never sent.
477 // FIXME: Dispatch a didReceiveInvalidMessage callback on the client.
478 ASSERT_NOT_REACHED();
479 }
480
processIncomingMessage(MessageID messageID,PassOwnPtr<ArgumentDecoder> arguments)481 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
482 {
483 // Check if this is a sync reply.
484 if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
485 processIncomingSyncReply(arguments);
486 return;
487 }
488
489 IncomingMessage incomingMessage(messageID, arguments);
490
491 // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
492 // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
493 // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
494 if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
495 return;
496
497 // Check if we're waiting for this message.
498 {
499 MutexLocker locker(m_waitForMessageMutex);
500
501 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
502 if (it != m_waitForMessageMap.end()) {
503 it->second = incomingMessage.releaseArguments().leakPtr();
504 ASSERT(it->second);
505
506 m_waitForMessageCondition.signal();
507 return;
508 }
509 }
510
511 enqueueIncomingMessage(incomingMessage);
512 }
513
connectionDidClose()514 void Connection::connectionDidClose()
515 {
516 // The connection is now invalid.
517 platformInvalidate();
518
519 {
520 MutexLocker locker(m_syncReplyStateMutex);
521
522 ASSERT(m_shouldWaitForSyncReplies);
523 m_shouldWaitForSyncReplies = false;
524
525 if (!m_pendingSyncReplies.isEmpty())
526 m_syncMessageState->wakeUpClientRunLoop();
527 }
528
529 if (m_didCloseOnConnectionWorkQueueCallback)
530 m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
531
532 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
533 }
534
dispatchConnectionDidClose()535 void Connection::dispatchConnectionDidClose()
536 {
537 // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
538 // then the client will be null here.
539 if (!m_client)
540 return;
541
542
543 // Because we define a connection as being "valid" based on wheter it has a null client, we null out
544 // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
545 // will then wait indefinitely for a reply.
546 Client* client = m_client;
547 m_client = 0;
548
549 client->didClose(this);
550 }
551
canSendOutgoingMessages() const552 bool Connection::canSendOutgoingMessages() const
553 {
554 return m_isConnected && platformCanSendOutgoingMessages();
555 }
556
sendOutgoingMessages()557 void Connection::sendOutgoingMessages()
558 {
559 if (!canSendOutgoingMessages())
560 return;
561
562 while (true) {
563 OutgoingMessage message;
564 {
565 MutexLocker locker(m_outgoingMessagesLock);
566 if (m_outgoingMessages.isEmpty())
567 break;
568 message = m_outgoingMessages.takeFirst();
569 }
570
571 if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
572 break;
573 }
574 }
575
dispatchSyncMessage(MessageID messageID,ArgumentDecoder * arguments)576 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
577 {
578 ASSERT(messageID.isSync());
579
580 // Decode the sync request ID.
581 uint64_t syncRequestID = 0;
582
583 if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
584 // We received an invalid sync message.
585 arguments->markInvalid();
586 return;
587 }
588
589 // Create our reply encoder.
590 ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
591
592 // Hand off both the decoder and encoder to the client..
593 SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
594
595 // FIXME: If the message was invalid, we should send back a SyncMessageError.
596 ASSERT(!arguments->isInvalid());
597
598 if (syncReplyMode == ManualReply) {
599 // The client will take ownership of the reply encoder and send it at some point in the future.
600 // We won't do anything here.
601 return;
602 }
603
604 // Send the reply.
605 sendSyncReply(replyEncoder);
606 }
607
didFailToSendSyncMessage()608 void Connection::didFailToSendSyncMessage()
609 {
610 if (!m_shouldExitOnSyncMessageSendFailure)
611 return;
612
613 exit(0);
614 }
615
enqueueIncomingMessage(IncomingMessage & incomingMessage)616 void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
617 {
618 MutexLocker locker(m_incomingMessagesLock);
619 m_incomingMessages.append(incomingMessage);
620
621 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
622 }
623
dispatchMessage(IncomingMessage & message)624 void Connection::dispatchMessage(IncomingMessage& message)
625 {
626 OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
627
628 // If there's no client, return. We do this after calling releaseArguments so that
629 // the ArgumentDecoder message will be freed.
630 if (!m_client)
631 return;
632
633 m_inDispatchMessageCount++;
634
635 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
636 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
637
638 bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
639 m_didReceiveInvalidMessage = false;
640
641 if (message.messageID().isSync())
642 dispatchSyncMessage(message.messageID(), arguments.get());
643 else
644 m_client->didReceiveMessage(this, message.messageID(), arguments.get());
645
646 m_didReceiveInvalidMessage |= arguments->isInvalid();
647 m_inDispatchMessageCount--;
648
649 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
650 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
651
652 if (m_didReceiveInvalidMessage && m_client)
653 m_client->didReceiveInvalidMessage(this, message.messageID());
654
655 m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
656 }
657
dispatchMessages()658 void Connection::dispatchMessages()
659 {
660 Vector<IncomingMessage> incomingMessages;
661
662 {
663 MutexLocker locker(m_incomingMessagesLock);
664 m_incomingMessages.swap(incomingMessages);
665 }
666
667 for (size_t i = 0; i < incomingMessages.size(); ++i)
668 dispatchMessage(incomingMessages[i]);
669 }
670
671 } // namespace CoreIPC
672