1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #if defined(WEBRTC_POSIX)
12 #include <sys/time.h>
13 #endif
14
15 #include "webrtc/base/common.h"
16 #include "webrtc/base/logging.h"
17 #include "webrtc/base/messagequeue.h"
18 #if defined(__native_client__)
19 #include "webrtc/base/nullsocketserver.h"
20 typedef rtc::NullSocketServer DefaultSocketServer;
21 #else
22 #include "webrtc/base/physicalsocketserver.h"
23 typedef rtc::PhysicalSocketServer DefaultSocketServer;
24 #endif
25
26 namespace rtc {
27
28 const uint32 kMaxMsgLatency = 150; // 150 ms
29
30 //------------------------------------------------------------------
31 // MessageQueueManager
32
33 MessageQueueManager* MessageQueueManager::instance_ = NULL;
34
Instance()35 MessageQueueManager* MessageQueueManager::Instance() {
36 // Note: This is not thread safe, but it is first called before threads are
37 // spawned.
38 if (!instance_)
39 instance_ = new MessageQueueManager;
40 return instance_;
41 }
42
IsInitialized()43 bool MessageQueueManager::IsInitialized() {
44 return instance_ != NULL;
45 }
46
MessageQueueManager()47 MessageQueueManager::MessageQueueManager() {
48 }
49
~MessageQueueManager()50 MessageQueueManager::~MessageQueueManager() {
51 }
52
Add(MessageQueue * message_queue)53 void MessageQueueManager::Add(MessageQueue *message_queue) {
54 return Instance()->AddInternal(message_queue);
55 }
AddInternal(MessageQueue * message_queue)56 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
57 // MessageQueueManager methods should be non-reentrant, so we
58 // ASSERT that is the case. If any of these ASSERT, please
59 // contact bpm or jbeda.
60 ASSERT(!crit_.CurrentThreadIsOwner());
61 CritScope cs(&crit_);
62 message_queues_.push_back(message_queue);
63 }
64
Remove(MessageQueue * message_queue)65 void MessageQueueManager::Remove(MessageQueue *message_queue) {
66 // If there isn't a message queue manager instance, then there isn't a queue
67 // to remove.
68 if (!instance_) return;
69 return Instance()->RemoveInternal(message_queue);
70 }
RemoveInternal(MessageQueue * message_queue)71 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
72 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
73 // If this is the last MessageQueue, destroy the manager as well so that
74 // we don't leak this object at program shutdown. As mentioned above, this is
75 // not thread-safe, but this should only happen at program termination (when
76 // the ThreadManager is destroyed, and threads are no longer active).
77 bool destroy = false;
78 {
79 CritScope cs(&crit_);
80 std::vector<MessageQueue *>::iterator iter;
81 iter = std::find(message_queues_.begin(), message_queues_.end(),
82 message_queue);
83 if (iter != message_queues_.end()) {
84 message_queues_.erase(iter);
85 }
86 destroy = message_queues_.empty();
87 }
88 if (destroy) {
89 instance_ = NULL;
90 delete this;
91 }
92 }
93
Clear(MessageHandler * handler)94 void MessageQueueManager::Clear(MessageHandler *handler) {
95 // If there isn't a message queue manager instance, then there aren't any
96 // queues to remove this handler from.
97 if (!instance_) return;
98 return Instance()->ClearInternal(handler);
99 }
ClearInternal(MessageHandler * handler)100 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
101 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
102 CritScope cs(&crit_);
103 std::vector<MessageQueue *>::iterator iter;
104 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
105 (*iter)->Clear(handler);
106 }
107
108 //------------------------------------------------------------------
109 // MessageQueue
110
MessageQueue(SocketServer * ss)111 MessageQueue::MessageQueue(SocketServer* ss)
112 : ss_(ss), fStop_(false), fPeekKeep_(false),
113 dmsgq_next_num_(0) {
114 if (!ss_) {
115 // Currently, MessageQueue holds a socket server, and is the base class for
116 // Thread. It seems like it makes more sense for Thread to hold the socket
117 // server, and provide it to the MessageQueue, since the Thread controls
118 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
119 // messagequeue_unittest to depend on network libraries... yuck.
120 default_ss_.reset(new DefaultSocketServer());
121 ss_ = default_ss_.get();
122 }
123 ss_->SetMessageQueue(this);
124 MessageQueueManager::Add(this);
125 }
126
~MessageQueue()127 MessageQueue::~MessageQueue() {
128 // The signal is done from here to ensure
129 // that it always gets called when the queue
130 // is going away.
131 SignalQueueDestroyed();
132 MessageQueueManager::Remove(this);
133 Clear(NULL);
134 if (ss_) {
135 ss_->SetMessageQueue(NULL);
136 }
137 }
138
set_socketserver(SocketServer * ss)139 void MessageQueue::set_socketserver(SocketServer* ss) {
140 ss_ = ss ? ss : default_ss_.get();
141 ss_->SetMessageQueue(this);
142 }
143
Quit()144 void MessageQueue::Quit() {
145 fStop_ = true;
146 ss_->WakeUp();
147 }
148
IsQuitting()149 bool MessageQueue::IsQuitting() {
150 return fStop_;
151 }
152
Restart()153 void MessageQueue::Restart() {
154 fStop_ = false;
155 }
156
Peek(Message * pmsg,int cmsWait)157 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
158 if (fPeekKeep_) {
159 *pmsg = msgPeek_;
160 return true;
161 }
162 if (!Get(pmsg, cmsWait))
163 return false;
164 msgPeek_ = *pmsg;
165 fPeekKeep_ = true;
166 return true;
167 }
168
Get(Message * pmsg,int cmsWait,bool process_io)169 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
170 // Return and clear peek if present
171 // Always return the peek if it exists so there is Peek/Get symmetry
172
173 if (fPeekKeep_) {
174 *pmsg = msgPeek_;
175 fPeekKeep_ = false;
176 return true;
177 }
178
179 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
180
181 int cmsTotal = cmsWait;
182 int cmsElapsed = 0;
183 uint32 msStart = Time();
184 uint32 msCurrent = msStart;
185 while (true) {
186 // Check for sent messages
187 ReceiveSends();
188
189 // Check for posted events
190 int cmsDelayNext = kForever;
191 bool first_pass = true;
192 while (true) {
193 // All queue operations need to be locked, but nothing else in this loop
194 // (specifically handling disposed message) can happen inside the crit.
195 // Otherwise, disposed MessageHandlers will cause deadlocks.
196 {
197 CritScope cs(&crit_);
198 // On the first pass, check for delayed messages that have been
199 // triggered and calculate the next trigger time.
200 if (first_pass) {
201 first_pass = false;
202 while (!dmsgq_.empty()) {
203 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
204 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
205 break;
206 }
207 msgq_.push_back(dmsgq_.top().msg_);
208 dmsgq_.pop();
209 }
210 }
211 // Pull a message off the message queue, if available.
212 if (msgq_.empty()) {
213 break;
214 } else {
215 *pmsg = msgq_.front();
216 msgq_.pop_front();
217 }
218 } // crit_ is released here.
219
220 // Log a warning for time-sensitive messages that we're late to deliver.
221 if (pmsg->ts_sensitive) {
222 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
223 if (delay > 0) {
224 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
225 << (delay + kMaxMsgLatency) << "ms";
226 }
227 }
228 // If this was a dispose message, delete it and skip it.
229 if (MQID_DISPOSE == pmsg->message_id) {
230 ASSERT(NULL == pmsg->phandler);
231 delete pmsg->pdata;
232 *pmsg = Message();
233 continue;
234 }
235 return true;
236 }
237
238 if (fStop_)
239 break;
240
241 // Which is shorter, the delay wait or the asked wait?
242
243 int cmsNext;
244 if (cmsWait == kForever) {
245 cmsNext = cmsDelayNext;
246 } else {
247 cmsNext = _max(0, cmsTotal - cmsElapsed);
248 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
249 cmsNext = cmsDelayNext;
250 }
251
252 // Wait and multiplex in the meantime
253 if (!ss_->Wait(cmsNext, process_io))
254 return false;
255
256 // If the specified timeout expired, return
257
258 msCurrent = Time();
259 cmsElapsed = TimeDiff(msCurrent, msStart);
260 if (cmsWait != kForever) {
261 if (cmsElapsed >= cmsWait)
262 return false;
263 }
264 }
265 return false;
266 }
267
ReceiveSends()268 void MessageQueue::ReceiveSends() {
269 }
270
Post(MessageHandler * phandler,uint32 id,MessageData * pdata,bool time_sensitive)271 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
272 MessageData *pdata, bool time_sensitive) {
273 if (fStop_)
274 return;
275
276 // Keep thread safe
277 // Add the message to the end of the queue
278 // Signal for the multiplexer to return
279
280 CritScope cs(&crit_);
281 Message msg;
282 msg.phandler = phandler;
283 msg.message_id = id;
284 msg.pdata = pdata;
285 if (time_sensitive) {
286 msg.ts_sensitive = Time() + kMaxMsgLatency;
287 }
288 msgq_.push_back(msg);
289 ss_->WakeUp();
290 }
291
DoDelayPost(int cmsDelay,uint32 tstamp,MessageHandler * phandler,uint32 id,MessageData * pdata)292 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
293 MessageHandler *phandler, uint32 id, MessageData* pdata) {
294 if (fStop_)
295 return;
296
297 // Keep thread safe
298 // Add to the priority queue. Gets sorted soonest first.
299 // Signal for the multiplexer to return.
300
301 CritScope cs(&crit_);
302 Message msg;
303 msg.phandler = phandler;
304 msg.message_id = id;
305 msg.pdata = pdata;
306 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
307 dmsgq_.push(dmsg);
308 // If this message queue processes 1 message every millisecond for 50 days,
309 // we will wrap this number. Even then, only messages with identical times
310 // will be misordered, and then only briefly. This is probably ok.
311 VERIFY(0 != ++dmsgq_next_num_);
312 ss_->WakeUp();
313 }
314
GetDelay()315 int MessageQueue::GetDelay() {
316 CritScope cs(&crit_);
317
318 if (!msgq_.empty())
319 return 0;
320
321 if (!dmsgq_.empty()) {
322 int delay = TimeUntil(dmsgq_.top().msTrigger_);
323 if (delay < 0)
324 delay = 0;
325 return delay;
326 }
327
328 return kForever;
329 }
330
Clear(MessageHandler * phandler,uint32 id,MessageList * removed)331 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
332 MessageList* removed) {
333 CritScope cs(&crit_);
334
335 // Remove messages with phandler
336
337 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
338 if (removed) {
339 removed->push_back(msgPeek_);
340 } else {
341 delete msgPeek_.pdata;
342 }
343 fPeekKeep_ = false;
344 }
345
346 // Remove from ordered message queue
347
348 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
349 if (it->Match(phandler, id)) {
350 if (removed) {
351 removed->push_back(*it);
352 } else {
353 delete it->pdata;
354 }
355 it = msgq_.erase(it);
356 } else {
357 ++it;
358 }
359 }
360
361 // Remove from priority queue. Not directly iterable, so use this approach
362
363 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
364 for (PriorityQueue::container_type::iterator it = new_end;
365 it != dmsgq_.container().end(); ++it) {
366 if (it->msg_.Match(phandler, id)) {
367 if (removed) {
368 removed->push_back(it->msg_);
369 } else {
370 delete it->msg_.pdata;
371 }
372 } else {
373 *new_end++ = *it;
374 }
375 }
376 dmsgq_.container().erase(new_end, dmsgq_.container().end());
377 dmsgq_.reheap();
378 }
379
Dispatch(Message * pmsg)380 void MessageQueue::Dispatch(Message *pmsg) {
381 pmsg->phandler->OnMessage(pmsg);
382 }
383
384 } // namespace rtc
385