• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #if defined(_MSC_VER) && _MSC_VER < 1300
29 #pragma warning(disable:4786)
30 #endif
31 
32 #ifdef POSIX
33 #include <sys/time.h>
34 #endif
35 
36 #include "talk/base/common.h"
37 #include "talk/base/logging.h"
38 #include "talk/base/messagequeue.h"
39 #include "talk/base/physicalsocketserver.h"
40 
41 
42 namespace talk_base {
43 
44 const uint32 kMaxMsgLatency = 150;  // 150 ms
45 
46 //------------------------------------------------------------------
47 // MessageQueueManager
48 
49 MessageQueueManager* MessageQueueManager::instance_;
50 
Instance()51 MessageQueueManager* MessageQueueManager::Instance() {
52   // Note: This is not thread safe, but it is first called before threads are
53   // spawned.
54   if (!instance_)
55     instance_ = new MessageQueueManager;
56   return instance_;
57 }
58 
MessageQueueManager()59 MessageQueueManager::MessageQueueManager() {
60 }
61 
~MessageQueueManager()62 MessageQueueManager::~MessageQueueManager() {
63 }
64 
Add(MessageQueue * message_queue)65 void MessageQueueManager::Add(MessageQueue *message_queue) {
66   // MessageQueueManager methods should be non-reentrant, so we
67   // ASSERT that is the case.  If any of these ASSERT, please
68   // contact bpm or jbeda.
69   ASSERT(!crit_.CurrentThreadIsOwner());
70   CritScope cs(&crit_);
71   message_queues_.push_back(message_queue);
72 }
73 
Remove(MessageQueue * message_queue)74 void MessageQueueManager::Remove(MessageQueue *message_queue) {
75   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
76   // If this is the last MessageQueue, destroy the manager as well so that
77   // we don't leak this object at program shutdown. As mentioned above, this is
78   // not thread-safe, but this should only happen at program termination (when
79   // the ThreadManager is destroyed, and threads are no longer active).
80   bool destroy = false;
81   {
82     CritScope cs(&crit_);
83     std::vector<MessageQueue *>::iterator iter;
84     iter = std::find(message_queues_.begin(), message_queues_.end(),
85                      message_queue);
86     if (iter != message_queues_.end()) {
87       message_queues_.erase(iter);
88     }
89     destroy = message_queues_.empty();
90   }
91   if (destroy) {
92     instance_ = NULL;
93     delete this;
94   }
95 }
96 
Clear(MessageHandler * handler)97 void MessageQueueManager::Clear(MessageHandler *handler) {
98   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
99   CritScope cs(&crit_);
100   std::vector<MessageQueue *>::iterator iter;
101   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
102     (*iter)->Clear(handler);
103 }
104 
105 //------------------------------------------------------------------
106 // MessageQueue
107 
MessageQueue(SocketServer * ss)108 MessageQueue::MessageQueue(SocketServer* ss)
109     : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
110       dmsgq_next_num_(0) {
111   if (!ss_) {
112     // Currently, MessageQueue holds a socket server, and is the base class for
113     // Thread.  It seems like it makes more sense for Thread to hold the socket
114     // server, and provide it to the MessageQueue, since the Thread controls
115     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
116     // messagequeue_unittest to depend on network libraries... yuck.
117     default_ss_.reset(new PhysicalSocketServer());
118     ss_ = default_ss_.get();
119   }
120   ss_->SetMessageQueue(this);
121 }
122 
~MessageQueue()123 MessageQueue::~MessageQueue() {
124   // The signal is done from here to ensure
125   // that it always gets called when the queue
126   // is going away.
127   SignalQueueDestroyed();
128   if (active_) {
129     MessageQueueManager::Instance()->Remove(this);
130     Clear(NULL);
131   }
132   if (ss_) {
133     ss_->SetMessageQueue(NULL);
134   }
135 }
136 
set_socketserver(SocketServer * ss)137 void MessageQueue::set_socketserver(SocketServer* ss) {
138   ss_ = ss ? ss : default_ss_.get();
139   ss_->SetMessageQueue(this);
140 }
141 
Quit()142 void MessageQueue::Quit() {
143   fStop_ = true;
144   ss_->WakeUp();
145 }
146 
IsQuitting()147 bool MessageQueue::IsQuitting() {
148   return fStop_;
149 }
150 
Restart()151 void MessageQueue::Restart() {
152   fStop_ = false;
153 }
154 
Peek(Message * pmsg,int cmsWait)155 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
156   if (fPeekKeep_) {
157     *pmsg = msgPeek_;
158     return true;
159   }
160   if (!Get(pmsg, cmsWait))
161     return false;
162   msgPeek_ = *pmsg;
163   fPeekKeep_ = true;
164   return true;
165 }
166 
Get(Message * pmsg,int cmsWait,bool process_io)167 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
168   // Return and clear peek if present
169   // Always return the peek if it exists so there is Peek/Get symmetry
170 
171   if (fPeekKeep_) {
172     *pmsg = msgPeek_;
173     fPeekKeep_ = false;
174     return true;
175   }
176 
177   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
178 
179   int cmsTotal = cmsWait;
180   int cmsElapsed = 0;
181   uint32 msStart = Time();
182   uint32 msCurrent = msStart;
183   while (true) {
184     // Check for sent messages
185 
186     ReceiveSends();
187 
188     // Check queues
189 
190     int cmsDelayNext = kForever;
191     {
192       CritScope cs(&crit_);
193 
194       // Check for delayed messages that have been triggered
195       // Calc the next trigger too
196 
197       while (!dmsgq_.empty()) {
198         if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
199           cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
200           break;
201         }
202         msgq_.push_back(dmsgq_.top().msg_);
203         dmsgq_.pop();
204       }
205 
206       // Check for posted events
207 
208       while (!msgq_.empty()) {
209         *pmsg = msgq_.front();
210         if (pmsg->ts_sensitive) {
211           long delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
212           if (delay > 0) {
213             LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
214                               << (delay + kMaxMsgLatency) << "ms";
215           }
216         }
217         msgq_.pop_front();
218         if (MQID_DISPOSE == pmsg->message_id) {
219           ASSERT(NULL == pmsg->phandler);
220           delete pmsg->pdata;
221           continue;
222         }
223         return true;
224       }
225     }
226 
227     if (fStop_)
228       break;
229 
230     // Which is shorter, the delay wait or the asked wait?
231 
232     int cmsNext;
233     if (cmsWait == kForever) {
234       cmsNext = cmsDelayNext;
235     } else {
236       cmsNext = _max(0, cmsTotal - cmsElapsed);
237       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
238         cmsNext = cmsDelayNext;
239     }
240 
241     // Wait and multiplex in the meantime
242     if (!ss_->Wait(cmsNext, process_io))
243       return false;
244 
245     // If the specified timeout expired, return
246 
247     msCurrent = Time();
248     cmsElapsed = TimeDiff(msCurrent, msStart);
249     if (cmsWait != kForever) {
250       if (cmsElapsed >= cmsWait)
251         return false;
252     }
253   }
254   return false;
255 }
256 
ReceiveSends()257 void MessageQueue::ReceiveSends() {
258 }
259 
Post(MessageHandler * phandler,uint32 id,MessageData * pdata,bool time_sensitive)260 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
261     MessageData *pdata, bool time_sensitive) {
262   if (fStop_)
263     return;
264 
265   // Keep thread safe
266   // Add the message to the end of the queue
267   // Signal for the multiplexer to return
268 
269   CritScope cs(&crit_);
270   EnsureActive();
271   Message msg;
272   msg.phandler = phandler;
273   msg.message_id = id;
274   msg.pdata = pdata;
275   if (time_sensitive) {
276     msg.ts_sensitive = Time() + kMaxMsgLatency;
277   }
278   msgq_.push_back(msg);
279   ss_->WakeUp();
280 }
281 
DoDelayPost(int cmsDelay,uint32 tstamp,MessageHandler * phandler,uint32 id,MessageData * pdata)282 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
283     MessageHandler *phandler, uint32 id, MessageData* pdata) {
284   if (fStop_)
285     return;
286 
287   // Keep thread safe
288   // Add to the priority queue. Gets sorted soonest first.
289   // Signal for the multiplexer to return.
290 
291   CritScope cs(&crit_);
292   EnsureActive();
293   Message msg;
294   msg.phandler = phandler;
295   msg.message_id = id;
296   msg.pdata = pdata;
297   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
298   dmsgq_.push(dmsg);
299   // If this message queue processes 1 message every millisecond for 50 days,
300   // we will wrap this number.  Even then, only messages with identical times
301   // will be misordered, and then only briefly.  This is probably ok.
302   VERIFY(0 != ++dmsgq_next_num_);
303   ss_->WakeUp();
304 }
305 
GetDelay()306 int MessageQueue::GetDelay() {
307   CritScope cs(&crit_);
308 
309   if (!msgq_.empty())
310     return 0;
311 
312   if (!dmsgq_.empty()) {
313     int delay = TimeUntil(dmsgq_.top().msTrigger_);
314     if (delay < 0)
315       delay = 0;
316     return delay;
317   }
318 
319   return kForever;
320 }
321 
Clear(MessageHandler * phandler,uint32 id,MessageList * removed)322 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
323                          MessageList* removed) {
324   CritScope cs(&crit_);
325 
326   // Remove messages with phandler
327 
328   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
329     if (removed) {
330       removed->push_back(msgPeek_);
331     } else {
332       delete msgPeek_.pdata;
333     }
334     fPeekKeep_ = false;
335   }
336 
337   // Remove from ordered message queue
338 
339   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
340     if (it->Match(phandler, id)) {
341       if (removed) {
342         removed->push_back(*it);
343       } else {
344         delete it->pdata;
345       }
346       it = msgq_.erase(it);
347     } else {
348       ++it;
349     }
350   }
351 
352   // Remove from priority queue. Not directly iterable, so use this approach
353 
354   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
355   for (PriorityQueue::container_type::iterator it = new_end;
356        it != dmsgq_.container().end(); ++it) {
357     if (it->msg_.Match(phandler, id)) {
358       if (removed) {
359         removed->push_back(it->msg_);
360       } else {
361         delete it->msg_.pdata;
362       }
363     } else {
364       *new_end++ = *it;
365     }
366   }
367   dmsgq_.container().erase(new_end, dmsgq_.container().end());
368   dmsgq_.reheap();
369 }
370 
Dispatch(Message * pmsg)371 void MessageQueue::Dispatch(Message *pmsg) {
372   pmsg->phandler->OnMessage(pmsg);
373 }
374 
EnsureActive()375 void MessageQueue::EnsureActive() {
376   ASSERT(crit_.CurrentThreadIsOwner());
377   if (!active_) {
378     active_ = true;
379     MessageQueueManager::Instance()->Add(this);
380   }
381 }
382 
383 }  // namespace talk_base
384