• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/local_message_pipe_endpoint.h"
6 
7 #include <string.h>
8 
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
12 
13 namespace mojo {
14 namespace system {
15 
MessageQueueEntry()16 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
17     : message(NULL) {
18 }
19 
20 // See comment in header file.
MessageQueueEntry(const MessageQueueEntry & other)21 LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
22     const MessageQueueEntry& other)
23     : message(NULL) {
24   DCHECK(!other.message);
25   DCHECK(other.dispatchers.empty());
26 }
27 
~MessageQueueEntry()28 LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
29   if (message)
30     message->Destroy();
31   // Close all the dispatchers.
32   for (size_t i = 0; i < dispatchers.size(); i++) {
33     // Note: Taking the |Dispatcher| locks is okay, since no one else should
34     // have a reference to the dispatchers (and the locks shouldn't be held).
35     DCHECK(dispatchers[i]->HasOneRef());
36     dispatchers[i]->Close();
37   }
38 }
39 
LocalMessagePipeEndpoint()40 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
41     : is_open_(true),
42       is_peer_open_(true) {
43 }
44 
~LocalMessagePipeEndpoint()45 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
46   DCHECK(!is_open_);
47 }
48 
Close()49 void LocalMessagePipeEndpoint::Close() {
50   DCHECK(is_open_);
51   is_open_ = false;
52   message_queue_.clear();
53 }
54 
OnPeerClose()55 bool LocalMessagePipeEndpoint::OnPeerClose() {
56   DCHECK(is_open_);
57   DCHECK(is_peer_open_);
58 
59   MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
60   MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
61   is_peer_open_ = false;
62   MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
63   MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
64 
65   if (new_satisfied_flags != old_satisfied_flags ||
66       new_satisfiable_flags != old_satisfiable_flags) {
67     waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
68                                             new_satisfiable_flags);
69   }
70 
71   return true;
72 }
73 
CanEnqueueMessage(const MessageInTransit *,const std::vector<Dispatcher * > *)74 MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage(
75     const MessageInTransit* /*message*/,
76     const std::vector<Dispatcher*>* /*dispatchers*/) {
77   return MOJO_RESULT_OK;
78 }
79 
EnqueueMessage(MessageInTransit * message,std::vector<scoped_refptr<Dispatcher>> * dispatchers)80 void LocalMessagePipeEndpoint::EnqueueMessage(
81     MessageInTransit* message,
82     std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
83   DCHECK(is_open_);
84   DCHECK(is_peer_open_);
85 
86   bool was_empty = message_queue_.empty();
87   message_queue_.push_back(MessageQueueEntry());
88   message_queue_.back().message = message;
89   if (dispatchers) {
90 #ifndef NDEBUG
91     // It's important that we're taking "ownership" of the dispatchers. In
92     // particular, they must not be in the global handle table (i.e., have live
93     // handles referring to them). If we need to destroy any queued messages, we
94     // need to know that any handles in them should be closed.
95     for (size_t i = 0; i < dispatchers->size(); i++)
96       DCHECK((*dispatchers)[i]->HasOneRef());
97 #endif
98     message_queue_.back().dispatchers.swap(*dispatchers);
99   }
100   if (was_empty) {
101     waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
102                                             SatisfiableFlags());
103   }
104 }
105 
CancelAllWaiters()106 void LocalMessagePipeEndpoint::CancelAllWaiters() {
107   DCHECK(is_open_);
108   waiter_list_.CancelAllWaiters();
109 }
110 
ReadMessage(void * bytes,uint32_t * num_bytes,std::vector<scoped_refptr<Dispatcher>> * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)111 MojoResult LocalMessagePipeEndpoint::ReadMessage(
112     void* bytes, uint32_t* num_bytes,
113     std::vector<scoped_refptr<Dispatcher> >* dispatchers,
114     uint32_t* num_dispatchers,
115     MojoReadMessageFlags flags) {
116   DCHECK(is_open_);
117   DCHECK(!dispatchers || dispatchers->empty());
118 
119   const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
120   const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
121 
122   if (message_queue_.empty()) {
123     return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
124                            MOJO_RESULT_FAILED_PRECONDITION;
125   }
126 
127   // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
128   // and release the lock immediately.
129   bool enough_space = true;
130   const MessageInTransit* queued_message = message_queue_.front().message;
131   if (num_bytes)
132     *num_bytes = queued_message->data_size();
133   if (queued_message->data_size() <= max_bytes)
134     memcpy(bytes, queued_message->data(), queued_message->data_size());
135   else
136     enough_space = false;
137 
138   std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
139       &message_queue_.front().dispatchers;
140   if (num_dispatchers)
141     *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
142   if (enough_space) {
143     if (queued_dispatchers->empty()) {
144       // Nothing to do.
145     } else if (queued_dispatchers->size() <= max_num_dispatchers) {
146       DCHECK(dispatchers);
147       dispatchers->swap(*queued_dispatchers);
148     } else {
149       enough_space = false;
150     }
151   }
152 
153   if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
154     message_queue_.pop_front();
155 
156     // Now it's empty, thus no longer readable.
157     if (message_queue_.empty()) {
158       // It's currently not possible to wait for non-readability, but we should
159       // do the state change anyway.
160       waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
161                                               SatisfiableFlags());
162     }
163   }
164 
165   if (!enough_space)
166     return MOJO_RESULT_RESOURCE_EXHAUSTED;
167 
168   return MOJO_RESULT_OK;
169 }
170 
AddWaiter(Waiter * waiter,MojoWaitFlags flags,MojoResult wake_result)171 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
172                                                MojoWaitFlags flags,
173                                                MojoResult wake_result) {
174   DCHECK(is_open_);
175 
176   if ((flags & SatisfiedFlags()))
177     return MOJO_RESULT_ALREADY_EXISTS;
178   if (!(flags & SatisfiableFlags()))
179     return MOJO_RESULT_FAILED_PRECONDITION;
180 
181   waiter_list_.AddWaiter(waiter, flags, wake_result);
182   return MOJO_RESULT_OK;
183 }
184 
RemoveWaiter(Waiter * waiter)185 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
186   DCHECK(is_open_);
187   waiter_list_.RemoveWaiter(waiter);
188 }
189 
SatisfiedFlags()190 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
191   MojoWaitFlags satisfied_flags = 0;
192   if (!message_queue_.empty())
193     satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
194   if (is_peer_open_)
195     satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
196   return satisfied_flags;
197 }
198 
SatisfiableFlags()199 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
200   MojoWaitFlags satisfiable_flags = 0;
201   if (!message_queue_.empty() || is_peer_open_)
202     satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
203   if (is_peer_open_)
204     satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
205   return satisfiable_flags;
206 }
207 
208 }  // namespace system
209 }  // namespace mojo
210