1 // Copyright 2015 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_ 6 #define MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_ 7 8 #include <stdint.h> 9 10 #include <memory> 11 #include <queue> 12 13 #include "base/macros.h" 14 #include "base/optional.h" 15 #include "mojo/core/atomic_flag.h" 16 #include "mojo/core/dispatcher.h" 17 #include "mojo/core/ports/port_ref.h" 18 #include "mojo/core/watcher_set.h" 19 20 namespace mojo { 21 namespace core { 22 23 class NodeController; 24 25 class MessagePipeDispatcher : public Dispatcher { 26 public: 27 // Constructs a MessagePipeDispatcher permanently tied to a specific port. 28 // |connected| must indicate the state of the port at construction time; if 29 // the port is initialized with a peer, |connected| must be true. Otherwise it 30 // must be false. 31 // 32 // A MessagePipeDispatcher may not be transferred while in a disconnected 33 // state, and one can never return to a disconnected once connected. 34 // 35 // |pipe_id| is a unique identifier which can be used to track pipe endpoints 36 // as they're passed around. |endpoint| is either 0 or 1 and again is only 37 // used for tracking pipes (one side is always 0, the other is always 1.) 38 MessagePipeDispatcher(NodeController* node_controller, 39 const ports::PortRef& port, 40 uint64_t pipe_id, 41 int endpoint); 42 43 // Fuses this pipe with |other|. Returns |true| on success or |false| on 44 // failure. Regardless of the return value, both dispatchers are closed by 45 // this call. 46 bool Fuse(MessagePipeDispatcher* other); 47 48 // Dispatcher: 49 Type GetType() const override; 50 MojoResult Close() override; 51 MojoResult WriteMessage( 52 std::unique_ptr<ports::UserMessageEvent> message) override; 53 MojoResult ReadMessage( 54 std::unique_ptr<ports::UserMessageEvent>* message) override; 55 MojoResult SetQuota(MojoQuotaType type, uint64_t limit) override; 56 MojoResult QueryQuota(MojoQuotaType type, 57 uint64_t* limit, 58 uint64_t* usage) override; 59 HandleSignalsState GetHandleSignalsState() const override; 60 MojoResult AddWatcherRef(const scoped_refptr<WatcherDispatcher>& watcher, 61 uintptr_t context) override; 62 MojoResult RemoveWatcherRef(WatcherDispatcher* watcher, 63 uintptr_t context) override; 64 void StartSerialize(uint32_t* num_bytes, 65 uint32_t* num_ports, 66 uint32_t* num_handles) override; 67 bool EndSerialize(void* destination, 68 ports::PortName* ports, 69 PlatformHandle* handles) override; 70 bool BeginTransit() override; 71 void CompleteTransitAndClose() override; 72 void CancelTransit() override; 73 74 static scoped_refptr<Dispatcher> Deserialize(const void* data, 75 size_t num_bytes, 76 const ports::PortName* ports, 77 size_t num_ports, 78 PlatformHandle* handles, 79 size_t num_handles); 80 81 private: 82 class PortObserverThunk; 83 friend class PortObserverThunk; 84 85 ~MessagePipeDispatcher() override; 86 87 MojoResult CloseNoLock(); 88 HandleSignalsState GetHandleSignalsStateNoLock() const; 89 void OnPortStatusChanged(); 90 91 // These are safe to access from any thread without locking. 92 NodeController* const node_controller_; 93 const ports::PortRef port_; 94 const uint64_t pipe_id_; 95 const int endpoint_; 96 97 // Guards access to all the fields below. 98 mutable base::Lock signal_lock_; 99 100 // This is not the same is |port_transferred_|. It's only held true between 101 // BeginTransit() and Complete/CancelTransit(). 102 AtomicFlag in_transit_; 103 104 bool port_transferred_ = false; 105 AtomicFlag port_closed_; 106 WatcherSet watchers_; 107 base::Optional<uint64_t> receive_queue_length_limit_; 108 base::Optional<uint64_t> receive_queue_memory_size_limit_; 109 110 DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher); 111 }; 112 113 } // namespace core 114 } // namespace mojo 115 116 #endif // MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_ 117