• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "jingle/glue/thread_wrapper.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/libjingle/source/talk/base/nullsocketserver.h"
12 
13 namespace jingle_glue {
14 
15 struct JingleThreadWrapper::PendingSend {
PendingSendjingle_glue::JingleThreadWrapper::PendingSend16   PendingSend(const talk_base::Message& message_value)
17       : sending_thread(JingleThreadWrapper::current()),
18         message(message_value),
19         done_event(true, false) {
20     DCHECK(sending_thread);
21   }
22 
23   JingleThreadWrapper* sending_thread;
24   talk_base::Message message;
25   base::WaitableEvent done_event;
26 };
27 
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29     g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30 
31 // static
EnsureForCurrentMessageLoop()32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33   if (JingleThreadWrapper::current() == NULL) {
34     base::MessageLoop* message_loop = base::MessageLoop::current();
35     g_jingle_thread_wrapper.Get()
36         .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
37     message_loop->AddDestructionObserver(current());
38   }
39 
40   DCHECK_EQ(talk_base::Thread::Current(), current());
41 }
42 
43 // static
current()44 JingleThreadWrapper* JingleThreadWrapper::current() {
45   return g_jingle_thread_wrapper.Get().Get();
46 }
47 
JingleThreadWrapper(scoped_refptr<base::SingleThreadTaskRunner> task_runner)48 JingleThreadWrapper::JingleThreadWrapper(
49     scoped_refptr<base::SingleThreadTaskRunner> task_runner)
50     : talk_base::Thread(new talk_base::NullSocketServer()),
51       task_runner_(task_runner),
52       send_allowed_(false),
53       last_task_id_(0),
54       pending_send_event_(true, false),
55       weak_ptr_factory_(this) {
56   DCHECK(task_runner->BelongsToCurrentThread());
57   DCHECK(!talk_base::Thread::Current());
58   weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
59   talk_base::MessageQueueManager::Add(this);
60   WrapCurrent();
61 }
62 
~JingleThreadWrapper()63 JingleThreadWrapper::~JingleThreadWrapper() {
64   Clear(NULL, talk_base::MQID_ANY, NULL);
65 }
66 
WillDestroyCurrentMessageLoop()67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68   DCHECK_EQ(talk_base::Thread::Current(), current());
69   UnwrapCurrent();
70   g_jingle_thread_wrapper.Get().Set(NULL);
71   talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
72   talk_base::MessageQueueManager::Remove(this);
73   talk_base::SocketServer* ss = socketserver();
74   delete this;
75   delete ss;
76 }
77 
Post(talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data,bool time_sensitive)78 void JingleThreadWrapper::Post(
79     talk_base::MessageHandler* handler, uint32 message_id,
80     talk_base::MessageData* data, bool time_sensitive) {
81   PostTaskInternal(0, handler, message_id, data);
82 }
83 
PostDelayed(int delay_ms,talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data)84 void JingleThreadWrapper::PostDelayed(
85     int delay_ms, talk_base::MessageHandler* handler,
86     uint32 message_id, talk_base::MessageData* data) {
87   PostTaskInternal(delay_ms, handler, message_id, data);
88 }
89 
Clear(talk_base::MessageHandler * handler,uint32 id,talk_base::MessageList * removed)90 void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id,
91                                 talk_base::MessageList* removed) {
92   base::AutoLock auto_lock(lock_);
93 
94   for (MessagesQueue::iterator it = messages_.begin();
95        it != messages_.end();) {
96     MessagesQueue::iterator next = it;
97     ++next;
98 
99     if (it->second.Match(handler, id)) {
100       if (removed) {
101         removed->push_back(it->second);
102       } else {
103         delete it->second.pdata;
104       }
105       messages_.erase(it);
106     }
107 
108     it = next;
109   }
110 
111   for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
112        it != pending_send_messages_.end();) {
113     std::list<PendingSend*>::iterator next = it;
114     ++next;
115 
116     if ((*it)->message.Match(handler, id)) {
117       if (removed) {
118         removed ->push_back((*it)->message);
119       } else {
120         delete (*it)->message.pdata;
121       }
122       (*it)->done_event.Signal();
123       pending_send_messages_.erase(it);
124     }
125 
126     it = next;
127   }
128 }
129 
Send(talk_base::MessageHandler * handler,uint32 id,talk_base::MessageData * data)130 void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id,
131                                talk_base::MessageData *data) {
132   if (fStop_)
133     return;
134 
135   JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
136   DCHECK(current_thread != NULL) << "Send() can be called only from a "
137       "thread that has JingleThreadWrapper.";
138 
139   talk_base::Message message;
140   message.phandler = handler;
141   message.message_id = id;
142   message.pdata = data;
143 
144   if (current_thread == this) {
145     handler->OnMessage(&message);
146     return;
147   }
148 
149   // Send message from a thread different than |this|.
150 
151   // Allow inter-thread send only from threads that have
152   // |send_allowed_| flag set.
153   DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
154       "messages is not allowed from the current thread.";
155 
156   PendingSend pending_send(message);
157   {
158     base::AutoLock auto_lock(lock_);
159     pending_send_messages_.push_back(&pending_send);
160   }
161 
162   // Need to signal |pending_send_event_| here in case the thread is
163   // sending message to another thread.
164   pending_send_event_.Signal();
165   task_runner_->PostTask(FROM_HERE,
166                          base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167                                     weak_ptr_));
168 
169 
170   while (!pending_send.done_event.IsSignaled()) {
171     base::WaitableEvent* events[] = {&pending_send.done_event,
172                                      &current_thread->pending_send_event_};
173     size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174     DCHECK(event == 0 || event == 1);
175 
176     if (event == 1)
177       current_thread->ProcessPendingSends();
178   }
179 }
180 
ProcessPendingSends()181 void JingleThreadWrapper::ProcessPendingSends() {
182   while (true) {
183     PendingSend* pending_send = NULL;
184     {
185       base::AutoLock auto_lock(lock_);
186       if (!pending_send_messages_.empty()) {
187         pending_send = pending_send_messages_.front();
188         pending_send_messages_.pop_front();
189       } else {
190         // Reset the event while |lock_| is still locked.
191         pending_send_event_.Reset();
192         break;
193       }
194     }
195     if (pending_send) {
196       pending_send->message.phandler->OnMessage(&pending_send->message);
197       pending_send->done_event.Signal();
198     }
199   }
200 }
201 
PostTaskInternal(int delay_ms,talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data)202 void JingleThreadWrapper::PostTaskInternal(
203     int delay_ms, talk_base::MessageHandler* handler,
204     uint32 message_id, talk_base::MessageData* data) {
205   int task_id;
206   talk_base::Message message;
207   message.phandler = handler;
208   message.message_id = message_id;
209   message.pdata = data;
210   {
211     base::AutoLock auto_lock(lock_);
212     task_id = ++last_task_id_;
213     messages_.insert(std::pair<int, talk_base::Message>(task_id, message));
214   }
215 
216   if (delay_ms <= 0) {
217     task_runner_->PostTask(FROM_HERE,
218                            base::Bind(&JingleThreadWrapper::RunTask,
219                                       weak_ptr_, task_id));
220   } else {
221     task_runner_->PostDelayedTask(FROM_HERE,
222                                   base::Bind(&JingleThreadWrapper::RunTask,
223                                              weak_ptr_, task_id),
224                                   base::TimeDelta::FromMilliseconds(delay_ms));
225   }
226 }
227 
RunTask(int task_id)228 void JingleThreadWrapper::RunTask(int task_id) {
229   bool have_message = false;
230   talk_base::Message message;
231   {
232     base::AutoLock auto_lock(lock_);
233     MessagesQueue::iterator it = messages_.find(task_id);
234     if (it != messages_.end()) {
235       have_message = true;
236       message = it->second;
237       messages_.erase(it);
238     }
239   }
240 
241   if (have_message) {
242     if (message.message_id == talk_base::MQID_DISPOSE) {
243       DCHECK(message.phandler == NULL);
244       delete message.pdata;
245     } else {
246       message.phandler->OnMessage(&message);
247     }
248   }
249 }
250 
251 // All methods below are marked as not reached. See comments in the
252 // header for more details.
Quit()253 void JingleThreadWrapper::Quit() {
254   NOTREACHED();
255 }
256 
IsQuitting()257 bool JingleThreadWrapper::IsQuitting() {
258   NOTREACHED();
259   return false;
260 }
261 
Restart()262 void JingleThreadWrapper::Restart() {
263   NOTREACHED();
264 }
265 
Get(talk_base::Message *,int,bool)266 bool JingleThreadWrapper::Get(talk_base::Message*, int, bool) {
267   NOTREACHED();
268   return false;
269 }
270 
Peek(talk_base::Message *,int)271 bool JingleThreadWrapper::Peek(talk_base::Message*, int) {
272   NOTREACHED();
273   return false;
274 }
275 
PostAt(uint32,talk_base::MessageHandler *,uint32,talk_base::MessageData *)276 void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*,
277                                  uint32, talk_base::MessageData*) {
278   NOTREACHED();
279 }
280 
Dispatch(talk_base::Message * message)281 void JingleThreadWrapper::Dispatch(talk_base::Message* message) {
282   NOTREACHED();
283 }
284 
ReceiveSends()285 void JingleThreadWrapper::ReceiveSends() {
286   NOTREACHED();
287 }
288 
GetDelay()289 int JingleThreadWrapper::GetDelay() {
290   NOTREACHED();
291   return 0;
292 }
293 
Stop()294 void JingleThreadWrapper::Stop() {
295   NOTREACHED();
296 }
297 
Run()298 void JingleThreadWrapper::Run() {
299   NOTREACHED();
300 }
301 
302 }  // namespace jingle_glue
303