1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/local_message_pipe_endpoint.h"
6
7 #include <string.h>
8
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
12
13 namespace mojo {
14 namespace system {
15
LocalMessagePipeEndpoint()16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
17 : is_open_(true),
18 is_peer_open_(true) {
19 }
20
~LocalMessagePipeEndpoint()21 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
22 DCHECK(!is_open_);
23 DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open.
24 }
25
GetType() const26 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
27 return kTypeLocal;
28 }
29
OnPeerClose()30 bool LocalMessagePipeEndpoint::OnPeerClose() {
31 DCHECK(is_open_);
32 DCHECK(is_peer_open_);
33
34 HandleSignalsState old_state = GetHandleSignalsState();
35 is_peer_open_ = false;
36 HandleSignalsState new_state = GetHandleSignalsState();
37
38 if (!new_state.equals(old_state))
39 waiter_list_.AwakeWaitersForStateChange(new_state);
40
41 return true;
42 }
43
EnqueueMessage(scoped_ptr<MessageInTransit> message)44 void LocalMessagePipeEndpoint::EnqueueMessage(
45 scoped_ptr<MessageInTransit> message) {
46 DCHECK(is_open_);
47 DCHECK(is_peer_open_);
48
49 bool was_empty = message_queue_.IsEmpty();
50 message_queue_.AddMessage(message.Pass());
51 if (was_empty)
52 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
53 }
54
Close()55 void LocalMessagePipeEndpoint::Close() {
56 DCHECK(is_open_);
57 is_open_ = false;
58 message_queue_.Clear();
59 }
60
CancelAllWaiters()61 void LocalMessagePipeEndpoint::CancelAllWaiters() {
62 DCHECK(is_open_);
63 waiter_list_.CancelAllWaiters();
64 }
65
ReadMessage(void * bytes,uint32_t * num_bytes,DispatcherVector * dispatchers,uint32_t * num_dispatchers,MojoReadMessageFlags flags)66 MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes,
67 uint32_t* num_bytes,
68 DispatcherVector* dispatchers,
69 uint32_t* num_dispatchers,
70 MojoReadMessageFlags flags) {
71 DCHECK(is_open_);
72 DCHECK(!dispatchers || dispatchers->empty());
73
74 const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
75 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
76
77 if (message_queue_.IsEmpty()) {
78 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT :
79 MOJO_RESULT_FAILED_PRECONDITION;
80 }
81
82 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
83 // and release the lock immediately.
84 bool enough_space = true;
85 MessageInTransit* message = message_queue_.PeekMessage();
86 if (num_bytes)
87 *num_bytes = message->num_bytes();
88 if (message->num_bytes() <= max_bytes)
89 memcpy(bytes, message->bytes(), message->num_bytes());
90 else
91 enough_space = false;
92
93 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
94 if (num_dispatchers)
95 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
96 if (enough_space) {
97 if (queued_dispatchers->empty()) {
98 // Nothing to do.
99 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
100 DCHECK(dispatchers);
101 dispatchers->swap(*queued_dispatchers);
102 } else {
103 enough_space = false;
104 }
105 }
106 } else {
107 if (num_dispatchers)
108 *num_dispatchers = 0;
109 }
110
111 message = NULL;
112
113 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
114 message_queue_.DiscardMessage();
115
116 // Now it's empty, thus no longer readable.
117 if (message_queue_.IsEmpty()) {
118 // It's currently not possible to wait for non-readability, but we should
119 // do the state change anyway.
120 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
121 }
122 }
123
124 if (!enough_space)
125 return MOJO_RESULT_RESOURCE_EXHAUSTED;
126
127 return MOJO_RESULT_OK;
128 }
129
AddWaiter(Waiter * waiter,MojoHandleSignals signals,uint32_t context)130 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
131 MojoHandleSignals signals,
132 uint32_t context) {
133 DCHECK(is_open_);
134
135 HandleSignalsState state = GetHandleSignalsState();
136 if (state.satisfies(signals))
137 return MOJO_RESULT_ALREADY_EXISTS;
138 if (!state.can_satisfy(signals))
139 return MOJO_RESULT_FAILED_PRECONDITION;
140
141 waiter_list_.AddWaiter(waiter, signals, context);
142 return MOJO_RESULT_OK;
143 }
144
RemoveWaiter(Waiter * waiter)145 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
146 DCHECK(is_open_);
147 waiter_list_.RemoveWaiter(waiter);
148 }
149
GetHandleSignalsState()150 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() {
151 HandleSignalsState rv;
152 if (!message_queue_.IsEmpty()) {
153 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
154 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
155 }
156 if (is_peer_open_) {
157 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
158 rv.satisfiable_signals |=
159 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
160 }
161 return rv;
162 }
163
164 } // namespace system
165 } // namespace mojo
166