1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/node_controller.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/histogram_macros.h"
16 #include "base/process/process_handle.h"
17 #include "base/rand_util.h"
18 #include "base/time/time.h"
19 #include "base/timer/elapsed_timer.h"
20 #include "mojo/edk/embedder/embedder_internal.h"
21 #include "mojo/edk/embedder/named_platform_channel_pair.h"
22 #include "mojo/edk/embedder/named_platform_handle.h"
23 #include "mojo/edk/embedder/platform_channel_pair.h"
24 #include "mojo/edk/system/broker.h"
25 #include "mojo/edk/system/broker_host.h"
26 #include "mojo/edk/system/core.h"
27 #include "mojo/edk/system/ports_message.h"
28 #include "mojo/edk/system/request_context.h"
29
30 #if defined(OS_MACOSX) && !defined(OS_IOS)
31 #include "mojo/edk/system/mach_port_relay.h"
32 #endif
33
34 #if !defined(OS_NACL)
35 #include "crypto/random.h"
36 #endif
37
38 namespace mojo {
39 namespace edk {
40
41 namespace {
42
43 #if defined(OS_NACL)
44 template <typename T>
GenerateRandomName(T * out)45 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
46 #else
47 template <typename T>
48 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
49 #endif
50
GetRandomNodeName()51 ports::NodeName GetRandomNodeName() {
52 ports::NodeName name;
53 GenerateRandomName(&name);
54 return name;
55 }
56
RecordPeerCount(size_t count)57 void RecordPeerCount(size_t count) {
58 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
59
60 // 8k is the maximum number of file descriptors allowed in Chrome.
61 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
62 static_cast<int32_t>(count),
63 1 /* min */,
64 8000 /* max */,
65 50 /* bucket count */);
66 }
67
RecordPendingChildCount(size_t count)68 void RecordPendingChildCount(size_t count) {
69 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
70
71 // 8k is the maximum number of file descriptors allowed in Chrome.
72 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
73 static_cast<int32_t>(count),
74 1 /* min */,
75 8000 /* max */,
76 50 /* bucket count */);
77 }
78
ParsePortsMessage(Channel::Message * message,void ** data,size_t * num_data_bytes,size_t * num_header_bytes,size_t * num_payload_bytes,size_t * num_ports_bytes)79 bool ParsePortsMessage(Channel::Message* message,
80 void** data,
81 size_t* num_data_bytes,
82 size_t* num_header_bytes,
83 size_t* num_payload_bytes,
84 size_t* num_ports_bytes) {
85 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
86 num_ports_bytes);
87
88 NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
89 if (!*num_data_bytes)
90 return false;
91
92 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
93 num_payload_bytes, num_ports_bytes)) {
94 return false;
95 }
96
97 return true;
98 }
99
100 // Used by NodeController to watch for shutdown. Since no IO can happen once
101 // the IO thread is killed, the NodeController can cleanly drop all its peers
102 // at that time.
103 class ThreadDestructionObserver :
104 public base::MessageLoop::DestructionObserver {
105 public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)106 static void Create(scoped_refptr<base::TaskRunner> task_runner,
107 const base::Closure& callback) {
108 if (task_runner->RunsTasksOnCurrentThread()) {
109 // Owns itself.
110 new ThreadDestructionObserver(callback);
111 } else {
112 task_runner->PostTask(FROM_HERE,
113 base::Bind(&Create, task_runner, callback));
114 }
115 }
116
117 private:
ThreadDestructionObserver(const base::Closure & callback)118 explicit ThreadDestructionObserver(const base::Closure& callback)
119 : callback_(callback) {
120 base::MessageLoop::current()->AddDestructionObserver(this);
121 }
122
~ThreadDestructionObserver()123 ~ThreadDestructionObserver() override {
124 base::MessageLoop::current()->RemoveDestructionObserver(this);
125 }
126
127 // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()128 void WillDestroyCurrentMessageLoop() override {
129 callback_.Run();
130 delete this;
131 }
132
133 const base::Closure callback_;
134
135 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
136 };
137
138 } // namespace
139
~NodeController()140 NodeController::~NodeController() {}
141
NodeController(Core * core)142 NodeController::NodeController(Core* core)
143 : core_(core),
144 name_(GetRandomNodeName()),
145 node_(new ports::Node(name_, this)) {
146 DVLOG(1) << "Initializing node " << name_;
147 }
148
149 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)150 void NodeController::CreateMachPortRelay(
151 base::PortProvider* port_provider) {
152 base::AutoLock lock(mach_port_relay_lock_);
153 DCHECK(!mach_port_relay_);
154 mach_port_relay_.reset(new MachPortRelay(port_provider));
155 }
156 #endif
157
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)158 void NodeController::SetIOTaskRunner(
159 scoped_refptr<base::TaskRunner> task_runner) {
160 io_task_runner_ = task_runner;
161 ThreadDestructionObserver::Create(
162 io_task_runner_,
163 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
164 }
165
ConnectToChild(base::ProcessHandle process_handle,ConnectionParams connection_params,const std::string & child_token,const ProcessErrorCallback & process_error_callback)166 void NodeController::ConnectToChild(
167 base::ProcessHandle process_handle,
168 ConnectionParams connection_params,
169 const std::string& child_token,
170 const ProcessErrorCallback& process_error_callback) {
171 // Generate the temporary remote node name here so that it can be associated
172 // with the embedder's child_token. If an error occurs in the child process
173 // after it is launched, but before any reserved ports are connected, this can
174 // be used to clean up any dangling ports.
175 ports::NodeName node_name;
176 GenerateRandomName(&node_name);
177
178 {
179 base::AutoLock lock(reserved_ports_lock_);
180 bool inserted = pending_child_tokens_.insert(
181 std::make_pair(node_name, child_token)).second;
182 DCHECK(inserted);
183 }
184
185 #if defined(OS_WIN)
186 // On Windows, we need to duplicate the process handle because we have no
187 // control over its lifetime and it may become invalid by the time the posted
188 // task runs.
189 HANDLE dup_handle = INVALID_HANDLE_VALUE;
190 BOOL ok = ::DuplicateHandle(
191 base::GetCurrentProcessHandle(), process_handle,
192 base::GetCurrentProcessHandle(), &dup_handle,
193 0, FALSE, DUPLICATE_SAME_ACCESS);
194 DPCHECK(ok);
195 process_handle = dup_handle;
196 #endif
197
198 io_task_runner_->PostTask(
199 FROM_HERE, base::Bind(&NodeController::ConnectToChildOnIOThread,
200 base::Unretained(this), process_handle,
201 base::Passed(&connection_params), node_name,
202 process_error_callback));
203 }
204
CloseChildPorts(const std::string & child_token)205 void NodeController::CloseChildPorts(const std::string& child_token) {
206 std::vector<ports::PortRef> ports_to_close;
207 {
208 std::vector<std::string> port_tokens;
209 base::AutoLock lock(reserved_ports_lock_);
210 for (const auto& port : reserved_ports_) {
211 if (port.second.child_token == child_token) {
212 DVLOG(1) << "Closing reserved port " << port.second.port.name();
213 ports_to_close.push_back(port.second.port);
214 port_tokens.push_back(port.first);
215 }
216 }
217
218 for (const auto& token : port_tokens)
219 reserved_ports_.erase(token);
220 }
221
222 for (const auto& port : ports_to_close)
223 node_->ClosePort(port);
224
225 // Ensure local port closure messages are processed.
226 AcceptIncomingMessages();
227 }
228
ClosePeerConnection(const std::string & peer_token)229 void NodeController::ClosePeerConnection(const std::string& peer_token) {
230 io_task_runner_->PostTask(
231 FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
232 base::Unretained(this), peer_token));
233 }
234
ConnectToParent(ConnectionParams connection_params)235 void NodeController::ConnectToParent(ConnectionParams connection_params) {
236 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
237 // Use the bootstrap channel for the broker and receive the node's channel
238 // synchronously as the first message from the broker.
239 base::ElapsedTimer timer;
240 broker_.reset(new Broker(connection_params.TakeChannelHandle()));
241 ScopedPlatformHandle platform_handle = broker_->GetParentPlatformHandle();
242 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
243 timer.Elapsed());
244
245 if (!platform_handle.is_valid()) {
246 // Most likely the browser side of the channel has already been closed and
247 // the broker was unable to negotiate a NodeChannel pipe. In this case we
248 // can cancel parent connection.
249 DVLOG(1) << "Cannot connect to invalid parent channel.";
250 CancelPendingPortMerges();
251 return;
252 }
253 connection_params = ConnectionParams(std::move(platform_handle));
254 #endif
255
256 io_task_runner_->PostTask(
257 FROM_HERE,
258 base::Bind(&NodeController::ConnectToParentOnIOThread,
259 base::Unretained(this), base::Passed(&connection_params)));
260 }
261
ConnectToPeer(ConnectionParams connection_params,const ports::PortRef & port,const std::string & peer_token)262 void NodeController::ConnectToPeer(ConnectionParams connection_params,
263 const ports::PortRef& port,
264 const std::string& peer_token) {
265 ports::NodeName node_name;
266 GenerateRandomName(&node_name);
267 io_task_runner_->PostTask(
268 FROM_HERE,
269 base::Bind(&NodeController::ConnectToPeerOnIOThread,
270 base::Unretained(this), base::Passed(&connection_params),
271 node_name, port, peer_token));
272 }
273
SetPortObserver(const ports::PortRef & port,scoped_refptr<PortObserver> observer)274 void NodeController::SetPortObserver(const ports::PortRef& port,
275 scoped_refptr<PortObserver> observer) {
276 node_->SetUserData(port, std::move(observer));
277 }
278
ClosePort(const ports::PortRef & port)279 void NodeController::ClosePort(const ports::PortRef& port) {
280 SetPortObserver(port, nullptr);
281 int rv = node_->ClosePort(port);
282 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
283
284 AcceptIncomingMessages();
285 }
286
SendMessage(const ports::PortRef & port,std::unique_ptr<PortsMessage> message)287 int NodeController::SendMessage(const ports::PortRef& port,
288 std::unique_ptr<PortsMessage> message) {
289 ports::ScopedMessage ports_message(message.release());
290 int rv = node_->SendMessage(port, std::move(ports_message));
291
292 AcceptIncomingMessages();
293 return rv;
294 }
295
ReservePort(const std::string & token,const ports::PortRef & port,const std::string & child_token)296 void NodeController::ReservePort(const std::string& token,
297 const ports::PortRef& port,
298 const std::string& child_token) {
299 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
300 << token;
301
302 base::AutoLock lock(reserved_ports_lock_);
303 auto result = reserved_ports_.insert(
304 std::make_pair(token, ReservedPort{port, child_token}));
305 DCHECK(result.second);
306 }
307
MergePortIntoParent(const std::string & token,const ports::PortRef & port)308 void NodeController::MergePortIntoParent(const std::string& token,
309 const ports::PortRef& port) {
310 bool was_merged = false;
311 {
312 // This request may be coming from within the process that reserved the
313 // "parent" side (e.g. for Chrome single-process mode), so if this token is
314 // reserved locally, merge locally instead.
315 base::AutoLock lock(reserved_ports_lock_);
316 auto it = reserved_ports_.find(token);
317 if (it != reserved_ports_.end()) {
318 node_->MergePorts(port, name_, it->second.port.name());
319 reserved_ports_.erase(it);
320 was_merged = true;
321 }
322 }
323 if (was_merged) {
324 AcceptIncomingMessages();
325 return;
326 }
327
328 scoped_refptr<NodeChannel> parent;
329 bool reject_merge = false;
330 {
331 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
332 // there is a race where the parent can be set, and |pending_port_merges_|
333 // be processed between retrieving |parent| and adding the merge to
334 // |pending_port_merges_|.
335 base::AutoLock lock(pending_port_merges_lock_);
336 parent = GetParentChannel();
337 if (reject_pending_merges_) {
338 reject_merge = true;
339 } else if (!parent) {
340 pending_port_merges_.push_back(std::make_pair(token, port));
341 return;
342 }
343 }
344 if (reject_merge) {
345 node_->ClosePort(port);
346 DVLOG(2) << "Rejecting port merge for token " << token
347 << " due to closed parent channel.";
348 AcceptIncomingMessages();
349 return;
350 }
351
352 parent->RequestPortMerge(port.name(), token);
353 }
354
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)355 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
356 const ports::PortRef& port1) {
357 int rv = node_->MergeLocalPorts(port0, port1);
358 AcceptIncomingMessages();
359 return rv;
360 }
361
CreateSharedBuffer(size_t num_bytes)362 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
363 size_t num_bytes) {
364 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
365 // Shared buffer creation failure is fatal, so always use the broker when we
366 // have one. This does mean that a non-root process that has children will use
367 // the broker for shared buffer creation even though that process is
368 // privileged.
369 if (broker_) {
370 return broker_->GetSharedBuffer(num_bytes);
371 }
372 #endif
373 return PlatformSharedBuffer::Create(num_bytes);
374 }
375
RequestShutdown(const base::Closure & callback)376 void NodeController::RequestShutdown(const base::Closure& callback) {
377 {
378 base::AutoLock lock(shutdown_lock_);
379 shutdown_callback_ = callback;
380 shutdown_callback_flag_.Set(true);
381 }
382
383 AttemptShutdownIfRequested();
384 }
385
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)386 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
387 const std::string& error) {
388 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
389 if (peer)
390 peer->NotifyBadMessage(error);
391 }
392
ConnectToChildOnIOThread(base::ProcessHandle process_handle,ConnectionParams connection_params,ports::NodeName token,const ProcessErrorCallback & process_error_callback)393 void NodeController::ConnectToChildOnIOThread(
394 base::ProcessHandle process_handle,
395 ConnectionParams connection_params,
396 ports::NodeName token,
397 const ProcessErrorCallback& process_error_callback) {
398 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
399
400 #if !defined(OS_MACOSX) && !defined(OS_NACL)
401 PlatformChannelPair node_channel;
402 ScopedPlatformHandle server_handle = node_channel.PassServerHandle();
403 // BrokerHost owns itself.
404 BrokerHost* broker_host =
405 new BrokerHost(process_handle, connection_params.TakeChannelHandle());
406 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle());
407
408 #if defined(OS_WIN)
409 if (!channel_ok) {
410 // On Windows the above operation may fail if the channel is crossing a
411 // session boundary. In that case we fall back to a named pipe.
412 NamedPlatformChannelPair named_channel;
413 server_handle = named_channel.PassServerHandle();
414 broker_host->SendNamedChannel(named_channel.handle().name);
415 }
416 #else
417 CHECK(channel_ok);
418 #endif // defined(OS_WIN)
419
420 scoped_refptr<NodeChannel> channel =
421 NodeChannel::Create(this, ConnectionParams(std::move(server_handle)),
422 io_task_runner_, process_error_callback);
423
424 #else // !defined(OS_MACOSX) && !defined(OS_NACL)
425 scoped_refptr<NodeChannel> channel =
426 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
427 process_error_callback);
428 #endif // !defined(OS_MACOSX) && !defined(OS_NACL)
429
430 // We set up the child channel with a temporary name so it can be identified
431 // as a pending child if it writes any messages to the channel. We may start
432 // receiving messages from it (though we shouldn't) as soon as Start() is
433 // called below.
434
435 pending_children_.insert(std::make_pair(token, channel));
436 RecordPendingChildCount(pending_children_.size());
437
438 channel->SetRemoteNodeName(token);
439 channel->SetRemoteProcessHandle(process_handle);
440 channel->Start();
441
442 channel->AcceptChild(name_, token);
443 }
444
ConnectToParentOnIOThread(ConnectionParams connection_params)445 void NodeController::ConnectToParentOnIOThread(
446 ConnectionParams connection_params) {
447 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
448
449 {
450 base::AutoLock lock(parent_lock_);
451 DCHECK(parent_name_ == ports::kInvalidNodeName);
452
453 // At this point we don't know the parent's name, so we can't yet insert it
454 // into our |peers_| map. That will happen as soon as we receive an
455 // AcceptChild message from them.
456 bootstrap_parent_channel_ =
457 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
458 ProcessErrorCallback());
459 // Prevent the parent pipe handle from being closed on shutdown. Pipe
460 // closure is used by the parent to detect the child process has exited.
461 // Relying on message pipes to be closed is not enough because the parent
462 // may see the message pipe closure before the child is dead, causing the
463 // child process to be unexpectedly SIGKILL'd.
464 bootstrap_parent_channel_->LeakHandleOnShutdown();
465 }
466 bootstrap_parent_channel_->Start();
467 }
468
ConnectToPeerOnIOThread(ConnectionParams connection_params,ports::NodeName token,ports::PortRef port,const std::string & peer_token)469 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params,
470 ports::NodeName token,
471 ports::PortRef port,
472 const std::string& peer_token) {
473 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
474
475 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
476 this, std::move(connection_params), io_task_runner_, {});
477 peer_connections_.insert(
478 {token, PeerConnection{channel, port, peer_token}});
479 peers_by_token_.insert({peer_token, token});
480
481 channel->SetRemoteNodeName(token);
482 channel->Start();
483
484 channel->AcceptPeer(name_, token, port.name());
485 }
486
ClosePeerConnectionOnIOThread(const std::string & peer_token)487 void NodeController::ClosePeerConnectionOnIOThread(
488 const std::string& peer_token) {
489 RequestContext request_context(RequestContext::Source::SYSTEM);
490 auto peer = peers_by_token_.find(peer_token);
491 // The connection may already be closed.
492 if (peer == peers_by_token_.end())
493 return;
494
495 // |peer| may be removed so make a copy of |name|.
496 ports::NodeName name = peer->second;
497 DropPeer(name, nullptr);
498 }
499
GetPeerChannel(const ports::NodeName & name)500 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
501 const ports::NodeName& name) {
502 base::AutoLock lock(peers_lock_);
503 auto it = peers_.find(name);
504 if (it == peers_.end())
505 return nullptr;
506 return it->second;
507 }
508
GetParentChannel()509 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
510 ports::NodeName parent_name;
511 {
512 base::AutoLock lock(parent_lock_);
513 parent_name = parent_name_;
514 }
515 return GetPeerChannel(parent_name);
516 }
517
GetBrokerChannel()518 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
519 ports::NodeName broker_name;
520 {
521 base::AutoLock lock(broker_lock_);
522 broker_name = broker_name_;
523 }
524 return GetPeerChannel(broker_name);
525 }
526
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)527 void NodeController::AddPeer(const ports::NodeName& name,
528 scoped_refptr<NodeChannel> channel,
529 bool start_channel) {
530 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
531
532 DCHECK(name != ports::kInvalidNodeName);
533 DCHECK(channel);
534
535 channel->SetRemoteNodeName(name);
536
537 OutgoingMessageQueue pending_messages;
538 {
539 base::AutoLock lock(peers_lock_);
540 if (peers_.find(name) != peers_.end()) {
541 // This can happen normally if two nodes race to be introduced to each
542 // other. The losing pipe will be silently closed and introduction should
543 // not be affected.
544 DVLOG(1) << "Ignoring duplicate peer name " << name;
545 return;
546 }
547
548 auto result = peers_.insert(std::make_pair(name, channel));
549 DCHECK(result.second);
550
551 DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
552
553 RecordPeerCount(peers_.size());
554
555 auto it = pending_peer_messages_.find(name);
556 if (it != pending_peer_messages_.end()) {
557 std::swap(pending_messages, it->second);
558 pending_peer_messages_.erase(it);
559 }
560 }
561
562 if (start_channel)
563 channel->Start();
564
565 // Flush any queued message we need to deliver to this node.
566 while (!pending_messages.empty()) {
567 channel->PortsMessage(std::move(pending_messages.front()));
568 pending_messages.pop();
569 }
570 }
571
DropPeer(const ports::NodeName & name,NodeChannel * channel)572 void NodeController::DropPeer(const ports::NodeName& name,
573 NodeChannel* channel) {
574 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
575
576 {
577 base::AutoLock lock(peers_lock_);
578 auto it = peers_.find(name);
579
580 if (it != peers_.end()) {
581 ports::NodeName peer = it->first;
582 peers_.erase(it);
583 DVLOG(1) << "Dropped peer " << peer;
584 }
585
586 pending_peer_messages_.erase(name);
587 pending_children_.erase(name);
588
589 RecordPeerCount(peers_.size());
590 RecordPendingChildCount(pending_children_.size());
591 }
592
593 std::vector<ports::PortRef> ports_to_close;
594 {
595 // Clean up any reserved ports.
596 base::AutoLock lock(reserved_ports_lock_);
597 auto it = pending_child_tokens_.find(name);
598 if (it != pending_child_tokens_.end()) {
599 const std::string& child_token = it->second;
600
601 std::vector<std::string> port_tokens;
602 for (const auto& port : reserved_ports_) {
603 if (port.second.child_token == child_token) {
604 DVLOG(1) << "Closing reserved port: " << port.second.port.name();
605 ports_to_close.push_back(port.second.port);
606 port_tokens.push_back(port.first);
607 }
608 }
609
610 // We have to erase reserved ports in a two-step manner because the usual
611 // manner of using the returned iterator from map::erase isn't technically
612 // valid in C++11 (although it is in C++14).
613 for (const auto& token : port_tokens)
614 reserved_ports_.erase(token);
615
616 pending_child_tokens_.erase(it);
617 }
618 }
619
620 bool is_parent;
621 {
622 base::AutoLock lock(parent_lock_);
623 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
624 }
625
626 // If the error comes from the parent channel, we also need to cancel any
627 // port merge requests, so that errors can be propagated to the message
628 // pipes.
629 if (is_parent)
630 CancelPendingPortMerges();
631
632 auto peer = peer_connections_.find(name);
633 if (peer != peer_connections_.end()) {
634 peers_by_token_.erase(peer->second.peer_token);
635 ports_to_close.push_back(peer->second.local_port);
636 peer_connections_.erase(peer);
637 }
638
639 for (const auto& port : ports_to_close)
640 node_->ClosePort(port);
641
642 node_->LostConnectionToNode(name);
643
644 AcceptIncomingMessages();
645 }
646
SendPeerMessage(const ports::NodeName & name,ports::ScopedMessage message)647 void NodeController::SendPeerMessage(const ports::NodeName& name,
648 ports::ScopedMessage message) {
649 Channel::MessagePtr channel_message =
650 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
651
652 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
653 #if defined(OS_WIN)
654 if (channel_message->has_handles()) {
655 // If we're sending a message with handles we aren't the destination
656 // node's parent or broker (i.e. we don't know its process handle), ask
657 // the broker to relay for us.
658 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
659 if (!peer || !peer->HasRemoteProcessHandle()) {
660 if (broker) {
661 broker->RelayPortsMessage(name, std::move(channel_message));
662 } else {
663 base::AutoLock lock(broker_lock_);
664 pending_relay_messages_[name].emplace(std::move(channel_message));
665 }
666 return;
667 }
668 }
669 #elif defined(OS_MACOSX) && !defined(OS_IOS)
670 if (channel_message->has_mach_ports()) {
671 // Messages containing Mach ports are always routed through the broker, even
672 // if the broker process is the intended recipient.
673 bool use_broker = false;
674 {
675 base::AutoLock lock(parent_lock_);
676 use_broker = (bootstrap_parent_channel_ ||
677 parent_name_ != ports::kInvalidNodeName);
678 }
679 if (use_broker) {
680 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
681 if (broker) {
682 broker->RelayPortsMessage(name, std::move(channel_message));
683 } else {
684 base::AutoLock lock(broker_lock_);
685 pending_relay_messages_[name].emplace(std::move(channel_message));
686 }
687 return;
688 }
689 }
690 #endif // defined(OS_WIN)
691
692 if (peer) {
693 peer->PortsMessage(std::move(channel_message));
694 return;
695 }
696
697 // If we don't know who the peer is, queue the message for delivery. If this
698 // is the first message queued for the peer, we also ask the broker to
699 // introduce us to them.
700
701 bool needs_introduction = false;
702 {
703 base::AutoLock lock(peers_lock_);
704 auto& queue = pending_peer_messages_[name];
705 needs_introduction = queue.empty();
706 queue.emplace(std::move(channel_message));
707 }
708
709 if (needs_introduction) {
710 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
711 if (!broker) {
712 DVLOG(1) << "Dropping message for unknown peer: " << name;
713 return;
714 }
715 broker->RequestIntroduction(name);
716 }
717 }
718
AcceptIncomingMessages()719 void NodeController::AcceptIncomingMessages() {
720 // This is an impactically large value which should never be reached in
721 // practice. See the CHECK below for usage.
722 constexpr size_t kMaxAcceptedMessages = 1000000;
723
724 size_t num_messages_accepted = 0;
725 while (incoming_messages_flag_) {
726 // TODO: We may need to be more careful to avoid starving the rest of the
727 // thread here. Revisit this if it turns out to be a problem. One
728 // alternative would be to schedule a task to continue pumping messages
729 // after flushing once.
730
731 messages_lock_.Acquire();
732 if (incoming_messages_.empty()) {
733 messages_lock_.Release();
734 break;
735 }
736
737 // libstdc++'s deque creates an internal buffer on construction, even when
738 // the size is 0. So avoid creating it until it is necessary.
739 std::queue<ports::ScopedMessage> messages;
740 std::swap(messages, incoming_messages_);
741 incoming_messages_flag_.Set(false);
742 messages_lock_.Release();
743
744 num_messages_accepted += messages.size();
745 while (!messages.empty()) {
746 node_->AcceptMessage(std::move(messages.front()));
747 messages.pop();
748 }
749
750 // This is effectively a safeguard against potential bugs which might lead
751 // to runaway message cycles. If any such cycles arise, we'll start seeing
752 // crash reports from this location.
753 CHECK_LE(num_messages_accepted, kMaxAcceptedMessages);
754 }
755
756 if (num_messages_accepted >= 4) {
757 // Note: We avoid logging this histogram for the vast majority of cases.
758 // See https://crbug.com/685763 for more context.
759 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.MessagesAcceptedPerEvent",
760 static_cast<int32_t>(num_messages_accepted),
761 1 /* min */,
762 500 /* max */,
763 50 /* bucket count */);
764 }
765
766 AttemptShutdownIfRequested();
767 }
768
ProcessIncomingMessages()769 void NodeController::ProcessIncomingMessages() {
770 RequestContext request_context(RequestContext::Source::SYSTEM);
771
772 {
773 base::AutoLock lock(messages_lock_);
774 // Allow a new incoming messages processing task to be posted. This can't be
775 // done after AcceptIncomingMessages() otherwise a message might be missed.
776 // Doing it here may result in at most two tasks existing at the same time;
777 // this running one, and one pending in the task runner.
778 incoming_messages_task_posted_ = false;
779 }
780
781 AcceptIncomingMessages();
782 }
783
DropAllPeers()784 void NodeController::DropAllPeers() {
785 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
786
787 std::vector<scoped_refptr<NodeChannel>> all_peers;
788 {
789 base::AutoLock lock(parent_lock_);
790 if (bootstrap_parent_channel_) {
791 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
792 // existence to determine whether or not this is the root node. Once
793 // bootstrap_parent_channel_->ShutDown() has been called,
794 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
795 // matter if it's deleted now or when |this| is deleted.
796 // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
797 all_peers.push_back(bootstrap_parent_channel_);
798 }
799 }
800
801 {
802 base::AutoLock lock(peers_lock_);
803 for (const auto& peer : peers_)
804 all_peers.push_back(peer.second);
805 for (const auto& peer : pending_children_)
806 all_peers.push_back(peer.second);
807 peers_.clear();
808 pending_children_.clear();
809 pending_peer_messages_.clear();
810 peer_connections_.clear();
811 }
812
813 for (const auto& peer : all_peers)
814 peer->ShutDown();
815
816 if (destroy_on_io_thread_shutdown_)
817 delete this;
818 }
819
GenerateRandomPortName(ports::PortName * port_name)820 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
821 GenerateRandomName(port_name);
822 }
823
AllocMessage(size_t num_header_bytes,ports::ScopedMessage * message)824 void NodeController::AllocMessage(size_t num_header_bytes,
825 ports::ScopedMessage* message) {
826 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
827 }
828
ForwardMessage(const ports::NodeName & node,ports::ScopedMessage message)829 void NodeController::ForwardMessage(const ports::NodeName& node,
830 ports::ScopedMessage message) {
831 DCHECK(message);
832 bool schedule_pump_task = false;
833 if (node == name_) {
834 // NOTE: We need to avoid re-entering the Node instance within
835 // ForwardMessage. Because ForwardMessage is only ever called
836 // (synchronously) in response to Node's ClosePort, SendMessage, or
837 // AcceptMessage, we flush the queue after calling any of those methods.
838 base::AutoLock lock(messages_lock_);
839 // |io_task_runner_| may be null in tests or processes that don't require
840 // multi-process Mojo.
841 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
842 !incoming_messages_task_posted_;
843 incoming_messages_task_posted_ |= schedule_pump_task;
844 incoming_messages_.emplace(std::move(message));
845 incoming_messages_flag_.Set(true);
846 } else {
847 SendPeerMessage(node, std::move(message));
848 }
849
850 if (schedule_pump_task) {
851 // Normally, the queue is processed after the action that added the local
852 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
853 // possible for a local message to be added as a result of a remote message,
854 // and OnChannelMessage() doesn't process this queue (although
855 // OnPortsMessage() does). There may also be other code paths, now or added
856 // in the future, which cause local messages to be added but don't process
857 // this message queue.
858 //
859 // Instead of adding a call to AcceptIncomingMessages() on every possible
860 // code path, post a task to the IO thread to process the queue. If the
861 // current call stack processes the queue, this may end up doing nothing.
862 io_task_runner_->PostTask(
863 FROM_HERE,
864 base::Bind(&NodeController::ProcessIncomingMessages,
865 base::Unretained(this)));
866 }
867 }
868
BroadcastMessage(ports::ScopedMessage message)869 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
870 CHECK_EQ(message->num_ports(), 0u);
871 Channel::MessagePtr channel_message =
872 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
873 CHECK(!channel_message->has_handles());
874
875 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
876 if (broker)
877 broker->Broadcast(std::move(channel_message));
878 else
879 OnBroadcast(name_, std::move(channel_message));
880 }
881
PortStatusChanged(const ports::PortRef & port)882 void NodeController::PortStatusChanged(const ports::PortRef& port) {
883 scoped_refptr<ports::UserData> user_data;
884 node_->GetUserData(port, &user_data);
885
886 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
887 if (observer) {
888 observer->OnPortStatusChanged();
889 } else {
890 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
891 << "doesn't have an observer.";
892 }
893 }
894
OnAcceptChild(const ports::NodeName & from_node,const ports::NodeName & parent_name,const ports::NodeName & token)895 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
896 const ports::NodeName& parent_name,
897 const ports::NodeName& token) {
898 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
899
900 scoped_refptr<NodeChannel> parent;
901 {
902 base::AutoLock lock(parent_lock_);
903 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
904 parent_name_ = parent_name;
905 parent = bootstrap_parent_channel_;
906 }
907 }
908
909 if (!parent) {
910 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
911 DropPeer(from_node, nullptr);
912 return;
913 }
914
915 parent->SetRemoteNodeName(parent_name);
916 parent->AcceptParent(token, name_);
917
918 // NOTE: The child does not actually add its parent as a peer until
919 // receiving an AcceptBrokerClient message from the broker. The parent
920 // will request that said message be sent upon receiving AcceptParent.
921
922 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
923 }
924
OnAcceptParent(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & child_name)925 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
926 const ports::NodeName& token,
927 const ports::NodeName& child_name) {
928 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
929
930 auto it = pending_children_.find(from_node);
931 if (it == pending_children_.end() || token != from_node) {
932 DLOG(ERROR) << "Received unexpected AcceptParent message from "
933 << from_node;
934 DropPeer(from_node, nullptr);
935 return;
936 }
937
938 scoped_refptr<NodeChannel> channel = it->second;
939 pending_children_.erase(it);
940
941 DCHECK(channel);
942
943 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
944
945 AddPeer(child_name, channel, false /* start_channel */);
946
947 // TODO(rockot/amistry): We could simplify child initialization if we could
948 // synchronously get a new async broker channel from the broker. For now we do
949 // it asynchronously since it's only used to facilitate handle passing, not
950 // handle creation.
951 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
952 if (broker) {
953 // Inform the broker of this new child.
954 broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
955 } else {
956 // If we have no broker, either we need to wait for one, or we *are* the
957 // broker.
958 scoped_refptr<NodeChannel> parent = GetParentChannel();
959 if (!parent) {
960 base::AutoLock lock(parent_lock_);
961 parent = bootstrap_parent_channel_;
962 }
963
964 if (!parent) {
965 // Yes, we're the broker. We can initialize the child directly.
966 channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
967 } else {
968 // We aren't the broker, so wait for a broker connection.
969 base::AutoLock lock(broker_lock_);
970 pending_broker_clients_.push(child_name);
971 }
972 }
973 }
974
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)975 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
976 const ports::NodeName& client_name,
977 base::ProcessHandle process_handle) {
978 #if defined(OS_WIN)
979 // Scoped handle to avoid leaks on error.
980 ScopedPlatformHandle scoped_process_handle =
981 ScopedPlatformHandle(PlatformHandle(process_handle));
982 #endif
983 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
984 if (!sender) {
985 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
986 return;
987 }
988
989 if (GetPeerChannel(client_name)) {
990 DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
991 DropPeer(from_node, nullptr);
992 return;
993 }
994
995 PlatformChannelPair broker_channel;
996 ConnectionParams connection_params(broker_channel.PassServerHandle());
997 scoped_refptr<NodeChannel> client =
998 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
999 ProcessErrorCallback());
1000
1001 #if defined(OS_WIN)
1002 // The broker must have a working handle to the client process in order to
1003 // properly copy other handles to and from the client.
1004 if (!scoped_process_handle.is_valid()) {
1005 DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
1006 return;
1007 }
1008 client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
1009 #else
1010 client->SetRemoteProcessHandle(process_handle);
1011 #endif
1012
1013 AddPeer(client_name, client, true /* start_channel */);
1014
1015 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
1016 << " from peer " << from_node;
1017
1018 sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
1019 }
1020
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,ScopedPlatformHandle broker_channel)1021 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
1022 const ports::NodeName& client_name,
1023 ScopedPlatformHandle broker_channel) {
1024 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
1025 if (!client) {
1026 DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
1027 return;
1028 }
1029
1030 // This should have come from our own broker.
1031 if (GetBrokerChannel() != GetPeerChannel(from_node)) {
1032 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
1033 return;
1034 }
1035
1036 DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
1037
1038 client->AcceptBrokerClient(from_node, std::move(broker_channel));
1039 }
1040
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,ScopedPlatformHandle broker_channel)1041 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
1042 const ports::NodeName& broker_name,
1043 ScopedPlatformHandle broker_channel) {
1044 // This node should already have a parent in bootstrap mode.
1045 ports::NodeName parent_name;
1046 scoped_refptr<NodeChannel> parent;
1047 {
1048 base::AutoLock lock(parent_lock_);
1049 parent_name = parent_name_;
1050 parent = bootstrap_parent_channel_;
1051 bootstrap_parent_channel_ = nullptr;
1052 }
1053 DCHECK(parent_name == from_node);
1054 DCHECK(parent);
1055
1056 std::queue<ports::NodeName> pending_broker_clients;
1057 std::unordered_map<ports::NodeName, OutgoingMessageQueue>
1058 pending_relay_messages;
1059 {
1060 base::AutoLock lock(broker_lock_);
1061 broker_name_ = broker_name;
1062 std::swap(pending_broker_clients, pending_broker_clients_);
1063 std::swap(pending_relay_messages, pending_relay_messages_);
1064 }
1065 DCHECK(broker_name != ports::kInvalidNodeName);
1066
1067 // It's now possible to add both the broker and the parent as peers.
1068 // Note that the broker and parent may be the same node.
1069 scoped_refptr<NodeChannel> broker;
1070 if (broker_name == parent_name) {
1071 DCHECK(!broker_channel.is_valid());
1072 broker = parent;
1073 } else {
1074 DCHECK(broker_channel.is_valid());
1075 broker =
1076 NodeChannel::Create(this, ConnectionParams(std::move(broker_channel)),
1077 io_task_runner_, ProcessErrorCallback());
1078 AddPeer(broker_name, broker, true /* start_channel */);
1079 }
1080
1081 AddPeer(parent_name, parent, false /* start_channel */);
1082
1083 {
1084 // Complete any port merge requests we have waiting for the parent.
1085 base::AutoLock lock(pending_port_merges_lock_);
1086 for (const auto& request : pending_port_merges_)
1087 parent->RequestPortMerge(request.second.name(), request.first);
1088 pending_port_merges_.clear();
1089 }
1090
1091 // Feed the broker any pending children of our own.
1092 while (!pending_broker_clients.empty()) {
1093 const ports::NodeName& child_name = pending_broker_clients.front();
1094 auto it = pending_children_.find(child_name);
1095 DCHECK(it != pending_children_.end());
1096 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
1097 pending_broker_clients.pop();
1098 }
1099
1100 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1101 // Have the broker relay any messages we have waiting.
1102 for (auto& entry : pending_relay_messages) {
1103 const ports::NodeName& destination = entry.first;
1104 auto& message_queue = entry.second;
1105 while (!message_queue.empty()) {
1106 broker->RelayPortsMessage(destination, std::move(message_queue.front()));
1107 message_queue.pop();
1108 }
1109 }
1110 #endif
1111
1112 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1113 }
1114
OnPortsMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)1115 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1116 Channel::MessagePtr channel_message) {
1117 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1118
1119 void* data;
1120 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1121 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1122 &num_header_bytes, &num_payload_bytes,
1123 &num_ports_bytes)) {
1124 DropPeer(from_node, nullptr);
1125 return;
1126 }
1127
1128 CHECK(channel_message);
1129 std::unique_ptr<PortsMessage> ports_message(
1130 new PortsMessage(num_header_bytes,
1131 num_payload_bytes,
1132 num_ports_bytes,
1133 std::move(channel_message)));
1134 ports_message->set_source_node(from_node);
1135 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1136 AcceptIncomingMessages();
1137 }
1138
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & token)1139 void NodeController::OnRequestPortMerge(
1140 const ports::NodeName& from_node,
1141 const ports::PortName& connector_port_name,
1142 const std::string& token) {
1143 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1144
1145 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
1146 << token << " and port " << connector_port_name << "@" << from_node;
1147
1148 ports::PortRef local_port;
1149 {
1150 base::AutoLock lock(reserved_ports_lock_);
1151 auto it = reserved_ports_.find(token);
1152 if (it == reserved_ports_.end()) {
1153 DVLOG(1) << "Ignoring request to connect to port for unknown token "
1154 << token;
1155 return;
1156 }
1157 local_port = it->second.port;
1158 }
1159
1160 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1161 if (rv != ports::OK)
1162 DLOG(ERROR) << "MergePorts failed: " << rv;
1163
1164 AcceptIncomingMessages();
1165 }
1166
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1167 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1168 const ports::NodeName& name) {
1169 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1170
1171 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1172 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1173 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1174 << from_node;
1175 DropPeer(from_node, nullptr);
1176 return;
1177 }
1178
1179 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1180 if (!new_friend) {
1181 // We don't know who they're talking about!
1182 requestor->Introduce(name, ScopedPlatformHandle());
1183 } else {
1184 PlatformChannelPair new_channel;
1185 requestor->Introduce(name, new_channel.PassServerHandle());
1186 new_friend->Introduce(from_node, new_channel.PassClientHandle());
1187 }
1188 }
1189
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,ScopedPlatformHandle channel_handle)1190 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1191 const ports::NodeName& name,
1192 ScopedPlatformHandle channel_handle) {
1193 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1194
1195 if (!channel_handle.is_valid()) {
1196 node_->LostConnectionToNode(name);
1197
1198 DVLOG(1) << "Could not be introduced to peer " << name;
1199 base::AutoLock lock(peers_lock_);
1200 pending_peer_messages_.erase(name);
1201 return;
1202 }
1203
1204 scoped_refptr<NodeChannel> channel =
1205 NodeChannel::Create(this, ConnectionParams(std::move(channel_handle)),
1206 io_task_runner_, ProcessErrorCallback());
1207
1208 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1209 AddPeer(name, channel, true /* start_channel */);
1210 }
1211
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1212 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1213 Channel::MessagePtr message) {
1214 DCHECK(!message->has_handles());
1215
1216 void* data;
1217 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1218 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1219 &num_header_bytes, &num_payload_bytes,
1220 &num_ports_bytes)) {
1221 DropPeer(from_node, nullptr);
1222 return;
1223 }
1224
1225 // Broadcast messages must not contain ports.
1226 if (num_ports_bytes > 0) {
1227 DropPeer(from_node, nullptr);
1228 return;
1229 }
1230
1231 base::AutoLock lock(peers_lock_);
1232 for (auto& iter : peers_) {
1233 // Copy and send the message to each known peer.
1234 Channel::MessagePtr peer_message(
1235 new Channel::Message(message->payload_size(), 0));
1236 memcpy(peer_message->mutable_payload(), message->payload(),
1237 message->payload_size());
1238 iter.second->PortsMessage(std::move(peer_message));
1239 }
1240 }
1241
1242 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayPortsMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1243 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1244 base::ProcessHandle from_process,
1245 const ports::NodeName& destination,
1246 Channel::MessagePtr message) {
1247 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1248
1249 if (GetBrokerChannel()) {
1250 // Only the broker should be asked to relay a message.
1251 LOG(ERROR) << "Non-broker refusing to relay message.";
1252 DropPeer(from_node, nullptr);
1253 return;
1254 }
1255
1256 // The parent should always know which process this came from.
1257 DCHECK(from_process != base::kNullProcessHandle);
1258
1259 #if defined(OS_WIN)
1260 // Rewrite the handles to this (the parent) process. If the message is
1261 // destined for another child process, the handles will be rewritten to that
1262 // process before going out (see NodeChannel::WriteChannelMessage).
1263 //
1264 // TODO: We could avoid double-duplication.
1265 //
1266 // Note that we explicitly mark the handles as being owned by the sending
1267 // process before rewriting them, in order to accommodate RewriteHandles'
1268 // internal sanity checks.
1269 ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
1270 for (size_t i = 0; i < handles->size(); ++i)
1271 (*handles)[i].owning_process = from_process;
1272 if (!Channel::Message::RewriteHandles(from_process,
1273 base::GetCurrentProcessHandle(),
1274 handles.get())) {
1275 DLOG(ERROR) << "Failed to relay one or more handles.";
1276 }
1277 message->SetHandles(std::move(handles));
1278 #else
1279 MachPortRelay* relay = GetMachPortRelay();
1280 if (!relay) {
1281 LOG(ERROR) << "Receiving Mach ports without a port relay from "
1282 << from_node << ". Dropping message.";
1283 return;
1284 }
1285 if (!relay->ExtractPortRights(message.get(), from_process)) {
1286 // NodeChannel should ensure that MachPortRelay is ready for the remote
1287 // process. At this point, if the port extraction failed, either something
1288 // went wrong in the mach stuff, or the remote process died.
1289 LOG(ERROR) << "Error on receiving Mach ports " << from_node
1290 << ". Dropping message.";
1291 return;
1292 }
1293 #endif // defined(OS_WIN)
1294
1295 if (destination == name_) {
1296 // Great, we can deliver this message locally.
1297 OnPortsMessage(from_node, std::move(message));
1298 return;
1299 }
1300
1301 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1302 if (peer)
1303 peer->PortsMessageFromRelay(from_node, std::move(message));
1304 else
1305 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1306 }
1307
OnPortsMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1308 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1309 const ports::NodeName& source_node,
1310 Channel::MessagePtr message) {
1311 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1312 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1313 DropPeer(from_node, nullptr);
1314 return;
1315 }
1316
1317 OnPortsMessage(source_node, std::move(message));
1318 }
1319 #endif
1320
OnAcceptPeer(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & peer_name,const ports::PortName & port_name)1321 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1322 const ports::NodeName& token,
1323 const ports::NodeName& peer_name,
1324 const ports::PortName& port_name) {
1325 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1326
1327 auto it = peer_connections_.find(from_node);
1328 if (it == peer_connections_.end()) {
1329 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1330 DropPeer(from_node, nullptr);
1331 return;
1332 }
1333
1334 scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
1335 ports::PortRef local_port = it->second.local_port;
1336 std::string peer_token = std::move(it->second.peer_token);
1337 peer_connections_.erase(it);
1338 DCHECK(channel);
1339
1340 // If the peer connection is a self connection (which is used in tests),
1341 // drop the channel to it and skip straight to merging the ports.
1342 if (name_ == peer_name) {
1343 peers_by_token_.erase(peer_token);
1344 } else {
1345 peers_by_token_[peer_token] = peer_name;
1346 peer_connections_.insert(
1347 {peer_name, PeerConnection{nullptr, local_port, peer_token}});
1348 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1349
1350 AddPeer(peer_name, channel, false /* start_channel */);
1351 }
1352
1353 // We need to choose one side to initiate the port merge. It doesn't matter
1354 // who does it as long as they don't both try. Simple solution: pick the one
1355 // with the "smaller" port name.
1356 if (local_port.name() < port_name) {
1357 node()->MergePorts(local_port, peer_name, port_name);
1358 }
1359 }
1360
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1361 void NodeController::OnChannelError(const ports::NodeName& from_node,
1362 NodeChannel* channel) {
1363 if (io_task_runner_->RunsTasksOnCurrentThread()) {
1364 DropPeer(from_node, channel);
1365 // DropPeer may have caused local port closures, so be sure to process any
1366 // pending local messages.
1367 AcceptIncomingMessages();
1368 } else {
1369 io_task_runner_->PostTask(
1370 FROM_HERE,
1371 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1372 from_node, channel));
1373 }
1374 }
1375
1376 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1377 MachPortRelay* NodeController::GetMachPortRelay() {
1378 {
1379 base::AutoLock lock(parent_lock_);
1380 // Return null if we're not the root.
1381 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1382 return nullptr;
1383 }
1384
1385 base::AutoLock lock(mach_port_relay_lock_);
1386 return mach_port_relay_.get();
1387 }
1388 #endif
1389
CancelPendingPortMerges()1390 void NodeController::CancelPendingPortMerges() {
1391 std::vector<ports::PortRef> ports_to_close;
1392
1393 {
1394 base::AutoLock lock(pending_port_merges_lock_);
1395 reject_pending_merges_ = true;
1396 for (const auto& port : pending_port_merges_)
1397 ports_to_close.push_back(port.second);
1398 pending_port_merges_.clear();
1399 }
1400
1401 for (const auto& port : ports_to_close)
1402 node_->ClosePort(port);
1403 }
1404
DestroyOnIOThreadShutdown()1405 void NodeController::DestroyOnIOThreadShutdown() {
1406 destroy_on_io_thread_shutdown_ = true;
1407 }
1408
AttemptShutdownIfRequested()1409 void NodeController::AttemptShutdownIfRequested() {
1410 if (!shutdown_callback_flag_)
1411 return;
1412
1413 base::Closure callback;
1414 {
1415 base::AutoLock lock(shutdown_lock_);
1416 if (shutdown_callback_.is_null())
1417 return;
1418 if (!node_->CanShutdownCleanly(
1419 ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
1420 DVLOG(2) << "Unable to cleanly shut down node " << name_;
1421 return;
1422 }
1423
1424 callback = shutdown_callback_;
1425 shutdown_callback_.Reset();
1426 shutdown_callback_flag_.Set(false);
1427 }
1428
1429 DCHECK(!callback.is_null());
1430
1431 callback.Run();
1432 }
1433
1434 NodeController::PeerConnection::PeerConnection() = default;
1435
1436 NodeController::PeerConnection::PeerConnection(
1437 const PeerConnection& other) = default;
1438
1439 NodeController::PeerConnection::PeerConnection(
1440 PeerConnection&& other) = default;
1441
PeerConnection(scoped_refptr<NodeChannel> channel,const ports::PortRef & local_port,const std::string & peer_token)1442 NodeController::PeerConnection::PeerConnection(
1443 scoped_refptr<NodeChannel> channel,
1444 const ports::PortRef& local_port,
1445 const std::string& peer_token)
1446 : channel(std::move(channel)),
1447 local_port(local_port),
1448 peer_token(peer_token) {}
1449
1450 NodeController::PeerConnection::~PeerConnection() = default;
1451
1452 NodeController::PeerConnection& NodeController::PeerConnection::
1453 operator=(const PeerConnection& other) = default;
1454
1455 NodeController::PeerConnection& NodeController::PeerConnection::
1456 operator=(PeerConnection&& other) = default;
1457
1458 } // namespace edk
1459 } // namespace mojo
1460