1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 7 8 #include <stdint.h> 9 10 #include <deque> 11 #include <map> 12 #include <memory> 13 #include <string> 14 15 #include "base/logging.h" 16 #include "base/macros.h" 17 #include "base/memory/ref_counted.h" 18 #include "base/memory/weak_ptr.h" 19 #include "base/single_thread_task_runner.h" 20 #include "base/synchronization/lock.h" 21 #include "base/threading/thread_checker.h" 22 #include "mojo/public/cpp/bindings/associated_group_controller.h" 23 #include "mojo/public/cpp/bindings/connector.h" 24 #include "mojo/public/cpp/bindings/interface_id.h" 25 #include "mojo/public/cpp/bindings/message_header_validator.h" 26 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" 27 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" 28 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" 29 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" 30 31 namespace base { 32 class SingleThreadTaskRunner; 33 } 34 35 namespace mojo { 36 37 class AssociatedGroup; 38 39 namespace internal { 40 41 // MultiplexRouter supports routing messages for multiple interfaces over a 42 // single message pipe. 43 // 44 // It is created on the thread where the master interface of the message pipe 45 // lives. Although it is ref-counted, it is guarateed to be destructed on the 46 // same thread. 47 // Some public methods are only allowed to be called on the creating thread; 48 // while the others are safe to call from any threads. Please see the method 49 // comments for more details. 50 class MultiplexRouter 51 : public MessageReceiver, 52 public AssociatedGroupController, 53 public PipeControlMessageHandlerDelegate { 54 public: 55 // If |set_interface_id_namespace_bit| is true, the interface IDs generated by 56 // this router will have the highest bit set. 57 MultiplexRouter(bool set_interface_id_namespace_bit, 58 ScopedMessagePipeHandle message_pipe, 59 scoped_refptr<base::SingleThreadTaskRunner> runner); 60 61 // Sets the master interface name for this router. Only used when reporting 62 // message header or control message validation errors. 63 void SetMasterInterfaceName(const std::string& name); 64 65 // --------------------------------------------------------------------------- 66 // The following public methods are safe to call from any threads. 67 68 // AssociatedGroupController implementation: 69 void CreateEndpointHandlePair( 70 ScopedInterfaceEndpointHandle* local_endpoint, 71 ScopedInterfaceEndpointHandle* remote_endpoint) override; 72 ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( 73 InterfaceId id) override; 74 void CloseEndpointHandle(InterfaceId id, bool is_local) override; 75 InterfaceEndpointController* AttachEndpointClient( 76 const ScopedInterfaceEndpointHandle& handle, 77 InterfaceEndpointClient* endpoint_client, 78 scoped_refptr<base::SingleThreadTaskRunner> runner) override; 79 void DetachEndpointClient( 80 const ScopedInterfaceEndpointHandle& handle) override; 81 void RaiseError() override; 82 83 // --------------------------------------------------------------------------- 84 // The following public methods are called on the creating thread. 85 86 // Please note that this method shouldn't be called unless it results from an 87 // explicit request of the user of bindings (e.g., the user sets an 88 // InterfacePtr to null or closes a Binding). 89 void CloseMessagePipe(); 90 91 // Extracts the underlying message pipe. PassMessagePipe()92 ScopedMessagePipeHandle PassMessagePipe() { 93 DCHECK(thread_checker_.CalledOnValidThread()); 94 DCHECK(!HasAssociatedEndpoints()); 95 return connector_.PassMessagePipe(); 96 } 97 98 // Blocks the current thread until the first incoming message, or |deadline|. WaitForIncomingMessage(MojoDeadline deadline)99 bool WaitForIncomingMessage(MojoDeadline deadline) { 100 DCHECK(thread_checker_.CalledOnValidThread()); 101 return connector_.WaitForIncomingMessage(deadline); 102 } 103 104 // See Binding for details of pause/resume. PauseIncomingMethodCallProcessing()105 void PauseIncomingMethodCallProcessing() { 106 DCHECK(thread_checker_.CalledOnValidThread()); 107 connector_.PauseIncomingMethodCallProcessing(); 108 } ResumeIncomingMethodCallProcessing()109 void ResumeIncomingMethodCallProcessing() { 110 DCHECK(thread_checker_.CalledOnValidThread()); 111 connector_.ResumeIncomingMethodCallProcessing(); 112 } 113 114 // Whether there are any associated interfaces running currently. 115 bool HasAssociatedEndpoints() const; 116 117 // Sets this object to testing mode. 118 // In testing mode, the object doesn't disconnect the underlying message pipe 119 // when it receives unexpected or invalid messages. 120 void EnableTestingMode(); 121 122 // Is the router bound to a message pipe handle? is_valid()123 bool is_valid() const { 124 DCHECK(thread_checker_.CalledOnValidThread()); 125 return connector_.is_valid(); 126 } 127 128 // TODO(yzshen): consider removing this getter. handle()129 MessagePipeHandle handle() const { 130 DCHECK(thread_checker_.CalledOnValidThread()); 131 return connector_.handle(); 132 } 133 134 private: 135 class InterfaceEndpoint; 136 struct Task; 137 138 ~MultiplexRouter() override; 139 140 // MessageReceiver implementation: 141 bool Accept(Message* message) override; 142 143 // PipeControlMessageHandlerDelegate implementation: 144 bool OnPeerAssociatedEndpointClosed(InterfaceId id) override; 145 bool OnAssociatedEndpointClosedBeforeSent(InterfaceId id) override; 146 147 void OnPipeConnectionError(); 148 149 // Specifies whether we are allowed to directly call into 150 // InterfaceEndpointClient (given that we are already on the same thread as 151 // the client). 152 enum ClientCallBehavior { 153 // Don't call any InterfaceEndpointClient methods directly. 154 NO_DIRECT_CLIENT_CALLS, 155 // Only call InterfaceEndpointClient::HandleIncomingMessage directly to 156 // handle sync messages. 157 ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, 158 // Allow to call any InterfaceEndpointClient methods directly. 159 ALLOW_DIRECT_CLIENT_CALLS 160 }; 161 162 // Processes enqueued tasks (incoming messages and error notifications). 163 // |current_task_runner| is only used when |client_call_behavior| is 164 // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task 165 // runner to make client calls for async messages or connection error 166 // notifications. 167 // 168 // Note: Because calling into InterfaceEndpointClient may lead to destruction 169 // of this object, if direct calls are allowed, the caller needs to hold on to 170 // a ref outside of |lock_| before calling this method. 171 void ProcessTasks(ClientCallBehavior client_call_behavior, 172 base::SingleThreadTaskRunner* current_task_runner); 173 174 // Processes the first queued sync message for the endpoint corresponding to 175 // |id|; returns whether there are more sync messages for that endpoint in the 176 // queue. 177 // 178 // This method is only used by enpoints during sync watching. Therefore, not 179 // all sync messages are handled by it. 180 bool ProcessFirstSyncMessageForEndpoint(InterfaceId id); 181 182 // Returns true to indicate that |task|/|message| has been processed. 183 bool ProcessNotifyErrorTask( 184 Task* task, 185 ClientCallBehavior client_call_behavior, 186 base::SingleThreadTaskRunner* current_task_runner); 187 bool ProcessIncomingMessage( 188 Message* message, 189 ClientCallBehavior client_call_behavior, 190 base::SingleThreadTaskRunner* current_task_runner); 191 192 void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner); 193 void LockAndCallProcessTasks(); 194 195 // Updates the state of |endpoint|. If both the endpoint and its peer have 196 // been closed, removes it from |endpoints_|. 197 // NOTE: The method may invalidate |endpoint|. 198 enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED }; 199 void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint, 200 EndpointStateUpdateType type); 201 202 void RaiseErrorInNonTestingMode(); 203 204 InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted); 205 206 // Whether to set the namespace bit when generating interface IDs. Please see 207 // comments of kInterfaceIdNamespaceMask. 208 const bool set_interface_id_namespace_bit_; 209 210 MessageHeaderValidator header_validator_; 211 Connector connector_; 212 213 base::ThreadChecker thread_checker_; 214 215 // Protects the following members. 216 mutable base::Lock lock_; 217 PipeControlMessageHandler control_message_handler_; 218 PipeControlMessageProxy control_message_proxy_; 219 220 std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_; 221 uint32_t next_interface_id_value_; 222 223 std::deque<std::unique_ptr<Task>> tasks_; 224 // It refers to tasks in |tasks_| and doesn't own any of them. 225 std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_; 226 227 bool posted_to_process_tasks_; 228 scoped_refptr<base::SingleThreadTaskRunner> posted_to_task_runner_; 229 230 bool encountered_error_; 231 232 bool testing_mode_; 233 234 DISALLOW_COPY_AND_ASSIGN(MultiplexRouter); 235 }; 236 237 } // namespace internal 238 } // namespace mojo 239 240 #endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 241