1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/node_controller.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/containers/queue.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/message_loop/message_loop_current.h"
17 #include "base/metrics/histogram_macros.h"
18 #include "base/process/process_handle.h"
19 #include "base/rand_util.h"
20 #include "base/time/time.h"
21 #include "base/timer/elapsed_timer.h"
22 #include "mojo/core/broker.h"
23 #include "mojo/core/broker_host.h"
24 #include "mojo/core/configuration.h"
25 #include "mojo/core/core.h"
26 #include "mojo/core/request_context.h"
27 #include "mojo/core/user_message_impl.h"
28 #include "mojo/public/cpp/platform/named_platform_channel.h"
29 #include "mojo/public/cpp/platform/platform_channel.h"
30
31 #if defined(OS_WIN)
32 #include <windows.h>
33 #endif
34
35 #if defined(OS_MACOSX) && !defined(OS_IOS)
36 #include "mojo/core/mach_port_relay.h"
37 #endif
38
39 #if !defined(OS_NACL)
40 #include "crypto/random.h"
41 #endif
42
43 namespace mojo {
44 namespace core {
45
46 namespace {
47
48 #if defined(OS_NACL)
49 template <typename T>
GenerateRandomName(T * out)50 void GenerateRandomName(T* out) {
51 base::RandBytes(out, sizeof(T));
52 }
53 #else
54 template <typename T>
55 void GenerateRandomName(T* out) {
56 crypto::RandBytes(out, sizeof(T));
57 }
58 #endif
59
GetRandomNodeName()60 ports::NodeName GetRandomNodeName() {
61 ports::NodeName name;
62 GenerateRandomName(&name);
63 return name;
64 }
65
SerializeEventMessage(ports::ScopedEvent event)66 Channel::MessagePtr SerializeEventMessage(ports::ScopedEvent event) {
67 if (event->type() == ports::Event::Type::kUserMessage) {
68 // User message events must already be partially serialized.
69 return UserMessageImpl::FinalizeEventMessage(
70 ports::Event::Cast<ports::UserMessageEvent>(&event));
71 }
72
73 void* data;
74 size_t size = event->GetSerializedSize();
75 auto message = NodeChannel::CreateEventMessage(size, size, &data, 0);
76 event->Serialize(data);
77 return message;
78 }
79
DeserializeEventMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)80 ports::ScopedEvent DeserializeEventMessage(
81 const ports::NodeName& from_node,
82 Channel::MessagePtr channel_message) {
83 void* data;
84 size_t size;
85 NodeChannel::GetEventMessageData(channel_message.get(), &data, &size);
86 auto event = ports::Event::Deserialize(data, size);
87 if (!event)
88 return nullptr;
89
90 if (event->type() != ports::Event::Type::kUserMessage)
91 return event;
92
93 // User messages require extra parsing.
94 const size_t event_size = event->GetSerializedSize();
95
96 // Note that if this weren't true, the event couldn't have been deserialized
97 // in the first place.
98 DCHECK_LE(event_size, size);
99
100 auto message_event = ports::Event::Cast<ports::UserMessageEvent>(&event);
101 auto message = UserMessageImpl::CreateFromChannelMessage(
102 message_event.get(), std::move(channel_message),
103 static_cast<uint8_t*>(data) + event_size, size - event_size);
104 message->set_source_node(from_node);
105
106 message_event->AttachMessage(std::move(message));
107 return std::move(message_event);
108 }
109
110 // Used by NodeController to watch for shutdown. Since no IO can happen once
111 // the IO thread is killed, the NodeController can cleanly drop all its peers
112 // at that time.
113 class ThreadDestructionObserver
114 : public base::MessageLoopCurrent::DestructionObserver {
115 public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)116 static void Create(scoped_refptr<base::TaskRunner> task_runner,
117 const base::Closure& callback) {
118 if (task_runner->RunsTasksInCurrentSequence()) {
119 // Owns itself.
120 new ThreadDestructionObserver(callback);
121 } else {
122 task_runner->PostTask(FROM_HERE,
123 base::Bind(&Create, task_runner, callback));
124 }
125 }
126
127 private:
ThreadDestructionObserver(const base::Closure & callback)128 explicit ThreadDestructionObserver(const base::Closure& callback)
129 : callback_(callback) {
130 base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
131 }
132
~ThreadDestructionObserver()133 ~ThreadDestructionObserver() override {
134 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
135 }
136
137 // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()138 void WillDestroyCurrentMessageLoop() override {
139 callback_.Run();
140 delete this;
141 }
142
143 const base::Closure callback_;
144
145 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
146 };
147
148 } // namespace
149
~NodeController()150 NodeController::~NodeController() {}
151
NodeController(Core * core)152 NodeController::NodeController(Core* core)
153 : core_(core),
154 name_(GetRandomNodeName()),
155 node_(new ports::Node(name_, this)) {
156 DVLOG(1) << "Initializing node " << name_;
157 }
158
159 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)160 void NodeController::CreateMachPortRelay(base::PortProvider* port_provider) {
161 base::AutoLock lock(mach_port_relay_lock_);
162 DCHECK(!mach_port_relay_);
163 mach_port_relay_.reset(new MachPortRelay(port_provider));
164 }
165 #endif
166
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)167 void NodeController::SetIOTaskRunner(
168 scoped_refptr<base::TaskRunner> task_runner) {
169 io_task_runner_ = task_runner;
170 ThreadDestructionObserver::Create(
171 io_task_runner_,
172 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
173 }
174
SendBrokerClientInvitation(base::ProcessHandle target_process,ConnectionParams connection_params,const std::vector<std::pair<std::string,ports::PortRef>> & attached_ports,const ProcessErrorCallback & process_error_callback)175 void NodeController::SendBrokerClientInvitation(
176 base::ProcessHandle target_process,
177 ConnectionParams connection_params,
178 const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
179 const ProcessErrorCallback& process_error_callback) {
180 // Generate the temporary remote node name here so that it can be associated
181 // with the ports "attached" to this invitation.
182 ports::NodeName temporary_node_name;
183 GenerateRandomName(&temporary_node_name);
184
185 {
186 base::AutoLock lock(reserved_ports_lock_);
187 PortMap& port_map = reserved_ports_[temporary_node_name];
188 for (auto& entry : attached_ports) {
189 auto result = port_map.emplace(entry.first, entry.second);
190 DCHECK(result.second) << "Duplicate attachment: " << entry.first;
191 }
192 }
193
194 ScopedProcessHandle scoped_target_process =
195 ScopedProcessHandle::CloneFrom(target_process);
196 io_task_runner_->PostTask(
197 FROM_HERE,
198 base::BindOnce(&NodeController::SendBrokerClientInvitationOnIOThread,
199 base::Unretained(this), std::move(scoped_target_process),
200 std::move(connection_params), temporary_node_name,
201 process_error_callback));
202 }
203
AcceptBrokerClientInvitation(ConnectionParams connection_params)204 void NodeController::AcceptBrokerClientInvitation(
205 ConnectionParams connection_params) {
206 DCHECK(!GetConfiguration().is_broker_process);
207 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
208 // Use the bootstrap channel for the broker and receive the node's channel
209 // synchronously as the first message from the broker.
210 DCHECK(connection_params.endpoint().is_valid());
211 base::ElapsedTimer timer;
212 broker_ = std::make_unique<Broker>(
213 connection_params.TakeEndpoint().TakePlatformHandle());
214 PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint();
215
216 if (!endpoint.is_valid()) {
217 // Most likely the inviter's side of the channel has already been closed and
218 // the broker was unable to negotiate a NodeChannel pipe. In this case we
219 // can cancel our connection to our inviter.
220 DVLOG(1) << "Cannot connect to invalid inviter channel.";
221 CancelPendingPortMerges();
222 return;
223 }
224 connection_params = ConnectionParams(std::move(endpoint));
225 #endif
226
227 io_task_runner_->PostTask(
228 FROM_HERE,
229 base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread,
230 base::Unretained(this), std::move(connection_params)));
231 }
232
ConnectIsolated(ConnectionParams connection_params,const ports::PortRef & port,base::StringPiece connection_name)233 void NodeController::ConnectIsolated(ConnectionParams connection_params,
234 const ports::PortRef& port,
235 base::StringPiece connection_name) {
236 io_task_runner_->PostTask(
237 FROM_HERE,
238 base::BindOnce(&NodeController::ConnectIsolatedOnIOThread,
239 base::Unretained(this), base::Passed(&connection_params),
240 port, connection_name.as_string()));
241 }
242
SetPortObserver(const ports::PortRef & port,scoped_refptr<PortObserver> observer)243 void NodeController::SetPortObserver(const ports::PortRef& port,
244 scoped_refptr<PortObserver> observer) {
245 node_->SetUserData(port, std::move(observer));
246 }
247
ClosePort(const ports::PortRef & port)248 void NodeController::ClosePort(const ports::PortRef& port) {
249 SetPortObserver(port, nullptr);
250 int rv = node_->ClosePort(port);
251 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
252 }
253
SendUserMessage(const ports::PortRef & port,std::unique_ptr<ports::UserMessageEvent> message)254 int NodeController::SendUserMessage(
255 const ports::PortRef& port,
256 std::unique_ptr<ports::UserMessageEvent> message) {
257 return node_->SendUserMessage(port, std::move(message));
258 }
259
MergePortIntoInviter(const std::string & name,const ports::PortRef & port)260 void NodeController::MergePortIntoInviter(const std::string& name,
261 const ports::PortRef& port) {
262 scoped_refptr<NodeChannel> inviter;
263 bool reject_merge = false;
264 {
265 // Hold |pending_port_merges_lock_| while getting |inviter|. Otherwise,
266 // there is a race where the inviter can be set, and |pending_port_merges_|
267 // be processed between retrieving |inviter| and adding the merge to
268 // |pending_port_merges_|.
269 base::AutoLock lock(pending_port_merges_lock_);
270 inviter = GetInviterChannel();
271 if (reject_pending_merges_) {
272 reject_merge = true;
273 } else if (!inviter) {
274 pending_port_merges_.push_back(std::make_pair(name, port));
275 return;
276 }
277 }
278 if (reject_merge) {
279 node_->ClosePort(port);
280 DVLOG(2) << "Rejecting port merge for name " << name
281 << " due to closed inviter channel.";
282 return;
283 }
284
285 inviter->RequestPortMerge(port.name(), name);
286 }
287
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)288 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
289 const ports::PortRef& port1) {
290 return node_->MergeLocalPorts(port0, port1);
291 }
292
CreateSharedBuffer(size_t num_bytes)293 base::WritableSharedMemoryRegion NodeController::CreateSharedBuffer(
294 size_t num_bytes) {
295 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
296 // Shared buffer creation failure is fatal, so always use the broker when we
297 // have one; unless of course the embedder forces us not to.
298 if (!GetConfiguration().force_direct_shared_memory_allocation && broker_)
299 return broker_->GetWritableSharedMemoryRegion(num_bytes);
300 #endif
301 return base::WritableSharedMemoryRegion::Create(num_bytes);
302 }
303
RequestShutdown(const base::Closure & callback)304 void NodeController::RequestShutdown(const base::Closure& callback) {
305 {
306 base::AutoLock lock(shutdown_lock_);
307 shutdown_callback_ = callback;
308 shutdown_callback_flag_.Set(true);
309 }
310
311 AttemptShutdownIfRequested();
312 }
313
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)314 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
315 const std::string& error) {
316 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
317 if (peer)
318 peer->NotifyBadMessage(error);
319 }
320
SendBrokerClientInvitationOnIOThread(ScopedProcessHandle target_process,ConnectionParams connection_params,ports::NodeName temporary_node_name,const ProcessErrorCallback & process_error_callback)321 void NodeController::SendBrokerClientInvitationOnIOThread(
322 ScopedProcessHandle target_process,
323 ConnectionParams connection_params,
324 ports::NodeName temporary_node_name,
325 const ProcessErrorCallback& process_error_callback) {
326 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
327
328 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
329 PlatformChannel node_channel;
330 ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint());
331 // BrokerHost owns itself.
332 BrokerHost* broker_host =
333 new BrokerHost(target_process.get(), std::move(connection_params),
334 process_error_callback);
335 bool channel_ok = broker_host->SendChannel(
336 node_channel.TakeRemoteEndpoint().TakePlatformHandle());
337
338 #if defined(OS_WIN)
339 if (!channel_ok) {
340 // On Windows the above operation may fail if the channel is crossing a
341 // session boundary. In that case we fall back to a named pipe.
342 NamedPlatformChannel::Options options;
343 NamedPlatformChannel named_channel(options);
344 node_connection_params =
345 ConnectionParams(named_channel.TakeServerEndpoint());
346 broker_host->SendNamedChannel(named_channel.GetServerName());
347 }
348 #else
349 CHECK(channel_ok);
350 #endif // defined(OS_WIN)
351
352 scoped_refptr<NodeChannel> channel =
353 NodeChannel::Create(this, std::move(node_connection_params),
354 io_task_runner_, process_error_callback);
355
356 #else // !defined(OS_MACOSX) && !defined(OS_NACL)
357 scoped_refptr<NodeChannel> channel =
358 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
359 process_error_callback);
360 #endif // !defined(OS_MACOSX) && !defined(OS_NACL)
361
362 // We set up the invitee channel with a temporary name so it can be identified
363 // as a pending invitee if it writes any messages to the channel. We may start
364 // receiving messages from it (though we shouldn't) as soon as Start() is
365 // called below.
366
367 pending_invitations_.insert(std::make_pair(temporary_node_name, channel));
368
369 channel->SetRemoteNodeName(temporary_node_name);
370 channel->SetRemoteProcessHandle(std::move(target_process));
371 channel->Start();
372
373 channel->AcceptInvitee(name_, temporary_node_name);
374 }
375
AcceptBrokerClientInvitationOnIOThread(ConnectionParams connection_params)376 void NodeController::AcceptBrokerClientInvitationOnIOThread(
377 ConnectionParams connection_params) {
378 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
379
380 {
381 base::AutoLock lock(inviter_lock_);
382 DCHECK(inviter_name_ == ports::kInvalidNodeName);
383
384 // At this point we don't know the inviter's name, so we can't yet insert it
385 // into our |peers_| map. That will happen as soon as we receive an
386 // AcceptInvitee message from them.
387 bootstrap_inviter_channel_ =
388 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
389 ProcessErrorCallback());
390 // Prevent the inviter pipe handle from being closed on shutdown. Pipe
391 // closure may be used by the inviter to detect the invitee process has
392 // exited.
393 bootstrap_inviter_channel_->LeakHandleOnShutdown();
394 }
395 bootstrap_inviter_channel_->Start();
396 }
397
ConnectIsolatedOnIOThread(ConnectionParams connection_params,ports::PortRef port,const std::string & connection_name)398 void NodeController::ConnectIsolatedOnIOThread(
399 ConnectionParams connection_params,
400 ports::PortRef port,
401 const std::string& connection_name) {
402 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
403
404 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
405 this, std::move(connection_params), io_task_runner_, {});
406
407 RequestContext request_context;
408 ports::NodeName token;
409 GenerateRandomName(&token);
410 pending_isolated_connections_.emplace(
411 token, IsolatedConnection{channel, port, connection_name});
412 if (!connection_name.empty()) {
413 // If a connection already exists with this name, drop it.
414 auto it = named_isolated_connections_.find(connection_name);
415 if (it != named_isolated_connections_.end()) {
416 ports::NodeName connection_node = it->second;
417 if (connection_node != name_) {
418 DropPeer(connection_node, nullptr);
419 } else {
420 auto pending_it = pending_isolated_connections_.find(connection_node);
421 if (pending_it != pending_isolated_connections_.end()) {
422 node_->ClosePort(pending_it->second.local_port);
423 pending_isolated_connections_.erase(pending_it);
424 }
425 named_isolated_connections_.erase(it);
426 }
427 }
428 named_isolated_connections_.emplace(connection_name, token);
429 }
430
431 channel->SetRemoteNodeName(token);
432 channel->Start();
433
434 channel->AcceptPeer(name_, token, port.name());
435 }
436
GetPeerChannel(const ports::NodeName & name)437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438 const ports::NodeName& name) {
439 base::AutoLock lock(peers_lock_);
440 auto it = peers_.find(name);
441 if (it == peers_.end())
442 return nullptr;
443 return it->second;
444 }
445
GetInviterChannel()446 scoped_refptr<NodeChannel> NodeController::GetInviterChannel() {
447 ports::NodeName inviter_name;
448 {
449 base::AutoLock lock(inviter_lock_);
450 inviter_name = inviter_name_;
451 }
452 return GetPeerChannel(inviter_name);
453 }
454
GetBrokerChannel()455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
456 if (GetConfiguration().is_broker_process)
457 return nullptr;
458
459 ports::NodeName broker_name;
460 {
461 base::AutoLock lock(broker_lock_);
462 broker_name = broker_name_;
463 }
464 return GetPeerChannel(broker_name);
465 }
466
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)467 void NodeController::AddPeer(const ports::NodeName& name,
468 scoped_refptr<NodeChannel> channel,
469 bool start_channel) {
470 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
471
472 DCHECK(name != ports::kInvalidNodeName);
473 DCHECK(channel);
474
475 channel->SetRemoteNodeName(name);
476
477 OutgoingMessageQueue pending_messages;
478 {
479 base::AutoLock lock(peers_lock_);
480 if (peers_.find(name) != peers_.end()) {
481 // This can happen normally if two nodes race to be introduced to each
482 // other. The losing pipe will be silently closed and introduction should
483 // not be affected.
484 DVLOG(1) << "Ignoring duplicate peer name " << name;
485 return;
486 }
487
488 auto result = peers_.insert(std::make_pair(name, channel));
489 DCHECK(result.second);
490
491 DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
492
493 auto it = pending_peer_messages_.find(name);
494 if (it != pending_peer_messages_.end()) {
495 std::swap(pending_messages, it->second);
496 pending_peer_messages_.erase(it);
497 }
498 }
499
500 if (start_channel)
501 channel->Start();
502
503 // Flush any queued message we need to deliver to this node.
504 while (!pending_messages.empty()) {
505 channel->SendChannelMessage(std::move(pending_messages.front()));
506 pending_messages.pop();
507 }
508 }
509
DropPeer(const ports::NodeName & name,NodeChannel * channel)510 void NodeController::DropPeer(const ports::NodeName& name,
511 NodeChannel* channel) {
512 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
513
514 {
515 base::AutoLock lock(peers_lock_);
516 auto it = peers_.find(name);
517
518 if (it != peers_.end()) {
519 ports::NodeName peer = it->first;
520 peers_.erase(it);
521 DVLOG(1) << "Dropped peer " << peer;
522 }
523
524 pending_peer_messages_.erase(name);
525 pending_invitations_.erase(name);
526 }
527
528 std::vector<ports::PortRef> ports_to_close;
529 {
530 // Clean up any reserved ports.
531 base::AutoLock lock(reserved_ports_lock_);
532 auto it = reserved_ports_.find(name);
533 if (it != reserved_ports_.end()) {
534 for (auto& entry : it->second)
535 ports_to_close.emplace_back(entry.second);
536 reserved_ports_.erase(it);
537 }
538 }
539
540 bool is_inviter;
541 {
542 base::AutoLock lock(inviter_lock_);
543 is_inviter = (name == inviter_name_ ||
544 (channel && channel == bootstrap_inviter_channel_));
545 }
546
547 // If the error comes from the inviter channel, we also need to cancel any
548 // port merge requests, so that errors can be propagated to the message
549 // pipes.
550 if (is_inviter)
551 CancelPendingPortMerges();
552
553 auto connection_it = pending_isolated_connections_.find(name);
554 if (connection_it != pending_isolated_connections_.end()) {
555 IsolatedConnection& connection = connection_it->second;
556 ports_to_close.push_back(connection.local_port);
557 if (!connection.name.empty())
558 named_isolated_connections_.erase(connection.name);
559 pending_isolated_connections_.erase(connection_it);
560 }
561
562 for (const auto& port : ports_to_close)
563 node_->ClosePort(port);
564
565 node_->LostConnectionToNode(name);
566 AttemptShutdownIfRequested();
567 }
568
SendPeerEvent(const ports::NodeName & name,ports::ScopedEvent event)569 void NodeController::SendPeerEvent(const ports::NodeName& name,
570 ports::ScopedEvent event) {
571 Channel::MessagePtr event_message = SerializeEventMessage(std::move(event));
572 if (!event_message)
573 return;
574 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
575 #if defined(OS_WIN)
576 if (event_message->has_handles()) {
577 // If we're sending a message with handles we aren't the destination
578 // node's inviter or broker (i.e. we don't know its process handle), ask
579 // the broker to relay for us.
580 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
581 if (!peer || !peer->HasRemoteProcessHandle()) {
582 if (!GetConfiguration().is_broker_process && broker) {
583 broker->RelayEventMessage(name, std::move(event_message));
584 } else {
585 base::AutoLock lock(broker_lock_);
586 pending_relay_messages_[name].emplace(std::move(event_message));
587 }
588 return;
589 }
590 }
591 #elif defined(OS_MACOSX) && !defined(OS_IOS)
592 if (event_message->has_mach_ports()) {
593 // Messages containing Mach ports are always routed through the broker, even
594 // if the broker process is the intended recipient.
595 bool use_broker = false;
596 if (!GetConfiguration().is_broker_process) {
597 base::AutoLock lock(inviter_lock_);
598 use_broker = (bootstrap_inviter_channel_ ||
599 inviter_name_ != ports::kInvalidNodeName);
600 }
601
602 if (use_broker) {
603 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
604 if (broker) {
605 broker->RelayEventMessage(name, std::move(event_message));
606 } else {
607 base::AutoLock lock(broker_lock_);
608 pending_relay_messages_[name].emplace(std::move(event_message));
609 }
610 return;
611 }
612 }
613 #endif // defined(OS_WIN)
614
615 if (peer) {
616 peer->SendChannelMessage(std::move(event_message));
617 return;
618 }
619
620 // If we don't know who the peer is and we are the broker, we can only assume
621 // the peer is invalid, i.e., it's either a junk name or has already been
622 // disconnected.
623 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
624 if (!broker) {
625 DVLOG(1) << "Dropping message for unknown peer: " << name;
626 return;
627 }
628
629 // If we aren't the broker, assume we just need to be introduced and queue
630 // until that can be either confirmed or denied by the broker.
631 bool needs_introduction = false;
632 {
633 base::AutoLock lock(peers_lock_);
634 // We may have been introduced on another thread by the time we get here.
635 // Double-check to be safe.
636 auto it = peers_.find(name);
637 if (it == peers_.end()) {
638 auto& queue = pending_peer_messages_[name];
639 needs_introduction = queue.empty();
640 queue.emplace(std::move(event_message));
641 } else {
642 peer = it->second;
643 }
644 }
645 if (needs_introduction)
646 broker->RequestIntroduction(name);
647 else if (peer)
648 peer->SendChannelMessage(std::move(event_message));
649 }
650
DropAllPeers()651 void NodeController::DropAllPeers() {
652 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
653
654 std::vector<scoped_refptr<NodeChannel>> all_peers;
655 {
656 base::AutoLock lock(inviter_lock_);
657 if (bootstrap_inviter_channel_) {
658 // |bootstrap_inviter_channel_| isn't null'd here becuase we rely on its
659 // existence to determine whether or not this is the root node. Once
660 // bootstrap_inviter_channel_->ShutDown() has been called,
661 // |bootstrap_inviter_channel_| is essentially a dead object and it
662 // doesn't matter if it's deleted now or when |this| is deleted. Note:
663 // |bootstrap_inviter_channel_| is only modified on the IO thread.
664 all_peers.push_back(bootstrap_inviter_channel_);
665 }
666 }
667
668 {
669 base::AutoLock lock(peers_lock_);
670 for (const auto& peer : peers_)
671 all_peers.push_back(peer.second);
672 for (const auto& peer : pending_invitations_)
673 all_peers.push_back(peer.second);
674 peers_.clear();
675 pending_invitations_.clear();
676 pending_peer_messages_.clear();
677 pending_isolated_connections_.clear();
678 named_isolated_connections_.clear();
679 }
680
681 for (const auto& peer : all_peers)
682 peer->ShutDown();
683
684 if (destroy_on_io_thread_shutdown_)
685 delete this;
686 }
687
ForwardEvent(const ports::NodeName & node,ports::ScopedEvent event)688 void NodeController::ForwardEvent(const ports::NodeName& node,
689 ports::ScopedEvent event) {
690 DCHECK(event);
691 if (node == name_)
692 node_->AcceptEvent(std::move(event));
693 else
694 SendPeerEvent(node, std::move(event));
695
696 AttemptShutdownIfRequested();
697 }
698
BroadcastEvent(ports::ScopedEvent event)699 void NodeController::BroadcastEvent(ports::ScopedEvent event) {
700 Channel::MessagePtr channel_message = SerializeEventMessage(std::move(event));
701 DCHECK(channel_message && !channel_message->has_handles());
702
703 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
704 if (broker)
705 broker->Broadcast(std::move(channel_message));
706 else
707 OnBroadcast(name_, std::move(channel_message));
708 }
709
PortStatusChanged(const ports::PortRef & port)710 void NodeController::PortStatusChanged(const ports::PortRef& port) {
711 scoped_refptr<ports::UserData> user_data;
712 node_->GetUserData(port, &user_data);
713
714 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
715 if (observer) {
716 observer->OnPortStatusChanged();
717 } else {
718 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
719 << "doesn't have an observer.";
720 }
721 }
722
OnAcceptInvitee(const ports::NodeName & from_node,const ports::NodeName & inviter_name,const ports::NodeName & token)723 void NodeController::OnAcceptInvitee(const ports::NodeName& from_node,
724 const ports::NodeName& inviter_name,
725 const ports::NodeName& token) {
726 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
727
728 scoped_refptr<NodeChannel> inviter;
729 {
730 base::AutoLock lock(inviter_lock_);
731 if (bootstrap_inviter_channel_ &&
732 inviter_name_ == ports::kInvalidNodeName) {
733 inviter_name_ = inviter_name;
734 inviter = bootstrap_inviter_channel_;
735 }
736 }
737
738 if (!inviter) {
739 DLOG(ERROR) << "Unexpected AcceptInvitee message from " << from_node;
740 DropPeer(from_node, nullptr);
741 return;
742 }
743
744 inviter->SetRemoteNodeName(inviter_name);
745 inviter->AcceptInvitation(token, name_);
746
747 // NOTE: The invitee does not actually add its inviter as a peer until
748 // receiving an AcceptBrokerClient message from the broker. The inviter will
749 // request that said message be sent upon receiving AcceptInvitation.
750
751 DVLOG(1) << "Broker client " << name_ << " accepting invitation from "
752 << inviter_name;
753 }
754
OnAcceptInvitation(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & invitee_name)755 void NodeController::OnAcceptInvitation(const ports::NodeName& from_node,
756 const ports::NodeName& token,
757 const ports::NodeName& invitee_name) {
758 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
759
760 auto it = pending_invitations_.find(from_node);
761 if (it == pending_invitations_.end() || token != from_node) {
762 DLOG(ERROR) << "Received unexpected AcceptInvitation message from "
763 << from_node;
764 DropPeer(from_node, nullptr);
765 return;
766 }
767
768 {
769 base::AutoLock lock(reserved_ports_lock_);
770 auto it = reserved_ports_.find(from_node);
771 if (it != reserved_ports_.end()) {
772 // Swap the temporary node name's reserved ports into an entry keyed by
773 // the real node name.
774 auto result =
775 reserved_ports_.emplace(invitee_name, std::move(it->second));
776 DCHECK(result.second);
777 reserved_ports_.erase(it);
778 }
779 }
780
781 scoped_refptr<NodeChannel> channel = it->second;
782 pending_invitations_.erase(it);
783
784 DCHECK(channel);
785
786 DVLOG(1) << "Node " << name_ << " accepted invitee " << invitee_name;
787
788 AddPeer(invitee_name, channel, false /* start_channel */);
789
790 // TODO(rockot): We could simplify invitee initialization if we could
791 // synchronously get a new async broker channel from the broker. For now we do
792 // it asynchronously since it's only used to facilitate handle passing, not
793 // handle creation.
794 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
795 if (broker) {
796 // Inform the broker of this new client.
797 broker->AddBrokerClient(invitee_name, channel->CloneRemoteProcessHandle());
798 } else {
799 // If we have no broker, either we need to wait for one, or we *are* the
800 // broker.
801 scoped_refptr<NodeChannel> inviter = GetInviterChannel();
802 if (!inviter) {
803 base::AutoLock lock(inviter_lock_);
804 inviter = bootstrap_inviter_channel_;
805 }
806
807 if (!inviter) {
808 // Yes, we're the broker. We can initialize the client directly.
809 channel->AcceptBrokerClient(name_, PlatformHandle());
810 } else {
811 // We aren't the broker, so wait for a broker connection.
812 base::AutoLock lock(broker_lock_);
813 pending_broker_clients_.push(invitee_name);
814 }
815 }
816 }
817
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)818 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
819 const ports::NodeName& client_name,
820 base::ProcessHandle process_handle) {
821 ScopedProcessHandle scoped_process_handle(process_handle);
822
823 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
824 if (!sender) {
825 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
826 return;
827 }
828
829 if (GetPeerChannel(client_name)) {
830 DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
831 DropPeer(from_node, nullptr);
832 return;
833 }
834
835 PlatformChannel broker_channel;
836 ConnectionParams connection_params(broker_channel.TakeLocalEndpoint());
837 scoped_refptr<NodeChannel> client =
838 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
839 ProcessErrorCallback());
840
841 #if defined(OS_WIN)
842 // The broker must have a working handle to the client process in order to
843 // properly copy other handles to and from the client.
844 if (!scoped_process_handle.is_valid()) {
845 DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
846 return;
847 }
848 #endif
849 client->SetRemoteProcessHandle(std::move(scoped_process_handle));
850
851 AddPeer(client_name, client, true /* start_channel */);
852
853 DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
854 << " from peer " << from_node;
855
856 sender->BrokerClientAdded(
857 client_name, broker_channel.TakeRemoteEndpoint().TakePlatformHandle());
858 }
859
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,PlatformHandle broker_channel)860 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
861 const ports::NodeName& client_name,
862 PlatformHandle broker_channel) {
863 scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
864 if (!client) {
865 DLOG(ERROR) << "BrokerClientAdded for unknown client " << client_name;
866 return;
867 }
868
869 // This should have come from our own broker.
870 if (GetBrokerChannel() != GetPeerChannel(from_node)) {
871 DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
872 return;
873 }
874
875 DVLOG(1) << "Client " << client_name << " accepted by broker " << from_node;
876
877 client->AcceptBrokerClient(from_node, std::move(broker_channel));
878 }
879
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,PlatformHandle broker_channel)880 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
881 const ports::NodeName& broker_name,
882 PlatformHandle broker_channel) {
883 DCHECK(!GetConfiguration().is_broker_process);
884
885 // This node should already have an inviter in bootstrap mode.
886 ports::NodeName inviter_name;
887 scoped_refptr<NodeChannel> inviter;
888 {
889 base::AutoLock lock(inviter_lock_);
890 inviter_name = inviter_name_;
891 inviter = bootstrap_inviter_channel_;
892 bootstrap_inviter_channel_ = nullptr;
893 }
894 DCHECK(inviter_name == from_node);
895 DCHECK(inviter);
896
897 base::queue<ports::NodeName> pending_broker_clients;
898 std::unordered_map<ports::NodeName, OutgoingMessageQueue>
899 pending_relay_messages;
900 {
901 base::AutoLock lock(broker_lock_);
902 broker_name_ = broker_name;
903 std::swap(pending_broker_clients, pending_broker_clients_);
904 std::swap(pending_relay_messages, pending_relay_messages_);
905 }
906 DCHECK(broker_name != ports::kInvalidNodeName);
907
908 // It's now possible to add both the broker and the inviter as peers.
909 // Note that the broker and inviter may be the same node.
910 scoped_refptr<NodeChannel> broker;
911 if (broker_name == inviter_name) {
912 DCHECK(!broker_channel.is_valid());
913 broker = inviter;
914 } else {
915 DCHECK(broker_channel.is_valid());
916 broker = NodeChannel::Create(
917 this,
918 ConnectionParams(PlatformChannelEndpoint(std::move(broker_channel))),
919 io_task_runner_, ProcessErrorCallback());
920 AddPeer(broker_name, broker, true /* start_channel */);
921 }
922
923 AddPeer(inviter_name, inviter, false /* start_channel */);
924
925 {
926 // Complete any port merge requests we have waiting for the inviter.
927 base::AutoLock lock(pending_port_merges_lock_);
928 for (const auto& request : pending_port_merges_)
929 inviter->RequestPortMerge(request.second.name(), request.first);
930 pending_port_merges_.clear();
931 }
932
933 // Feed the broker any pending invitees of our own.
934 while (!pending_broker_clients.empty()) {
935 const ports::NodeName& invitee_name = pending_broker_clients.front();
936 auto it = peers_.find(invitee_name);
937 if (it != peers_.end()) {
938 broker->AddBrokerClient(invitee_name,
939 it->second->CloneRemoteProcessHandle());
940 }
941 pending_broker_clients.pop();
942 }
943
944 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
945 // Have the broker relay any messages we have waiting.
946 for (auto& entry : pending_relay_messages) {
947 const ports::NodeName& destination = entry.first;
948 auto& message_queue = entry.second;
949 while (!message_queue.empty()) {
950 broker->RelayEventMessage(destination, std::move(message_queue.front()));
951 message_queue.pop();
952 }
953 }
954 #endif
955
956 DVLOG(1) << "Client " << name_ << " accepted by broker " << broker_name;
957 }
958
OnEventMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)959 void NodeController::OnEventMessage(const ports::NodeName& from_node,
960 Channel::MessagePtr channel_message) {
961 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
962
963 auto event = DeserializeEventMessage(from_node, std::move(channel_message));
964 if (!event) {
965 // We silently ignore unparseable events, as they may come from a process
966 // running a newer version of Mojo.
967 DVLOG(1) << "Ignoring invalid or unknown event from " << from_node;
968 return;
969 }
970
971 node_->AcceptEvent(std::move(event));
972
973 AttemptShutdownIfRequested();
974 }
975
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & name)976 void NodeController::OnRequestPortMerge(
977 const ports::NodeName& from_node,
978 const ports::PortName& connector_port_name,
979 const std::string& name) {
980 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
981
982 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name
983 << " and port " << connector_port_name << "@" << from_node;
984
985 ports::PortRef local_port;
986 {
987 base::AutoLock lock(reserved_ports_lock_);
988 auto it = reserved_ports_.find(from_node);
989 // TODO(https://crbug.com/822034): We should send a notification back to the
990 // requestor so they can clean up their dangling port in this failure case.
991 // This requires changes to the internal protocol, which can't be made yet.
992 // Until this is done, pipes from |MojoExtractMessagePipeFromInvitation()|
993 // will never break if the given name was invalid.
994 if (it == reserved_ports_.end()) {
995 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". "
996 << "No ports reserved for that node.";
997 return;
998 }
999
1000 PortMap& port_map = it->second;
1001 auto port_it = port_map.find(name);
1002 if (port_it == port_map.end()) {
1003 DVLOG(1) << "Ignoring request to connect to port for unknown name "
1004 << name << " from node " << from_node;
1005 return;
1006 }
1007 local_port = port_it->second;
1008 port_map.erase(port_it);
1009 if (port_map.empty())
1010 reserved_ports_.erase(it);
1011 }
1012
1013 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1014 if (rv != ports::OK)
1015 DLOG(ERROR) << "MergePorts failed: " << rv;
1016 }
1017
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1018 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1019 const ports::NodeName& name) {
1020 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1021
1022 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1023 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1024 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1025 << from_node;
1026 DropPeer(from_node, nullptr);
1027 return;
1028 }
1029
1030 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1031 if (!new_friend) {
1032 // We don't know who they're talking about!
1033 requestor->Introduce(name, PlatformHandle());
1034 } else {
1035 PlatformChannel new_channel;
1036 requestor->Introduce(name,
1037 new_channel.TakeLocalEndpoint().TakePlatformHandle());
1038 new_friend->Introduce(
1039 from_node, new_channel.TakeRemoteEndpoint().TakePlatformHandle());
1040 }
1041 }
1042
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,PlatformHandle channel_handle)1043 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1044 const ports::NodeName& name,
1045 PlatformHandle channel_handle) {
1046 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1047
1048 if (!channel_handle.is_valid()) {
1049 node_->LostConnectionToNode(name);
1050
1051 DVLOG(1) << "Could not be introduced to peer " << name;
1052 base::AutoLock lock(peers_lock_);
1053 pending_peer_messages_.erase(name);
1054 return;
1055 }
1056
1057 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
1058 this,
1059 ConnectionParams(PlatformChannelEndpoint(std::move(channel_handle))),
1060 io_task_runner_, ProcessErrorCallback());
1061
1062 DVLOG(1) << "Adding new peer " << name << " via broker introduction.";
1063 AddPeer(name, channel, true /* start_channel */);
1064 }
1065
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1066 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1067 Channel::MessagePtr message) {
1068 DCHECK(!message->has_handles());
1069
1070 auto event = DeserializeEventMessage(from_node, std::move(message));
1071 if (!event) {
1072 // We silently ignore unparseable events, as they may come from a process
1073 // running a newer version of Mojo.
1074 DVLOG(1) << "Ignoring request to broadcast invalid or unknown event from "
1075 << from_node;
1076 return;
1077 }
1078
1079 base::AutoLock lock(peers_lock_);
1080 for (auto& iter : peers_) {
1081 // Clone and send the event to each known peer. Events which cannot be
1082 // cloned cannot be broadcast.
1083 ports::ScopedEvent clone = event->Clone();
1084 if (!clone) {
1085 DVLOG(1) << "Ignoring request to broadcast invalid event from "
1086 << from_node << " [type=" << static_cast<uint32_t>(event->type())
1087 << "]";
1088 return;
1089 }
1090
1091 iter.second->SendChannelMessage(SerializeEventMessage(std::move(clone)));
1092 }
1093 }
1094
1095 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayEventMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1096 void NodeController::OnRelayEventMessage(const ports::NodeName& from_node,
1097 base::ProcessHandle from_process,
1098 const ports::NodeName& destination,
1099 Channel::MessagePtr message) {
1100 // The broker should always know which process this came from.
1101 DCHECK(from_process != base::kNullProcessHandle);
1102 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1103
1104 if (GetBrokerChannel()) {
1105 // Only the broker should be asked to relay a message.
1106 LOG(ERROR) << "Non-broker refusing to relay message.";
1107 DropPeer(from_node, nullptr);
1108 return;
1109 }
1110
1111 if (destination == name_) {
1112 // Great, we can deliver this message locally.
1113 OnEventMessage(from_node, std::move(message));
1114 return;
1115 }
1116
1117 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1118 if (peer)
1119 peer->EventMessageFromRelay(from_node, std::move(message));
1120 else
1121 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1122 }
1123
OnEventMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1124 void NodeController::OnEventMessageFromRelay(const ports::NodeName& from_node,
1125 const ports::NodeName& source_node,
1126 Channel::MessagePtr message) {
1127 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1128 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1129 DropPeer(from_node, nullptr);
1130 return;
1131 }
1132
1133 OnEventMessage(source_node, std::move(message));
1134 }
1135 #endif
1136
OnAcceptPeer(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & peer_name,const ports::PortName & port_name)1137 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1138 const ports::NodeName& token,
1139 const ports::NodeName& peer_name,
1140 const ports::PortName& port_name) {
1141 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1142
1143 auto it = pending_isolated_connections_.find(from_node);
1144 if (it == pending_isolated_connections_.end()) {
1145 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1146 DropPeer(from_node, nullptr);
1147 return;
1148 }
1149
1150 IsolatedConnection& connection = it->second;
1151 scoped_refptr<NodeChannel> channel = std::move(connection.channel);
1152 ports::PortRef local_port = connection.local_port;
1153 if (!connection.name.empty())
1154 named_isolated_connections_[connection.name] = peer_name;
1155 pending_isolated_connections_.erase(it);
1156 DCHECK(channel);
1157
1158 if (name_ != peer_name) {
1159 // It's possible (e.g. in tests) that we may "connect" to ourself, in which
1160 // case we skip this |AddPeer()| call and go straight to merging ports.
1161 // Note that we explicitly drop any prior connection to the same peer so
1162 // that new isolated connections can replace old ones.
1163 DropPeer(peer_name, nullptr);
1164 AddPeer(peer_name, channel, false /* start_channel */);
1165 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1166 }
1167
1168 // We need to choose one side to initiate the port merge. It doesn't matter
1169 // who does it as long as they don't both try. Simple solution: pick the one
1170 // with the "smaller" port name.
1171 if (local_port.name() < port_name)
1172 node()->MergePorts(local_port, peer_name, port_name);
1173 }
1174
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1175 void NodeController::OnChannelError(const ports::NodeName& from_node,
1176 NodeChannel* channel) {
1177 if (io_task_runner_->RunsTasksInCurrentSequence()) {
1178 RequestContext request_context(RequestContext::Source::SYSTEM);
1179 DropPeer(from_node, channel);
1180 } else {
1181 io_task_runner_->PostTask(
1182 FROM_HERE,
1183 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1184 from_node, base::RetainedRef(channel)));
1185 }
1186 }
1187
1188 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1189 MachPortRelay* NodeController::GetMachPortRelay() {
1190 {
1191 base::AutoLock lock(inviter_lock_);
1192 // Return null if we're not the root.
1193 if (bootstrap_inviter_channel_ || inviter_name_ != ports::kInvalidNodeName)
1194 return nullptr;
1195 }
1196
1197 base::AutoLock lock(mach_port_relay_lock_);
1198 return mach_port_relay_.get();
1199 }
1200 #endif
1201
CancelPendingPortMerges()1202 void NodeController::CancelPendingPortMerges() {
1203 std::vector<ports::PortRef> ports_to_close;
1204
1205 {
1206 base::AutoLock lock(pending_port_merges_lock_);
1207 reject_pending_merges_ = true;
1208 for (const auto& port : pending_port_merges_)
1209 ports_to_close.push_back(port.second);
1210 pending_port_merges_.clear();
1211 }
1212
1213 for (const auto& port : ports_to_close)
1214 node_->ClosePort(port);
1215 }
1216
DestroyOnIOThreadShutdown()1217 void NodeController::DestroyOnIOThreadShutdown() {
1218 destroy_on_io_thread_shutdown_ = true;
1219 }
1220
AttemptShutdownIfRequested()1221 void NodeController::AttemptShutdownIfRequested() {
1222 if (!shutdown_callback_flag_)
1223 return;
1224
1225 base::Closure callback;
1226 {
1227 base::AutoLock lock(shutdown_lock_);
1228 if (shutdown_callback_.is_null())
1229 return;
1230 if (!node_->CanShutdownCleanly(
1231 ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
1232 DVLOG(2) << "Unable to cleanly shut down node " << name_;
1233 return;
1234 }
1235
1236 callback = shutdown_callback_;
1237 shutdown_callback_.Reset();
1238 shutdown_callback_flag_.Set(false);
1239 }
1240
1241 DCHECK(!callback.is_null());
1242
1243 callback.Run();
1244 }
1245
1246 NodeController::IsolatedConnection::IsolatedConnection() = default;
1247
1248 NodeController::IsolatedConnection::IsolatedConnection(
1249 const IsolatedConnection& other) = default;
1250
1251 NodeController::IsolatedConnection::IsolatedConnection(
1252 IsolatedConnection&& other) = default;
1253
IsolatedConnection(scoped_refptr<NodeChannel> channel,const ports::PortRef & local_port,base::StringPiece name)1254 NodeController::IsolatedConnection::IsolatedConnection(
1255 scoped_refptr<NodeChannel> channel,
1256 const ports::PortRef& local_port,
1257 base::StringPiece name)
1258 : channel(std::move(channel)), local_port(local_port), name(name) {}
1259
1260 NodeController::IsolatedConnection::~IsolatedConnection() = default;
1261
1262 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
1263 operator=(const IsolatedConnection& other) = default;
1264
1265 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
1266 operator=(IsolatedConnection&& other) = default;
1267
1268 } // namespace core
1269 } // namespace mojo
1270