• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_channel_proxy.h"
6 
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_channel_factory.h"
15 #include "ipc/ipc_listener.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/message_filter.h"
19 #include "ipc/message_filter_router.h"
20 
21 namespace IPC {
22 
23 //------------------------------------------------------------------------------
24 
Context(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner)25 ChannelProxy::Context::Context(
26     Listener* listener,
27     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
28     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
29       listener_(listener),
30       ipc_task_runner_(ipc_task_runner),
31       channel_connected_called_(false),
32       message_filter_router_(new MessageFilterRouter()),
33       peer_pid_(base::kNullProcessId) {
34   DCHECK(ipc_task_runner_.get());
35   // The Listener thread where Messages are handled must be a separate thread
36   // to avoid oversubscribing the IO thread. If you trigger this error, you
37   // need to either:
38   // 1) Create the ChannelProxy on a different thread, or
39   // 2) Just use Channel
40   // Note, we currently make an exception for a NULL listener. That usage
41   // basically works, but is outside the intent of ChannelProxy. This support
42   // will disappear, so please don't rely on it. See crbug.com/364241
43   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
44 }
45 
~Context()46 ChannelProxy::Context::~Context() {
47 }
48 
ClearIPCTaskRunner()49 void ChannelProxy::Context::ClearIPCTaskRunner() {
50   ipc_task_runner_ = NULL;
51 }
52 
CreateChannel(scoped_ptr<ChannelFactory> factory)53 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
54   DCHECK(!channel_);
55   channel_id_ = factory->GetName();
56   channel_ = factory->BuildChannel(this);
57 }
58 
TryFilters(const Message & message)59 bool ChannelProxy::Context::TryFilters(const Message& message) {
60   DCHECK(message_filter_router_);
61 #ifdef IPC_MESSAGE_LOG_ENABLED
62   Logging* logger = Logging::GetInstance();
63   if (logger->Enabled())
64     logger->OnPreDispatchMessage(message);
65 #endif
66 
67   if (message_filter_router_->TryFilters(message)) {
68     if (message.dispatch_error()) {
69       listener_task_runner_->PostTask(
70           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
71     }
72 #ifdef IPC_MESSAGE_LOG_ENABLED
73     if (logger->Enabled())
74       logger->OnPostDispatchMessage(message, channel_id_);
75 #endif
76     return true;
77   }
78   return false;
79 }
80 
81 // Called on the IPC::Channel thread
OnMessageReceived(const Message & message)82 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
83   // First give a chance to the filters to process this message.
84   if (!TryFilters(message))
85     OnMessageReceivedNoFilter(message);
86   return true;
87 }
88 
89 // Called on the IPC::Channel thread
OnMessageReceivedNoFilter(const Message & message)90 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
91   listener_task_runner_->PostTask(
92       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
93   return true;
94 }
95 
96 // Called on the IPC::Channel thread
OnChannelConnected(int32 peer_pid)97 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
98   // We cache off the peer_pid so it can be safely accessed from both threads.
99   peer_pid_ = channel_->GetPeerPID();
100 
101   // Add any pending filters.  This avoids a race condition where someone
102   // creates a ChannelProxy, calls AddFilter, and then right after starts the
103   // peer process.  The IO thread could receive a message before the task to add
104   // the filter is run on the IO thread.
105   OnAddFilter();
106 
107   // See above comment about using listener_task_runner_ here.
108   listener_task_runner_->PostTask(
109       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
110 }
111 
112 // Called on the IPC::Channel thread
OnChannelError()113 void ChannelProxy::Context::OnChannelError() {
114   for (size_t i = 0; i < filters_.size(); ++i)
115     filters_[i]->OnChannelError();
116 
117   // See above comment about using listener_task_runner_ here.
118   listener_task_runner_->PostTask(
119       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
120 }
121 
122 // Called on the IPC::Channel thread
OnChannelOpened()123 void ChannelProxy::Context::OnChannelOpened() {
124   DCHECK(channel_ != NULL);
125 
126   // Assume a reference to ourselves on behalf of this thread.  This reference
127   // will be released when we are closed.
128   AddRef();
129 
130   if (!channel_->Connect()) {
131     OnChannelError();
132     return;
133   }
134 
135   for (size_t i = 0; i < filters_.size(); ++i)
136     filters_[i]->OnFilterAdded(channel_.get());
137 }
138 
139 // Called on the IPC::Channel thread
OnChannelClosed()140 void ChannelProxy::Context::OnChannelClosed() {
141   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
142   // would result in this branch being taken.
143   if (!channel_)
144     return;
145 
146   for (size_t i = 0; i < filters_.size(); ++i) {
147     filters_[i]->OnChannelClosing();
148     filters_[i]->OnFilterRemoved();
149   }
150 
151   // We don't need the filters anymore.
152   message_filter_router_->Clear();
153   filters_.clear();
154   // We don't need the lock, because at this point, the listener thread can't
155   // access it any more.
156   pending_filters_.clear();
157 
158   channel_.reset();
159 
160   // Balance with the reference taken during startup.  This may result in
161   // self-destruction.
162   Release();
163 }
164 
Clear()165 void ChannelProxy::Context::Clear() {
166   listener_ = NULL;
167 }
168 
169 // Called on the IPC::Channel thread
OnSendMessage(scoped_ptr<Message> message)170 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
171   if (!channel_) {
172     OnChannelClosed();
173     return;
174   }
175 
176   if (!channel_->Send(message.release()))
177     OnChannelError();
178 }
179 
180 // Called on the IPC::Channel thread
OnAddFilter()181 void ChannelProxy::Context::OnAddFilter() {
182   // Our OnChannelConnected method has not yet been called, so we can't be
183   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
184   // it invokes OnAddFilter, so any pending filter(s) will be added at that
185   // time.
186   if (peer_pid_ == base::kNullProcessId)
187     return;
188 
189   std::vector<scoped_refptr<MessageFilter> > new_filters;
190   {
191     base::AutoLock auto_lock(pending_filters_lock_);
192     new_filters.swap(pending_filters_);
193   }
194 
195   for (size_t i = 0; i < new_filters.size(); ++i) {
196     filters_.push_back(new_filters[i]);
197 
198     message_filter_router_->AddFilter(new_filters[i].get());
199 
200     // The channel has already been created and connected, so we need to
201     // inform the filters right now.
202     new_filters[i]->OnFilterAdded(channel_.get());
203     new_filters[i]->OnChannelConnected(peer_pid_);
204   }
205 }
206 
207 // Called on the IPC::Channel thread
OnRemoveFilter(MessageFilter * filter)208 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
209   if (peer_pid_ == base::kNullProcessId) {
210     // The channel is not yet connected, so any filters are still pending.
211     base::AutoLock auto_lock(pending_filters_lock_);
212     for (size_t i = 0; i < pending_filters_.size(); ++i) {
213       if (pending_filters_[i].get() == filter) {
214         filter->OnFilterRemoved();
215         pending_filters_.erase(pending_filters_.begin() + i);
216         return;
217       }
218     }
219     return;
220   }
221   if (!channel_)
222     return;  // The filters have already been deleted.
223 
224   message_filter_router_->RemoveFilter(filter);
225 
226   for (size_t i = 0; i < filters_.size(); ++i) {
227     if (filters_[i].get() == filter) {
228       filter->OnFilterRemoved();
229       filters_.erase(filters_.begin() + i);
230       return;
231     }
232   }
233 
234   NOTREACHED() << "filter to be removed not found";
235 }
236 
237 // Called on the listener's thread
AddFilter(MessageFilter * filter)238 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
239   base::AutoLock auto_lock(pending_filters_lock_);
240   pending_filters_.push_back(make_scoped_refptr(filter));
241   ipc_task_runner_->PostTask(
242       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
243 }
244 
245 // Called on the listener's thread
OnDispatchMessage(const Message & message)246 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
247 #ifdef IPC_MESSAGE_LOG_ENABLED
248   Logging* logger = Logging::GetInstance();
249   std::string name;
250   logger->GetMessageText(message.type(), &name, &message, NULL);
251   TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
252                "name", name);
253 #else
254   TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
255                "class", IPC_MESSAGE_ID_CLASS(message.type()),
256                "line", IPC_MESSAGE_ID_LINE(message.type()));
257 #endif
258 
259   if (!listener_)
260     return;
261 
262   OnDispatchConnected();
263 
264 #ifdef IPC_MESSAGE_LOG_ENABLED
265   if (message.type() == IPC_LOGGING_ID) {
266     logger->OnReceivedLoggingMessage(message);
267     return;
268   }
269 
270   if (logger->Enabled())
271     logger->OnPreDispatchMessage(message);
272 #endif
273 
274   listener_->OnMessageReceived(message);
275   if (message.dispatch_error())
276     listener_->OnBadMessageReceived(message);
277 
278 #ifdef IPC_MESSAGE_LOG_ENABLED
279   if (logger->Enabled())
280     logger->OnPostDispatchMessage(message, channel_id_);
281 #endif
282 }
283 
284 // Called on the listener's thread
OnDispatchConnected()285 void ChannelProxy::Context::OnDispatchConnected() {
286   if (channel_connected_called_)
287     return;
288 
289   channel_connected_called_ = true;
290   if (listener_)
291     listener_->OnChannelConnected(peer_pid_);
292 }
293 
294 // Called on the listener's thread
OnDispatchError()295 void ChannelProxy::Context::OnDispatchError() {
296   if (listener_)
297     listener_->OnChannelError();
298 }
299 
300 // Called on the listener's thread
OnDispatchBadMessage(const Message & message)301 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
302   if (listener_)
303     listener_->OnBadMessageReceived(message);
304 }
305 
306 //-----------------------------------------------------------------------------
307 
308 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner)309 scoped_ptr<ChannelProxy> ChannelProxy::Create(
310     const IPC::ChannelHandle& channel_handle,
311     Channel::Mode mode,
312     Listener* listener,
313     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
314   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
315   channel->Init(channel_handle, mode, true);
316   return channel.Pass();
317 }
318 
319 // static
Create(scoped_ptr<ChannelFactory> factory,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner)320 scoped_ptr<ChannelProxy> ChannelProxy::Create(
321     scoped_ptr<ChannelFactory> factory,
322     Listener* listener,
323     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
324   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
325   channel->Init(factory.Pass(), true);
326   return channel.Pass();
327 }
328 
ChannelProxy(Context * context)329 ChannelProxy::ChannelProxy(Context* context)
330     : context_(context),
331       did_init_(false) {
332 }
333 
ChannelProxy(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner)334 ChannelProxy::ChannelProxy(
335     Listener* listener,
336     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
337     : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
338 }
339 
~ChannelProxy()340 ChannelProxy::~ChannelProxy() {
341   DCHECK(CalledOnValidThread());
342 
343   Close();
344 }
345 
Init(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,bool create_pipe_now)346 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
347                         Channel::Mode mode,
348                         bool create_pipe_now) {
349 #if defined(OS_POSIX)
350   // When we are creating a server on POSIX, we need its file descriptor
351   // to be created immediately so that it can be accessed and passed
352   // to other processes. Forcing it to be created immediately avoids
353   // race conditions that may otherwise arise.
354   if (mode & Channel::MODE_SERVER_FLAG) {
355     create_pipe_now = true;
356   }
357 #endif  // defined(OS_POSIX)
358   Init(ChannelFactory::Create(channel_handle, mode),
359        create_pipe_now);
360 }
361 
Init(scoped_ptr<ChannelFactory> factory,bool create_pipe_now)362 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
363                         bool create_pipe_now) {
364   DCHECK(CalledOnValidThread());
365   DCHECK(!did_init_);
366 
367   if (create_pipe_now) {
368     // Create the channel immediately.  This effectively sets up the
369     // low-level pipe so that the client can connect.  Without creating
370     // the pipe immediately, it is possible for a listener to attempt
371     // to connect and get an error since the pipe doesn't exist yet.
372     context_->CreateChannel(factory.Pass());
373   } else {
374     context_->ipc_task_runner()->PostTask(
375         FROM_HERE, base::Bind(&Context::CreateChannel,
376                               context_.get(), Passed(factory.Pass())));
377   }
378 
379   // complete initialization on the background thread
380   context_->ipc_task_runner()->PostTask(
381       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
382 
383   did_init_ = true;
384 }
385 
Close()386 void ChannelProxy::Close() {
387   DCHECK(CalledOnValidThread());
388 
389   // Clear the backpointer to the listener so that any pending calls to
390   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
391   // possible that the channel could be closed while it is receiving messages!
392   context_->Clear();
393 
394   if (context_->ipc_task_runner()) {
395     context_->ipc_task_runner()->PostTask(
396         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
397   }
398 }
399 
Send(Message * message)400 bool ChannelProxy::Send(Message* message) {
401   DCHECK(did_init_);
402 
403   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
404   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
405 
406 #ifdef IPC_MESSAGE_LOG_ENABLED
407   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
408 #endif
409 
410   context_->ipc_task_runner()->PostTask(
411       FROM_HERE,
412       base::Bind(&ChannelProxy::Context::OnSendMessage,
413                  context_, base::Passed(scoped_ptr<Message>(message))));
414   return true;
415 }
416 
AddFilter(MessageFilter * filter)417 void ChannelProxy::AddFilter(MessageFilter* filter) {
418   DCHECK(CalledOnValidThread());
419 
420   context_->AddFilter(filter);
421 }
422 
RemoveFilter(MessageFilter * filter)423 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
424   DCHECK(CalledOnValidThread());
425 
426   context_->ipc_task_runner()->PostTask(
427       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
428                             make_scoped_refptr(filter)));
429 }
430 
ClearIPCTaskRunner()431 void ChannelProxy::ClearIPCTaskRunner() {
432   DCHECK(CalledOnValidThread());
433 
434   context()->ClearIPCTaskRunner();
435 }
436 
437 #if defined(OS_POSIX) && !defined(OS_NACL)
438 // See the TODO regarding lazy initialization of the channel in
439 // ChannelProxy::Init().
GetClientFileDescriptor()440 int ChannelProxy::GetClientFileDescriptor() {
441   DCHECK(CalledOnValidThread());
442 
443   Channel* channel = context_.get()->channel_.get();
444   // Channel must have been created first.
445   DCHECK(channel) << context_.get()->channel_id_;
446   return channel->GetClientFileDescriptor();
447 }
448 
TakeClientFileDescriptor()449 int ChannelProxy::TakeClientFileDescriptor() {
450   DCHECK(CalledOnValidThread());
451 
452   Channel* channel = context_.get()->channel_.get();
453   // Channel must have been created first.
454   DCHECK(channel) << context_.get()->channel_id_;
455   return channel->TakeClientFileDescriptor();
456 }
457 #endif
458 
459 //-----------------------------------------------------------------------------
460 
461 }  // namespace IPC
462