1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_channel_proxy.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "build/build_config.h"
20 #include "ipc/ipc_channel_factory.h"
21 #include "ipc/ipc_listener.h"
22 #include "ipc/ipc_logging.h"
23 #include "ipc/ipc_message_macros.h"
24 #include "ipc/message_filter.h"
25 #include "ipc/message_filter_router.h"
26
27 namespace IPC {
28
29 //------------------------------------------------------------------------------
30
Context(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)31 ChannelProxy::Context::Context(
32 Listener* listener,
33 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
34 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
35 : listener_task_runner_(listener_task_runner),
36 listener_(listener),
37 ipc_task_runner_(ipc_task_runner),
38 channel_connected_called_(false),
39 message_filter_router_(new MessageFilterRouter()),
40 peer_pid_(base::kNullProcessId) {
41 DCHECK(ipc_task_runner_.get());
42 // The Listener thread where Messages are handled must be a separate thread
43 // to avoid oversubscribing the IO thread. If you trigger this error, you
44 // need to either:
45 // 1) Create the ChannelProxy on a different thread, or
46 // 2) Just use Channel
47 // Note, we currently make an exception for a NULL listener. That usage
48 // basically works, but is outside the intent of ChannelProxy. This support
49 // will disappear, so please don't rely on it. See crbug.com/364241
50 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
51 }
52
53 ChannelProxy::Context::~Context() = default;
54
ClearIPCTaskRunner()55 void ChannelProxy::Context::ClearIPCTaskRunner() {
56 ipc_task_runner_ = NULL;
57 }
58
CreateChannel(std::unique_ptr<ChannelFactory> factory)59 void ChannelProxy::Context::CreateChannel(
60 std::unique_ptr<ChannelFactory> factory) {
61 base::AutoLock l(channel_lifetime_lock_);
62 DCHECK(!channel_);
63 DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_);
64 channel_ = factory->BuildChannel(this);
65
66 Channel::AssociatedInterfaceSupport* support =
67 channel_->GetAssociatedInterfaceSupport();
68 if (support) {
69 thread_safe_channel_ = support->CreateThreadSafeChannel();
70
71 base::AutoLock l(pending_filters_lock_);
72 for (auto& entry : pending_io_thread_interfaces_)
73 support->AddGenericAssociatedInterface(entry.first, entry.second);
74 pending_io_thread_interfaces_.clear();
75 }
76 }
77
TryFilters(const Message & message)78 bool ChannelProxy::Context::TryFilters(const Message& message) {
79 DCHECK(message_filter_router_);
80 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
81 Logging* logger = Logging::GetInstance();
82 if (logger->Enabled())
83 logger->OnPreDispatchMessage(message);
84 #endif
85
86 if (message_filter_router_->TryFilters(message)) {
87 if (message.dispatch_error()) {
88 listener_task_runner_->PostTask(
89 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
90 }
91 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
92 if (logger->Enabled())
93 logger->OnPostDispatchMessage(message);
94 #endif
95 return true;
96 }
97 return false;
98 }
99
100 // Called on the IPC::Channel thread
PauseChannel()101 void ChannelProxy::Context::PauseChannel() {
102 DCHECK(channel_);
103 channel_->Pause();
104 }
105
106 // Called on the IPC::Channel thread
UnpauseChannel(bool flush)107 void ChannelProxy::Context::UnpauseChannel(bool flush) {
108 DCHECK(channel_);
109 channel_->Unpause(flush);
110 }
111
112 // Called on the IPC::Channel thread
FlushChannel()113 void ChannelProxy::Context::FlushChannel() {
114 DCHECK(channel_);
115 channel_->Flush();
116 }
117
118 // Called on the IPC::Channel thread
OnMessageReceived(const Message & message)119 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
120 // First give a chance to the filters to process this message.
121 if (!TryFilters(message))
122 OnMessageReceivedNoFilter(message);
123 return true;
124 }
125
126 // Called on the IPC::Channel thread
OnMessageReceivedNoFilter(const Message & message)127 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
128 listener_task_runner_->PostTask(
129 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
130 return true;
131 }
132
133 // Called on the IPC::Channel thread
OnChannelConnected(int32_t peer_pid)134 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
135 // We cache off the peer_pid so it can be safely accessed from both threads.
136 {
137 base::AutoLock l(peer_pid_lock_);
138 peer_pid_ = peer_pid;
139 }
140
141 // Add any pending filters. This avoids a race condition where someone
142 // creates a ChannelProxy, calls AddFilter, and then right after starts the
143 // peer process. The IO thread could receive a message before the task to add
144 // the filter is run on the IO thread.
145 OnAddFilter();
146
147 // See above comment about using listener_task_runner_ here.
148 listener_task_runner_->PostTask(
149 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
150 }
151
152 // Called on the IPC::Channel thread
OnChannelError()153 void ChannelProxy::Context::OnChannelError() {
154 for (size_t i = 0; i < filters_.size(); ++i)
155 filters_[i]->OnChannelError();
156
157 // See above comment about using listener_task_runner_ here.
158 listener_task_runner_->PostTask(
159 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
160 }
161
162 // Called on the IPC::Channel thread
OnAssociatedInterfaceRequest(const std::string & interface_name,mojo::ScopedInterfaceEndpointHandle handle)163 void ChannelProxy::Context::OnAssociatedInterfaceRequest(
164 const std::string& interface_name,
165 mojo::ScopedInterfaceEndpointHandle handle) {
166 listener_task_runner_->PostTask(
167 FROM_HERE, base::Bind(&Context::OnDispatchAssociatedInterfaceRequest,
168 this, interface_name, base::Passed(&handle)));
169 }
170
171 // Called on the IPC::Channel thread
OnChannelOpened()172 void ChannelProxy::Context::OnChannelOpened() {
173 DCHECK(channel_ != NULL);
174
175 // Assume a reference to ourselves on behalf of this thread. This reference
176 // will be released when we are closed.
177 AddRef();
178
179 if (!channel_->Connect()) {
180 OnChannelError();
181 return;
182 }
183
184 for (size_t i = 0; i < filters_.size(); ++i)
185 filters_[i]->OnFilterAdded(channel_.get());
186 }
187
188 // Called on the IPC::Channel thread
OnChannelClosed()189 void ChannelProxy::Context::OnChannelClosed() {
190 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
191 // would result in this branch being taken.
192 if (!channel_)
193 return;
194
195 for (auto& filter : pending_filters_) {
196 filter->OnChannelClosing();
197 filter->OnFilterRemoved();
198 }
199 for (auto& filter : filters_) {
200 filter->OnChannelClosing();
201 filter->OnFilterRemoved();
202 }
203
204 // We don't need the filters anymore.
205 message_filter_router_->Clear();
206 filters_.clear();
207 // We don't need the lock, because at this point, the listener thread can't
208 // access it any more.
209 pending_filters_.clear();
210
211 ClearChannel();
212
213 // Balance with the reference taken during startup. This may result in
214 // self-destruction.
215 Release();
216 }
217
Clear()218 void ChannelProxy::Context::Clear() {
219 listener_ = NULL;
220 }
221
222 // Called on the IPC::Channel thread
OnSendMessage(std::unique_ptr<Message> message)223 void ChannelProxy::Context::OnSendMessage(std::unique_ptr<Message> message) {
224 if (!channel_) {
225 OnChannelClosed();
226 return;
227 }
228
229 if (!channel_->Send(message.release()))
230 OnChannelError();
231 }
232
233 // Called on the IPC::Channel thread
OnAddFilter()234 void ChannelProxy::Context::OnAddFilter() {
235 // Our OnChannelConnected method has not yet been called, so we can't be
236 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
237 // it invokes OnAddFilter, so any pending filter(s) will be added at that
238 // time.
239 // No lock necessary for |peer_pid_| because it is only modified on this
240 // thread.
241 if (peer_pid_ == base::kNullProcessId)
242 return;
243
244 std::vector<scoped_refptr<MessageFilter> > new_filters;
245 {
246 base::AutoLock auto_lock(pending_filters_lock_);
247 new_filters.swap(pending_filters_);
248 }
249
250 for (size_t i = 0; i < new_filters.size(); ++i) {
251 filters_.push_back(new_filters[i]);
252
253 message_filter_router_->AddFilter(new_filters[i].get());
254
255 // The channel has already been created and connected, so we need to
256 // inform the filters right now.
257 new_filters[i]->OnFilterAdded(channel_.get());
258 new_filters[i]->OnChannelConnected(peer_pid_);
259 }
260 }
261
262 // Called on the IPC::Channel thread
OnRemoveFilter(MessageFilter * filter)263 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
264 // No lock necessary for |peer_pid_| because it is only modified on this
265 // thread.
266 if (peer_pid_ == base::kNullProcessId) {
267 // The channel is not yet connected, so any filters are still pending.
268 base::AutoLock auto_lock(pending_filters_lock_);
269 for (size_t i = 0; i < pending_filters_.size(); ++i) {
270 if (pending_filters_[i].get() == filter) {
271 filter->OnFilterRemoved();
272 pending_filters_.erase(pending_filters_.begin() + i);
273 return;
274 }
275 }
276 return;
277 }
278 if (!channel_)
279 return; // The filters have already been deleted.
280
281 message_filter_router_->RemoveFilter(filter);
282
283 for (size_t i = 0; i < filters_.size(); ++i) {
284 if (filters_[i].get() == filter) {
285 filter->OnFilterRemoved();
286 filters_.erase(filters_.begin() + i);
287 return;
288 }
289 }
290
291 NOTREACHED() << "filter to be removed not found";
292 }
293
294 // Called on the listener's thread
AddFilter(MessageFilter * filter)295 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
296 base::AutoLock auto_lock(pending_filters_lock_);
297 pending_filters_.push_back(base::WrapRefCounted(filter));
298 ipc_task_runner_->PostTask(
299 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
300 }
301
302 // Called on the listener's thread
OnDispatchMessage(const Message & message)303 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
304 if (!listener_)
305 return;
306
307 OnDispatchConnected();
308
309 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
310 Logging* logger = Logging::GetInstance();
311 if (message.type() == IPC_LOGGING_ID) {
312 logger->OnReceivedLoggingMessage(message);
313 return;
314 }
315
316 if (logger->Enabled())
317 logger->OnPreDispatchMessage(message);
318 #endif
319
320 listener_->OnMessageReceived(message);
321 if (message.dispatch_error())
322 listener_->OnBadMessageReceived(message);
323
324 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
325 if (logger->Enabled())
326 logger->OnPostDispatchMessage(message);
327 #endif
328 }
329
330 // Called on the listener's thread
OnDispatchConnected()331 void ChannelProxy::Context::OnDispatchConnected() {
332 if (channel_connected_called_)
333 return;
334
335 base::ProcessId peer_pid;
336 {
337 base::AutoLock l(peer_pid_lock_);
338 peer_pid = peer_pid_;
339 }
340 channel_connected_called_ = true;
341 if (listener_)
342 listener_->OnChannelConnected(peer_pid);
343 }
344
345 // Called on the listener's thread
OnDispatchError()346 void ChannelProxy::Context::OnDispatchError() {
347 if (listener_)
348 listener_->OnChannelError();
349 }
350
351 // Called on the listener's thread
OnDispatchBadMessage(const Message & message)352 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
353 if (listener_)
354 listener_->OnBadMessageReceived(message);
355 }
356
357 // Called on the listener's thread
OnDispatchAssociatedInterfaceRequest(const std::string & interface_name,mojo::ScopedInterfaceEndpointHandle handle)358 void ChannelProxy::Context::OnDispatchAssociatedInterfaceRequest(
359 const std::string& interface_name,
360 mojo::ScopedInterfaceEndpointHandle handle) {
361 if (listener_)
362 listener_->OnAssociatedInterfaceRequest(interface_name, std::move(handle));
363 }
364
ClearChannel()365 void ChannelProxy::Context::ClearChannel() {
366 base::AutoLock l(channel_lifetime_lock_);
367 channel_.reset();
368 }
369
AddGenericAssociatedInterfaceForIOThread(const std::string & name,const GenericAssociatedInterfaceFactory & factory)370 void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread(
371 const std::string& name,
372 const GenericAssociatedInterfaceFactory& factory) {
373 base::AutoLock l(channel_lifetime_lock_);
374 if (!channel_) {
375 base::AutoLock l(pending_filters_lock_);
376 pending_io_thread_interfaces_.emplace_back(name, factory);
377 return;
378 }
379 Channel::AssociatedInterfaceSupport* support =
380 channel_->GetAssociatedInterfaceSupport();
381 if (support)
382 support->AddGenericAssociatedInterface(name, factory);
383 }
384
Send(Message * message)385 void ChannelProxy::Context::Send(Message* message) {
386 ipc_task_runner()->PostTask(
387 FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
388 base::Passed(base::WrapUnique(message))));
389 }
390
391 //-----------------------------------------------------------------------------
392
393 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)394 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
395 const IPC::ChannelHandle& channel_handle,
396 Channel::Mode mode,
397 Listener* listener,
398 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
399 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
400 std::unique_ptr<ChannelProxy> channel(
401 new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
402 channel->Init(channel_handle, mode, true);
403 return channel;
404 }
405
406 // static
Create(std::unique_ptr<ChannelFactory> factory,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)407 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
408 std::unique_ptr<ChannelFactory> factory,
409 Listener* listener,
410 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
411 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
412 std::unique_ptr<ChannelProxy> channel(
413 new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
414 channel->Init(std::move(factory), true);
415 return channel;
416 }
417
ChannelProxy(Context * context)418 ChannelProxy::ChannelProxy(Context* context)
419 : context_(context), did_init_(false) {
420 #if defined(ENABLE_IPC_FUZZER)
421 outgoing_message_filter_ = NULL;
422 #endif
423 }
424
ChannelProxy(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)425 ChannelProxy::ChannelProxy(
426 Listener* listener,
427 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
428 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
429 : context_(new Context(listener, ipc_task_runner, listener_task_runner)),
430 did_init_(false) {
431 #if defined(ENABLE_IPC_FUZZER)
432 outgoing_message_filter_ = NULL;
433 #endif
434 }
435
~ChannelProxy()436 ChannelProxy::~ChannelProxy() {
437 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
438
439 Close();
440 }
441
Init(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,bool create_pipe_now)442 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
443 Channel::Mode mode,
444 bool create_pipe_now) {
445 #if defined(OS_POSIX) || defined(OS_FUCHSIA)
446 // When we are creating a server on POSIX, we need its file descriptor
447 // to be created immediately so that it can be accessed and passed
448 // to other processes. Forcing it to be created immediately avoids
449 // race conditions that may otherwise arise.
450 if (mode & Channel::MODE_SERVER_FLAG) {
451 create_pipe_now = true;
452 }
453 #endif // defined(OS_POSIX) || defined(OS_FUCHSIA)
454 Init(
455 ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()),
456 create_pipe_now);
457 }
458
Init(std::unique_ptr<ChannelFactory> factory,bool create_pipe_now)459 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
460 bool create_pipe_now) {
461 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
462 DCHECK(!did_init_);
463
464 if (create_pipe_now) {
465 // Create the channel immediately. This effectively sets up the
466 // low-level pipe so that the client can connect. Without creating
467 // the pipe immediately, it is possible for a listener to attempt
468 // to connect and get an error since the pipe doesn't exist yet.
469 context_->CreateChannel(std::move(factory));
470 } else {
471 context_->ipc_task_runner()->PostTask(
472 FROM_HERE, base::Bind(&Context::CreateChannel, context_,
473 base::Passed(&factory)));
474 }
475
476 // complete initialization on the background thread
477 context_->ipc_task_runner()->PostTask(
478 FROM_HERE,
479 base::Bind(&Context::OnChannelOpened, context_));
480
481 did_init_ = true;
482 OnChannelInit();
483 }
484
Pause()485 void ChannelProxy::Pause() {
486 context_->ipc_task_runner()->PostTask(
487 FROM_HERE, base::Bind(&Context::PauseChannel, context_));
488 }
489
Unpause(bool flush)490 void ChannelProxy::Unpause(bool flush) {
491 context_->ipc_task_runner()->PostTask(
492 FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush));
493 }
494
Flush()495 void ChannelProxy::Flush() {
496 context_->ipc_task_runner()->PostTask(
497 FROM_HERE, base::Bind(&Context::FlushChannel, context_));
498 }
499
Close()500 void ChannelProxy::Close() {
501 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
502
503 // Clear the backpointer to the listener so that any pending calls to
504 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
505 // possible that the channel could be closed while it is receiving messages!
506 context_->Clear();
507
508 if (context_->ipc_task_runner()) {
509 context_->ipc_task_runner()->PostTask(
510 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_));
511 }
512 }
513
Send(Message * message)514 bool ChannelProxy::Send(Message* message) {
515 DCHECK(!message->is_sync()) << "Need to use IPC::SyncChannel";
516 SendInternal(message);
517 return true;
518 }
519
SendInternal(Message * message)520 void ChannelProxy::SendInternal(Message* message) {
521 DCHECK(did_init_);
522
523 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
524 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
525
526 #ifdef ENABLE_IPC_FUZZER
527 // In IPC fuzzing builds, it is possible to define a filter to apply to
528 // outgoing messages. It will either rewrite the message and return a new
529 // one, freeing the original, or return the message unchanged.
530 if (outgoing_message_filter())
531 message = outgoing_message_filter()->Rewrite(message);
532 #endif
533
534 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
535 Logging::GetInstance()->OnSendMessage(message);
536 #endif
537
538 // See https://crbug.com/766032. This is to ensure that senders of oversized
539 // messages can be caught more easily in the wild.
540 CHECK_LE(message->size(), Channel::kMaximumMessageSize);
541
542 context_->Send(message);
543 }
544
AddFilter(MessageFilter * filter)545 void ChannelProxy::AddFilter(MessageFilter* filter) {
546 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
547
548 context_->AddFilter(filter);
549 }
550
RemoveFilter(MessageFilter * filter)551 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
552 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
553
554 context_->ipc_task_runner()->PostTask(
555 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_,
556 base::RetainedRef(filter)));
557 }
558
AddGenericAssociatedInterfaceForIOThread(const std::string & name,const GenericAssociatedInterfaceFactory & factory)559 void ChannelProxy::AddGenericAssociatedInterfaceForIOThread(
560 const std::string& name,
561 const GenericAssociatedInterfaceFactory& factory) {
562 context()->AddGenericAssociatedInterfaceForIOThread(name, factory);
563 }
564
GetGenericRemoteAssociatedInterface(const std::string & name,mojo::ScopedInterfaceEndpointHandle handle)565 void ChannelProxy::GetGenericRemoteAssociatedInterface(
566 const std::string& name,
567 mojo::ScopedInterfaceEndpointHandle handle) {
568 DCHECK(did_init_);
569 context()->thread_safe_channel().GetAssociatedInterface(
570 name, mojom::GenericInterfaceAssociatedRequest(std::move(handle)));
571 }
572
ClearIPCTaskRunner()573 void ChannelProxy::ClearIPCTaskRunner() {
574 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
575 context()->ClearIPCTaskRunner();
576 }
577
OnChannelInit()578 void ChannelProxy::OnChannelInit() {
579 }
580
581 //-----------------------------------------------------------------------------
582
583 } // namespace IPC
584