• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25 
26 #include "config.h"
27 #include "Connection.h"
28 
29 #include "BinarySemaphore.h"
30 #include "CoreIPCMessageKinds.h"
31 #include "RunLoop.h"
32 #include "WebProcess.h"
33 #include "WorkItem.h"
34 #include <wtf/CurrentTime.h>
35 
36 using namespace std;
37 
38 namespace CoreIPC {
39 
40 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
41 public:
42     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
43     ~SyncMessageState();
44 
wakeUpClientRunLoop()45     void wakeUpClientRunLoop()
46     {
47         m_waitForSyncReplySemaphore.signal();
48     }
49 
wait(double absoluteTime)50     bool wait(double absoluteTime)
51     {
52         return m_waitForSyncReplySemaphore.wait(absoluteTime);
53     }
54 
55 #if PLATFORM(WIN)
waitWhileDispatchingSentWin32Messages(double absoluteTime,const Vector<HWND> & windowsToReceiveMessages)56     bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
57     {
58         return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
59     }
60 #endif
61 
62     // Returns true if this message will be handled on a client thread that is currently
63     // waiting for a reply to a synchronous message.
64     bool processIncomingMessage(Connection*, IncomingMessage&);
65 
66     void dispatchMessages();
67 
68 private:
69     explicit SyncMessageState(RunLoop*);
70 
71     typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
syncMessageStateMap()72     static SyncMessageStateMap& syncMessageStateMap()
73     {
74         DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
75         return syncMessageStateMap;
76     }
77 
syncMessageStateMapMutex()78     static Mutex& syncMessageStateMapMutex()
79     {
80         DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
81         return syncMessageStateMapMutex;
82     }
83 
84     void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
85 
86     RunLoop* m_runLoop;
87     BinarySemaphore m_waitForSyncReplySemaphore;
88 
89     // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
90     Mutex m_mutex;
91 
92     bool m_didScheduleDispatchMessagesWork;
93 
94     struct ConnectionAndIncomingMessage {
95         RefPtr<Connection> connection;
96         IncomingMessage incomingMessage;
97     };
98     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
99 };
100 
getOrCreate(RunLoop * runLoop)101 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
102 {
103     MutexLocker locker(syncMessageStateMapMutex());
104     pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
105 
106     if (!result.second) {
107         ASSERT(result.first->second);
108         return result.first->second;
109     }
110 
111     RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
112     result.first->second = syncMessageState.get();
113 
114     return syncMessageState.release();
115 }
116 
SyncMessageState(RunLoop * runLoop)117 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
118     : m_runLoop(runLoop)
119     , m_didScheduleDispatchMessagesWork(false)
120 {
121 }
122 
~SyncMessageState()123 Connection::SyncMessageState::~SyncMessageState()
124 {
125     MutexLocker locker(syncMessageStateMapMutex());
126 
127     ASSERT(syncMessageStateMap().contains(m_runLoop));
128     syncMessageStateMap().remove(m_runLoop);
129 }
130 
processIncomingMessage(Connection * connection,IncomingMessage & incomingMessage)131 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
132 {
133     MessageID messageID = incomingMessage.messageID();
134     if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
135         return false;
136 
137     ConnectionAndIncomingMessage connectionAndIncomingMessage;
138     connectionAndIncomingMessage.connection = connection;
139     connectionAndIncomingMessage.incomingMessage = incomingMessage;
140 
141     {
142         MutexLocker locker(m_mutex);
143 
144         if (!m_didScheduleDispatchMessagesWork) {
145             m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
146             m_didScheduleDispatchMessagesWork = true;
147         }
148 
149         m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
150     }
151 
152     wakeUpClientRunLoop();
153 
154     return true;
155 }
156 
dispatchMessages()157 void Connection::SyncMessageState::dispatchMessages()
158 {
159     ASSERT(m_runLoop == RunLoop::current());
160 
161     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
162 
163     {
164         MutexLocker locker(m_mutex);
165         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
166     }
167 
168     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
169         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
170         connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
171     }
172 }
173 
dispatchMessageAndResetDidScheduleDispatchMessagesWork()174 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
175 {
176     {
177         MutexLocker locker(m_mutex);
178         ASSERT(m_didScheduleDispatchMessagesWork);
179         m_didScheduleDispatchMessagesWork = false;
180     }
181 
182     dispatchMessages();
183 }
184 
createServerConnection(Identifier identifier,Client * client,RunLoop * clientRunLoop)185 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
186 {
187     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
188 }
189 
createClientConnection(Identifier identifier,Client * client,RunLoop * clientRunLoop)190 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
191 {
192     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
193 }
194 
Connection(Identifier identifier,bool isServer,Client * client,RunLoop * clientRunLoop)195 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
196     : m_client(client)
197     , m_isServer(isServer)
198     , m_syncRequestID(0)
199     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
200     , m_shouldExitOnSyncMessageSendFailure(false)
201     , m_didCloseOnConnectionWorkQueueCallback(0)
202     , m_isConnected(false)
203     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
204     , m_clientRunLoop(clientRunLoop)
205     , m_inDispatchMessageCount(0)
206     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
207     , m_didReceiveInvalidMessage(false)
208     , m_defaultSyncMessageTimeout(NoTimeout)
209     , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
210     , m_shouldWaitForSyncReplies(true)
211 {
212     ASSERT(m_client);
213 
214     platformInitialize(identifier);
215 }
216 
~Connection()217 Connection::~Connection()
218 {
219     ASSERT(!isValid());
220 
221     m_connectionQueue.invalidate();
222 }
223 
setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)224 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
225 {
226     ASSERT(!m_isConnected);
227 
228     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
229 }
230 
setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)231 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
232 {
233     ASSERT(!m_isConnected);
234 
235     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
236 }
237 
setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)238 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
239 {
240     ASSERT(!m_isConnected);
241 
242     m_didCloseOnConnectionWorkQueueCallback = callback;
243 }
244 
invalidate()245 void Connection::invalidate()
246 {
247     if (!isValid()) {
248         // Someone already called invalidate().
249         return;
250     }
251 
252     // Reset the client.
253     m_client = 0;
254 
255     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
256 }
257 
markCurrentlyDispatchedMessageAsInvalid()258 void Connection::markCurrentlyDispatchedMessageAsInvalid()
259 {
260     // This should only be called while processing a message.
261     ASSERT(m_inDispatchMessageCount > 0);
262 
263     m_didReceiveInvalidMessage = true;
264 }
265 
setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)266 void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
267 {
268     ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
269 
270     m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
271 }
272 
createSyncMessageArgumentEncoder(uint64_t destinationID,uint64_t & syncRequestID)273 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
274 {
275     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
276 
277     // Encode the sync request ID.
278     syncRequestID = ++m_syncRequestID;
279     argumentEncoder->encode(syncRequestID);
280 
281     return argumentEncoder.release();
282 }
283 
sendMessage(MessageID messageID,PassOwnPtr<ArgumentEncoder> arguments,unsigned messageSendFlags)284 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
285 {
286     if (!isValid())
287         return false;
288 
289     if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
290         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
291             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
292         messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
293 
294     MutexLocker locker(m_outgoingMessagesLock);
295     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
296 
297     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
298     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
299     return true;
300 }
301 
sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)302 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
303 {
304     return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
305 }
306 
waitForMessage(MessageID messageID,uint64_t destinationID,double timeout)307 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
308 {
309     // First, check if this message is already in the incoming messages queue.
310     {
311         MutexLocker locker(m_incomingMessagesLock);
312 
313         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
314             const IncomingMessage& message = m_incomingMessages[i];
315 
316             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
317                 OwnPtr<ArgumentDecoder> arguments(message.arguments());
318 
319                 // Erase the incoming message.
320                 m_incomingMessages.remove(i);
321                 return arguments.release();
322             }
323         }
324     }
325 
326     double absoluteTime = currentTime() + timeout;
327 
328     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
329 
330     {
331         MutexLocker locker(m_waitForMessageMutex);
332 
333         // We don't support having multiple clients wait for the same message.
334         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
335 
336         // Insert our pending wait.
337         m_waitForMessageMap.set(messageAndDestination, 0);
338     }
339 
340     // Now wait for it to be set.
341     while (true) {
342         MutexLocker locker(m_waitForMessageMutex);
343 
344         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
345         if (it->second) {
346             OwnPtr<ArgumentDecoder> arguments(it->second);
347             m_waitForMessageMap.remove(it);
348 
349             return arguments.release();
350         }
351 
352         // Now we wait.
353         if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
354             // We timed out, now remove the pending wait.
355             m_waitForMessageMap.remove(messageAndDestination);
356 
357             break;
358         }
359     }
360 
361     return PassOwnPtr<ArgumentDecoder>();
362 }
363 
sendSyncMessage(MessageID messageID,uint64_t syncRequestID,PassOwnPtr<ArgumentEncoder> encoder,double timeout)364 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
365 {
366     // We only allow sending sync messages from the client run loop.
367     ASSERT(RunLoop::current() == m_clientRunLoop);
368 
369     if (!isValid()) {
370         didFailToSendSyncMessage();
371         return 0;
372     }
373 
374     // Push the pending sync reply information on our stack.
375     {
376         MutexLocker locker(m_syncReplyStateMutex);
377         if (!m_shouldWaitForSyncReplies) {
378             didFailToSendSyncMessage();
379             return 0;
380         }
381 
382         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
383     }
384 
385     // First send the message.
386     sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
387 
388     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
389     // keep an extra reference to the connection here in case it's invalidated.
390     RefPtr<Connection> protect(this);
391     OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
392 
393     // Finally, pop the pending sync reply information.
394     {
395         MutexLocker locker(m_syncReplyStateMutex);
396         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
397         m_pendingSyncReplies.removeLast();
398     }
399 
400     if (!reply)
401         didFailToSendSyncMessage();
402 
403     return reply.release();
404 }
405 
waitForSyncReply(uint64_t syncRequestID,double timeout)406 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
407 {
408     if (timeout == DefaultTimeout)
409         timeout = m_defaultSyncMessageTimeout;
410 
411     // Use a really long timeout.
412     if (timeout == NoTimeout)
413         timeout = 1e10;
414 
415     double absoluteTime = currentTime() + timeout;
416 
417     bool timedOut = false;
418     while (!timedOut) {
419         // First, check if we have any messages that we need to process.
420         m_syncMessageState->dispatchMessages();
421 
422         {
423             MutexLocker locker(m_syncReplyStateMutex);
424 
425             // Second, check if there is a sync reply at the top of the stack.
426             ASSERT(!m_pendingSyncReplies.isEmpty());
427 
428             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
429             ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
430 
431             // We found the sync reply, or the connection was closed.
432             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
433                 return pendingSyncReply.releaseReplyDecoder();
434         }
435 
436         // We didn't find a sync reply yet, keep waiting.
437 #if PLATFORM(WIN)
438         timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
439 #else
440         timedOut = !m_syncMessageState->wait(absoluteTime);
441 #endif
442     }
443 
444     // We timed out.
445     if (m_client)
446         m_client->syncMessageSendTimedOut(this);
447 
448     return 0;
449 }
450 
processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)451 void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
452 {
453     MutexLocker locker(m_syncReplyStateMutex);
454     ASSERT(!m_pendingSyncReplies.isEmpty());
455 
456     // Go through the stack of sync requests that have pending replies and see which one
457     // this reply is for.
458     for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
459         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
460 
461         if (pendingSyncReply.syncRequestID != arguments->destinationID())
462             continue;
463 
464         ASSERT(!pendingSyncReply.replyDecoder);
465 
466         pendingSyncReply.replyDecoder = arguments.leakPtr();
467         pendingSyncReply.didReceiveReply = true;
468 
469         // We got a reply to the last send message, wake up the client run loop so it can be processed.
470         if (i == m_pendingSyncReplies.size())
471             m_syncMessageState->wakeUpClientRunLoop();
472 
473         return;
474     }
475 
476     // We got a reply for a message we never sent.
477     // FIXME: Dispatch a didReceiveInvalidMessage callback on the client.
478     ASSERT_NOT_REACHED();
479 }
480 
processIncomingMessage(MessageID messageID,PassOwnPtr<ArgumentDecoder> arguments)481 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
482 {
483     // Check if this is a sync reply.
484     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
485         processIncomingSyncReply(arguments);
486         return;
487     }
488 
489     IncomingMessage incomingMessage(messageID, arguments);
490 
491     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
492     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
493     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
494     if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
495         return;
496 
497     // Check if we're waiting for this message.
498     {
499         MutexLocker locker(m_waitForMessageMutex);
500 
501         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
502         if (it != m_waitForMessageMap.end()) {
503             it->second = incomingMessage.releaseArguments().leakPtr();
504             ASSERT(it->second);
505 
506             m_waitForMessageCondition.signal();
507             return;
508         }
509     }
510 
511     enqueueIncomingMessage(incomingMessage);
512 }
513 
connectionDidClose()514 void Connection::connectionDidClose()
515 {
516     // The connection is now invalid.
517     platformInvalidate();
518 
519     {
520         MutexLocker locker(m_syncReplyStateMutex);
521 
522         ASSERT(m_shouldWaitForSyncReplies);
523         m_shouldWaitForSyncReplies = false;
524 
525         if (!m_pendingSyncReplies.isEmpty())
526             m_syncMessageState->wakeUpClientRunLoop();
527     }
528 
529     if (m_didCloseOnConnectionWorkQueueCallback)
530         m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
531 
532     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
533 }
534 
dispatchConnectionDidClose()535 void Connection::dispatchConnectionDidClose()
536 {
537     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
538     // then the client will be null here.
539     if (!m_client)
540         return;
541 
542 
543     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
544     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
545     // will then wait indefinitely for a reply.
546     Client* client = m_client;
547     m_client = 0;
548 
549     client->didClose(this);
550 }
551 
canSendOutgoingMessages() const552 bool Connection::canSendOutgoingMessages() const
553 {
554     return m_isConnected && platformCanSendOutgoingMessages();
555 }
556 
sendOutgoingMessages()557 void Connection::sendOutgoingMessages()
558 {
559     if (!canSendOutgoingMessages())
560         return;
561 
562     while (true) {
563         OutgoingMessage message;
564         {
565             MutexLocker locker(m_outgoingMessagesLock);
566             if (m_outgoingMessages.isEmpty())
567                 break;
568             message = m_outgoingMessages.takeFirst();
569         }
570 
571         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
572             break;
573     }
574 }
575 
dispatchSyncMessage(MessageID messageID,ArgumentDecoder * arguments)576 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
577 {
578     ASSERT(messageID.isSync());
579 
580     // Decode the sync request ID.
581     uint64_t syncRequestID = 0;
582 
583     if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
584         // We received an invalid sync message.
585         arguments->markInvalid();
586         return;
587     }
588 
589     // Create our reply encoder.
590     ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
591 
592     // Hand off both the decoder and encoder to the client..
593     SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
594 
595     // FIXME: If the message was invalid, we should send back a SyncMessageError.
596     ASSERT(!arguments->isInvalid());
597 
598     if (syncReplyMode == ManualReply) {
599         // The client will take ownership of the reply encoder and send it at some point in the future.
600         // We won't do anything here.
601         return;
602     }
603 
604     // Send the reply.
605     sendSyncReply(replyEncoder);
606 }
607 
didFailToSendSyncMessage()608 void Connection::didFailToSendSyncMessage()
609 {
610     if (!m_shouldExitOnSyncMessageSendFailure)
611         return;
612 
613     exit(0);
614 }
615 
enqueueIncomingMessage(IncomingMessage & incomingMessage)616 void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
617 {
618     MutexLocker locker(m_incomingMessagesLock);
619     m_incomingMessages.append(incomingMessage);
620 
621     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
622 }
623 
dispatchMessage(IncomingMessage & message)624 void Connection::dispatchMessage(IncomingMessage& message)
625 {
626     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
627 
628     // If there's no client, return. We do this after calling releaseArguments so that
629     // the ArgumentDecoder message will be freed.
630     if (!m_client)
631         return;
632 
633     m_inDispatchMessageCount++;
634 
635     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
636         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
637 
638     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
639     m_didReceiveInvalidMessage = false;
640 
641     if (message.messageID().isSync())
642         dispatchSyncMessage(message.messageID(), arguments.get());
643     else
644         m_client->didReceiveMessage(this, message.messageID(), arguments.get());
645 
646     m_didReceiveInvalidMessage |= arguments->isInvalid();
647     m_inDispatchMessageCount--;
648 
649     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
650         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
651 
652     if (m_didReceiveInvalidMessage && m_client)
653         m_client->didReceiveInvalidMessage(this, message.messageID());
654 
655     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
656 }
657 
dispatchMessages()658 void Connection::dispatchMessages()
659 {
660     Vector<IncomingMessage> incomingMessages;
661 
662     {
663         MutexLocker locker(m_incomingMessagesLock);
664         m_incomingMessages.swap(incomingMessages);
665     }
666 
667     for (size_t i = 0; i < incomingMessages.size(); ++i)
668         dispatchMessage(incomingMessages[i]);
669 }
670 
671 } // namespace CoreIPC
672