1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/local_message_pipe_endpoint.h"
6
7 #include <string.h>
8
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
12
13 namespace mojo {
14 namespace system {
15
MessageQueueEntry()16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
17 : message(NULL) {
18 }
19
20 // See comment in header file.
MessageQueueEntry(const MessageQueueEntry & other)21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
22 const MessageQueueEntry& other)
23 : message(NULL) {
24 DCHECK(!other.message);
25 DCHECK(other.dispatchers.empty());
26 }
27
~MessageQueueEntry()28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
29 if (message)
30 message->Destroy();
31 // Close all the dispatchers.
32 for (size_t i = 0; i < dispatchers.size(); i++) {
33 // Note: Taking the |Dispatcher| locks is okay, since no one else should
34 // have a reference to the dispatchers (and the locks shouldn't be held).
35 DCHECK(dispatchers[i]->HasOneRef());
36 dispatchers[i]->Close();
37 }
38 }
39
LocalMessagePipeEndpoint()40 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
41 : is_open_(true),
42 is_peer_open_(true) {
43 }
44
~LocalMessagePipeEndpoint()45 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
46 DCHECK(!is_open_);
47 }
48
Close()49 void LocalMessagePipeEndpoint::Close() {
50 DCHECK(is_open_);
51 is_open_ = false;
52 message_queue_.clear();
53 }
54
OnPeerClose()55 bool LocalMessagePipeEndpoint::OnPeerClose() {
56 DCHECK(is_open_);
57 DCHECK(is_peer_open_);
58
59 MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
60 MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
61 is_peer_open_ = false;
62 MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
63 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
64
65 if (new_satisfied_flags != old_satisfied_flags ||
66 new_satisfiable_flags != old_satisfiable_flags) {
67 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
68 new_satisfiable_flags);
69 }
70
71 return true;
72 }
73
CanEnqueueMessage(const MessageInTransit *,const std::vector<Dispatcher * > *)74 MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage(
75 const MessageInTransit* /*message*/,
76 const std::vector<Dispatcher*>* /*dispatchers*/) {
77 return MOJO_RESULT_OK;
78 }
79
EnqueueMessage(MessageInTransit * message,std::vector<scoped_refptr<Dispatcher>> * dispatchers)80 void LocalMessagePipeEndpoint::EnqueueMessage(
81 MessageInTransit* message,
82 std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
83 DCHECK(is_open_);
84 DCHECK(is_peer_open_);
85
86 bool was_empty = message_queue_.empty();
87 message_queue_.push_back(MessageQueueEntry());
88 message_queue_.back().message = message;
89 if (dispatchers) {
90 #ifndef NDEBUG
91 // It's important that we're taking "ownership" of the dispatchers. In
92 // particular, they must not be in the global handle table (i.e., have live
93 // handles referring to them). If we need to destroy any queued messages, we
94 // need to know that any handles in them should be closed.
95 for (size_t i = 0; i < dispatchers->size(); i++)
96 DCHECK((*dispatchers)[i]->HasOneRef());
97 #endif
98 message_queue_.back().dispatchers.swap(*dispatchers);
99 }
100 if (was_empty) {
101 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
102 SatisfiableFlags());
103 }
104 }
105
CancelAllWaiters()106 void LocalMessagePipeEndpoint::CancelAllWaiters() {
107 DCHECK(is_open_);
108 waiter_list_.CancelAllWaiters();
109 }
110
ReadMessage(void * bytes,uint32_t * num_bytes,std::vector<scoped_refptr<Dispatcher>> * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)111 MojoResult LocalMessagePipeEndpoint::ReadMessage(
112 void* bytes, uint32_t* num_bytes,
113 std::vector<scoped_refptr<Dispatcher> >* dispatchers,
114 uint32_t* num_dispatchers,
115 MojoReadMessageFlags flags) {
116 DCHECK(is_open_);
117 DCHECK(!dispatchers || dispatchers->empty());
118
119 const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
120 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
121
122 if (message_queue_.empty()) {
123 return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
124 MOJO_RESULT_FAILED_PRECONDITION;
125 }
126
127 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
128 // and release the lock immediately.
129 bool enough_space = true;
130 const MessageInTransit* queued_message = message_queue_.front().message;
131 if (num_bytes)
132 *num_bytes = queued_message->data_size();
133 if (queued_message->data_size() <= max_bytes)
134 memcpy(bytes, queued_message->data(), queued_message->data_size());
135 else
136 enough_space = false;
137
138 std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
139 &message_queue_.front().dispatchers;
140 if (num_dispatchers)
141 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
142 if (enough_space) {
143 if (queued_dispatchers->empty()) {
144 // Nothing to do.
145 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
146 DCHECK(dispatchers);
147 dispatchers->swap(*queued_dispatchers);
148 } else {
149 enough_space = false;
150 }
151 }
152
153 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
154 message_queue_.pop_front();
155
156 // Now it's empty, thus no longer readable.
157 if (message_queue_.empty()) {
158 // It's currently not possible to wait for non-readability, but we should
159 // do the state change anyway.
160 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
161 SatisfiableFlags());
162 }
163 }
164
165 if (!enough_space)
166 return MOJO_RESULT_RESOURCE_EXHAUSTED;
167
168 return MOJO_RESULT_OK;
169 }
170
AddWaiter(Waiter * waiter,MojoWaitFlags flags,MojoResult wake_result)171 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
172 MojoWaitFlags flags,
173 MojoResult wake_result) {
174 DCHECK(is_open_);
175
176 if ((flags & SatisfiedFlags()))
177 return MOJO_RESULT_ALREADY_EXISTS;
178 if (!(flags & SatisfiableFlags()))
179 return MOJO_RESULT_FAILED_PRECONDITION;
180
181 waiter_list_.AddWaiter(waiter, flags, wake_result);
182 return MOJO_RESULT_OK;
183 }
184
RemoveWaiter(Waiter * waiter)185 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
186 DCHECK(is_open_);
187 waiter_list_.RemoveWaiter(waiter);
188 }
189
SatisfiedFlags()190 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
191 MojoWaitFlags satisfied_flags = 0;
192 if (!message_queue_.empty())
193 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
194 if (is_peer_open_)
195 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
196 return satisfied_flags;
197 }
198
SatisfiableFlags()199 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
200 MojoWaitFlags satisfiable_flags = 0;
201 if (!message_queue_.empty() || is_peer_open_)
202 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
203 if (is_peer_open_)
204 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
205 return satisfiable_flags;
206 }
207
208 } // namespace system
209 } // namespace mojo
210