1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 7 8 #include <memory> 9 #include <queue> 10 #include <unordered_map> 11 #include <unordered_set> 12 #include <utility> 13 #include <vector> 14 15 #include "base/callback.h" 16 #include "base/containers/hash_tables.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/task_runner.h" 20 #include "mojo/edk/embedder/platform_handle_vector.h" 21 #include "mojo/edk/embedder/platform_shared_buffer.h" 22 #include "mojo/edk/embedder/scoped_platform_handle.h" 23 #include "mojo/edk/system/atomic_flag.h" 24 #include "mojo/edk/system/node_channel.h" 25 #include "mojo/edk/system/ports/name.h" 26 #include "mojo/edk/system/ports/node.h" 27 #include "mojo/edk/system/ports/node_delegate.h" 28 29 namespace base { 30 class PortProvider; 31 } 32 33 namespace mojo { 34 namespace edk { 35 36 class Broker; 37 class Core; 38 class MachPortRelay; 39 class PortsMessage; 40 41 // The owner of ports::Node which facilitates core EDK implementation. All 42 // public interface methods are safe to call from any thread. 43 class NodeController : public ports::NodeDelegate, 44 public NodeChannel::Delegate { 45 public: 46 class PortObserver : public ports::UserData { 47 public: 48 virtual void OnPortStatusChanged() = 0; 49 50 protected: ~PortObserver()51 ~PortObserver() override {} 52 }; 53 54 // |core| owns and out-lives us. 55 explicit NodeController(Core* core); 56 ~NodeController() override; 57 name()58 const ports::NodeName& name() const { return name_; } core()59 Core* core() const { return core_; } node()60 ports::Node* node() const { return node_.get(); } io_task_runner()61 scoped_refptr<base::TaskRunner> io_task_runner() const { 62 return io_task_runner_; 63 } 64 65 #if defined(OS_MACOSX) && !defined(OS_IOS) 66 // Create the relay used to transfer mach ports between processes. 67 void CreateMachPortRelay(base::PortProvider* port_provider); 68 #endif 69 70 // Called exactly once, shortly after construction, and before any other 71 // methods are called on this object. 72 void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner); 73 74 // Connects this node to a child node. This node will initiate a handshake. 75 void ConnectToChild(base::ProcessHandle process_handle, 76 ScopedPlatformHandle platform_handle, 77 const std::string& child_token, 78 const ProcessErrorCallback& process_error_callback); 79 80 // Closes all reserved ports which associated with the child process 81 // |child_token|. 82 void CloseChildPorts(const std::string& child_token); 83 84 // Connects this node to a parent node. The parent node will initiate a 85 // handshake. 86 void ConnectToParent(ScopedPlatformHandle platform_handle); 87 88 // Sets a port's observer. If |observer| is null the port's current observer 89 // is removed. 90 void SetPortObserver(const ports::PortRef& port, 91 const scoped_refptr<PortObserver>& observer); 92 93 // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as 94 // it ensures the port's observer has also been removed. 95 void ClosePort(const ports::PortRef& port); 96 97 // Sends a message on a port to its peer. 98 int SendMessage(const ports::PortRef& port_ref, 99 std::unique_ptr<PortsMessage> message); 100 101 // Reserves a local port |port| associated with |token|. A peer holding a copy 102 // of |token| can merge one of its own ports into this one. 103 void ReservePort(const std::string& token, const ports::PortRef& port, 104 const std::string& child_token); 105 106 // Merges a local port |port| into a port reserved by |token| in the parent. 107 void MergePortIntoParent(const std::string& token, 108 const ports::PortRef& port); 109 110 // Merges two local ports together. 111 int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1); 112 113 // Creates a new shared buffer for use in the current process. 114 scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); 115 116 // Request that the Node be shut down cleanly. This may take an arbitrarily 117 // long time to complete, at which point |callback| will be called. 118 // 119 // Note that while it is safe to continue using the NodeController's public 120 // interface after requesting shutdown, you do so at your own risk and there 121 // is NO guarantee that new messages will be sent or ports will complete 122 // transfer. 123 void RequestShutdown(const base::Closure& callback); 124 125 // Notifies the NodeController that we received a bad message from the given 126 // node. 127 void NotifyBadMessageFrom(const ports::NodeName& source_node, 128 const std::string& error); 129 130 private: 131 friend Core; 132 133 using NodeMap = std::unordered_map<ports::NodeName, 134 scoped_refptr<NodeChannel>>; 135 using OutgoingMessageQueue = std::queue<Channel::MessagePtr>; 136 137 struct ReservedPort { 138 ports::PortRef port; 139 const std::string child_token; 140 }; 141 142 void ConnectToChildOnIOThread( 143 base::ProcessHandle process_handle, 144 ScopedPlatformHandle platform_handle, 145 ports::NodeName token, 146 const ProcessErrorCallback& process_error_callback); 147 void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle); 148 149 scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); 150 scoped_refptr<NodeChannel> GetParentChannel(); 151 scoped_refptr<NodeChannel> GetBrokerChannel(); 152 153 void AddPeer(const ports::NodeName& name, 154 scoped_refptr<NodeChannel> channel, 155 bool start_channel); 156 void DropPeer(const ports::NodeName& name, NodeChannel* channel); 157 void SendPeerMessage(const ports::NodeName& name, 158 ports::ScopedMessage message); 159 void AcceptIncomingMessages(); 160 void ProcessIncomingMessages(); 161 void DropAllPeers(); 162 163 // ports::NodeDelegate: 164 void GenerateRandomPortName(ports::PortName* port_name) override; 165 void AllocMessage(size_t num_header_bytes, 166 ports::ScopedMessage* message) override; 167 void ForwardMessage(const ports::NodeName& node, 168 ports::ScopedMessage message) override; 169 void BroadcastMessage(ports::ScopedMessage message) override; 170 void PortStatusChanged(const ports::PortRef& port) override; 171 172 // NodeChannel::Delegate: 173 void OnAcceptChild(const ports::NodeName& from_node, 174 const ports::NodeName& parent_name, 175 const ports::NodeName& token) override; 176 void OnAcceptParent(const ports::NodeName& from_node, 177 const ports::NodeName& token, 178 const ports::NodeName& child_name) override; 179 void OnAddBrokerClient(const ports::NodeName& from_node, 180 const ports::NodeName& client_name, 181 base::ProcessHandle process_handle) override; 182 void OnBrokerClientAdded(const ports::NodeName& from_node, 183 const ports::NodeName& client_name, 184 ScopedPlatformHandle broker_channel) override; 185 void OnAcceptBrokerClient(const ports::NodeName& from_node, 186 const ports::NodeName& broker_name, 187 ScopedPlatformHandle broker_channel) override; 188 void OnPortsMessage(const ports::NodeName& from_node, 189 Channel::MessagePtr message) override; 190 void OnRequestPortMerge(const ports::NodeName& from_node, 191 const ports::PortName& connector_port_name, 192 const std::string& token) override; 193 void OnRequestIntroduction(const ports::NodeName& from_node, 194 const ports::NodeName& name) override; 195 void OnIntroduce(const ports::NodeName& from_node, 196 const ports::NodeName& name, 197 ScopedPlatformHandle channel_handle) override; 198 void OnBroadcast(const ports::NodeName& from_node, 199 Channel::MessagePtr message) override; 200 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 201 void OnRelayPortsMessage(const ports::NodeName& from_node, 202 base::ProcessHandle from_process, 203 const ports::NodeName& destination, 204 Channel::MessagePtr message) override; 205 void OnPortsMessageFromRelay(const ports::NodeName& from_node, 206 const ports::NodeName& source_node, 207 Channel::MessagePtr message) override; 208 #endif 209 void OnChannelError(const ports::NodeName& from_node, 210 NodeChannel* channel) override; 211 #if defined(OS_MACOSX) && !defined(OS_IOS) 212 MachPortRelay* GetMachPortRelay() override; 213 #endif 214 215 // Marks this NodeController for destruction when the IO thread shuts down. 216 // This is used in case Core is torn down before the IO thread. Must only be 217 // called on the IO thread. 218 void DestroyOnIOThreadShutdown(); 219 220 // If there is a registered shutdown callback (meaning shutdown has been 221 // requested, this checks the Node's status to see if clean shutdown is 222 // possible. If so, shutdown is performed and the shutdown callback is run. 223 void AttemptShutdownIfRequested(); 224 225 // These are safe to access from any thread as long as the Node is alive. 226 Core* const core_; 227 const ports::NodeName name_; 228 const std::unique_ptr<ports::Node> node_; 229 scoped_refptr<base::TaskRunner> io_task_runner_; 230 231 // Guards |peers_| and |pending_peer_messages_|. 232 base::Lock peers_lock_; 233 234 // Channels to known peers, including parent and children, if any. 235 NodeMap peers_; 236 237 // Outgoing message queues for peers we've heard of but can't yet talk to. 238 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 239 pending_peer_messages_; 240 241 // Guards |reserved_ports_| and |pending_child_tokens_|. 242 base::Lock reserved_ports_lock_; 243 244 // Ports reserved by token. Key is the port token. 245 base::hash_map<std::string, ReservedPort> reserved_ports_; 246 // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't 247 // have one yet :( 248 std::unordered_map<ports::NodeName, std::string> pending_child_tokens_; 249 250 // Guards |pending_port_merges_| and |reject_pending_merges_|. 251 base::Lock pending_port_merges_lock_; 252 253 // A set of port merge requests awaiting parent connection. 254 std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; 255 256 // Indicates that new merge requests should be rejected because the parent has 257 // disconnected. 258 bool reject_pending_merges_ = false; 259 260 // Guards |parent_name_| and |bootstrap_parent_channel_|. 261 base::Lock parent_lock_; 262 263 // The name of our parent node, if any. 264 ports::NodeName parent_name_; 265 266 // A temporary reference to the parent channel before we know their name. 267 scoped_refptr<NodeChannel> bootstrap_parent_channel_; 268 269 // Guards |broker_name_|, |pending_broker_clients_|, and 270 // |pending_relay_messages_|. 271 base::Lock broker_lock_; 272 273 // The name of our broker node, if any. 274 ports::NodeName broker_name_; 275 276 // A queue of pending child names waiting to be connected to a broker. 277 std::queue<ports::NodeName> pending_broker_clients_; 278 279 // Messages waiting to be relayed by the broker once it's known. 280 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 281 pending_relay_messages_; 282 283 // Guards |incoming_messages_| and |incoming_messages_task_posted_|. 284 base::Lock messages_lock_; 285 std::queue<ports::ScopedMessage> incoming_messages_; 286 // Ensures that there is only one incoming messages task posted to the IO 287 // thread. 288 bool incoming_messages_task_posted_ = false; 289 290 // Guards |shutdown_callback_|. 291 base::Lock shutdown_lock_; 292 293 // Set by RequestShutdown(). If this is non-null, the controller will 294 // begin polling the Node to see if clean shutdown is possible any time the 295 // Node's state is modified by the controller. 296 base::Closure shutdown_callback_; 297 // Flag to fast-path checking |shutdown_callback_|. 298 AtomicFlag shutdown_callback_flag_; 299 300 // All other fields below must only be accessed on the I/O thread, i.e., the 301 // thread on which core_->io_task_runner() runs tasks. 302 303 // Channels to children during handshake. 304 NodeMap pending_children_; 305 306 // Indicates whether this object should delete itself on IO thread shutdown. 307 // Must only be accessed from the IO thread. 308 bool destroy_on_io_thread_shutdown_ = false; 309 310 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 311 // Broker for sync shared buffer creation (non-Mac posix-only) in children. 312 std::unique_ptr<Broker> broker_; 313 #endif 314 315 #if defined(OS_MACOSX) && !defined(OS_IOS) 316 base::Lock mach_port_relay_lock_; 317 // Relay for transferring mach ports to/from children. 318 std::unique_ptr<MachPortRelay> mach_port_relay_; 319 #endif 320 321 DISALLOW_COPY_AND_ASSIGN(NodeController); 322 }; 323 324 } // namespace edk 325 } // namespace mojo 326 327 #endif // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ 328