• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(WEBRTC_POSIX)
12 #include <sys/time.h>
13 #endif
14 
15 #include "webrtc/base/common.h"
16 #include "webrtc/base/logging.h"
17 #include "webrtc/base/messagequeue.h"
18 #if defined(__native_client__)
19 #include "webrtc/base/nullsocketserver.h"
20 typedef rtc::NullSocketServer DefaultSocketServer;
21 #else
22 #include "webrtc/base/physicalsocketserver.h"
23 typedef rtc::PhysicalSocketServer DefaultSocketServer;
24 #endif
25 
26 namespace rtc {
27 
28 const uint32 kMaxMsgLatency = 150;  // 150 ms
29 
30 //------------------------------------------------------------------
31 // MessageQueueManager
32 
33 MessageQueueManager* MessageQueueManager::instance_ = NULL;
34 
Instance()35 MessageQueueManager* MessageQueueManager::Instance() {
36   // Note: This is not thread safe, but it is first called before threads are
37   // spawned.
38   if (!instance_)
39     instance_ = new MessageQueueManager;
40   return instance_;
41 }
42 
IsInitialized()43 bool MessageQueueManager::IsInitialized() {
44   return instance_ != NULL;
45 }
46 
MessageQueueManager()47 MessageQueueManager::MessageQueueManager() {
48 }
49 
~MessageQueueManager()50 MessageQueueManager::~MessageQueueManager() {
51 }
52 
Add(MessageQueue * message_queue)53 void MessageQueueManager::Add(MessageQueue *message_queue) {
54   return Instance()->AddInternal(message_queue);
55 }
AddInternal(MessageQueue * message_queue)56 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
57   // MessageQueueManager methods should be non-reentrant, so we
58   // ASSERT that is the case.  If any of these ASSERT, please
59   // contact bpm or jbeda.
60   ASSERT(!crit_.CurrentThreadIsOwner());
61   CritScope cs(&crit_);
62   message_queues_.push_back(message_queue);
63 }
64 
Remove(MessageQueue * message_queue)65 void MessageQueueManager::Remove(MessageQueue *message_queue) {
66   // If there isn't a message queue manager instance, then there isn't a queue
67   // to remove.
68   if (!instance_) return;
69   return Instance()->RemoveInternal(message_queue);
70 }
RemoveInternal(MessageQueue * message_queue)71 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
72   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
73   // If this is the last MessageQueue, destroy the manager as well so that
74   // we don't leak this object at program shutdown. As mentioned above, this is
75   // not thread-safe, but this should only happen at program termination (when
76   // the ThreadManager is destroyed, and threads are no longer active).
77   bool destroy = false;
78   {
79     CritScope cs(&crit_);
80     std::vector<MessageQueue *>::iterator iter;
81     iter = std::find(message_queues_.begin(), message_queues_.end(),
82                      message_queue);
83     if (iter != message_queues_.end()) {
84       message_queues_.erase(iter);
85     }
86     destroy = message_queues_.empty();
87   }
88   if (destroy) {
89     instance_ = NULL;
90     delete this;
91   }
92 }
93 
Clear(MessageHandler * handler)94 void MessageQueueManager::Clear(MessageHandler *handler) {
95   // If there isn't a message queue manager instance, then there aren't any
96   // queues to remove this handler from.
97   if (!instance_) return;
98   return Instance()->ClearInternal(handler);
99 }
ClearInternal(MessageHandler * handler)100 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
101   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
102   CritScope cs(&crit_);
103   std::vector<MessageQueue *>::iterator iter;
104   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
105     (*iter)->Clear(handler);
106 }
107 
108 //------------------------------------------------------------------
109 // MessageQueue
110 
MessageQueue(SocketServer * ss)111 MessageQueue::MessageQueue(SocketServer* ss)
112     : ss_(ss), fStop_(false), fPeekKeep_(false),
113       dmsgq_next_num_(0) {
114   if (!ss_) {
115     // Currently, MessageQueue holds a socket server, and is the base class for
116     // Thread.  It seems like it makes more sense for Thread to hold the socket
117     // server, and provide it to the MessageQueue, since the Thread controls
118     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
119     // messagequeue_unittest to depend on network libraries... yuck.
120     default_ss_.reset(new DefaultSocketServer());
121     ss_ = default_ss_.get();
122   }
123   ss_->SetMessageQueue(this);
124   MessageQueueManager::Add(this);
125 }
126 
~MessageQueue()127 MessageQueue::~MessageQueue() {
128   // The signal is done from here to ensure
129   // that it always gets called when the queue
130   // is going away.
131   SignalQueueDestroyed();
132   MessageQueueManager::Remove(this);
133   Clear(NULL);
134   if (ss_) {
135     ss_->SetMessageQueue(NULL);
136   }
137 }
138 
set_socketserver(SocketServer * ss)139 void MessageQueue::set_socketserver(SocketServer* ss) {
140   ss_ = ss ? ss : default_ss_.get();
141   ss_->SetMessageQueue(this);
142 }
143 
Quit()144 void MessageQueue::Quit() {
145   fStop_ = true;
146   ss_->WakeUp();
147 }
148 
IsQuitting()149 bool MessageQueue::IsQuitting() {
150   return fStop_;
151 }
152 
Restart()153 void MessageQueue::Restart() {
154   fStop_ = false;
155 }
156 
Peek(Message * pmsg,int cmsWait)157 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
158   if (fPeekKeep_) {
159     *pmsg = msgPeek_;
160     return true;
161   }
162   if (!Get(pmsg, cmsWait))
163     return false;
164   msgPeek_ = *pmsg;
165   fPeekKeep_ = true;
166   return true;
167 }
168 
Get(Message * pmsg,int cmsWait,bool process_io)169 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
170   // Return and clear peek if present
171   // Always return the peek if it exists so there is Peek/Get symmetry
172 
173   if (fPeekKeep_) {
174     *pmsg = msgPeek_;
175     fPeekKeep_ = false;
176     return true;
177   }
178 
179   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
180 
181   int cmsTotal = cmsWait;
182   int cmsElapsed = 0;
183   uint32 msStart = Time();
184   uint32 msCurrent = msStart;
185   while (true) {
186     // Check for sent messages
187     ReceiveSends();
188 
189     // Check for posted events
190     int cmsDelayNext = kForever;
191     bool first_pass = true;
192     while (true) {
193       // All queue operations need to be locked, but nothing else in this loop
194       // (specifically handling disposed message) can happen inside the crit.
195       // Otherwise, disposed MessageHandlers will cause deadlocks.
196       {
197         CritScope cs(&crit_);
198         // On the first pass, check for delayed messages that have been
199         // triggered and calculate the next trigger time.
200         if (first_pass) {
201           first_pass = false;
202           while (!dmsgq_.empty()) {
203             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
204               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
205               break;
206             }
207             msgq_.push_back(dmsgq_.top().msg_);
208             dmsgq_.pop();
209           }
210         }
211         // Pull a message off the message queue, if available.
212         if (msgq_.empty()) {
213           break;
214         } else {
215           *pmsg = msgq_.front();
216           msgq_.pop_front();
217         }
218       }  // crit_ is released here.
219 
220       // Log a warning for time-sensitive messages that we're late to deliver.
221       if (pmsg->ts_sensitive) {
222         int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
223         if (delay > 0) {
224           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
225                             << (delay + kMaxMsgLatency) << "ms";
226         }
227       }
228       // If this was a dispose message, delete it and skip it.
229       if (MQID_DISPOSE == pmsg->message_id) {
230         ASSERT(NULL == pmsg->phandler);
231         delete pmsg->pdata;
232         *pmsg = Message();
233         continue;
234       }
235       return true;
236     }
237 
238     if (fStop_)
239       break;
240 
241     // Which is shorter, the delay wait or the asked wait?
242 
243     int cmsNext;
244     if (cmsWait == kForever) {
245       cmsNext = cmsDelayNext;
246     } else {
247       cmsNext = _max(0, cmsTotal - cmsElapsed);
248       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
249         cmsNext = cmsDelayNext;
250     }
251 
252     // Wait and multiplex in the meantime
253     if (!ss_->Wait(cmsNext, process_io))
254       return false;
255 
256     // If the specified timeout expired, return
257 
258     msCurrent = Time();
259     cmsElapsed = TimeDiff(msCurrent, msStart);
260     if (cmsWait != kForever) {
261       if (cmsElapsed >= cmsWait)
262         return false;
263     }
264   }
265   return false;
266 }
267 
ReceiveSends()268 void MessageQueue::ReceiveSends() {
269 }
270 
Post(MessageHandler * phandler,uint32 id,MessageData * pdata,bool time_sensitive)271 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
272     MessageData *pdata, bool time_sensitive) {
273   if (fStop_)
274     return;
275 
276   // Keep thread safe
277   // Add the message to the end of the queue
278   // Signal for the multiplexer to return
279 
280   CritScope cs(&crit_);
281   Message msg;
282   msg.phandler = phandler;
283   msg.message_id = id;
284   msg.pdata = pdata;
285   if (time_sensitive) {
286     msg.ts_sensitive = Time() + kMaxMsgLatency;
287   }
288   msgq_.push_back(msg);
289   ss_->WakeUp();
290 }
291 
DoDelayPost(int cmsDelay,uint32 tstamp,MessageHandler * phandler,uint32 id,MessageData * pdata)292 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
293     MessageHandler *phandler, uint32 id, MessageData* pdata) {
294   if (fStop_)
295     return;
296 
297   // Keep thread safe
298   // Add to the priority queue. Gets sorted soonest first.
299   // Signal for the multiplexer to return.
300 
301   CritScope cs(&crit_);
302   Message msg;
303   msg.phandler = phandler;
304   msg.message_id = id;
305   msg.pdata = pdata;
306   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
307   dmsgq_.push(dmsg);
308   // If this message queue processes 1 message every millisecond for 50 days,
309   // we will wrap this number.  Even then, only messages with identical times
310   // will be misordered, and then only briefly.  This is probably ok.
311   VERIFY(0 != ++dmsgq_next_num_);
312   ss_->WakeUp();
313 }
314 
GetDelay()315 int MessageQueue::GetDelay() {
316   CritScope cs(&crit_);
317 
318   if (!msgq_.empty())
319     return 0;
320 
321   if (!dmsgq_.empty()) {
322     int delay = TimeUntil(dmsgq_.top().msTrigger_);
323     if (delay < 0)
324       delay = 0;
325     return delay;
326   }
327 
328   return kForever;
329 }
330 
Clear(MessageHandler * phandler,uint32 id,MessageList * removed)331 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
332                          MessageList* removed) {
333   CritScope cs(&crit_);
334 
335   // Remove messages with phandler
336 
337   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
338     if (removed) {
339       removed->push_back(msgPeek_);
340     } else {
341       delete msgPeek_.pdata;
342     }
343     fPeekKeep_ = false;
344   }
345 
346   // Remove from ordered message queue
347 
348   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
349     if (it->Match(phandler, id)) {
350       if (removed) {
351         removed->push_back(*it);
352       } else {
353         delete it->pdata;
354       }
355       it = msgq_.erase(it);
356     } else {
357       ++it;
358     }
359   }
360 
361   // Remove from priority queue. Not directly iterable, so use this approach
362 
363   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
364   for (PriorityQueue::container_type::iterator it = new_end;
365        it != dmsgq_.container().end(); ++it) {
366     if (it->msg_.Match(phandler, id)) {
367       if (removed) {
368         removed->push_back(it->msg_);
369       } else {
370         delete it->msg_.pdata;
371       }
372     } else {
373       *new_end++ = *it;
374     }
375   }
376   dmsgq_.container().erase(new_end, dmsgq_.container().end());
377   dmsgq_.reheap();
378 }
379 
Dispatch(Message * pmsg)380 void MessageQueue::Dispatch(Message *pmsg) {
381   pmsg->phandler->OnMessage(pmsg);
382 }
383 
384 }  // namespace rtc
385