1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 7 8 #include <stdint.h> 9 10 #include <deque> 11 #include <map> 12 #include <memory> 13 #include <string> 14 15 #include "base/compiler_specific.h" 16 #include "base/logging.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/memory/weak_ptr.h" 20 #include "base/optional.h" 21 #include "base/single_thread_task_runner.h" 22 #include "base/synchronization/lock.h" 23 #include "base/threading/thread_checker.h" 24 #include "mojo/public/cpp/bindings/associated_group_controller.h" 25 #include "mojo/public/cpp/bindings/bindings_export.h" 26 #include "mojo/public/cpp/bindings/connector.h" 27 #include "mojo/public/cpp/bindings/filter_chain.h" 28 #include "mojo/public/cpp/bindings/interface_id.h" 29 #include "mojo/public/cpp/bindings/message_header_validator.h" 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" 31 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" 32 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" 33 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" 34 35 namespace base { 36 class SingleThreadTaskRunner; 37 } 38 39 namespace mojo { 40 41 namespace internal { 42 43 // MultiplexRouter supports routing messages for multiple interfaces over a 44 // single message pipe. 45 // 46 // It is created on the thread where the master interface of the message pipe 47 // lives. Although it is ref-counted, it is guarateed to be destructed on the 48 // same thread. 49 // Some public methods are only allowed to be called on the creating thread; 50 // while the others are safe to call from any threads. Please see the method 51 // comments for more details. 52 // 53 // NOTE: CloseMessagePipe() or PassMessagePipe() MUST be called on |runner|'s 54 // thread before this object is destroyed. 55 class MOJO_CPP_BINDINGS_EXPORT MultiplexRouter NON_EXPORTED_BASE(public MessageReceiver)56 : NON_EXPORTED_BASE(public MessageReceiver), 57 public AssociatedGroupController, 58 NON_EXPORTED_BASE(public PipeControlMessageHandlerDelegate) { 59 public: 60 enum Config { 61 // There is only the master interface running on this router. Please note 62 // that because of interface versioning, the other side of the message pipe 63 // may use a newer master interface definition which passes associated 64 // interfaces. In that case, this router may still receive pipe control 65 // messages or messages targetting associated interfaces. 66 SINGLE_INTERFACE, 67 // Similar to the mode above, there is only the master interface running on 68 // this router. Besides, the master interface has sync methods. 69 SINGLE_INTERFACE_WITH_SYNC_METHODS, 70 // There may be associated interfaces running on this router. 71 MULTI_INTERFACE 72 }; 73 74 // If |set_interface_id_namespace_bit| is true, the interface IDs generated by 75 // this router will have the highest bit set. 76 MultiplexRouter(ScopedMessagePipeHandle message_pipe, 77 Config config, 78 bool set_interface_id_namespace_bit, 79 scoped_refptr<base::SingleThreadTaskRunner> runner); 80 81 // Sets the master interface name for this router. Only used when reporting 82 // message header or control message validation errors. 83 // |name| must be a string literal. 84 void SetMasterInterfaceName(const char* name); 85 86 // --------------------------------------------------------------------------- 87 // The following public methods are safe to call from any threads. 88 89 // AssociatedGroupController implementation: 90 InterfaceId AssociateInterface( 91 ScopedInterfaceEndpointHandle handle_to_send) override; 92 ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( 93 InterfaceId id) override; 94 void CloseEndpointHandle( 95 InterfaceId id, 96 const base::Optional<DisconnectReason>& reason) override; 97 InterfaceEndpointController* AttachEndpointClient( 98 const ScopedInterfaceEndpointHandle& handle, 99 InterfaceEndpointClient* endpoint_client, 100 scoped_refptr<base::SingleThreadTaskRunner> runner) override; 101 void DetachEndpointClient( 102 const ScopedInterfaceEndpointHandle& handle) override; 103 void RaiseError() override; 104 105 // --------------------------------------------------------------------------- 106 // The following public methods are called on the creating thread. 107 108 // Please note that this method shouldn't be called unless it results from an 109 // explicit request of the user of bindings (e.g., the user sets an 110 // InterfacePtr to null or closes a Binding). 111 void CloseMessagePipe(); 112 113 // Extracts the underlying message pipe. 114 ScopedMessagePipeHandle PassMessagePipe() { 115 DCHECK(thread_checker_.CalledOnValidThread()); 116 DCHECK(!HasAssociatedEndpoints()); 117 return connector_.PassMessagePipe(); 118 } 119 120 // Blocks the current thread until the first incoming message, or |deadline|. 121 bool WaitForIncomingMessage(MojoDeadline deadline) { 122 DCHECK(thread_checker_.CalledOnValidThread()); 123 return connector_.WaitForIncomingMessage(deadline); 124 } 125 126 // See Binding for details of pause/resume. 127 void PauseIncomingMethodCallProcessing(); 128 void ResumeIncomingMethodCallProcessing(); 129 130 // Whether there are any associated interfaces running currently. 131 bool HasAssociatedEndpoints() const; 132 133 // Sets this object to testing mode. 134 // In testing mode, the object doesn't disconnect the underlying message pipe 135 // when it receives unexpected or invalid messages. 136 void EnableTestingMode(); 137 138 // Is the router bound to a message pipe handle? 139 bool is_valid() const { 140 DCHECK(thread_checker_.CalledOnValidThread()); 141 return connector_.is_valid(); 142 } 143 144 // TODO(yzshen): consider removing this getter. 145 MessagePipeHandle handle() const { 146 DCHECK(thread_checker_.CalledOnValidThread()); 147 return connector_.handle(); 148 } 149 150 bool SimulateReceivingMessageForTesting(Message* message) { 151 return filters_.Accept(message); 152 } 153 154 private: 155 class InterfaceEndpoint; 156 class MessageWrapper; 157 struct Task; 158 159 ~MultiplexRouter() override; 160 161 // MessageReceiver implementation: 162 bool Accept(Message* message) override; 163 164 // PipeControlMessageHandlerDelegate implementation: 165 bool OnPeerAssociatedEndpointClosed( 166 InterfaceId id, 167 const base::Optional<DisconnectReason>& reason) override; 168 169 void OnPipeConnectionError(); 170 171 // Specifies whether we are allowed to directly call into 172 // InterfaceEndpointClient (given that we are already on the same thread as 173 // the client). 174 enum ClientCallBehavior { 175 // Don't call any InterfaceEndpointClient methods directly. 176 NO_DIRECT_CLIENT_CALLS, 177 // Only call InterfaceEndpointClient::HandleIncomingMessage directly to 178 // handle sync messages. 179 ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, 180 // Allow to call any InterfaceEndpointClient methods directly. 181 ALLOW_DIRECT_CLIENT_CALLS 182 }; 183 184 // Processes enqueued tasks (incoming messages and error notifications). 185 // |current_task_runner| is only used when |client_call_behavior| is 186 // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task 187 // runner to make client calls for async messages or connection error 188 // notifications. 189 // 190 // Note: Because calling into InterfaceEndpointClient may lead to destruction 191 // of this object, if direct calls are allowed, the caller needs to hold on to 192 // a ref outside of |lock_| before calling this method. 193 void ProcessTasks(ClientCallBehavior client_call_behavior, 194 base::SingleThreadTaskRunner* current_task_runner); 195 196 // Processes the first queued sync message for the endpoint corresponding to 197 // |id|; returns whether there are more sync messages for that endpoint in the 198 // queue. 199 // 200 // This method is only used by enpoints during sync watching. Therefore, not 201 // all sync messages are handled by it. 202 bool ProcessFirstSyncMessageForEndpoint(InterfaceId id); 203 204 // Returns true to indicate that |task|/|message| has been processed. 205 bool ProcessNotifyErrorTask( 206 Task* task, 207 ClientCallBehavior client_call_behavior, 208 base::SingleThreadTaskRunner* current_task_runner); 209 bool ProcessIncomingMessage( 210 Message* message, 211 ClientCallBehavior client_call_behavior, 212 base::SingleThreadTaskRunner* current_task_runner); 213 214 void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner); 215 void LockAndCallProcessTasks(); 216 217 // Updates the state of |endpoint|. If both the endpoint and its peer have 218 // been closed, removes it from |endpoints_|. 219 // NOTE: The method may invalidate |endpoint|. 220 enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED }; 221 void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint, 222 EndpointStateUpdateType type); 223 224 void RaiseErrorInNonTestingMode(); 225 226 InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted); 227 InterfaceEndpoint* FindEndpoint(InterfaceId id); 228 229 void AssertLockAcquired(); 230 231 // Whether to set the namespace bit when generating interface IDs. Please see 232 // comments of kInterfaceIdNamespaceMask. 233 const bool set_interface_id_namespace_bit_; 234 235 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 236 237 // Owned by |filters_| below. 238 MessageHeaderValidator* header_validator_; 239 240 FilterChain filters_; 241 Connector connector_; 242 243 base::ThreadChecker thread_checker_; 244 245 // Protects the following members. 246 // Not set in Config::SINGLE_INTERFACE* mode. 247 mutable base::Optional<base::Lock> lock_; 248 PipeControlMessageHandler control_message_handler_; 249 250 // NOTE: It is unsafe to call into this object while holding |lock_|. 251 PipeControlMessageProxy control_message_proxy_; 252 253 std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_; 254 uint32_t next_interface_id_value_; 255 256 std::deque<std::unique_ptr<Task>> tasks_; 257 // It refers to tasks in |tasks_| and doesn't own any of them. 258 std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_; 259 260 bool posted_to_process_tasks_; 261 scoped_refptr<base::SingleThreadTaskRunner> posted_to_task_runner_; 262 263 bool encountered_error_; 264 265 bool paused_; 266 267 bool testing_mode_; 268 269 DISALLOW_COPY_AND_ASSIGN(MultiplexRouter); 270 }; 271 272 } // namespace internal 273 } // namespace mojo 274 275 #endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 276