• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/local_message_pipe_endpoint.h"
6 
7 #include <string.h>
8 
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
12 
13 namespace mojo {
14 namespace system {
15 
LocalMessagePipeEndpoint()16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
17     : is_open_(true),
18       is_peer_open_(true) {
19 }
20 
~LocalMessagePipeEndpoint()21 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
22   DCHECK(!is_open_);
23   DCHECK(message_queue_.IsEmpty());  // Should be implied by not being open.
24 }
25 
GetType() const26 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
27   return kTypeLocal;
28 }
29 
OnPeerClose()30 bool LocalMessagePipeEndpoint::OnPeerClose() {
31   DCHECK(is_open_);
32   DCHECK(is_peer_open_);
33 
34   HandleSignalsState old_state = GetHandleSignalsState();
35   is_peer_open_ = false;
36   HandleSignalsState new_state = GetHandleSignalsState();
37 
38   if (!new_state.equals(old_state))
39     waiter_list_.AwakeWaitersForStateChange(new_state);
40 
41   return true;
42 }
43 
EnqueueMessage(scoped_ptr<MessageInTransit> message)44 void LocalMessagePipeEndpoint::EnqueueMessage(
45     scoped_ptr<MessageInTransit> message) {
46   DCHECK(is_open_);
47   DCHECK(is_peer_open_);
48 
49   bool was_empty = message_queue_.IsEmpty();
50   message_queue_.AddMessage(message.Pass());
51   if (was_empty)
52     waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
53 }
54 
Close()55 void LocalMessagePipeEndpoint::Close() {
56   DCHECK(is_open_);
57   is_open_ = false;
58   message_queue_.Clear();
59 }
60 
CancelAllWaiters()61 void LocalMessagePipeEndpoint::CancelAllWaiters() {
62   DCHECK(is_open_);
63   waiter_list_.CancelAllWaiters();
64 }
65 
ReadMessage(void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)66 MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes,
67                                                  uint32_t* num_bytes,
68                                                  DispatcherVector* dispatchers,
69                                                  uint32_t* num_dispatchers,
70                                                  MojoReadMessageFlags flags) {
71   DCHECK(is_open_);
72   DCHECK(!dispatchers || dispatchers->empty());
73 
74   const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
75   const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
76 
77   if (message_queue_.IsEmpty()) {
78     return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT :
79                            MOJO_RESULT_FAILED_PRECONDITION;
80   }
81 
82   // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
83   // and release the lock immediately.
84   bool enough_space = true;
85   MessageInTransit* message = message_queue_.PeekMessage();
86   if (num_bytes)
87     *num_bytes = message->num_bytes();
88   if (message->num_bytes() <= max_bytes)
89     memcpy(bytes, message->bytes(), message->num_bytes());
90   else
91     enough_space = false;
92 
93   if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
94     if (num_dispatchers)
95       *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
96     if (enough_space) {
97       if (queued_dispatchers->empty()) {
98         // Nothing to do.
99       } else if (queued_dispatchers->size() <= max_num_dispatchers) {
100         DCHECK(dispatchers);
101         dispatchers->swap(*queued_dispatchers);
102       } else {
103         enough_space = false;
104       }
105     }
106   } else {
107     if (num_dispatchers)
108       *num_dispatchers = 0;
109   }
110 
111   message = NULL;
112 
113   if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
114     message_queue_.DiscardMessage();
115 
116     // Now it's empty, thus no longer readable.
117     if (message_queue_.IsEmpty()) {
118       // It's currently not possible to wait for non-readability, but we should
119       // do the state change anyway.
120       waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
121     }
122   }
123 
124   if (!enough_space)
125     return MOJO_RESULT_RESOURCE_EXHAUSTED;
126 
127   return MOJO_RESULT_OK;
128 }
129 
AddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)130 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
131                                                MojoHandleSignals signals,
132                                                uint32_t context) {
133   DCHECK(is_open_);
134 
135   HandleSignalsState state = GetHandleSignalsState();
136   if (state.satisfies(signals))
137     return MOJO_RESULT_ALREADY_EXISTS;
138   if (!state.can_satisfy(signals))
139     return MOJO_RESULT_FAILED_PRECONDITION;
140 
141   waiter_list_.AddWaiter(waiter, signals, context);
142   return MOJO_RESULT_OK;
143 }
144 
RemoveWaiter(Waiter * waiter)145 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
146   DCHECK(is_open_);
147   waiter_list_.RemoveWaiter(waiter);
148 }
149 
GetHandleSignalsState()150 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() {
151   HandleSignalsState rv;
152   if (!message_queue_.IsEmpty()) {
153     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
154     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
155   }
156   if (is_peer_open_) {
157     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
158     rv.satisfiable_signals |=
159         MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
160   }
161   return rv;
162 }
163 
164 }  // namespace system
165 }  // namespace mojo
166