1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/node_controller.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/histogram_macros.h"
16 #include "base/process/process_handle.h"
17 #include "base/rand_util.h"
18 #include "base/time/time.h"
19 #include "base/timer/elapsed_timer.h"
20 #include "mojo/edk/embedder/embedder_internal.h"
21 #include "mojo/edk/embedder/platform_channel_pair.h"
22 #include "mojo/edk/system/broker.h"
23 #include "mojo/edk/system/broker_host.h"
24 #include "mojo/edk/system/core.h"
25 #include "mojo/edk/system/ports_message.h"
26 #include "mojo/edk/system/request_context.h"
27
28 #if defined(OS_MACOSX) && !defined(OS_IOS)
29 #include "mojo/edk/system/mach_port_relay.h"
30 #endif
31
32 #if !defined(OS_NACL)
33 #include "crypto/random.h"
34 #endif
35
36 namespace mojo {
37 namespace edk {
38
39 namespace {
40
41 #if defined(OS_NACL)
42 template <typename T>
GenerateRandomName(T * out)43 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
44 #else
45 template <typename T>
46 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
47 #endif
48
GetRandomNodeName()49 ports::NodeName GetRandomNodeName() {
50 ports::NodeName name;
51 GenerateRandomName(&name);
52 return name;
53 }
54
RecordPeerCount(size_t count)55 void RecordPeerCount(size_t count) {
56 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
57
58 // 8k is the maximum number of file descriptors allowed in Chrome.
59 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
60 static_cast<int32_t>(count),
61 0 /* min */,
62 8000 /* max */,
63 50 /* bucket count */);
64 }
65
RecordPendingChildCount(size_t count)66 void RecordPendingChildCount(size_t count) {
67 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
68
69 // 8k is the maximum number of file descriptors allowed in Chrome.
70 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
71 static_cast<int32_t>(count),
72 0 /* min */,
73 8000 /* max */,
74 50 /* bucket count */);
75 }
76
ParsePortsMessage(Channel::Message * message,void ** data,size_t * num_data_bytes,size_t * num_header_bytes,size_t * num_payload_bytes,size_t * num_ports_bytes)77 bool ParsePortsMessage(Channel::Message* message,
78 void** data,
79 size_t* num_data_bytes,
80 size_t* num_header_bytes,
81 size_t* num_payload_bytes,
82 size_t* num_ports_bytes) {
83 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
84 num_ports_bytes);
85
86 NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
87 if (!*num_data_bytes)
88 return false;
89
90 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
91 num_payload_bytes, num_ports_bytes)) {
92 return false;
93 }
94
95 return true;
96 }
97
98 // Used by NodeController to watch for shutdown. Since no IO can happen once
99 // the IO thread is killed, the NodeController can cleanly drop all its peers
100 // at that time.
101 class ThreadDestructionObserver :
102 public base::MessageLoop::DestructionObserver {
103 public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)104 static void Create(scoped_refptr<base::TaskRunner> task_runner,
105 const base::Closure& callback) {
106 if (task_runner->RunsTasksOnCurrentThread()) {
107 // Owns itself.
108 new ThreadDestructionObserver(callback);
109 } else {
110 task_runner->PostTask(FROM_HERE,
111 base::Bind(&Create, task_runner, callback));
112 }
113 }
114
115 private:
ThreadDestructionObserver(const base::Closure & callback)116 explicit ThreadDestructionObserver(const base::Closure& callback)
117 : callback_(callback) {
118 base::MessageLoop::current()->AddDestructionObserver(this);
119 }
120
~ThreadDestructionObserver()121 ~ThreadDestructionObserver() override {
122 base::MessageLoop::current()->RemoveDestructionObserver(this);
123 }
124
125 // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()126 void WillDestroyCurrentMessageLoop() override {
127 callback_.Run();
128 delete this;
129 }
130
131 const base::Closure callback_;
132
133 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
134 };
135
136 } // namespace
137
~NodeController()138 NodeController::~NodeController() {}
139
NodeController(Core * core)140 NodeController::NodeController(Core* core)
141 : core_(core),
142 name_(GetRandomNodeName()),
143 node_(new ports::Node(name_, this)) {
144 DVLOG(1) << "Initializing node " << name_;
145 }
146
147 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)148 void NodeController::CreateMachPortRelay(
149 base::PortProvider* port_provider) {
150 base::AutoLock lock(mach_port_relay_lock_);
151 DCHECK(!mach_port_relay_);
152 mach_port_relay_.reset(new MachPortRelay(port_provider));
153 }
154 #endif
155
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)156 void NodeController::SetIOTaskRunner(
157 scoped_refptr<base::TaskRunner> task_runner) {
158 io_task_runner_ = task_runner;
159 ThreadDestructionObserver::Create(
160 io_task_runner_,
161 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
162 }
163
ConnectToChild(base::ProcessHandle process_handle,ScopedPlatformHandle platform_handle,const std::string & child_token,const ProcessErrorCallback & process_error_callback)164 void NodeController::ConnectToChild(
165 base::ProcessHandle process_handle,
166 ScopedPlatformHandle platform_handle,
167 const std::string& child_token,
168 const ProcessErrorCallback& process_error_callback) {
169 // Generate the temporary remote node name here so that it can be associated
170 // with the embedder's child_token. If an error occurs in the child process
171 // after it is launched, but before any reserved ports are connected, this can
172 // be used to clean up any dangling ports.
173 ports::NodeName node_name;
174 GenerateRandomName(&node_name);
175
176 {
177 base::AutoLock lock(reserved_ports_lock_);
178 bool inserted = pending_child_tokens_.insert(
179 std::make_pair(node_name, child_token)).second;
180 DCHECK(inserted);
181 }
182
183 #if defined(OS_WIN)
184 // On Windows, we need to duplicate the process handle because we have no
185 // control over its lifetime and it may become invalid by the time the posted
186 // task runs.
187 HANDLE dup_handle = INVALID_HANDLE_VALUE;
188 BOOL ok = ::DuplicateHandle(
189 base::GetCurrentProcessHandle(), process_handle,
190 base::GetCurrentProcessHandle(), &dup_handle,
191 0, FALSE, DUPLICATE_SAME_ACCESS);
192 DPCHECK(ok);
193 process_handle = dup_handle;
194 #endif
195
196 io_task_runner_->PostTask(
197 FROM_HERE,
198 base::Bind(&NodeController::ConnectToChildOnIOThread,
199 base::Unretained(this),
200 process_handle,
201 base::Passed(&platform_handle),
202 node_name,
203 process_error_callback));
204 }
205
CloseChildPorts(const std::string & child_token)206 void NodeController::CloseChildPorts(const std::string& child_token) {
207 std::vector<ports::PortRef> ports_to_close;
208 {
209 std::vector<std::string> port_tokens;
210 base::AutoLock lock(reserved_ports_lock_);
211 for (const auto& port : reserved_ports_) {
212 if (port.second.child_token == child_token) {
213 DVLOG(1) << "Closing reserved port " << port.second.port.name();
214 ports_to_close.push_back(port.second.port);
215 port_tokens.push_back(port.first);
216 }
217 }
218
219 for (const auto& token : port_tokens)
220 reserved_ports_.erase(token);
221 }
222
223 for (const auto& port : ports_to_close)
224 node_->ClosePort(port);
225
226 // Ensure local port closure messages are processed.
227 AcceptIncomingMessages();
228 }
229
ConnectToParent(ScopedPlatformHandle platform_handle)230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
231 // TODO(amistry): Consider the need for a broker on Windows.
232 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
233 // On posix, use the bootstrap channel for the broker and receive the node's
234 // channel synchronously as the first message from the broker.
235 base::ElapsedTimer timer;
236 broker_.reset(new Broker(std::move(platform_handle)));
237 platform_handle = broker_->GetParentPlatformHandle();
238 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
239 timer.Elapsed());
240
241 if (!platform_handle.is_valid()) {
242 // Most likely the browser side of the channel has already been closed and
243 // the broker was unable to negotiate a NodeChannel pipe. In this case we
244 // can cancel parent connection.
245 DVLOG(1) << "Cannot connect to invalid parent channel.";
246 return;
247 }
248 #endif
249
250 io_task_runner_->PostTask(
251 FROM_HERE,
252 base::Bind(&NodeController::ConnectToParentOnIOThread,
253 base::Unretained(this),
254 base::Passed(&platform_handle)));
255 }
256
SetPortObserver(const ports::PortRef & port,const scoped_refptr<PortObserver> & observer)257 void NodeController::SetPortObserver(
258 const ports::PortRef& port,
259 const scoped_refptr<PortObserver>& observer) {
260 node_->SetUserData(port, observer);
261 }
262
ClosePort(const ports::PortRef & port)263 void NodeController::ClosePort(const ports::PortRef& port) {
264 SetPortObserver(port, nullptr);
265 int rv = node_->ClosePort(port);
266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
267
268 AcceptIncomingMessages();
269 }
270
SendMessage(const ports::PortRef & port,std::unique_ptr<PortsMessage> message)271 int NodeController::SendMessage(const ports::PortRef& port,
272 std::unique_ptr<PortsMessage> message) {
273 ports::ScopedMessage ports_message(message.release());
274 int rv = node_->SendMessage(port, std::move(ports_message));
275
276 AcceptIncomingMessages();
277 return rv;
278 }
279
ReservePort(const std::string & token,const ports::PortRef & port,const std::string & child_token)280 void NodeController::ReservePort(const std::string& token,
281 const ports::PortRef& port,
282 const std::string& child_token) {
283 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
284 << token;
285
286 base::AutoLock lock(reserved_ports_lock_);
287 auto result = reserved_ports_.insert(
288 std::make_pair(token, ReservedPort{port, child_token}));
289 DCHECK(result.second);
290 }
291
MergePortIntoParent(const std::string & token,const ports::PortRef & port)292 void NodeController::MergePortIntoParent(const std::string& token,
293 const ports::PortRef& port) {
294 bool was_merged = false;
295 {
296 // This request may be coming from within the process that reserved the
297 // "parent" side (e.g. for Chrome single-process mode), so if this token is
298 // reserved locally, merge locally instead.
299 base::AutoLock lock(reserved_ports_lock_);
300 auto it = reserved_ports_.find(token);
301 if (it != reserved_ports_.end()) {
302 node_->MergePorts(port, name_, it->second.port.name());
303 reserved_ports_.erase(it);
304 was_merged = true;
305 }
306 }
307 if (was_merged) {
308 AcceptIncomingMessages();
309 return;
310 }
311
312 scoped_refptr<NodeChannel> parent;
313 bool reject_merge = false;
314 {
315 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
316 // there is a race where the parent can be set, and |pending_port_merges_|
317 // be processed between retrieving |parent| and adding the merge to
318 // |pending_port_merges_|.
319 base::AutoLock lock(pending_port_merges_lock_);
320 parent = GetParentChannel();
321 if (reject_pending_merges_) {
322 reject_merge = true;
323 } else if (!parent) {
324 pending_port_merges_.push_back(std::make_pair(token, port));
325 return;
326 }
327 }
328 if (reject_merge) {
329 node_->ClosePort(port);
330 DVLOG(2) << "Rejecting port merge for token " << token
331 << " due to closed parent channel.";
332 AcceptIncomingMessages();
333 return;
334 }
335
336 parent->RequestPortMerge(port.name(), token);
337 }
338
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)339 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
340 const ports::PortRef& port1) {
341 int rv = node_->MergeLocalPorts(port0, port1);
342 AcceptIncomingMessages();
343 return rv;
344 }
345
CreateSharedBuffer(size_t num_bytes)346 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
347 size_t num_bytes) {
348 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
349 // Shared buffer creation failure is fatal, so always use the broker when we
350 // have one. This does mean that a non-root process that has children will use
351 // the broker for shared buffer creation even though that process is
352 // privileged.
353 if (broker_) {
354 return broker_->GetSharedBuffer(num_bytes);
355 }
356 #endif
357 return PlatformSharedBuffer::Create(num_bytes);
358 }
359
RequestShutdown(const base::Closure & callback)360 void NodeController::RequestShutdown(const base::Closure& callback) {
361 {
362 base::AutoLock lock(shutdown_lock_);
363 shutdown_callback_ = callback;
364 shutdown_callback_flag_.Set(true);
365 }
366
367 AttemptShutdownIfRequested();
368 }
369
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)370 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
371 const std::string& error) {
372 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
373 if (peer)
374 peer->NotifyBadMessage(error);
375 }
376
ConnectToChildOnIOThread(base::ProcessHandle process_handle,ScopedPlatformHandle platform_handle,ports::NodeName token,const ProcessErrorCallback & process_error_callback)377 void NodeController::ConnectToChildOnIOThread(
378 base::ProcessHandle process_handle,
379 ScopedPlatformHandle platform_handle,
380 ports::NodeName token,
381 const ProcessErrorCallback& process_error_callback) {
382 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
383
384 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL)
385 PlatformChannelPair node_channel;
386 // BrokerHost owns itself.
387 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
388 broker_host->SendChannel(node_channel.PassClientHandle());
389 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
390 this, node_channel.PassServerHandle(), io_task_runner_,
391 process_error_callback);
392 #else
393 scoped_refptr<NodeChannel> channel =
394 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
395 process_error_callback);
396 #endif
397
398 // We set up the child channel with a temporary name so it can be identified
399 // as a pending child if it writes any messages to the channel. We may start
400 // receiving messages from it (though we shouldn't) as soon as Start() is
401 // called below.
402
403 pending_children_.insert(std::make_pair(token, channel));
404 RecordPendingChildCount(pending_children_.size());
405
406 channel->SetRemoteNodeName(token);
407 channel->SetRemoteProcessHandle(process_handle);
408 channel->Start();
409
410 channel->AcceptChild(name_, token);
411 }
412
ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle)413 void NodeController::ConnectToParentOnIOThread(
414 ScopedPlatformHandle platform_handle) {
415 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
416
417 {
418 base::AutoLock lock(parent_lock_);
419 DCHECK(parent_name_ == ports::kInvalidNodeName);
420
421 // At this point we don't know the parent's name, so we can't yet insert it
422 // into our |peers_| map. That will happen as soon as we receive an
423 // AcceptChild message from them.
424 bootstrap_parent_channel_ =
425 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
426 ProcessErrorCallback());
427 // Prevent the parent pipe handle from being closed on shutdown. Pipe
428 // closure is used by the parent to detect the child process has exited.
429 // Relying on message pipes to be closed is not enough because the parent
430 // may see the message pipe closure before the child is dead, causing the
431 // child process to be unexpectedly SIGKILL'd.
432 bootstrap_parent_channel_->LeakHandleOnShutdown();
433 }
434 bootstrap_parent_channel_->Start();
435 }
436
GetPeerChannel(const ports::NodeName & name)437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438 const ports::NodeName& name) {
439 base::AutoLock lock(peers_lock_);
440 auto it = peers_.find(name);
441 if (it == peers_.end())
442 return nullptr;
443 return it->second;
444 }
445
GetParentChannel()446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
447 ports::NodeName parent_name;
448 {
449 base::AutoLock lock(parent_lock_);
450 parent_name = parent_name_;
451 }
452 return GetPeerChannel(parent_name);
453 }
454
GetBrokerChannel()455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
456 ports::NodeName broker_name;
457 {
458 base::AutoLock lock(broker_lock_);
459 broker_name = broker_name_;
460 }
461 return GetPeerChannel(broker_name);
462 }
463
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)464 void NodeController::AddPeer(const ports::NodeName& name,
465 scoped_refptr<NodeChannel> channel,
466 bool start_channel) {
467 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
468
469 DCHECK(name != ports::kInvalidNodeName);
470 DCHECK(channel);
471
472 channel->SetRemoteNodeName(name);
473
474 OutgoingMessageQueue pending_messages;
475 {
476 base::AutoLock lock(peers_lock_);
477 if (peers_.find(name) != peers_.end()) {
478 // This can happen normally if two nodes race to be introduced to each
479 // other. The losing pipe will be silently closed and introduction should
480 // not be affected.
481 DVLOG(1) << "Ignoring duplicate peer name " << name;
482 return;
483 }
484
485 auto result = peers_.insert(std::make_pair(name, channel));
486 DCHECK(result.second);
487
488 DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
489
490 RecordPeerCount(peers_.size());
491
492 auto it = pending_peer_messages_.find(name);
493 if (it != pending_peer_messages_.end()) {
494 std::swap(pending_messages, it->second);
495 pending_peer_messages_.erase(it);
496 }
497 }
498
499 if (start_channel)
500 channel->Start();
501
502 // Flush any queued message we need to deliver to this node.
503 while (!pending_messages.empty()) {
504 channel->PortsMessage(std::move(pending_messages.front()));
505 pending_messages.pop();
506 }
507 }
508
DropPeer(const ports::NodeName & name,NodeChannel * channel)509 void NodeController::DropPeer(const ports::NodeName& name,
510 NodeChannel* channel) {
511 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
512
513 {
514 base::AutoLock lock(peers_lock_);
515 auto it = peers_.find(name);
516
517 if (it != peers_.end()) {
518 ports::NodeName peer = it->first;
519 peers_.erase(it);
520 DVLOG(1) << "Dropped peer " << peer;
521 }
522
523 pending_peer_messages_.erase(name);
524 pending_children_.erase(name);
525
526 RecordPeerCount(peers_.size());
527 RecordPendingChildCount(pending_children_.size());
528 }
529
530 std::vector<ports::PortRef> ports_to_close;
531 {
532 // Clean up any reserved ports.
533 base::AutoLock lock(reserved_ports_lock_);
534 auto it = pending_child_tokens_.find(name);
535 if (it != pending_child_tokens_.end()) {
536 const std::string& child_token = it->second;
537
538 std::vector<std::string> port_tokens;
539 for (const auto& port : reserved_ports_) {
540 if (port.second.child_token == child_token) {
541 DVLOG(1) << "Closing reserved port: " << port.second.port.name();
542 ports_to_close.push_back(port.second.port);
543 port_tokens.push_back(port.first);
544 }
545 }
546
547 // We have to erase reserved ports in a two-step manner because the usual
548 // manner of using the returned iterator from map::erase isn't technically
549 // valid in C++11 (although it is in C++14).
550 for (const auto& token : port_tokens)
551 reserved_ports_.erase(token);
552
553 pending_child_tokens_.erase(it);
554 }
555 }
556
557 bool is_parent;
558 {
559 base::AutoLock lock(parent_lock_);
560 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
561 }
562 // If the error comes from the parent channel, we also need to cancel any
563 // port merge requests, so that errors can be propagated to the message
564 // pipes.
565 if (is_parent) {
566 base::AutoLock lock(pending_port_merges_lock_);
567 reject_pending_merges_ = true;
568
569 for (const auto& port : pending_port_merges_)
570 ports_to_close.push_back(port.second);
571 pending_port_merges_.clear();
572 }
573
574 for (const auto& port : ports_to_close)
575 node_->ClosePort(port);
576
577 node_->LostConnectionToNode(name);
578
579 AcceptIncomingMessages();
580 }
581
SendPeerMessage(const ports::NodeName & name,ports::ScopedMessage message)582 void NodeController::SendPeerMessage(const ports::NodeName& name,
583 ports::ScopedMessage message) {
584 Channel::MessagePtr channel_message =
585 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
586
587 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
588 #if defined(OS_WIN)
589 if (channel_message->has_handles()) {
590 // If we're sending a message with handles we aren't the destination
591 // node's parent or broker (i.e. we don't know its process handle), ask
592 // the broker to relay for us.
593 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
594 if (!peer || !peer->HasRemoteProcessHandle()) {
595 if (broker) {
596 broker->RelayPortsMessage(name, std::move(channel_message));
597 } else {
598 base::AutoLock lock(broker_lock_);
599 pending_relay_messages_[name].emplace(std::move(channel_message));
600 }
601 return;
602 }
603 }
604 #elif defined(OS_MACOSX) && !defined(OS_IOS)
605 if (channel_message->has_mach_ports()) {
606 // Messages containing Mach ports are always routed through the broker, even
607 // if the broker process is the intended recipient.
608 bool use_broker = false;
609 {
610 base::AutoLock lock(parent_lock_);
611 use_broker = (bootstrap_parent_channel_ ||
612 parent_name_ != ports::kInvalidNodeName);
613 }
614 if (use_broker) {
615 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
616 if (broker) {
617 broker->RelayPortsMessage(name, std::move(channel_message));
618 } else {
619 base::AutoLock lock(broker_lock_);
620 pending_relay_messages_[name].emplace(std::move(channel_message));
621 }
622 return;
623 }
624 }
625 #endif // defined(OS_WIN)
626
627 if (peer) {
628 peer->PortsMessage(std::move(channel_message));
629 return;
630 }
631
632 // If we don't know who the peer is, queue the message for delivery. If this
633 // is the first message queued for the peer, we also ask the broker to
634 // introduce us to them.
635
636 bool needs_introduction = false;
637 {
638 base::AutoLock lock(peers_lock_);
639 auto& queue = pending_peer_messages_[name];
640 needs_introduction = queue.empty();
641 queue.emplace(std::move(channel_message));
642 }
643
644 if (needs_introduction) {
645 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
646 if (!broker) {
647 DVLOG(1) << "Dropping message for unknown peer: " << name;
648 return;
649 }
650 broker->RequestIntroduction(name);
651 }
652 }
653
AcceptIncomingMessages()654 void NodeController::AcceptIncomingMessages() {
655 {
656 base::AutoLock lock(messages_lock_);
657 if (!incoming_messages_.empty()) {
658 // libstdc++'s deque creates an internal buffer on construction, even when
659 // the size is 0. So avoid creating it until it is necessary.
660 std::queue<ports::ScopedMessage> messages;
661 std::swap(messages, incoming_messages_);
662 base::AutoUnlock unlock(messages_lock_);
663
664 while (!messages.empty()) {
665 node_->AcceptMessage(std::move(messages.front()));
666 messages.pop();
667 }
668 }
669 }
670
671 AttemptShutdownIfRequested();
672 }
673
ProcessIncomingMessages()674 void NodeController::ProcessIncomingMessages() {
675 RequestContext request_context(RequestContext::Source::SYSTEM);
676
677 {
678 base::AutoLock lock(messages_lock_);
679 // Allow a new incoming messages processing task to be posted. This can't be
680 // done after AcceptIncomingMessages() otherwise a message might be missed.
681 // Doing it here may result in at most two tasks existing at the same time;
682 // this running one, and one pending in the task runner.
683 incoming_messages_task_posted_ = false;
684 }
685
686 AcceptIncomingMessages();
687 }
688
DropAllPeers()689 void NodeController::DropAllPeers() {
690 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
691
692 std::vector<scoped_refptr<NodeChannel>> all_peers;
693 {
694 base::AutoLock lock(parent_lock_);
695 if (bootstrap_parent_channel_) {
696 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
697 // existence to determine whether or not this is the root node. Once
698 // bootstrap_parent_channel_->ShutDown() has been called,
699 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
700 // matter if it's deleted now or when |this| is deleted.
701 // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
702 all_peers.push_back(bootstrap_parent_channel_);
703 }
704 }
705
706 {
707 base::AutoLock lock(peers_lock_);
708 for (const auto& peer : peers_)
709 all_peers.push_back(peer.second);
710 for (const auto& peer : pending_children_)
711 all_peers.push_back(peer.second);
712 peers_.clear();
713 pending_children_.clear();
714 pending_peer_messages_.clear();
715 }
716
717 for (const auto& peer : all_peers)
718 peer->ShutDown();
719
720 if (destroy_on_io_thread_shutdown_)
721 delete this;
722 }
723
GenerateRandomPortName(ports::PortName * port_name)724 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
725 GenerateRandomName(port_name);
726 }
727
AllocMessage(size_t num_header_bytes,ports::ScopedMessage * message)728 void NodeController::AllocMessage(size_t num_header_bytes,
729 ports::ScopedMessage* message) {
730 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
731 }
732
ForwardMessage(const ports::NodeName & node,ports::ScopedMessage message)733 void NodeController::ForwardMessage(const ports::NodeName& node,
734 ports::ScopedMessage message) {
735 DCHECK(message);
736 bool schedule_pump_task = false;
737 if (node == name_) {
738 // NOTE: We need to avoid re-entering the Node instance within
739 // ForwardMessage. Because ForwardMessage is only ever called
740 // (synchronously) in response to Node's ClosePort, SendMessage, or
741 // AcceptMessage, we flush the queue after calling any of those methods.
742 base::AutoLock lock(messages_lock_);
743 // |io_task_runner_| may be null in tests or processes that don't require
744 // multi-process Mojo.
745 schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
746 !incoming_messages_task_posted_;
747 incoming_messages_task_posted_ |= schedule_pump_task;
748 incoming_messages_.emplace(std::move(message));
749 } else {
750 SendPeerMessage(node, std::move(message));
751 }
752
753 if (schedule_pump_task) {
754 // Normally, the queue is processed after the action that added the local
755 // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
756 // possible for a local message to be added as a result of a remote message,
757 // and OnChannelMessage() doesn't process this queue (although
758 // OnPortsMessage() does). There may also be other code paths, now or added
759 // in the future, which cause local messages to be added but don't process
760 // this message queue.
761 //
762 // Instead of adding a call to AcceptIncomingMessages() on every possible
763 // code path, post a task to the IO thread to process the queue. If the
764 // current call stack processes the queue, this may end up doing nothing.
765 io_task_runner_->PostTask(
766 FROM_HERE,
767 base::Bind(&NodeController::ProcessIncomingMessages,
768 base::Unretained(this)));
769 }
770 }
771
BroadcastMessage(ports::ScopedMessage message)772 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
773 CHECK_EQ(message->num_ports(), 0u);
774 Channel::MessagePtr channel_message =
775 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
776 CHECK(!channel_message->has_handles());
777
778 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
779 if (broker)
780 broker->Broadcast(std::move(channel_message));
781 else
782 OnBroadcast(name_, std::move(channel_message));
783 }
784
PortStatusChanged(const ports::PortRef & port)785 void NodeController::PortStatusChanged(const ports::PortRef& port) {
786 scoped_refptr<ports::UserData> user_data;
787 node_->GetUserData(port, &user_data);
788
789 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
790 if (observer) {
791 observer->OnPortStatusChanged();
792 } else {
793 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
794 << "doesn't have an observer.";
795 }
796 }
797
OnAcceptChild(const ports::NodeName & from_node,const ports::NodeName & parent_name,const ports::NodeName & token)798 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
799 const ports::NodeName& parent_name,
800 const ports::NodeName& token) {
801 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
802
803 scoped_refptr<NodeChannel> parent;
804 {
805 base::AutoLock lock(parent_lock_);
806 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
807 parent_name_ = parent_name;
808 parent = bootstrap_parent_channel_;
809 }
810 }
811
812 if (!parent) {
813 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
814 DropPeer(from_node, nullptr);
815 return;
816 }
817
818 parent->SetRemoteNodeName(parent_name);
819 parent->AcceptParent(token, name_);
820
821 // NOTE: The child does not actually add its parent as a peer until
822 // receiving an AcceptBrokerClient message from the broker. The parent
823 // will request that said message be sent upon receiving AcceptParent.
824
825 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
826 }
827
OnAcceptParent(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & child_name)828 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
829 const ports::NodeName& token,
830 const ports::NodeName& child_name) {
831 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
832
833 auto it = pending_children_.find(from_node);
834 if (it == pending_children_.end() || token != from_node) {
835 DLOG(ERROR) << "Received unexpected AcceptParent message from "
836 << from_node;
837 DropPeer(from_node, nullptr);
838 return;
839 }
840
841 scoped_refptr<NodeChannel> channel = it->second;
842 pending_children_.erase(it);
843
844 DCHECK(channel);
845
846 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
847
848 AddPeer(child_name, channel, false /* start_channel */);
849
850 // TODO(rockot/amistry): We could simplify child initialization if we could
851 // synchronously get a new async broker channel from the broker. For now we do
852 // it asynchronously since it's only used to facilitate handle passing, not
853 // handle creation.
854 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
855 if (broker) {
856 // Inform the broker of this new child.
857 broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
858 } else {
859 // If we have no broker, either we need to wait for one, or we *are* the
860 // broker.
861 scoped_refptr<NodeChannel> parent = GetParentChannel();
862 if (!parent) {
863 base::AutoLock lock(parent_lock_);
864 parent = bootstrap_parent_channel_;
865 }
866
867 if (!parent) {
868 // Yes, we're the broker. We can initialize the child directly.
869 channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
870 } else {
871 // We aren't the broker, so wait for a broker connection.
872 base::AutoLock lock(broker_lock_);
873 pending_broker_clients_.push(child_name);
874 }
875 }
876 }
877
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)878 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
879 const ports::NodeName& client_name,
880 base::ProcessHandle process_handle) {
881 #if defined(OS_WIN)
882 // Scoped handle to avoid leaks on error.
883 ScopedPlatformHandle scoped_process_handle =
884 ScopedPlatformHandle(PlatformHandle(process_handle));
885 #endif
886 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
887 if (!sender) {
888 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
889 return;
890 }
891
892 if (GetPeerChannel(client_name)) {
893 DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
894 DropPeer(from_node, nullptr);
895 return;
896 }
897
898 PlatformChannelPair broker_channel;
899 scoped_refptr<NodeChannel> client = NodeChannel::Create(
900 this, broker_channel.PassServerHandle(), io_task_runner_,
901 ProcessErrorCallback());
902
903 #if defined(OS_WIN)
904 // The broker must have a working handle to the client process in order to
905 // properly copy other handles to and from the client.
906 if (!scoped_process_handle.is_valid()) {
907 DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
908 return;
909 }
910 client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
911 #else
912 client->SetRemoteProcessHandle(process_handle);
913 #endif
914
915 AddPeer(client_name, client, true /* start_channel */);
916
917 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
918 << " from peer " << from_node;
919
920 sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
921 }
922
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,ScopedPlatformHandle broker_channel)923 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
924 const ports::NodeName& client_name,
925 ScopedPlatformHandle broker_channel) {
926 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
927 if (!client) {
928 DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
929 return;
930 }
931
932 // This should have come from our own broker.
933 if (GetBrokerChannel() != GetPeerChannel(from_node)) {
934 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
935 return;
936 }
937
938 DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
939
940 client->AcceptBrokerClient(from_node, std::move(broker_channel));
941 }
942
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,ScopedPlatformHandle broker_channel)943 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
944 const ports::NodeName& broker_name,
945 ScopedPlatformHandle broker_channel) {
946 // This node should already have a parent in bootstrap mode.
947 ports::NodeName parent_name;
948 scoped_refptr<NodeChannel> parent;
949 {
950 base::AutoLock lock(parent_lock_);
951 parent_name = parent_name_;
952 parent = bootstrap_parent_channel_;
953 bootstrap_parent_channel_ = nullptr;
954 }
955 DCHECK(parent_name == from_node);
956 DCHECK(parent);
957
958 std::queue<ports::NodeName> pending_broker_clients;
959 std::unordered_map<ports::NodeName, OutgoingMessageQueue>
960 pending_relay_messages;
961 {
962 base::AutoLock lock(broker_lock_);
963 broker_name_ = broker_name;
964 std::swap(pending_broker_clients, pending_broker_clients_);
965 std::swap(pending_relay_messages, pending_relay_messages_);
966 }
967 DCHECK(broker_name != ports::kInvalidNodeName);
968
969 // It's now possible to add both the broker and the parent as peers.
970 // Note that the broker and parent may be the same node.
971 scoped_refptr<NodeChannel> broker;
972 if (broker_name == parent_name) {
973 DCHECK(!broker_channel.is_valid());
974 broker = parent;
975 } else {
976 DCHECK(broker_channel.is_valid());
977 broker = NodeChannel::Create(this, std::move(broker_channel),
978 io_task_runner_, ProcessErrorCallback());
979 AddPeer(broker_name, broker, true /* start_channel */);
980 }
981
982 AddPeer(parent_name, parent, false /* start_channel */);
983
984 {
985 // Complete any port merge requests we have waiting for the parent.
986 base::AutoLock lock(pending_port_merges_lock_);
987 for (const auto& request : pending_port_merges_)
988 parent->RequestPortMerge(request.second.name(), request.first);
989 pending_port_merges_.clear();
990 }
991
992 // Feed the broker any pending children of our own.
993 while (!pending_broker_clients.empty()) {
994 const ports::NodeName& child_name = pending_broker_clients.front();
995 auto it = pending_children_.find(child_name);
996 DCHECK(it != pending_children_.end());
997 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
998 pending_broker_clients.pop();
999 }
1000
1001 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1002 // Have the broker relay any messages we have waiting.
1003 for (auto& entry : pending_relay_messages) {
1004 const ports::NodeName& destination = entry.first;
1005 auto& message_queue = entry.second;
1006 while (!message_queue.empty()) {
1007 broker->RelayPortsMessage(destination, std::move(message_queue.front()));
1008 message_queue.pop();
1009 }
1010 }
1011 #endif
1012
1013 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1014 }
1015
OnPortsMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)1016 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1017 Channel::MessagePtr channel_message) {
1018 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1019
1020 void* data;
1021 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1022 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1023 &num_header_bytes, &num_payload_bytes,
1024 &num_ports_bytes)) {
1025 DropPeer(from_node, nullptr);
1026 return;
1027 }
1028
1029 CHECK(channel_message);
1030 std::unique_ptr<PortsMessage> ports_message(
1031 new PortsMessage(num_header_bytes,
1032 num_payload_bytes,
1033 num_ports_bytes,
1034 std::move(channel_message)));
1035 ports_message->set_source_node(from_node);
1036 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1037 AcceptIncomingMessages();
1038 }
1039
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & token)1040 void NodeController::OnRequestPortMerge(
1041 const ports::NodeName& from_node,
1042 const ports::PortName& connector_port_name,
1043 const std::string& token) {
1044 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1045
1046 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
1047 << token << " and port " << connector_port_name << "@" << from_node;
1048
1049 ports::PortRef local_port;
1050 {
1051 base::AutoLock lock(reserved_ports_lock_);
1052 auto it = reserved_ports_.find(token);
1053 if (it == reserved_ports_.end()) {
1054 DVLOG(1) << "Ignoring request to connect to port for unknown token "
1055 << token;
1056 return;
1057 }
1058 local_port = it->second.port;
1059 }
1060
1061 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1062 if (rv != ports::OK)
1063 DLOG(ERROR) << "MergePorts failed: " << rv;
1064
1065 AcceptIncomingMessages();
1066 }
1067
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1068 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1069 const ports::NodeName& name) {
1070 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1071
1072 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1073 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1074 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1075 << from_node;
1076 DropPeer(from_node, nullptr);
1077 return;
1078 }
1079
1080 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1081 if (!new_friend) {
1082 // We don't know who they're talking about!
1083 requestor->Introduce(name, ScopedPlatformHandle());
1084 } else {
1085 PlatformChannelPair new_channel;
1086 requestor->Introduce(name, new_channel.PassServerHandle());
1087 new_friend->Introduce(from_node, new_channel.PassClientHandle());
1088 }
1089 }
1090
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,ScopedPlatformHandle channel_handle)1091 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1092 const ports::NodeName& name,
1093 ScopedPlatformHandle channel_handle) {
1094 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1095
1096 if (!channel_handle.is_valid()) {
1097 node_->LostConnectionToNode(name);
1098
1099 DLOG(ERROR) << "Could not be introduced to peer " << name;
1100 base::AutoLock lock(peers_lock_);
1101 pending_peer_messages_.erase(name);
1102 return;
1103 }
1104
1105 scoped_refptr<NodeChannel> channel =
1106 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
1107 ProcessErrorCallback());
1108
1109 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1110 AddPeer(name, channel, true /* start_channel */);
1111 }
1112
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1113 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1114 Channel::MessagePtr message) {
1115 DCHECK(!message->has_handles());
1116
1117 void* data;
1118 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1119 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1120 &num_header_bytes, &num_payload_bytes,
1121 &num_ports_bytes)) {
1122 DropPeer(from_node, nullptr);
1123 return;
1124 }
1125
1126 // Broadcast messages must not contain ports.
1127 if (num_ports_bytes > 0) {
1128 DropPeer(from_node, nullptr);
1129 return;
1130 }
1131
1132 base::AutoLock lock(peers_lock_);
1133 for (auto& iter : peers_) {
1134 // Copy and send the message to each known peer.
1135 Channel::MessagePtr peer_message(
1136 new Channel::Message(message->payload_size(), 0));
1137 memcpy(peer_message->mutable_payload(), message->payload(),
1138 message->payload_size());
1139 iter.second->PortsMessage(std::move(peer_message));
1140 }
1141 }
1142
1143 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayPortsMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1144 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1145 base::ProcessHandle from_process,
1146 const ports::NodeName& destination,
1147 Channel::MessagePtr message) {
1148 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1149
1150 if (GetBrokerChannel()) {
1151 // Only the broker should be asked to relay a message.
1152 LOG(ERROR) << "Non-broker refusing to relay message.";
1153 DropPeer(from_node, nullptr);
1154 return;
1155 }
1156
1157 // The parent should always know which process this came from.
1158 DCHECK(from_process != base::kNullProcessHandle);
1159
1160 #if defined(OS_WIN)
1161 // Rewrite the handles to this (the parent) process. If the message is
1162 // destined for another child process, the handles will be rewritten to that
1163 // process before going out (see NodeChannel::WriteChannelMessage).
1164 //
1165 // TODO: We could avoid double-duplication.
1166 //
1167 // Note that we explicitly mark the handles as being owned by the sending
1168 // process before rewriting them, in order to accommodate RewriteHandles'
1169 // internal sanity checks.
1170 ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
1171 for (size_t i = 0; i < handles->size(); ++i)
1172 (*handles)[i].owning_process = from_process;
1173 if (!Channel::Message::RewriteHandles(from_process,
1174 base::GetCurrentProcessHandle(),
1175 handles.get())) {
1176 DLOG(ERROR) << "Failed to relay one or more handles.";
1177 }
1178 message->SetHandles(std::move(handles));
1179 #else
1180 MachPortRelay* relay = GetMachPortRelay();
1181 if (!relay) {
1182 LOG(ERROR) << "Receiving Mach ports without a port relay from "
1183 << from_node << ". Dropping message.";
1184 return;
1185 }
1186 if (!relay->ExtractPortRights(message.get(), from_process)) {
1187 // NodeChannel should ensure that MachPortRelay is ready for the remote
1188 // process. At this point, if the port extraction failed, either something
1189 // went wrong in the mach stuff, or the remote process died.
1190 LOG(ERROR) << "Error on receiving Mach ports " << from_node
1191 << ". Dropping message.";
1192 return;
1193 }
1194 #endif // defined(OS_WIN)
1195
1196 if (destination == name_) {
1197 // Great, we can deliver this message locally.
1198 OnPortsMessage(from_node, std::move(message));
1199 return;
1200 }
1201
1202 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1203 if (peer)
1204 peer->PortsMessageFromRelay(from_node, std::move(message));
1205 else
1206 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1207 }
1208
OnPortsMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1209 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1210 const ports::NodeName& source_node,
1211 Channel::MessagePtr message) {
1212 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1213 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1214 DropPeer(from_node, nullptr);
1215 return;
1216 }
1217
1218 OnPortsMessage(source_node, std::move(message));
1219 }
1220 #endif
1221
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1222 void NodeController::OnChannelError(const ports::NodeName& from_node,
1223 NodeChannel* channel) {
1224 if (io_task_runner_->RunsTasksOnCurrentThread()) {
1225 DropPeer(from_node, channel);
1226 // DropPeer may have caused local port closures, so be sure to process any
1227 // pending local messages.
1228 AcceptIncomingMessages();
1229 } else {
1230 io_task_runner_->PostTask(
1231 FROM_HERE,
1232 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1233 from_node, channel));
1234 }
1235 }
1236
1237 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1238 MachPortRelay* NodeController::GetMachPortRelay() {
1239 {
1240 base::AutoLock lock(parent_lock_);
1241 // Return null if we're not the root.
1242 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1243 return nullptr;
1244 }
1245
1246 base::AutoLock lock(mach_port_relay_lock_);
1247 return mach_port_relay_.get();
1248 }
1249 #endif
1250
DestroyOnIOThreadShutdown()1251 void NodeController::DestroyOnIOThreadShutdown() {
1252 destroy_on_io_thread_shutdown_ = true;
1253 }
1254
AttemptShutdownIfRequested()1255 void NodeController::AttemptShutdownIfRequested() {
1256 if (!shutdown_callback_flag_)
1257 return;
1258
1259 base::Closure callback;
1260 {
1261 base::AutoLock lock(shutdown_lock_);
1262 if (shutdown_callback_.is_null())
1263 return;
1264 if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
1265 DVLOG(2) << "Unable to cleanly shut down node " << name_;
1266 return;
1267 }
1268
1269 callback = shutdown_callback_;
1270 shutdown_callback_.Reset();
1271 shutdown_callback_flag_.Set(false);
1272 }
1273
1274 DCHECK(!callback.is_null());
1275
1276 callback.Run();
1277 }
1278
1279 } // namespace edk
1280 } // namespace mojo
1281