1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "jingle/glue/thread_wrapper.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/libjingle/source/talk/base/nullsocketserver.h"
12
13 namespace jingle_glue {
14
15 struct JingleThreadWrapper::PendingSend {
PendingSendjingle_glue::JingleThreadWrapper::PendingSend16 PendingSend(const talk_base::Message& message_value)
17 : sending_thread(JingleThreadWrapper::current()),
18 message(message_value),
19 done_event(true, false) {
20 DCHECK(sending_thread);
21 }
22
23 JingleThreadWrapper* sending_thread;
24 talk_base::Message message;
25 base::WaitableEvent done_event;
26 };
27
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30
31 // static
EnsureForCurrentMessageLoop()32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == NULL) {
34 base::MessageLoop* message_loop = base::MessageLoop::current();
35 g_jingle_thread_wrapper.Get()
36 .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
37 message_loop->AddDestructionObserver(current());
38 }
39
40 DCHECK_EQ(talk_base::Thread::Current(), current());
41 }
42
43 // static
current()44 JingleThreadWrapper* JingleThreadWrapper::current() {
45 return g_jingle_thread_wrapper.Get().Get();
46 }
47
JingleThreadWrapper(scoped_refptr<base::SingleThreadTaskRunner> task_runner)48 JingleThreadWrapper::JingleThreadWrapper(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
50 : talk_base::Thread(new talk_base::NullSocketServer()),
51 task_runner_(task_runner),
52 send_allowed_(false),
53 last_task_id_(0),
54 pending_send_event_(true, false),
55 weak_ptr_factory_(this) {
56 DCHECK(task_runner->BelongsToCurrentThread());
57 DCHECK(!talk_base::Thread::Current());
58 weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
59 talk_base::MessageQueueManager::Add(this);
60 WrapCurrent();
61 }
62
~JingleThreadWrapper()63 JingleThreadWrapper::~JingleThreadWrapper() {
64 Clear(NULL, talk_base::MQID_ANY, NULL);
65 }
66
WillDestroyCurrentMessageLoop()67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68 DCHECK_EQ(talk_base::Thread::Current(), current());
69 UnwrapCurrent();
70 g_jingle_thread_wrapper.Get().Set(NULL);
71 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
72 talk_base::MessageQueueManager::Remove(this);
73 talk_base::SocketServer* ss = socketserver();
74 delete this;
75 delete ss;
76 }
77
Post(talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data,bool time_sensitive)78 void JingleThreadWrapper::Post(
79 talk_base::MessageHandler* handler, uint32 message_id,
80 talk_base::MessageData* data, bool time_sensitive) {
81 PostTaskInternal(0, handler, message_id, data);
82 }
83
PostDelayed(int delay_ms,talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data)84 void JingleThreadWrapper::PostDelayed(
85 int delay_ms, talk_base::MessageHandler* handler,
86 uint32 message_id, talk_base::MessageData* data) {
87 PostTaskInternal(delay_ms, handler, message_id, data);
88 }
89
Clear(talk_base::MessageHandler * handler,uint32 id,talk_base::MessageList * removed)90 void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id,
91 talk_base::MessageList* removed) {
92 base::AutoLock auto_lock(lock_);
93
94 for (MessagesQueue::iterator it = messages_.begin();
95 it != messages_.end();) {
96 MessagesQueue::iterator next = it;
97 ++next;
98
99 if (it->second.Match(handler, id)) {
100 if (removed) {
101 removed->push_back(it->second);
102 } else {
103 delete it->second.pdata;
104 }
105 messages_.erase(it);
106 }
107
108 it = next;
109 }
110
111 for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
112 it != pending_send_messages_.end();) {
113 std::list<PendingSend*>::iterator next = it;
114 ++next;
115
116 if ((*it)->message.Match(handler, id)) {
117 if (removed) {
118 removed ->push_back((*it)->message);
119 } else {
120 delete (*it)->message.pdata;
121 }
122 (*it)->done_event.Signal();
123 pending_send_messages_.erase(it);
124 }
125
126 it = next;
127 }
128 }
129
Send(talk_base::MessageHandler * handler,uint32 id,talk_base::MessageData * data)130 void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id,
131 talk_base::MessageData *data) {
132 if (fStop_)
133 return;
134
135 JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
136 DCHECK(current_thread != NULL) << "Send() can be called only from a "
137 "thread that has JingleThreadWrapper.";
138
139 talk_base::Message message;
140 message.phandler = handler;
141 message.message_id = id;
142 message.pdata = data;
143
144 if (current_thread == this) {
145 handler->OnMessage(&message);
146 return;
147 }
148
149 // Send message from a thread different than |this|.
150
151 // Allow inter-thread send only from threads that have
152 // |send_allowed_| flag set.
153 DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
154 "messages is not allowed from the current thread.";
155
156 PendingSend pending_send(message);
157 {
158 base::AutoLock auto_lock(lock_);
159 pending_send_messages_.push_back(&pending_send);
160 }
161
162 // Need to signal |pending_send_event_| here in case the thread is
163 // sending message to another thread.
164 pending_send_event_.Signal();
165 task_runner_->PostTask(FROM_HERE,
166 base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167 weak_ptr_));
168
169
170 while (!pending_send.done_event.IsSignaled()) {
171 base::WaitableEvent* events[] = {&pending_send.done_event,
172 ¤t_thread->pending_send_event_};
173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174 DCHECK(event == 0 || event == 1);
175
176 if (event == 1)
177 current_thread->ProcessPendingSends();
178 }
179 }
180
ProcessPendingSends()181 void JingleThreadWrapper::ProcessPendingSends() {
182 while (true) {
183 PendingSend* pending_send = NULL;
184 {
185 base::AutoLock auto_lock(lock_);
186 if (!pending_send_messages_.empty()) {
187 pending_send = pending_send_messages_.front();
188 pending_send_messages_.pop_front();
189 } else {
190 // Reset the event while |lock_| is still locked.
191 pending_send_event_.Reset();
192 break;
193 }
194 }
195 if (pending_send) {
196 pending_send->message.phandler->OnMessage(&pending_send->message);
197 pending_send->done_event.Signal();
198 }
199 }
200 }
201
PostTaskInternal(int delay_ms,talk_base::MessageHandler * handler,uint32 message_id,talk_base::MessageData * data)202 void JingleThreadWrapper::PostTaskInternal(
203 int delay_ms, talk_base::MessageHandler* handler,
204 uint32 message_id, talk_base::MessageData* data) {
205 int task_id;
206 talk_base::Message message;
207 message.phandler = handler;
208 message.message_id = message_id;
209 message.pdata = data;
210 {
211 base::AutoLock auto_lock(lock_);
212 task_id = ++last_task_id_;
213 messages_.insert(std::pair<int, talk_base::Message>(task_id, message));
214 }
215
216 if (delay_ms <= 0) {
217 task_runner_->PostTask(FROM_HERE,
218 base::Bind(&JingleThreadWrapper::RunTask,
219 weak_ptr_, task_id));
220 } else {
221 task_runner_->PostDelayedTask(FROM_HERE,
222 base::Bind(&JingleThreadWrapper::RunTask,
223 weak_ptr_, task_id),
224 base::TimeDelta::FromMilliseconds(delay_ms));
225 }
226 }
227
RunTask(int task_id)228 void JingleThreadWrapper::RunTask(int task_id) {
229 bool have_message = false;
230 talk_base::Message message;
231 {
232 base::AutoLock auto_lock(lock_);
233 MessagesQueue::iterator it = messages_.find(task_id);
234 if (it != messages_.end()) {
235 have_message = true;
236 message = it->second;
237 messages_.erase(it);
238 }
239 }
240
241 if (have_message) {
242 if (message.message_id == talk_base::MQID_DISPOSE) {
243 DCHECK(message.phandler == NULL);
244 delete message.pdata;
245 } else {
246 message.phandler->OnMessage(&message);
247 }
248 }
249 }
250
251 // All methods below are marked as not reached. See comments in the
252 // header for more details.
Quit()253 void JingleThreadWrapper::Quit() {
254 NOTREACHED();
255 }
256
IsQuitting()257 bool JingleThreadWrapper::IsQuitting() {
258 NOTREACHED();
259 return false;
260 }
261
Restart()262 void JingleThreadWrapper::Restart() {
263 NOTREACHED();
264 }
265
Get(talk_base::Message *,int,bool)266 bool JingleThreadWrapper::Get(talk_base::Message*, int, bool) {
267 NOTREACHED();
268 return false;
269 }
270
Peek(talk_base::Message *,int)271 bool JingleThreadWrapper::Peek(talk_base::Message*, int) {
272 NOTREACHED();
273 return false;
274 }
275
PostAt(uint32,talk_base::MessageHandler *,uint32,talk_base::MessageData *)276 void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*,
277 uint32, talk_base::MessageData*) {
278 NOTREACHED();
279 }
280
Dispatch(talk_base::Message * message)281 void JingleThreadWrapper::Dispatch(talk_base::Message* message) {
282 NOTREACHED();
283 }
284
ReceiveSends()285 void JingleThreadWrapper::ReceiveSends() {
286 NOTREACHED();
287 }
288
GetDelay()289 int JingleThreadWrapper::GetDelay() {
290 NOTREACHED();
291 return 0;
292 }
293
Stop()294 void JingleThreadWrapper::Stop() {
295 NOTREACHED();
296 }
297
Run()298 void JingleThreadWrapper::Run() {
299 NOTREACHED();
300 }
301
302 } // namespace jingle_glue
303