• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/node_controller.h"
6 
7 #include <algorithm>
8 #include <limits>
9 #include <vector>
10 
11 #include "base/bind.h"
12 #include "base/containers/queue.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/macros.h"
16 #include "base/message_loop/message_loop_current.h"
17 #include "base/metrics/histogram_macros.h"
18 #include "base/process/process_handle.h"
19 #include "base/rand_util.h"
20 #include "base/time/time.h"
21 #include "base/timer/elapsed_timer.h"
22 #include "mojo/core/broker.h"
23 #include "mojo/core/broker_host.h"
24 #include "mojo/core/configuration.h"
25 #include "mojo/core/core.h"
26 #include "mojo/core/request_context.h"
27 #include "mojo/core/user_message_impl.h"
28 #include "mojo/public/cpp/platform/named_platform_channel.h"
29 #include "mojo/public/cpp/platform/platform_channel.h"
30 
31 #if defined(OS_WIN)
32 #include <windows.h>
33 #endif
34 
35 #if defined(OS_MACOSX) && !defined(OS_IOS)
36 #include "mojo/core/mach_port_relay.h"
37 #endif
38 
39 #if !defined(OS_NACL)
40 #include "crypto/random.h"
41 #endif
42 
43 namespace mojo {
44 namespace core {
45 
46 namespace {
47 
48 #if defined(OS_NACL)
49 template <typename T>
GenerateRandomName(T * out)50 void GenerateRandomName(T* out) {
51   base::RandBytes(out, sizeof(T));
52 }
53 #else
54 template <typename T>
55 void GenerateRandomName(T* out) {
56   crypto::RandBytes(out, sizeof(T));
57 }
58 #endif
59 
GetRandomNodeName()60 ports::NodeName GetRandomNodeName() {
61   ports::NodeName name;
62   GenerateRandomName(&name);
63   return name;
64 }
65 
SerializeEventMessage(ports::ScopedEvent event)66 Channel::MessagePtr SerializeEventMessage(ports::ScopedEvent event) {
67   if (event->type() == ports::Event::Type::kUserMessage) {
68     // User message events must already be partially serialized.
69     return UserMessageImpl::FinalizeEventMessage(
70         ports::Event::Cast<ports::UserMessageEvent>(&event));
71   }
72 
73   void* data;
74   size_t size = event->GetSerializedSize();
75   auto message = NodeChannel::CreateEventMessage(size, size, &data, 0);
76   event->Serialize(data);
77   return message;
78 }
79 
DeserializeEventMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)80 ports::ScopedEvent DeserializeEventMessage(
81     const ports::NodeName& from_node,
82     Channel::MessagePtr channel_message) {
83   void* data;
84   size_t size;
85   NodeChannel::GetEventMessageData(channel_message.get(), &data, &size);
86   auto event = ports::Event::Deserialize(data, size);
87   if (!event)
88     return nullptr;
89 
90   if (event->type() != ports::Event::Type::kUserMessage)
91     return event;
92 
93   // User messages require extra parsing.
94   const size_t event_size = event->GetSerializedSize();
95 
96   // Note that if this weren't true, the event couldn't have been deserialized
97   // in the first place.
98   DCHECK_LE(event_size, size);
99 
100   auto message_event = ports::Event::Cast<ports::UserMessageEvent>(&event);
101   auto message = UserMessageImpl::CreateFromChannelMessage(
102       message_event.get(), std::move(channel_message),
103       static_cast<uint8_t*>(data) + event_size, size - event_size);
104   message->set_source_node(from_node);
105 
106   message_event->AttachMessage(std::move(message));
107   return std::move(message_event);
108 }
109 
110 // Used by NodeController to watch for shutdown. Since no IO can happen once
111 // the IO thread is killed, the NodeController can cleanly drop all its peers
112 // at that time.
113 class ThreadDestructionObserver
114     : public base::MessageLoopCurrent::DestructionObserver {
115  public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)116   static void Create(scoped_refptr<base::TaskRunner> task_runner,
117                      const base::Closure& callback) {
118     if (task_runner->RunsTasksInCurrentSequence()) {
119       // Owns itself.
120       new ThreadDestructionObserver(callback);
121     } else {
122       task_runner->PostTask(FROM_HERE,
123                             base::Bind(&Create, task_runner, callback));
124     }
125   }
126 
127  private:
ThreadDestructionObserver(const base::Closure & callback)128   explicit ThreadDestructionObserver(const base::Closure& callback)
129       : callback_(callback) {
130     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
131   }
132 
~ThreadDestructionObserver()133   ~ThreadDestructionObserver() override {
134     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
135   }
136 
137   // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()138   void WillDestroyCurrentMessageLoop() override {
139     callback_.Run();
140     delete this;
141   }
142 
143   const base::Closure callback_;
144 
145   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
146 };
147 
148 }  // namespace
149 
~NodeController()150 NodeController::~NodeController() {}
151 
NodeController(Core * core)152 NodeController::NodeController(Core* core)
153     : core_(core),
154       name_(GetRandomNodeName()),
155       node_(new ports::Node(name_, this)) {
156   DVLOG(1) << "Initializing node " << name_;
157 }
158 
159 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)160 void NodeController::CreateMachPortRelay(base::PortProvider* port_provider) {
161   base::AutoLock lock(mach_port_relay_lock_);
162   DCHECK(!mach_port_relay_);
163   mach_port_relay_.reset(new MachPortRelay(port_provider));
164 }
165 #endif
166 
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)167 void NodeController::SetIOTaskRunner(
168     scoped_refptr<base::TaskRunner> task_runner) {
169   io_task_runner_ = task_runner;
170   ThreadDestructionObserver::Create(
171       io_task_runner_,
172       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
173 }
174 
SendBrokerClientInvitation(base::ProcessHandle target_process,ConnectionParams connection_params,const std::vector<std::pair<std::string,ports::PortRef>> & attached_ports,const ProcessErrorCallback & process_error_callback)175 void NodeController::SendBrokerClientInvitation(
176     base::ProcessHandle target_process,
177     ConnectionParams connection_params,
178     const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
179     const ProcessErrorCallback& process_error_callback) {
180   // Generate the temporary remote node name here so that it can be associated
181   // with the ports "attached" to this invitation.
182   ports::NodeName temporary_node_name;
183   GenerateRandomName(&temporary_node_name);
184 
185   {
186     base::AutoLock lock(reserved_ports_lock_);
187     PortMap& port_map = reserved_ports_[temporary_node_name];
188     for (auto& entry : attached_ports) {
189       auto result = port_map.emplace(entry.first, entry.second);
190       DCHECK(result.second) << "Duplicate attachment: " << entry.first;
191     }
192   }
193 
194   ScopedProcessHandle scoped_target_process =
195       ScopedProcessHandle::CloneFrom(target_process);
196   io_task_runner_->PostTask(
197       FROM_HERE,
198       base::BindOnce(&NodeController::SendBrokerClientInvitationOnIOThread,
199                      base::Unretained(this), std::move(scoped_target_process),
200                      std::move(connection_params), temporary_node_name,
201                      process_error_callback));
202 }
203 
AcceptBrokerClientInvitation(ConnectionParams connection_params)204 void NodeController::AcceptBrokerClientInvitation(
205     ConnectionParams connection_params) {
206   DCHECK(!GetConfiguration().is_broker_process);
207 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
208   // Use the bootstrap channel for the broker and receive the node's channel
209   // synchronously as the first message from the broker.
210   DCHECK(connection_params.endpoint().is_valid());
211   base::ElapsedTimer timer;
212   broker_ = std::make_unique<Broker>(
213       connection_params.TakeEndpoint().TakePlatformHandle());
214   PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint();
215 
216   if (!endpoint.is_valid()) {
217     // Most likely the inviter's side of the channel has already been closed and
218     // the broker was unable to negotiate a NodeChannel pipe. In this case we
219     // can cancel our connection to our inviter.
220     DVLOG(1) << "Cannot connect to invalid inviter channel.";
221     CancelPendingPortMerges();
222     return;
223   }
224   connection_params = ConnectionParams(std::move(endpoint));
225 #endif
226 
227   io_task_runner_->PostTask(
228       FROM_HERE,
229       base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread,
230                      base::Unretained(this), std::move(connection_params)));
231 }
232 
ConnectIsolated(ConnectionParams connection_params,const ports::PortRef & port,base::StringPiece connection_name)233 void NodeController::ConnectIsolated(ConnectionParams connection_params,
234                                      const ports::PortRef& port,
235                                      base::StringPiece connection_name) {
236   io_task_runner_->PostTask(
237       FROM_HERE,
238       base::BindOnce(&NodeController::ConnectIsolatedOnIOThread,
239                      base::Unretained(this), base::Passed(&connection_params),
240                      port, connection_name.as_string()));
241 }
242 
SetPortObserver(const ports::PortRef & port,scoped_refptr<PortObserver> observer)243 void NodeController::SetPortObserver(const ports::PortRef& port,
244                                      scoped_refptr<PortObserver> observer) {
245   node_->SetUserData(port, std::move(observer));
246 }
247 
ClosePort(const ports::PortRef & port)248 void NodeController::ClosePort(const ports::PortRef& port) {
249   SetPortObserver(port, nullptr);
250   int rv = node_->ClosePort(port);
251   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
252 }
253 
SendUserMessage(const ports::PortRef & port,std::unique_ptr<ports::UserMessageEvent> message)254 int NodeController::SendUserMessage(
255     const ports::PortRef& port,
256     std::unique_ptr<ports::UserMessageEvent> message) {
257   return node_->SendUserMessage(port, std::move(message));
258 }
259 
MergePortIntoInviter(const std::string & name,const ports::PortRef & port)260 void NodeController::MergePortIntoInviter(const std::string& name,
261                                           const ports::PortRef& port) {
262   scoped_refptr<NodeChannel> inviter;
263   bool reject_merge = false;
264   {
265     // Hold |pending_port_merges_lock_| while getting |inviter|. Otherwise,
266     // there is a race where the inviter can be set, and |pending_port_merges_|
267     // be processed between retrieving |inviter| and adding the merge to
268     // |pending_port_merges_|.
269     base::AutoLock lock(pending_port_merges_lock_);
270     inviter = GetInviterChannel();
271     if (reject_pending_merges_) {
272       reject_merge = true;
273     } else if (!inviter) {
274       pending_port_merges_.push_back(std::make_pair(name, port));
275       return;
276     }
277   }
278   if (reject_merge) {
279     node_->ClosePort(port);
280     DVLOG(2) << "Rejecting port merge for name " << name
281              << " due to closed inviter channel.";
282     return;
283   }
284 
285   inviter->RequestPortMerge(port.name(), name);
286 }
287 
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)288 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
289                                     const ports::PortRef& port1) {
290   return node_->MergeLocalPorts(port0, port1);
291 }
292 
CreateSharedBuffer(size_t num_bytes)293 base::WritableSharedMemoryRegion NodeController::CreateSharedBuffer(
294     size_t num_bytes) {
295 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
296   // Shared buffer creation failure is fatal, so always use the broker when we
297   // have one; unless of course the embedder forces us not to.
298   if (!GetConfiguration().force_direct_shared_memory_allocation && broker_)
299     return broker_->GetWritableSharedMemoryRegion(num_bytes);
300 #endif
301   return base::WritableSharedMemoryRegion::Create(num_bytes);
302 }
303 
RequestShutdown(const base::Closure & callback)304 void NodeController::RequestShutdown(const base::Closure& callback) {
305   {
306     base::AutoLock lock(shutdown_lock_);
307     shutdown_callback_ = callback;
308     shutdown_callback_flag_.Set(true);
309   }
310 
311   AttemptShutdownIfRequested();
312 }
313 
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)314 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
315                                           const std::string& error) {
316   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
317   if (peer)
318     peer->NotifyBadMessage(error);
319 }
320 
SendBrokerClientInvitationOnIOThread(ScopedProcessHandle target_process,ConnectionParams connection_params,ports::NodeName temporary_node_name,const ProcessErrorCallback & process_error_callback)321 void NodeController::SendBrokerClientInvitationOnIOThread(
322     ScopedProcessHandle target_process,
323     ConnectionParams connection_params,
324     ports::NodeName temporary_node_name,
325     const ProcessErrorCallback& process_error_callback) {
326   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
327 
328 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
329   PlatformChannel node_channel;
330   ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint());
331   // BrokerHost owns itself.
332   BrokerHost* broker_host =
333       new BrokerHost(target_process.get(), std::move(connection_params),
334                      process_error_callback);
335   bool channel_ok = broker_host->SendChannel(
336       node_channel.TakeRemoteEndpoint().TakePlatformHandle());
337 
338 #if defined(OS_WIN)
339   if (!channel_ok) {
340     // On Windows the above operation may fail if the channel is crossing a
341     // session boundary. In that case we fall back to a named pipe.
342     NamedPlatformChannel::Options options;
343     NamedPlatformChannel named_channel(options);
344     node_connection_params =
345         ConnectionParams(named_channel.TakeServerEndpoint());
346     broker_host->SendNamedChannel(named_channel.GetServerName());
347   }
348 #else
349   CHECK(channel_ok);
350 #endif  // defined(OS_WIN)
351 
352   scoped_refptr<NodeChannel> channel =
353       NodeChannel::Create(this, std::move(node_connection_params),
354                           io_task_runner_, process_error_callback);
355 
356 #else   // !defined(OS_MACOSX) && !defined(OS_NACL)
357   scoped_refptr<NodeChannel> channel =
358       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
359                           process_error_callback);
360 #endif  // !defined(OS_MACOSX) && !defined(OS_NACL)
361 
362   // We set up the invitee channel with a temporary name so it can be identified
363   // as a pending invitee if it writes any messages to the channel. We may start
364   // receiving messages from it (though we shouldn't) as soon as Start() is
365   // called below.
366 
367   pending_invitations_.insert(std::make_pair(temporary_node_name, channel));
368 
369   channel->SetRemoteNodeName(temporary_node_name);
370   channel->SetRemoteProcessHandle(std::move(target_process));
371   channel->Start();
372 
373   channel->AcceptInvitee(name_, temporary_node_name);
374 }
375 
AcceptBrokerClientInvitationOnIOThread(ConnectionParams connection_params)376 void NodeController::AcceptBrokerClientInvitationOnIOThread(
377     ConnectionParams connection_params) {
378   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
379 
380   {
381     base::AutoLock lock(inviter_lock_);
382     DCHECK(inviter_name_ == ports::kInvalidNodeName);
383 
384     // At this point we don't know the inviter's name, so we can't yet insert it
385     // into our |peers_| map. That will happen as soon as we receive an
386     // AcceptInvitee message from them.
387     bootstrap_inviter_channel_ =
388         NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
389                             ProcessErrorCallback());
390     // Prevent the inviter pipe handle from being closed on shutdown. Pipe
391     // closure may be used by the inviter to detect the invitee process has
392     // exited.
393     bootstrap_inviter_channel_->LeakHandleOnShutdown();
394   }
395   bootstrap_inviter_channel_->Start();
396 }
397 
ConnectIsolatedOnIOThread(ConnectionParams connection_params,ports::PortRef port,const std::string & connection_name)398 void NodeController::ConnectIsolatedOnIOThread(
399     ConnectionParams connection_params,
400     ports::PortRef port,
401     const std::string& connection_name) {
402   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
403 
404   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
405       this, std::move(connection_params), io_task_runner_, {});
406 
407   RequestContext request_context;
408   ports::NodeName token;
409   GenerateRandomName(&token);
410   pending_isolated_connections_.emplace(
411       token, IsolatedConnection{channel, port, connection_name});
412   if (!connection_name.empty()) {
413     // If a connection already exists with this name, drop it.
414     auto it = named_isolated_connections_.find(connection_name);
415     if (it != named_isolated_connections_.end()) {
416       ports::NodeName connection_node = it->second;
417       if (connection_node != name_) {
418         DropPeer(connection_node, nullptr);
419       } else {
420         auto pending_it = pending_isolated_connections_.find(connection_node);
421         if (pending_it != pending_isolated_connections_.end()) {
422           node_->ClosePort(pending_it->second.local_port);
423           pending_isolated_connections_.erase(pending_it);
424         }
425         named_isolated_connections_.erase(it);
426       }
427     }
428     named_isolated_connections_.emplace(connection_name, token);
429   }
430 
431   channel->SetRemoteNodeName(token);
432   channel->Start();
433 
434   channel->AcceptPeer(name_, token, port.name());
435 }
436 
GetPeerChannel(const ports::NodeName & name)437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438     const ports::NodeName& name) {
439   base::AutoLock lock(peers_lock_);
440   auto it = peers_.find(name);
441   if (it == peers_.end())
442     return nullptr;
443   return it->second;
444 }
445 
GetInviterChannel()446 scoped_refptr<NodeChannel> NodeController::GetInviterChannel() {
447   ports::NodeName inviter_name;
448   {
449     base::AutoLock lock(inviter_lock_);
450     inviter_name = inviter_name_;
451   }
452   return GetPeerChannel(inviter_name);
453 }
454 
GetBrokerChannel()455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
456   if (GetConfiguration().is_broker_process)
457     return nullptr;
458 
459   ports::NodeName broker_name;
460   {
461     base::AutoLock lock(broker_lock_);
462     broker_name = broker_name_;
463   }
464   return GetPeerChannel(broker_name);
465 }
466 
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)467 void NodeController::AddPeer(const ports::NodeName& name,
468                              scoped_refptr<NodeChannel> channel,
469                              bool start_channel) {
470   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
471 
472   DCHECK(name != ports::kInvalidNodeName);
473   DCHECK(channel);
474 
475   channel->SetRemoteNodeName(name);
476 
477   OutgoingMessageQueue pending_messages;
478   {
479     base::AutoLock lock(peers_lock_);
480     if (peers_.find(name) != peers_.end()) {
481       // This can happen normally if two nodes race to be introduced to each
482       // other. The losing pipe will be silently closed and introduction should
483       // not be affected.
484       DVLOG(1) << "Ignoring duplicate peer name " << name;
485       return;
486     }
487 
488     auto result = peers_.insert(std::make_pair(name, channel));
489     DCHECK(result.second);
490 
491     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
492 
493     auto it = pending_peer_messages_.find(name);
494     if (it != pending_peer_messages_.end()) {
495       std::swap(pending_messages, it->second);
496       pending_peer_messages_.erase(it);
497     }
498   }
499 
500   if (start_channel)
501     channel->Start();
502 
503   // Flush any queued message we need to deliver to this node.
504   while (!pending_messages.empty()) {
505     channel->SendChannelMessage(std::move(pending_messages.front()));
506     pending_messages.pop();
507   }
508 }
509 
DropPeer(const ports::NodeName & name,NodeChannel * channel)510 void NodeController::DropPeer(const ports::NodeName& name,
511                               NodeChannel* channel) {
512   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
513 
514   {
515     base::AutoLock lock(peers_lock_);
516     auto it = peers_.find(name);
517 
518     if (it != peers_.end()) {
519       ports::NodeName peer = it->first;
520       peers_.erase(it);
521       DVLOG(1) << "Dropped peer " << peer;
522     }
523 
524     pending_peer_messages_.erase(name);
525     pending_invitations_.erase(name);
526   }
527 
528   std::vector<ports::PortRef> ports_to_close;
529   {
530     // Clean up any reserved ports.
531     base::AutoLock lock(reserved_ports_lock_);
532     auto it = reserved_ports_.find(name);
533     if (it != reserved_ports_.end()) {
534       for (auto& entry : it->second)
535         ports_to_close.emplace_back(entry.second);
536       reserved_ports_.erase(it);
537     }
538   }
539 
540   bool is_inviter;
541   {
542     base::AutoLock lock(inviter_lock_);
543     is_inviter = (name == inviter_name_ ||
544                   (channel && channel == bootstrap_inviter_channel_));
545   }
546 
547   // If the error comes from the inviter channel, we also need to cancel any
548   // port merge requests, so that errors can be propagated to the message
549   // pipes.
550   if (is_inviter)
551     CancelPendingPortMerges();
552 
553   auto connection_it = pending_isolated_connections_.find(name);
554   if (connection_it != pending_isolated_connections_.end()) {
555     IsolatedConnection& connection = connection_it->second;
556     ports_to_close.push_back(connection.local_port);
557     if (!connection.name.empty())
558       named_isolated_connections_.erase(connection.name);
559     pending_isolated_connections_.erase(connection_it);
560   }
561 
562   for (const auto& port : ports_to_close)
563     node_->ClosePort(port);
564 
565   node_->LostConnectionToNode(name);
566   AttemptShutdownIfRequested();
567 }
568 
SendPeerEvent(const ports::NodeName & name,ports::ScopedEvent event)569 void NodeController::SendPeerEvent(const ports::NodeName& name,
570                                    ports::ScopedEvent event) {
571   Channel::MessagePtr event_message = SerializeEventMessage(std::move(event));
572   if (!event_message)
573     return;
574   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
575 #if defined(OS_WIN)
576   if (event_message->has_handles()) {
577     // If we're sending a message with handles we aren't the destination
578     // node's inviter or broker (i.e. we don't know its process handle), ask
579     // the broker to relay for us.
580     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
581     if (!peer || !peer->HasRemoteProcessHandle()) {
582       if (!GetConfiguration().is_broker_process && broker) {
583         broker->RelayEventMessage(name, std::move(event_message));
584       } else {
585         base::AutoLock lock(broker_lock_);
586         pending_relay_messages_[name].emplace(std::move(event_message));
587       }
588       return;
589     }
590   }
591 #elif defined(OS_MACOSX) && !defined(OS_IOS)
592   if (event_message->has_mach_ports()) {
593     // Messages containing Mach ports are always routed through the broker, even
594     // if the broker process is the intended recipient.
595     bool use_broker = false;
596     if (!GetConfiguration().is_broker_process) {
597       base::AutoLock lock(inviter_lock_);
598       use_broker = (bootstrap_inviter_channel_ ||
599                     inviter_name_ != ports::kInvalidNodeName);
600     }
601 
602     if (use_broker) {
603       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
604       if (broker) {
605         broker->RelayEventMessage(name, std::move(event_message));
606       } else {
607         base::AutoLock lock(broker_lock_);
608         pending_relay_messages_[name].emplace(std::move(event_message));
609       }
610       return;
611     }
612   }
613 #endif  // defined(OS_WIN)
614 
615   if (peer) {
616     peer->SendChannelMessage(std::move(event_message));
617     return;
618   }
619 
620   // If we don't know who the peer is and we are the broker, we can only assume
621   // the peer is invalid, i.e., it's either a junk name or has already been
622   // disconnected.
623   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
624   if (!broker) {
625     DVLOG(1) << "Dropping message for unknown peer: " << name;
626     return;
627   }
628 
629   // If we aren't the broker, assume we just need to be introduced and queue
630   // until that can be either confirmed or denied by the broker.
631   bool needs_introduction = false;
632   {
633     base::AutoLock lock(peers_lock_);
634     // We may have been introduced on another thread by the time we get here.
635     // Double-check to be safe.
636     auto it = peers_.find(name);
637     if (it == peers_.end()) {
638       auto& queue = pending_peer_messages_[name];
639       needs_introduction = queue.empty();
640       queue.emplace(std::move(event_message));
641     } else {
642       peer = it->second;
643     }
644   }
645   if (needs_introduction)
646     broker->RequestIntroduction(name);
647   else if (peer)
648     peer->SendChannelMessage(std::move(event_message));
649 }
650 
DropAllPeers()651 void NodeController::DropAllPeers() {
652   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
653 
654   std::vector<scoped_refptr<NodeChannel>> all_peers;
655   {
656     base::AutoLock lock(inviter_lock_);
657     if (bootstrap_inviter_channel_) {
658       // |bootstrap_inviter_channel_| isn't null'd here becuase we rely on its
659       // existence to determine whether or not this is the root node. Once
660       // bootstrap_inviter_channel_->ShutDown() has been called,
661       // |bootstrap_inviter_channel_| is essentially a dead object and it
662       // doesn't matter if it's deleted now or when |this| is deleted. Note:
663       // |bootstrap_inviter_channel_| is only modified on the IO thread.
664       all_peers.push_back(bootstrap_inviter_channel_);
665     }
666   }
667 
668   {
669     base::AutoLock lock(peers_lock_);
670     for (const auto& peer : peers_)
671       all_peers.push_back(peer.second);
672     for (const auto& peer : pending_invitations_)
673       all_peers.push_back(peer.second);
674     peers_.clear();
675     pending_invitations_.clear();
676     pending_peer_messages_.clear();
677     pending_isolated_connections_.clear();
678     named_isolated_connections_.clear();
679   }
680 
681   for (const auto& peer : all_peers)
682     peer->ShutDown();
683 
684   if (destroy_on_io_thread_shutdown_)
685     delete this;
686 }
687 
ForwardEvent(const ports::NodeName & node,ports::ScopedEvent event)688 void NodeController::ForwardEvent(const ports::NodeName& node,
689                                   ports::ScopedEvent event) {
690   DCHECK(event);
691   if (node == name_)
692     node_->AcceptEvent(std::move(event));
693   else
694     SendPeerEvent(node, std::move(event));
695 
696   AttemptShutdownIfRequested();
697 }
698 
BroadcastEvent(ports::ScopedEvent event)699 void NodeController::BroadcastEvent(ports::ScopedEvent event) {
700   Channel::MessagePtr channel_message = SerializeEventMessage(std::move(event));
701   DCHECK(channel_message && !channel_message->has_handles());
702 
703   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
704   if (broker)
705     broker->Broadcast(std::move(channel_message));
706   else
707     OnBroadcast(name_, std::move(channel_message));
708 }
709 
PortStatusChanged(const ports::PortRef & port)710 void NodeController::PortStatusChanged(const ports::PortRef& port) {
711   scoped_refptr<ports::UserData> user_data;
712   node_->GetUserData(port, &user_data);
713 
714   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
715   if (observer) {
716     observer->OnPortStatusChanged();
717   } else {
718     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
719              << "doesn't have an observer.";
720   }
721 }
722 
OnAcceptInvitee(const ports::NodeName & from_node,const ports::NodeName & inviter_name,const ports::NodeName & token)723 void NodeController::OnAcceptInvitee(const ports::NodeName& from_node,
724                                      const ports::NodeName& inviter_name,
725                                      const ports::NodeName& token) {
726   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
727 
728   scoped_refptr<NodeChannel> inviter;
729   {
730     base::AutoLock lock(inviter_lock_);
731     if (bootstrap_inviter_channel_ &&
732         inviter_name_ == ports::kInvalidNodeName) {
733       inviter_name_ = inviter_name;
734       inviter = bootstrap_inviter_channel_;
735     }
736   }
737 
738   if (!inviter) {
739     DLOG(ERROR) << "Unexpected AcceptInvitee message from " << from_node;
740     DropPeer(from_node, nullptr);
741     return;
742   }
743 
744   inviter->SetRemoteNodeName(inviter_name);
745   inviter->AcceptInvitation(token, name_);
746 
747   // NOTE: The invitee does not actually add its inviter as a peer until
748   // receiving an AcceptBrokerClient message from the broker. The inviter will
749   // request that said message be sent upon receiving AcceptInvitation.
750 
751   DVLOG(1) << "Broker client " << name_ << " accepting invitation from "
752            << inviter_name;
753 }
754 
OnAcceptInvitation(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & invitee_name)755 void NodeController::OnAcceptInvitation(const ports::NodeName& from_node,
756                                         const ports::NodeName& token,
757                                         const ports::NodeName& invitee_name) {
758   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
759 
760   auto it = pending_invitations_.find(from_node);
761   if (it == pending_invitations_.end() || token != from_node) {
762     DLOG(ERROR) << "Received unexpected AcceptInvitation message from "
763                 << from_node;
764     DropPeer(from_node, nullptr);
765     return;
766   }
767 
768   {
769     base::AutoLock lock(reserved_ports_lock_);
770     auto it = reserved_ports_.find(from_node);
771     if (it != reserved_ports_.end()) {
772       // Swap the temporary node name's reserved ports into an entry keyed by
773       // the real node name.
774       auto result =
775           reserved_ports_.emplace(invitee_name, std::move(it->second));
776       DCHECK(result.second);
777       reserved_ports_.erase(it);
778     }
779   }
780 
781   scoped_refptr<NodeChannel> channel = it->second;
782   pending_invitations_.erase(it);
783 
784   DCHECK(channel);
785 
786   DVLOG(1) << "Node " << name_ << " accepted invitee " << invitee_name;
787 
788   AddPeer(invitee_name, channel, false /* start_channel */);
789 
790   // TODO(rockot): We could simplify invitee initialization if we could
791   // synchronously get a new async broker channel from the broker. For now we do
792   // it asynchronously since it's only used to facilitate handle passing, not
793   // handle creation.
794   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
795   if (broker) {
796     // Inform the broker of this new client.
797     broker->AddBrokerClient(invitee_name, channel->CloneRemoteProcessHandle());
798   } else {
799     // If we have no broker, either we need to wait for one, or we *are* the
800     // broker.
801     scoped_refptr<NodeChannel> inviter = GetInviterChannel();
802     if (!inviter) {
803       base::AutoLock lock(inviter_lock_);
804       inviter = bootstrap_inviter_channel_;
805     }
806 
807     if (!inviter) {
808       // Yes, we're the broker. We can initialize the client directly.
809       channel->AcceptBrokerClient(name_, PlatformHandle());
810     } else {
811       // We aren't the broker, so wait for a broker connection.
812       base::AutoLock lock(broker_lock_);
813       pending_broker_clients_.push(invitee_name);
814     }
815   }
816 }
817 
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)818 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
819                                        const ports::NodeName& client_name,
820                                        base::ProcessHandle process_handle) {
821   ScopedProcessHandle scoped_process_handle(process_handle);
822 
823   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
824   if (!sender) {
825     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
826     return;
827   }
828 
829   if (GetPeerChannel(client_name)) {
830     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
831     DropPeer(from_node, nullptr);
832     return;
833   }
834 
835   PlatformChannel broker_channel;
836   ConnectionParams connection_params(broker_channel.TakeLocalEndpoint());
837   scoped_refptr<NodeChannel> client =
838       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
839                           ProcessErrorCallback());
840 
841 #if defined(OS_WIN)
842   // The broker must have a working handle to the client process in order to
843   // properly copy other handles to and from the client.
844   if (!scoped_process_handle.is_valid()) {
845     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
846     return;
847   }
848 #endif
849   client->SetRemoteProcessHandle(std::move(scoped_process_handle));
850 
851   AddPeer(client_name, client, true /* start_channel */);
852 
853   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
854            << " from peer " << from_node;
855 
856   sender->BrokerClientAdded(
857       client_name, broker_channel.TakeRemoteEndpoint().TakePlatformHandle());
858 }
859 
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,PlatformHandle broker_channel)860 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
861                                          const ports::NodeName& client_name,
862                                          PlatformHandle broker_channel) {
863   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
864   if (!client) {
865     DLOG(ERROR) << "BrokerClientAdded for unknown client " << client_name;
866     return;
867   }
868 
869   // This should have come from our own broker.
870   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
871     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
872     return;
873   }
874 
875   DVLOG(1) << "Client " << client_name << " accepted by broker " << from_node;
876 
877   client->AcceptBrokerClient(from_node, std::move(broker_channel));
878 }
879 
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,PlatformHandle broker_channel)880 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
881                                           const ports::NodeName& broker_name,
882                                           PlatformHandle broker_channel) {
883   DCHECK(!GetConfiguration().is_broker_process);
884 
885   // This node should already have an inviter in bootstrap mode.
886   ports::NodeName inviter_name;
887   scoped_refptr<NodeChannel> inviter;
888   {
889     base::AutoLock lock(inviter_lock_);
890     inviter_name = inviter_name_;
891     inviter = bootstrap_inviter_channel_;
892     bootstrap_inviter_channel_ = nullptr;
893   }
894   DCHECK(inviter_name == from_node);
895   DCHECK(inviter);
896 
897   base::queue<ports::NodeName> pending_broker_clients;
898   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
899       pending_relay_messages;
900   {
901     base::AutoLock lock(broker_lock_);
902     broker_name_ = broker_name;
903     std::swap(pending_broker_clients, pending_broker_clients_);
904     std::swap(pending_relay_messages, pending_relay_messages_);
905   }
906   DCHECK(broker_name != ports::kInvalidNodeName);
907 
908   // It's now possible to add both the broker and the inviter as peers.
909   // Note that the broker and inviter may be the same node.
910   scoped_refptr<NodeChannel> broker;
911   if (broker_name == inviter_name) {
912     DCHECK(!broker_channel.is_valid());
913     broker = inviter;
914   } else {
915     DCHECK(broker_channel.is_valid());
916     broker = NodeChannel::Create(
917         this,
918         ConnectionParams(PlatformChannelEndpoint(std::move(broker_channel))),
919         io_task_runner_, ProcessErrorCallback());
920     AddPeer(broker_name, broker, true /* start_channel */);
921   }
922 
923   AddPeer(inviter_name, inviter, false /* start_channel */);
924 
925   {
926     // Complete any port merge requests we have waiting for the inviter.
927     base::AutoLock lock(pending_port_merges_lock_);
928     for (const auto& request : pending_port_merges_)
929       inviter->RequestPortMerge(request.second.name(), request.first);
930     pending_port_merges_.clear();
931   }
932 
933   // Feed the broker any pending invitees of our own.
934   while (!pending_broker_clients.empty()) {
935     const ports::NodeName& invitee_name = pending_broker_clients.front();
936     auto it = pending_invitations_.find(invitee_name);
937     // If for any reason we don't have a pending invitation for the invitee,
938     // there's nothing left to do: we've already swapped the relevant state into
939     // the stack.
940     if (it != pending_invitations_.end()) {
941       broker->AddBrokerClient(invitee_name,
942                               it->second->CloneRemoteProcessHandle());
943     }
944     pending_broker_clients.pop();
945   }
946 
947 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
948   // Have the broker relay any messages we have waiting.
949   for (auto& entry : pending_relay_messages) {
950     const ports::NodeName& destination = entry.first;
951     auto& message_queue = entry.second;
952     while (!message_queue.empty()) {
953       broker->RelayEventMessage(destination, std::move(message_queue.front()));
954       message_queue.pop();
955     }
956   }
957 #endif
958 
959   DVLOG(1) << "Client " << name_ << " accepted by broker " << broker_name;
960 }
961 
OnEventMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)962 void NodeController::OnEventMessage(const ports::NodeName& from_node,
963                                     Channel::MessagePtr channel_message) {
964   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
965 
966   auto event = DeserializeEventMessage(from_node, std::move(channel_message));
967   if (!event) {
968     // We silently ignore unparseable events, as they may come from a process
969     // running a newer version of Mojo.
970     DVLOG(1) << "Ignoring invalid or unknown event from " << from_node;
971     return;
972   }
973 
974   node_->AcceptEvent(std::move(event));
975 
976   AttemptShutdownIfRequested();
977 }
978 
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & name)979 void NodeController::OnRequestPortMerge(
980     const ports::NodeName& from_node,
981     const ports::PortName& connector_port_name,
982     const std::string& name) {
983   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
984 
985   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name
986            << " and port " << connector_port_name << "@" << from_node;
987 
988   ports::PortRef local_port;
989   {
990     base::AutoLock lock(reserved_ports_lock_);
991     auto it = reserved_ports_.find(from_node);
992     // TODO(https://crbug.com/822034): We should send a notification back to the
993     // requestor so they can clean up their dangling port in this failure case.
994     // This requires changes to the internal protocol, which can't be made yet.
995     // Until this is done, pipes from |MojoExtractMessagePipeFromInvitation()|
996     // will never break if the given name was invalid.
997     if (it == reserved_ports_.end()) {
998       DVLOG(1) << "Ignoring port merge request from node " << from_node << ". "
999                << "No ports reserved for that node.";
1000       return;
1001     }
1002 
1003     PortMap& port_map = it->second;
1004     auto port_it = port_map.find(name);
1005     if (port_it == port_map.end()) {
1006       DVLOG(1) << "Ignoring request to connect to port for unknown name "
1007                << name << " from node " << from_node;
1008       return;
1009     }
1010     local_port = port_it->second;
1011     port_map.erase(port_it);
1012     if (port_map.empty())
1013       reserved_ports_.erase(it);
1014   }
1015 
1016   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1017   if (rv != ports::OK)
1018     DLOG(ERROR) << "MergePorts failed: " << rv;
1019 }
1020 
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1021 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1022                                            const ports::NodeName& name) {
1023   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1024 
1025   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1026   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1027     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1028                 << from_node;
1029     DropPeer(from_node, nullptr);
1030     return;
1031   }
1032 
1033   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1034   if (!new_friend) {
1035     // We don't know who they're talking about!
1036     requestor->Introduce(name, PlatformHandle());
1037   } else {
1038     PlatformChannel new_channel;
1039     requestor->Introduce(name,
1040                          new_channel.TakeLocalEndpoint().TakePlatformHandle());
1041     new_friend->Introduce(
1042         from_node, new_channel.TakeRemoteEndpoint().TakePlatformHandle());
1043   }
1044 }
1045 
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,PlatformHandle channel_handle)1046 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1047                                  const ports::NodeName& name,
1048                                  PlatformHandle channel_handle) {
1049   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1050 
1051   if (!channel_handle.is_valid()) {
1052     node_->LostConnectionToNode(name);
1053 
1054     DVLOG(1) << "Could not be introduced to peer " << name;
1055     base::AutoLock lock(peers_lock_);
1056     pending_peer_messages_.erase(name);
1057     return;
1058   }
1059 
1060   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
1061       this,
1062       ConnectionParams(PlatformChannelEndpoint(std::move(channel_handle))),
1063       io_task_runner_, ProcessErrorCallback());
1064 
1065   DVLOG(1) << "Adding new peer " << name << " via broker introduction.";
1066   AddPeer(name, channel, true /* start_channel */);
1067 }
1068 
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1069 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1070                                  Channel::MessagePtr message) {
1071   DCHECK(!message->has_handles());
1072 
1073   auto event = DeserializeEventMessage(from_node, std::move(message));
1074   if (!event) {
1075     // We silently ignore unparseable events, as they may come from a process
1076     // running a newer version of Mojo.
1077     DVLOG(1) << "Ignoring request to broadcast invalid or unknown event from "
1078              << from_node;
1079     return;
1080   }
1081 
1082   base::AutoLock lock(peers_lock_);
1083   for (auto& iter : peers_) {
1084     // Clone and send the event to each known peer. Events which cannot be
1085     // cloned cannot be broadcast.
1086     ports::ScopedEvent clone = event->Clone();
1087     if (!clone) {
1088       DVLOG(1) << "Ignoring request to broadcast invalid event from "
1089                << from_node << " [type=" << static_cast<uint32_t>(event->type())
1090                << "]";
1091       return;
1092     }
1093 
1094     iter.second->SendChannelMessage(SerializeEventMessage(std::move(clone)));
1095   }
1096 }
1097 
1098 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayEventMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1099 void NodeController::OnRelayEventMessage(const ports::NodeName& from_node,
1100                                          base::ProcessHandle from_process,
1101                                          const ports::NodeName& destination,
1102                                          Channel::MessagePtr message) {
1103   // The broker should always know which process this came from.
1104   DCHECK(from_process != base::kNullProcessHandle);
1105   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1106 
1107   if (GetBrokerChannel()) {
1108     // Only the broker should be asked to relay a message.
1109     LOG(ERROR) << "Non-broker refusing to relay message.";
1110     DropPeer(from_node, nullptr);
1111     return;
1112   }
1113 
1114   if (destination == name_) {
1115     // Great, we can deliver this message locally.
1116     OnEventMessage(from_node, std::move(message));
1117     return;
1118   }
1119 
1120   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1121   if (peer)
1122     peer->EventMessageFromRelay(from_node, std::move(message));
1123   else
1124     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1125 }
1126 
OnEventMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1127 void NodeController::OnEventMessageFromRelay(const ports::NodeName& from_node,
1128                                              const ports::NodeName& source_node,
1129                                              Channel::MessagePtr message) {
1130   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1131     LOG(ERROR) << "Refusing relayed message from non-broker node.";
1132     DropPeer(from_node, nullptr);
1133     return;
1134   }
1135 
1136   OnEventMessage(source_node, std::move(message));
1137 }
1138 #endif
1139 
OnAcceptPeer(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & peer_name,const ports::PortName & port_name)1140 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1141                                   const ports::NodeName& token,
1142                                   const ports::NodeName& peer_name,
1143                                   const ports::PortName& port_name) {
1144   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1145 
1146   auto it = pending_isolated_connections_.find(from_node);
1147   if (it == pending_isolated_connections_.end()) {
1148     DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1149     DropPeer(from_node, nullptr);
1150     return;
1151   }
1152 
1153   IsolatedConnection& connection = it->second;
1154   scoped_refptr<NodeChannel> channel = std::move(connection.channel);
1155   ports::PortRef local_port = connection.local_port;
1156   if (!connection.name.empty())
1157     named_isolated_connections_[connection.name] = peer_name;
1158   pending_isolated_connections_.erase(it);
1159   DCHECK(channel);
1160 
1161   if (name_ != peer_name) {
1162     // It's possible (e.g. in tests) that we may "connect" to ourself, in which
1163     // case we skip this |AddPeer()| call and go straight to merging ports.
1164     // Note that we explicitly drop any prior connection to the same peer so
1165     // that new isolated connections can replace old ones.
1166     DropPeer(peer_name, nullptr);
1167     AddPeer(peer_name, channel, false /* start_channel */);
1168     DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1169   }
1170 
1171   // We need to choose one side to initiate the port merge. It doesn't matter
1172   // who does it as long as they don't both try. Simple solution: pick the one
1173   // with the "smaller" port name.
1174   if (local_port.name() < port_name)
1175     node()->MergePorts(local_port, peer_name, port_name);
1176 }
1177 
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1178 void NodeController::OnChannelError(const ports::NodeName& from_node,
1179                                     NodeChannel* channel) {
1180   if (io_task_runner_->RunsTasksInCurrentSequence()) {
1181     RequestContext request_context(RequestContext::Source::SYSTEM);
1182     DropPeer(from_node, channel);
1183   } else {
1184     io_task_runner_->PostTask(
1185         FROM_HERE,
1186         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1187                    from_node, base::RetainedRef(channel)));
1188   }
1189 }
1190 
1191 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1192 MachPortRelay* NodeController::GetMachPortRelay() {
1193   {
1194     base::AutoLock lock(inviter_lock_);
1195     // Return null if we're not the root.
1196     if (bootstrap_inviter_channel_ || inviter_name_ != ports::kInvalidNodeName)
1197       return nullptr;
1198   }
1199 
1200   base::AutoLock lock(mach_port_relay_lock_);
1201   return mach_port_relay_.get();
1202 }
1203 #endif
1204 
CancelPendingPortMerges()1205 void NodeController::CancelPendingPortMerges() {
1206   std::vector<ports::PortRef> ports_to_close;
1207 
1208   {
1209     base::AutoLock lock(pending_port_merges_lock_);
1210     reject_pending_merges_ = true;
1211     for (const auto& port : pending_port_merges_)
1212       ports_to_close.push_back(port.second);
1213     pending_port_merges_.clear();
1214   }
1215 
1216   for (const auto& port : ports_to_close)
1217     node_->ClosePort(port);
1218 }
1219 
DestroyOnIOThreadShutdown()1220 void NodeController::DestroyOnIOThreadShutdown() {
1221   destroy_on_io_thread_shutdown_ = true;
1222 }
1223 
AttemptShutdownIfRequested()1224 void NodeController::AttemptShutdownIfRequested() {
1225   if (!shutdown_callback_flag_)
1226     return;
1227 
1228   base::Closure callback;
1229   {
1230     base::AutoLock lock(shutdown_lock_);
1231     if (shutdown_callback_.is_null())
1232       return;
1233     if (!node_->CanShutdownCleanly(
1234             ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
1235       DVLOG(2) << "Unable to cleanly shut down node " << name_;
1236       return;
1237     }
1238 
1239     callback = shutdown_callback_;
1240     shutdown_callback_.Reset();
1241     shutdown_callback_flag_.Set(false);
1242   }
1243 
1244   DCHECK(!callback.is_null());
1245 
1246   callback.Run();
1247 }
1248 
1249 NodeController::IsolatedConnection::IsolatedConnection() = default;
1250 
1251 NodeController::IsolatedConnection::IsolatedConnection(
1252     const IsolatedConnection& other) = default;
1253 
1254 NodeController::IsolatedConnection::IsolatedConnection(
1255     IsolatedConnection&& other) = default;
1256 
IsolatedConnection(scoped_refptr<NodeChannel> channel,const ports::PortRef & local_port,base::StringPiece name)1257 NodeController::IsolatedConnection::IsolatedConnection(
1258     scoped_refptr<NodeChannel> channel,
1259     const ports::PortRef& local_port,
1260     base::StringPiece name)
1261     : channel(std::move(channel)), local_port(local_port), name(name) {}
1262 
1263 NodeController::IsolatedConnection::~IsolatedConnection() = default;
1264 
1265 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
1266 operator=(const IsolatedConnection& other) = default;
1267 
1268 NodeController::IsolatedConnection& NodeController::IsolatedConnection::
1269 operator=(IsolatedConnection&& other) = default;
1270 
1271 }  // namespace core
1272 }  // namespace mojo
1273