• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "content/child/webmessageportchannel_impl.h"
6 
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
14 
15 using blink::WebMessagePortChannel;
16 using blink::WebMessagePortChannelArray;
17 using blink::WebMessagePortChannelClient;
18 using blink::WebString;
19 
20 namespace content {
21 
WebMessagePortChannelImpl(base::MessageLoopProxy * child_thread_loop)22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23     base::MessageLoopProxy* child_thread_loop)
24     : client_(NULL),
25       route_id_(MSG_ROUTING_NONE),
26       message_port_id_(MSG_ROUTING_NONE),
27       child_thread_loop_(child_thread_loop) {
28   AddRef();
29   Init();
30 }
31 
WebMessagePortChannelImpl(int route_id,int message_port_id,base::MessageLoopProxy * child_thread_loop)32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
33     int route_id,
34     int message_port_id,
35     base::MessageLoopProxy* child_thread_loop)
36     : client_(NULL),
37       route_id_(route_id),
38       message_port_id_(message_port_id),
39       child_thread_loop_(child_thread_loop) {
40   AddRef();
41   Init();
42 }
43 
~WebMessagePortChannelImpl()44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45   // If we have any queued messages with attached ports, manually destroy them.
46   while (!message_queue_.empty()) {
47     const std::vector<WebMessagePortChannelImpl*>& channel_array =
48         message_queue_.front().ports;
49     for (size_t i = 0; i < channel_array.size(); i++) {
50       channel_array[i]->destroy();
51     }
52     message_queue_.pop();
53   }
54 
55   if (message_port_id_ != MSG_ROUTING_NONE)
56     Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
57 
58   if (route_id_ != MSG_ROUTING_NONE)
59     ChildThread::current()->RemoveRoute(route_id_);
60 }
61 
setClient(WebMessagePortChannelClient * client)62 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
63   // Must lock here since client_ is called on the main thread.
64   base::AutoLock auto_lock(lock_);
65   client_ = client;
66 }
67 
destroy()68 void WebMessagePortChannelImpl::destroy() {
69   setClient(NULL);
70 
71   // Release the object on the main thread, since the destructor might want to
72   // send an IPC, and that has to happen on the main thread.
73   child_thread_loop_->ReleaseSoon(FROM_HERE, this);
74 }
75 
entangle(WebMessagePortChannel * channel)76 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel* channel) {
77   // The message port ids might not be set up yet, if this channel wasn't
78   // created on the main thread.  So need to wait until we're on the main thread
79   // before getting the other message port id.
80   scoped_refptr<WebMessagePortChannelImpl> webchannel(
81       static_cast<WebMessagePortChannelImpl*>(channel));
82   Entangle(webchannel);
83 }
84 
postMessage(const WebString & message,WebMessagePortChannelArray * channels)85 void WebMessagePortChannelImpl::postMessage(
86     const WebString& message,
87     WebMessagePortChannelArray* channels) {
88   if (!child_thread_loop_->BelongsToCurrentThread()) {
89     child_thread_loop_->PostTask(
90         FROM_HERE,
91         base::Bind(
92             &WebMessagePortChannelImpl::postMessage, this, message, channels));
93     return;
94   }
95 
96   std::vector<int> message_port_ids(channels ? channels->size() : 0);
97   if (channels) {
98     // Extract the port IDs from the source array, then free it.
99     for (size_t i = 0; i < channels->size(); ++i) {
100       WebMessagePortChannelImpl* webchannel =
101           static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
102       message_port_ids[i] = webchannel->message_port_id();
103       webchannel->QueueMessages();
104       DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
105     }
106     delete channels;
107   }
108 
109   IPC::Message* msg = new MessagePortHostMsg_PostMessage(
110       message_port_id_, message, message_port_ids);
111   Send(msg);
112 }
113 
tryGetMessage(WebString * message,WebMessagePortChannelArray & channels)114 bool WebMessagePortChannelImpl::tryGetMessage(
115     WebString* message,
116     WebMessagePortChannelArray& channels) {
117   base::AutoLock auto_lock(lock_);
118   if (message_queue_.empty())
119     return false;
120 
121   *message = message_queue_.front().message;
122   const std::vector<WebMessagePortChannelImpl*>& channel_array =
123       message_queue_.front().ports;
124   WebMessagePortChannelArray result_ports(channel_array.size());
125   for (size_t i = 0; i < channel_array.size(); i++) {
126     result_ports[i] = channel_array[i];
127   }
128 
129   channels.swap(result_ports);
130   message_queue_.pop();
131   return true;
132 }
133 
Init()134 void WebMessagePortChannelImpl::Init() {
135   if (!child_thread_loop_->BelongsToCurrentThread()) {
136     child_thread_loop_->PostTask(
137         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
138     return;
139   }
140 
141   if (route_id_ == MSG_ROUTING_NONE) {
142     DCHECK(message_port_id_ == MSG_ROUTING_NONE);
143     Send(new MessagePortHostMsg_CreateMessagePort(
144         &route_id_, &message_port_id_));
145   }
146 
147   ChildThread::current()->AddRoute(route_id_, this);
148 }
149 
Entangle(scoped_refptr<WebMessagePortChannelImpl> channel)150 void WebMessagePortChannelImpl::Entangle(
151     scoped_refptr<WebMessagePortChannelImpl> channel) {
152   if (!child_thread_loop_->BelongsToCurrentThread()) {
153     child_thread_loop_->PostTask(
154         FROM_HERE,
155         base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
156     return;
157   }
158 
159   Send(new MessagePortHostMsg_Entangle(
160       message_port_id_, channel->message_port_id()));
161 }
162 
QueueMessages()163 void WebMessagePortChannelImpl::QueueMessages() {
164   if (!child_thread_loop_->BelongsToCurrentThread()) {
165     child_thread_loop_->PostTask(
166         FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
167     return;
168   }
169   // This message port is being sent elsewhere (perhaps to another process).
170   // The new endpoint needs to receive the queued messages, including ones that
171   // could still be in-flight.  So we tell the browser to queue messages, and it
172   // sends us an ack, whose receipt we know means that no more messages are
173   // in-flight.  We then send the queued messages to the browser, which prepends
174   // them to the ones it queued and it sends them to the new endpoint.
175   Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
176 
177   // The process could potentially go away while we're still waiting for
178   // in-flight messages.  Ensure it stays alive.
179   ChildProcess::current()->AddRefProcess();
180 }
181 
Send(IPC::Message * message)182 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
183   if (!child_thread_loop_->BelongsToCurrentThread()) {
184     DCHECK(!message->is_sync());
185     child_thread_loop_->PostTask(
186         FROM_HERE,
187         base::Bind(&WebMessagePortChannelImpl::Send, this, message));
188     return;
189   }
190 
191   ChildThread::current()->Send(message);
192 }
193 
OnMessageReceived(const IPC::Message & message)194 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
195   bool handled = true;
196   IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
197     IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
198     IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
199     IPC_MESSAGE_UNHANDLED(handled = false)
200   IPC_END_MESSAGE_MAP()
201   return handled;
202 }
203 
OnMessage(const base::string16 & message,const std::vector<int> & sent_message_port_ids,const std::vector<int> & new_routing_ids)204 void WebMessagePortChannelImpl::OnMessage(
205     const base::string16& message,
206     const std::vector<int>& sent_message_port_ids,
207     const std::vector<int>& new_routing_ids) {
208   base::AutoLock auto_lock(lock_);
209   Message msg;
210   msg.message = message;
211   if (!sent_message_port_ids.empty()) {
212     msg.ports.resize(sent_message_port_ids.size());
213     for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
214       msg.ports[i] = new WebMessagePortChannelImpl(new_routing_ids[i],
215                                                    sent_message_port_ids[i],
216                                                    child_thread_loop_.get());
217     }
218   }
219 
220   bool was_empty = message_queue_.empty();
221   message_queue_.push(msg);
222   if (client_ && was_empty)
223     client_->messageAvailable();
224 }
225 
OnMessagesQueued()226 void WebMessagePortChannelImpl::OnMessagesQueued() {
227   std::vector<QueuedMessage> queued_messages;
228 
229   {
230     base::AutoLock auto_lock(lock_);
231     queued_messages.reserve(message_queue_.size());
232     while (!message_queue_.empty()) {
233       base::string16 message = message_queue_.front().message;
234       const std::vector<WebMessagePortChannelImpl*>& channel_array =
235           message_queue_.front().ports;
236       std::vector<int> port_ids(channel_array.size());
237       for (size_t i = 0; i < channel_array.size(); ++i) {
238         port_ids[i] = channel_array[i]->message_port_id();
239         channel_array[i]->QueueMessages();
240       }
241       queued_messages.push_back(std::make_pair(message, port_ids));
242       message_queue_.pop();
243     }
244   }
245 
246   Send(new MessagePortHostMsg_SendQueuedMessages(
247       message_port_id_, queued_messages));
248 
249   message_port_id_ = MSG_ROUTING_NONE;
250 
251   Release();
252   ChildProcess::current()->ReleaseProcess();
253 }
254 
Message()255 WebMessagePortChannelImpl::Message::Message() {}
256 
~Message()257 WebMessagePortChannelImpl::Message::~Message() {}
258 
259 }  // namespace content
260