• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "content/child/webmessageportchannel_impl.h"
6 
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
14 
15 using blink::WebMessagePortChannel;
16 using blink::WebMessagePortChannelArray;
17 using blink::WebMessagePortChannelClient;
18 using blink::WebString;
19 
20 namespace content {
21 
WebMessagePortChannelImpl(const scoped_refptr<base::MessageLoopProxy> & child_thread_loop)22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
24     : client_(NULL),
25       route_id_(MSG_ROUTING_NONE),
26       message_port_id_(MSG_ROUTING_NONE),
27       child_thread_loop_(child_thread_loop) {
28   AddRef();
29   Init();
30 }
31 
WebMessagePortChannelImpl(int route_id,int message_port_id,const scoped_refptr<base::MessageLoopProxy> & child_thread_loop)32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
33     int route_id,
34     int message_port_id,
35     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop)
36     : client_(NULL),
37       route_id_(route_id),
38       message_port_id_(message_port_id),
39       child_thread_loop_(child_thread_loop) {
40   AddRef();
41   Init();
42 }
43 
~WebMessagePortChannelImpl()44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45   // If we have any queued messages with attached ports, manually destroy them.
46   while (!message_queue_.empty()) {
47     const std::vector<WebMessagePortChannelImpl*>& channel_array =
48         message_queue_.front().ports;
49     for (size_t i = 0; i < channel_array.size(); i++) {
50       channel_array[i]->destroy();
51     }
52     message_queue_.pop();
53   }
54 
55   if (message_port_id_ != MSG_ROUTING_NONE)
56     Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
57 
58   if (route_id_ != MSG_ROUTING_NONE)
59     ChildThread::current()->GetRouter()->RemoveRoute(route_id_);
60 }
61 
62 // static
CreatePair(const scoped_refptr<base::MessageLoopProxy> & child_thread_loop,blink::WebMessagePortChannel ** channel1,blink::WebMessagePortChannel ** channel2)63 void WebMessagePortChannelImpl::CreatePair(
64     const scoped_refptr<base::MessageLoopProxy>& child_thread_loop,
65     blink::WebMessagePortChannel** channel1,
66     blink::WebMessagePortChannel** channel2) {
67   WebMessagePortChannelImpl* impl1 =
68       new WebMessagePortChannelImpl(child_thread_loop);
69   WebMessagePortChannelImpl* impl2 =
70       new WebMessagePortChannelImpl(child_thread_loop);
71 
72   impl1->Entangle(impl2);
73   impl2->Entangle(impl1);
74 
75   *channel1 = impl1;
76   *channel2 = impl2;
77 }
78 
79 // static
ExtractMessagePortIDs(WebMessagePortChannelArray * channels)80 std::vector<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81     WebMessagePortChannelArray* channels) {
82   std::vector<int> message_port_ids;
83   if (channels) {
84     message_port_ids.resize(channels->size());
85     // Extract the port IDs from the source array, then free it.
86     for (size_t i = 0; i < channels->size(); ++i) {
87       WebMessagePortChannelImpl* webchannel =
88           static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
89       // The message port ids might not be set up yet if this channel
90       // wasn't created on the main thread.
91       DCHECK(webchannel->child_thread_loop_->BelongsToCurrentThread());
92       message_port_ids[i] = webchannel->message_port_id();
93       webchannel->QueueMessages();
94       DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
95     }
96     delete channels;
97   }
98   return message_port_ids;
99 }
100 
setClient(WebMessagePortChannelClient * client)101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
102   // Must lock here since client_ is called on the main thread.
103   base::AutoLock auto_lock(lock_);
104   client_ = client;
105 }
106 
destroy()107 void WebMessagePortChannelImpl::destroy() {
108   setClient(NULL);
109 
110   // Release the object on the main thread, since the destructor might want to
111   // send an IPC, and that has to happen on the main thread.
112   child_thread_loop_->ReleaseSoon(FROM_HERE, this);
113 }
114 
postMessage(const WebString & message,WebMessagePortChannelArray * channels)115 void WebMessagePortChannelImpl::postMessage(
116     const WebString& message,
117     WebMessagePortChannelArray* channels) {
118   if (!child_thread_loop_->BelongsToCurrentThread()) {
119     child_thread_loop_->PostTask(
120         FROM_HERE,
121         base::Bind(
122             &WebMessagePortChannelImpl::PostMessage, this,
123             static_cast<base::string16>(message), channels));
124   } else {
125     PostMessage(message, channels);
126   }
127 }
128 
PostMessage(const base::string16 & message,WebMessagePortChannelArray * channels)129 void WebMessagePortChannelImpl::PostMessage(
130     const base::string16& message,
131     WebMessagePortChannelArray* channels) {
132   IPC::Message* msg = new MessagePortHostMsg_PostMessage(
133       message_port_id_, message, ExtractMessagePortIDs(channels));
134   Send(msg);
135 }
136 
tryGetMessage(WebString * message,WebMessagePortChannelArray & channels)137 bool WebMessagePortChannelImpl::tryGetMessage(
138     WebString* message,
139     WebMessagePortChannelArray& channels) {
140   base::AutoLock auto_lock(lock_);
141   if (message_queue_.empty())
142     return false;
143 
144   *message = message_queue_.front().message;
145   const std::vector<WebMessagePortChannelImpl*>& channel_array =
146       message_queue_.front().ports;
147   WebMessagePortChannelArray result_ports(channel_array.size());
148   for (size_t i = 0; i < channel_array.size(); i++) {
149     result_ports[i] = channel_array[i];
150   }
151 
152   channels.swap(result_ports);
153   message_queue_.pop();
154   return true;
155 }
156 
Init()157 void WebMessagePortChannelImpl::Init() {
158   if (!child_thread_loop_->BelongsToCurrentThread()) {
159     child_thread_loop_->PostTask(
160         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
161     return;
162   }
163 
164   if (route_id_ == MSG_ROUTING_NONE) {
165     DCHECK(message_port_id_ == MSG_ROUTING_NONE);
166     Send(new MessagePortHostMsg_CreateMessagePort(
167         &route_id_, &message_port_id_));
168   }
169 
170   ChildThread::current()->GetRouter()->AddRoute(route_id_, this);
171 }
172 
Entangle(scoped_refptr<WebMessagePortChannelImpl> channel)173 void WebMessagePortChannelImpl::Entangle(
174     scoped_refptr<WebMessagePortChannelImpl> channel) {
175   // The message port ids might not be set up yet, if this channel wasn't
176   // created on the main thread.  So need to wait until we're on the main thread
177   // before getting the other message port id.
178   if (!child_thread_loop_->BelongsToCurrentThread()) {
179     child_thread_loop_->PostTask(
180         FROM_HERE,
181         base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
182     return;
183   }
184 
185   Send(new MessagePortHostMsg_Entangle(
186       message_port_id_, channel->message_port_id()));
187 }
188 
QueueMessages()189 void WebMessagePortChannelImpl::QueueMessages() {
190   if (!child_thread_loop_->BelongsToCurrentThread()) {
191     child_thread_loop_->PostTask(
192         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
193     return;
194   }
195   // This message port is being sent elsewhere (perhaps to another process).
196   // The new endpoint needs to receive the queued messages, including ones that
197   // could still be in-flight.  So we tell the browser to queue messages, and it
198   // sends us an ack, whose receipt we know means that no more messages are
199   // in-flight.  We then send the queued messages to the browser, which prepends
200   // them to the ones it queued and it sends them to the new endpoint.
201   Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
202 
203   // The process could potentially go away while we're still waiting for
204   // in-flight messages.  Ensure it stays alive.
205   ChildProcess::current()->AddRefProcess();
206 }
207 
Send(IPC::Message * message)208 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
209   if (!child_thread_loop_->BelongsToCurrentThread()) {
210     DCHECK(!message->is_sync());
211     child_thread_loop_->PostTask(
212         FROM_HERE,
213         base::Bind(&WebMessagePortChannelImpl::Send, this, message));
214     return;
215   }
216 
217   ChildThread::current()->GetRouter()->Send(message);
218 }
219 
OnMessageReceived(const IPC::Message & message)220 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
221   bool handled = true;
222   IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
223     IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
224     IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
225     IPC_MESSAGE_UNHANDLED(handled = false)
226   IPC_END_MESSAGE_MAP()
227   return handled;
228 }
229 
OnMessage(const base::string16 & message,const std::vector<int> & sent_message_port_ids,const std::vector<int> & new_routing_ids)230 void WebMessagePortChannelImpl::OnMessage(
231     const base::string16& message,
232     const std::vector<int>& sent_message_port_ids,
233     const std::vector<int>& new_routing_ids) {
234   base::AutoLock auto_lock(lock_);
235   Message msg;
236   msg.message = message;
237   if (!sent_message_port_ids.empty()) {
238     msg.ports.resize(sent_message_port_ids.size());
239     for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
240       msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
241                                                    sent_message_port_ids[i],
242                                                    child_thread_loop_.get());
243     }
244   }
245 
246   bool was_empty = message_queue_.empty();
247   message_queue_.push(msg);
248   if (client_ && was_empty)
249     client_->messageAvailable();
250 }
251 
OnMessagesQueued()252 void WebMessagePortChannelImpl::OnMessagesQueued() {
253   std::vector<QueuedMessage> queued_messages;
254 
255   {
256     base::AutoLock auto_lock(lock_);
257     queued_messages.reserve(message_queue_.size());
258     while (!message_queue_.empty()) {
259       base::string16 message = message_queue_.front().message;
260       const std::vector<WebMessagePortChannelImpl*>& channel_array =
261           message_queue_.front().ports;
262       std::vector<int> port_ids(channel_array.size());
263       for (size_t i = 0; i < channel_array.size(); ++i) {
264         port_ids[i] = channel_array[i]->message_port_id();
265         channel_array[i]->QueueMessages();
266       }
267       queued_messages.push_back(std::make_pair(message, port_ids));
268       message_queue_.pop();
269     }
270   }
271 
272   Send(new MessagePortHostMsg_SendQueuedMessages(
273       message_port_id_, queued_messages));
274 
275   message_port_id_ = MSG_ROUTING_NONE;
276 
277   Release();
278   ChildProcess::current()->ReleaseProcess();
279 }
280 
Message()281 WebMessagePortChannelImpl::Message::Message() {}
282 
~Message()283 WebMessagePortChannelImpl::Message::~Message() {}
284 
285 }  // namespace content
286