1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_CORE_NODE_CONTROLLER_H_ 6 #define MOJO_CORE_NODE_CONTROLLER_H_ 7 8 #include <map> 9 #include <memory> 10 #include <string> 11 #include <unordered_map> 12 #include <unordered_set> 13 #include <utility> 14 #include <vector> 15 16 #include "base/callback.h" 17 #include "base/containers/hash_tables.h" 18 #include "base/containers/queue.h" 19 #include "base/macros.h" 20 #include "base/memory/ref_counted.h" 21 #include "base/memory/writable_shared_memory_region.h" 22 #include "base/task_runner.h" 23 #include "build/build_config.h" 24 #include "mojo/core/atomic_flag.h" 25 #include "mojo/core/node_channel.h" 26 #include "mojo/core/ports/event.h" 27 #include "mojo/core/ports/name.h" 28 #include "mojo/core/ports/node.h" 29 #include "mojo/core/ports/node_delegate.h" 30 #include "mojo/core/scoped_process_handle.h" 31 #include "mojo/core/system_impl_export.h" 32 #include "mojo/public/cpp/platform/platform_handle.h" 33 34 namespace base { 35 class PortProvider; 36 } 37 38 namespace mojo { 39 namespace core { 40 41 class Broker; 42 class Core; 43 class MachPortRelay; 44 45 // The owner of ports::Node which facilitates core EDK implementation. All 46 // public interface methods are safe to call from any thread. 47 class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate, 48 public NodeChannel::Delegate { 49 public: 50 class PortObserver : public ports::UserData { 51 public: 52 virtual void OnPortStatusChanged() = 0; 53 54 protected: ~PortObserver()55 ~PortObserver() override {} 56 }; 57 58 // |core| owns and out-lives us. 59 explicit NodeController(Core* core); 60 ~NodeController() override; 61 name()62 const ports::NodeName& name() const { return name_; } core()63 Core* core() const { return core_; } node()64 ports::Node* node() const { return node_.get(); } io_task_runner()65 scoped_refptr<base::TaskRunner> io_task_runner() const { 66 return io_task_runner_; 67 } 68 69 #if defined(OS_MACOSX) && !defined(OS_IOS) 70 // Create the relay used to transfer mach ports between processes. 71 void CreateMachPortRelay(base::PortProvider* port_provider); 72 #endif 73 74 // Called exactly once, shortly after construction, and before any other 75 // methods are called on this object. 76 void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner); 77 78 // Sends an invitation to a remote process (via |connection_params|) to join 79 // this process's graph of connected processes as a broker client. 80 void SendBrokerClientInvitation( 81 base::ProcessHandle target_process, 82 ConnectionParams connection_params, 83 const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports, 84 const ProcessErrorCallback& process_error_callback); 85 86 // Connects this node to the process which invited it to be a broker client. 87 void AcceptBrokerClientInvitation(ConnectionParams connection_params); 88 89 // Connects this node to a peer node. On success, |port| will be merged with 90 // the corresponding port in the peer node. 91 void ConnectIsolated(ConnectionParams connection_params, 92 const ports::PortRef& port, 93 base::StringPiece connection_name); 94 95 // Sets a port's observer. If |observer| is null the port's current observer 96 // is removed. 97 void SetPortObserver(const ports::PortRef& port, 98 scoped_refptr<PortObserver> observer); 99 100 // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as 101 // it ensures the port's observer has also been removed. 102 void ClosePort(const ports::PortRef& port); 103 104 // Sends a message on a port to its peer. 105 int SendUserMessage(const ports::PortRef& port_ref, 106 std::unique_ptr<ports::UserMessageEvent> message); 107 108 // Merges a local port |port| into a port reserved by |name| in the node which 109 // invited this node. 110 void MergePortIntoInviter(const std::string& name, 111 const ports::PortRef& port); 112 113 // Merges two local ports together. 114 int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1); 115 116 // Creates a new shared buffer for use in the current process. 117 base::WritableSharedMemoryRegion CreateSharedBuffer(size_t num_bytes); 118 119 // Request that the Node be shut down cleanly. This may take an arbitrarily 120 // long time to complete, at which point |callback| will be called. 121 // 122 // Note that while it is safe to continue using the NodeController's public 123 // interface after requesting shutdown, you do so at your own risk and there 124 // is NO guarantee that new messages will be sent or ports will complete 125 // transfer. 126 void RequestShutdown(const base::Closure& callback); 127 128 // Notifies the NodeController that we received a bad message from the given 129 // node. 130 void NotifyBadMessageFrom(const ports::NodeName& source_node, 131 const std::string& error); 132 133 private: 134 friend Core; 135 136 using NodeMap = 137 std::unordered_map<ports::NodeName, scoped_refptr<NodeChannel>>; 138 using OutgoingMessageQueue = base::queue<Channel::MessagePtr>; 139 using PortMap = std::map<std::string, ports::PortRef>; 140 141 struct IsolatedConnection { 142 IsolatedConnection(); 143 IsolatedConnection(const IsolatedConnection& other); 144 IsolatedConnection(IsolatedConnection&& other); 145 IsolatedConnection(scoped_refptr<NodeChannel> channel, 146 const ports::PortRef& local_port, 147 base::StringPiece name); 148 ~IsolatedConnection(); 149 150 IsolatedConnection& operator=(const IsolatedConnection& other); 151 IsolatedConnection& operator=(IsolatedConnection&& other); 152 153 // NOTE: |channel| is null once the connection is fully established. 154 scoped_refptr<NodeChannel> channel; 155 ports::PortRef local_port; 156 std::string name; 157 }; 158 159 void SendBrokerClientInvitationOnIOThread( 160 ScopedProcessHandle target_process, 161 ConnectionParams connection_params, 162 ports::NodeName token, 163 const ProcessErrorCallback& process_error_callback); 164 void AcceptBrokerClientInvitationOnIOThread( 165 ConnectionParams connection_params); 166 167 void ConnectIsolatedOnIOThread(ConnectionParams connection_params, 168 ports::PortRef port, 169 const std::string& connection_name); 170 171 scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); 172 scoped_refptr<NodeChannel> GetInviterChannel(); 173 scoped_refptr<NodeChannel> GetBrokerChannel(); 174 175 void AddPeer(const ports::NodeName& name, 176 scoped_refptr<NodeChannel> channel, 177 bool start_channel); 178 void DropPeer(const ports::NodeName& name, NodeChannel* channel); 179 void SendPeerEvent(const ports::NodeName& name, ports::ScopedEvent event); 180 void DropAllPeers(); 181 182 // ports::NodeDelegate: 183 void ForwardEvent(const ports::NodeName& node, 184 ports::ScopedEvent event) override; 185 void BroadcastEvent(ports::ScopedEvent event) override; 186 void PortStatusChanged(const ports::PortRef& port) override; 187 188 // NodeChannel::Delegate: 189 void OnAcceptInvitee(const ports::NodeName& from_node, 190 const ports::NodeName& inviter_name, 191 const ports::NodeName& token) override; 192 void OnAcceptInvitation(const ports::NodeName& from_node, 193 const ports::NodeName& token, 194 const ports::NodeName& invitee_name) override; 195 void OnAddBrokerClient(const ports::NodeName& from_node, 196 const ports::NodeName& client_name, 197 base::ProcessHandle process_handle) override; 198 void OnBrokerClientAdded(const ports::NodeName& from_node, 199 const ports::NodeName& client_name, 200 PlatformHandle broker_channel) override; 201 void OnAcceptBrokerClient(const ports::NodeName& from_node, 202 const ports::NodeName& broker_name, 203 PlatformHandle broker_channel) override; 204 void OnEventMessage(const ports::NodeName& from_node, 205 Channel::MessagePtr message) override; 206 void OnRequestPortMerge(const ports::NodeName& from_node, 207 const ports::PortName& connector_port_name, 208 const std::string& token) override; 209 void OnRequestIntroduction(const ports::NodeName& from_node, 210 const ports::NodeName& name) override; 211 void OnIntroduce(const ports::NodeName& from_node, 212 const ports::NodeName& name, 213 PlatformHandle channel_handle) override; 214 void OnBroadcast(const ports::NodeName& from_node, 215 Channel::MessagePtr message) override; 216 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 217 void OnRelayEventMessage(const ports::NodeName& from_node, 218 base::ProcessHandle from_process, 219 const ports::NodeName& destination, 220 Channel::MessagePtr message) override; 221 void OnEventMessageFromRelay(const ports::NodeName& from_node, 222 const ports::NodeName& source_node, 223 Channel::MessagePtr message) override; 224 #endif 225 void OnAcceptPeer(const ports::NodeName& from_node, 226 const ports::NodeName& token, 227 const ports::NodeName& peer_name, 228 const ports::PortName& port_name) override; 229 void OnChannelError(const ports::NodeName& from_node, 230 NodeChannel* channel) override; 231 232 #if defined(OS_MACOSX) && !defined(OS_IOS) 233 MachPortRelay* GetMachPortRelay(); 234 #endif 235 236 // Cancels all pending port merges. These are merges which are supposed to 237 // be requested from the inviter ASAP, and they may be cancelled if the 238 // connection to the inviter is broken or never established. 239 void CancelPendingPortMerges(); 240 241 // Marks this NodeController for destruction when the IO thread shuts down. 242 // This is used in case Core is torn down before the IO thread. Must only be 243 // called on the IO thread. 244 void DestroyOnIOThreadShutdown(); 245 246 // If there is a registered shutdown callback (meaning shutdown has been 247 // requested, this checks the Node's status to see if clean shutdown is 248 // possible. If so, shutdown is performed and the shutdown callback is run. 249 void AttemptShutdownIfRequested(); 250 251 // These are safe to access from any thread as long as the Node is alive. 252 Core* const core_; 253 const ports::NodeName name_; 254 const std::unique_ptr<ports::Node> node_; 255 scoped_refptr<base::TaskRunner> io_task_runner_; 256 257 // Guards |peers_| and |pending_peer_messages_|. 258 base::Lock peers_lock_; 259 260 // Channels to known peers, including inviter and invitees, if any. 261 NodeMap peers_; 262 263 // Outgoing message queues for peers we've heard of but can't yet talk to. 264 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 265 pending_peer_messages_; 266 267 // Guards |reserved_ports_|. 268 base::Lock reserved_ports_lock_; 269 270 // Ports reserved by name, per peer. 271 std::map<ports::NodeName, PortMap> reserved_ports_; 272 273 // Guards |pending_port_merges_| and |reject_pending_merges_|. 274 base::Lock pending_port_merges_lock_; 275 276 // A set of port merge requests awaiting inviter connection. 277 std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; 278 279 // Indicates that new merge requests should be rejected because the inviter 280 // has disconnected. 281 bool reject_pending_merges_ = false; 282 283 // Guards |inviter_name_| and |bootstrap_inviter_channel_|. 284 base::Lock inviter_lock_; 285 286 // The name of the node which invited us to join its network, if any. 287 ports::NodeName inviter_name_; 288 289 // A temporary reference to the inviter channel before we know their name. 290 scoped_refptr<NodeChannel> bootstrap_inviter_channel_; 291 292 // Guards |broker_name_|, |pending_broker_clients_|, and 293 // |pending_relay_messages_|. 294 base::Lock broker_lock_; 295 296 // The name of our broker node, if any. 297 ports::NodeName broker_name_; 298 299 // A queue of remote broker clients waiting to be connected to the broker. 300 base::queue<ports::NodeName> pending_broker_clients_; 301 302 // Messages waiting to be relayed by the broker once it's known. 303 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 304 pending_relay_messages_; 305 306 // Guards |shutdown_callback_|. 307 base::Lock shutdown_lock_; 308 309 // Set by RequestShutdown(). If this is non-null, the controller will 310 // begin polling the Node to see if clean shutdown is possible any time the 311 // Node's state is modified by the controller. 312 base::Closure shutdown_callback_; 313 // Flag to fast-path checking |shutdown_callback_|. 314 AtomicFlag shutdown_callback_flag_; 315 316 // All other fields below must only be accessed on the I/O thread, i.e., the 317 // thread on which core_->io_task_runner() runs tasks. 318 319 // Channels to invitees during handshake. 320 NodeMap pending_invitations_; 321 322 std::map<ports::NodeName, IsolatedConnection> pending_isolated_connections_; 323 std::map<std::string, ports::NodeName> named_isolated_connections_; 324 325 // Indicates whether this object should delete itself on IO thread shutdown. 326 // Must only be accessed from the IO thread. 327 bool destroy_on_io_thread_shutdown_ = false; 328 329 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) 330 // Broker for sync shared buffer creation on behalf of broker clients. 331 std::unique_ptr<Broker> broker_; 332 #endif 333 334 #if defined(OS_MACOSX) && !defined(OS_IOS) 335 base::Lock mach_port_relay_lock_; 336 // Relay for transferring mach ports to/from broker clients. 337 std::unique_ptr<MachPortRelay> mach_port_relay_; 338 #endif 339 340 DISALLOW_COPY_AND_ASSIGN(NodeController); 341 }; 342 343 } // namespace core 344 } // namespace mojo 345 346 #endif // MOJO_CORE_NODE_CONTROLLER_H_ 347