1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/message_pipe_dispatcher.h"
6
7 #include <limits>
8 #include <memory>
9
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/core/core.h"
14 #include "mojo/core/node_controller.h"
15 #include "mojo/core/ports/event.h"
16 #include "mojo/core/ports/message_filter.h"
17 #include "mojo/core/request_context.h"
18 #include "mojo/core/user_message_impl.h"
19
20 namespace mojo {
21 namespace core {
22
23 namespace {
24
25 #pragma pack(push, 1)
26
27 struct SerializedState {
28 uint64_t pipe_id;
29 int8_t endpoint;
30 char padding[7];
31 };
32
33 static_assert(sizeof(SerializedState) % 8 == 0,
34 "Invalid SerializedState size.");
35
36 #pragma pack(pop)
37
38 } // namespace
39
40 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
41 // reference to the MPD to ensure it lives as long as the observed port.
42 class MessagePipeDispatcher::PortObserverThunk
43 : public NodeController::PortObserver {
44 public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)45 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
46 : dispatcher_(dispatcher) {}
47
48 private:
~PortObserverThunk()49 ~PortObserverThunk() override {}
50
51 // NodeController::PortObserver:
OnPortStatusChanged()52 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
53
54 scoped_refptr<MessagePipeDispatcher> dispatcher_;
55
56 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
57 };
58
59 #if DCHECK_IS_ON()
60
61 // A MessageFilter which never matches a message. Used to peek at the size of
62 // the next available message on a port, for debug logging only.
63 class PeekSizeMessageFilter : public ports::MessageFilter {
64 public:
PeekSizeMessageFilter()65 PeekSizeMessageFilter() {}
~PeekSizeMessageFilter()66 ~PeekSizeMessageFilter() override {}
67
68 // ports::MessageFilter:
Match(const ports::UserMessageEvent & message_event)69 bool Match(const ports::UserMessageEvent& message_event) override {
70 const auto* message = message_event.GetMessage<UserMessageImpl>();
71 if (message->IsSerialized())
72 message_size_ = message->user_payload_size();
73 return false;
74 }
75
message_size() const76 size_t message_size() const { return message_size_; }
77
78 private:
79 size_t message_size_ = 0;
80
81 DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
82 };
83
84 #endif // DCHECK_IS_ON()
85
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)86 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
87 const ports::PortRef& port,
88 uint64_t pipe_id,
89 int endpoint)
90 : node_controller_(node_controller),
91 port_(port),
92 pipe_id_(pipe_id),
93 endpoint_(endpoint),
94 watchers_(this) {
95 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
96 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
97
98 node_controller_->SetPortObserver(
99 port_, base::MakeRefCounted<PortObserverThunk>(this));
100 }
101
Fuse(MessagePipeDispatcher * other)102 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
103 node_controller_->SetPortObserver(port_, nullptr);
104 node_controller_->SetPortObserver(other->port_, nullptr);
105
106 ports::PortRef port0;
107 {
108 base::AutoLock lock(signal_lock_);
109 port0 = port_;
110 port_closed_.Set(true);
111 watchers_.NotifyClosed();
112 }
113
114 ports::PortRef port1;
115 {
116 base::AutoLock lock(other->signal_lock_);
117 port1 = other->port_;
118 other->port_closed_.Set(true);
119 other->watchers_.NotifyClosed();
120 }
121
122 // Both ports are always closed by this call.
123 int rv = node_controller_->MergeLocalPorts(port0, port1);
124 return rv == ports::OK;
125 }
126
GetType() const127 Dispatcher::Type MessagePipeDispatcher::GetType() const {
128 return Type::MESSAGE_PIPE;
129 }
130
Close()131 MojoResult MessagePipeDispatcher::Close() {
132 base::AutoLock lock(signal_lock_);
133 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
134 << " [port=" << port_.name() << "]";
135 return CloseNoLock();
136 }
137
WriteMessage(std::unique_ptr<ports::UserMessageEvent> message)138 MojoResult MessagePipeDispatcher::WriteMessage(
139 std::unique_ptr<ports::UserMessageEvent> message) {
140 if (port_closed_ || in_transit_)
141 return MOJO_RESULT_INVALID_ARGUMENT;
142
143 int rv = node_controller_->SendUserMessage(port_, std::move(message));
144
145 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
146 << " [port=" << port_.name() << "; rv=" << rv << "]";
147
148 if (rv != ports::OK) {
149 if (rv == ports::ERROR_PORT_UNKNOWN ||
150 rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
151 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
152 return MOJO_RESULT_INVALID_ARGUMENT;
153 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
154 return MOJO_RESULT_FAILED_PRECONDITION;
155 }
156
157 NOTREACHED();
158 return MOJO_RESULT_UNKNOWN;
159 }
160
161 return MOJO_RESULT_OK;
162 }
163
ReadMessage(std::unique_ptr<ports::UserMessageEvent> * message)164 MojoResult MessagePipeDispatcher::ReadMessage(
165 std::unique_ptr<ports::UserMessageEvent>* message) {
166 // We can't read from a port that's closed or in transit!
167 if (port_closed_ || in_transit_)
168 return MOJO_RESULT_INVALID_ARGUMENT;
169
170 int rv = node_controller_->node()->GetMessage(port_, message, nullptr);
171 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
172 if (rv == ports::ERROR_PORT_UNKNOWN ||
173 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
174 return MOJO_RESULT_INVALID_ARGUMENT;
175
176 NOTREACHED();
177 return MOJO_RESULT_UNKNOWN;
178 }
179
180 if (!*message) {
181 // No message was available in queue.
182 if (rv == ports::OK)
183 return MOJO_RESULT_SHOULD_WAIT;
184 // Peer is closed and there are no more messages to read.
185 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
186 return MOJO_RESULT_FAILED_PRECONDITION;
187 }
188
189 // We may need to update anyone watching our signals in case we just read the
190 // last available message.
191 base::AutoLock lock(signal_lock_);
192 watchers_.NotifyState(GetHandleSignalsStateNoLock());
193 return MOJO_RESULT_OK;
194 }
195
SetQuota(MojoQuotaType type,uint64_t limit)196 MojoResult MessagePipeDispatcher::SetQuota(MojoQuotaType type, uint64_t limit) {
197 switch (type) {
198 case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
199 if (limit == MOJO_QUOTA_LIMIT_NONE)
200 receive_queue_length_limit_.reset();
201 else
202 receive_queue_length_limit_ = limit;
203 break;
204
205 case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
206 if (limit == MOJO_QUOTA_LIMIT_NONE)
207 receive_queue_memory_size_limit_.reset();
208 else
209 receive_queue_memory_size_limit_ = limit;
210 break;
211
212 default:
213 return MOJO_RESULT_INVALID_ARGUMENT;
214 }
215
216 return MOJO_RESULT_OK;
217 }
218
QueryQuota(MojoQuotaType type,uint64_t * limit,uint64_t * usage)219 MojoResult MessagePipeDispatcher::QueryQuota(MojoQuotaType type,
220 uint64_t* limit,
221 uint64_t* usage) {
222 ports::PortStatus port_status;
223 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
224 CHECK(in_transit_ || port_transferred_ || port_closed_);
225 return MOJO_RESULT_INVALID_ARGUMENT;
226 }
227
228 switch (type) {
229 case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
230 *limit = receive_queue_length_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
231 *usage = port_status.queued_message_count;
232 break;
233
234 case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
235 *limit = receive_queue_memory_size_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
236 *usage = port_status.queued_num_bytes;
237 break;
238
239 default:
240 return MOJO_RESULT_INVALID_ARGUMENT;
241 }
242
243 return MOJO_RESULT_OK;
244 }
245
GetHandleSignalsState() const246 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsState() const {
247 base::AutoLock lock(signal_lock_);
248 return GetHandleSignalsStateNoLock();
249 }
250
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)251 MojoResult MessagePipeDispatcher::AddWatcherRef(
252 const scoped_refptr<WatcherDispatcher>& watcher,
253 uintptr_t context) {
254 base::AutoLock lock(signal_lock_);
255 if (port_closed_ || in_transit_)
256 return MOJO_RESULT_INVALID_ARGUMENT;
257 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
258 }
259
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)260 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
261 uintptr_t context) {
262 base::AutoLock lock(signal_lock_);
263 if (port_closed_ || in_transit_)
264 return MOJO_RESULT_INVALID_ARGUMENT;
265 return watchers_.Remove(watcher, context);
266 }
267
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)268 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
269 uint32_t* num_ports,
270 uint32_t* num_handles) {
271 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
272 *num_ports = 1;
273 *num_handles = 0;
274 }
275
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)276 bool MessagePipeDispatcher::EndSerialize(void* destination,
277 ports::PortName* ports,
278 PlatformHandle* handles) {
279 SerializedState* state = static_cast<SerializedState*>(destination);
280 state->pipe_id = pipe_id_;
281 state->endpoint = static_cast<int8_t>(endpoint_);
282 memset(state->padding, 0, sizeof(state->padding));
283 ports[0] = port_.name();
284 return true;
285 }
286
BeginTransit()287 bool MessagePipeDispatcher::BeginTransit() {
288 base::AutoLock lock(signal_lock_);
289 if (in_transit_ || port_closed_)
290 return false;
291 in_transit_.Set(true);
292 return in_transit_;
293 }
294
CompleteTransitAndClose()295 void MessagePipeDispatcher::CompleteTransitAndClose() {
296 node_controller_->SetPortObserver(port_, nullptr);
297
298 base::AutoLock lock(signal_lock_);
299 port_transferred_ = true;
300 in_transit_.Set(false);
301 CloseNoLock();
302 }
303
CancelTransit()304 void MessagePipeDispatcher::CancelTransit() {
305 base::AutoLock lock(signal_lock_);
306 in_transit_.Set(false);
307
308 // Something may have happened while we were waiting for potential transit.
309 watchers_.NotifyState(GetHandleSignalsStateNoLock());
310 }
311
312 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)313 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
314 const void* data,
315 size_t num_bytes,
316 const ports::PortName* ports,
317 size_t num_ports,
318 PlatformHandle* handles,
319 size_t num_handles) {
320 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
321 return nullptr;
322
323 const SerializedState* state = static_cast<const SerializedState*>(data);
324
325 ports::Node* node = Core::Get()->GetNodeController()->node();
326 ports::PortRef port;
327 if (node->GetPort(ports[0], &port) != ports::OK)
328 return nullptr;
329
330 ports::PortStatus status;
331 if (node->GetStatus(port, &status) != ports::OK)
332 return nullptr;
333
334 return new MessagePipeDispatcher(Core::Get()->GetNodeController(), port,
335 state->pipe_id, state->endpoint);
336 }
337
338 MessagePipeDispatcher::~MessagePipeDispatcher() = default;
339
CloseNoLock()340 MojoResult MessagePipeDispatcher::CloseNoLock() {
341 signal_lock_.AssertAcquired();
342 if (port_closed_ || in_transit_)
343 return MOJO_RESULT_INVALID_ARGUMENT;
344
345 port_closed_.Set(true);
346 watchers_.NotifyClosed();
347
348 if (!port_transferred_) {
349 base::AutoUnlock unlock(signal_lock_);
350 node_controller_->ClosePort(port_);
351 }
352
353 return MOJO_RESULT_OK;
354 }
355
GetHandleSignalsStateNoLock() const356 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
357 HandleSignalsState rv;
358
359 ports::PortStatus port_status;
360 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
361 CHECK(in_transit_ || port_transferred_ || port_closed_);
362 return HandleSignalsState();
363 }
364
365 if (port_status.has_messages) {
366 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
367 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
368 }
369 if (port_status.receiving_messages)
370 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
371 if (!port_status.peer_closed) {
372 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
373 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
374 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
375 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
376 if (port_status.peer_remote)
377 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
378 } else {
379 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
380 }
381 if (receive_queue_length_limit_ &&
382 port_status.queued_message_count > *receive_queue_length_limit_) {
383 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
384 } else if (receive_queue_memory_size_limit_ &&
385 port_status.queued_num_bytes > *receive_queue_memory_size_limit_) {
386 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
387 }
388 rv.satisfiable_signals |=
389 MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
390 return rv;
391 }
392
OnPortStatusChanged()393 void MessagePipeDispatcher::OnPortStatusChanged() {
394 DCHECK(RequestContext::current());
395
396 base::AutoLock lock(signal_lock_);
397
398 // We stop observing our port as soon as it's transferred, but this can race
399 // with events which are raised right before that happens. This is fine to
400 // ignore.
401 if (port_transferred_)
402 return;
403
404 #if DCHECK_IS_ON()
405 ports::PortStatus port_status;
406 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
407 if (port_status.has_messages) {
408 std::unique_ptr<ports::UserMessageEvent> unused;
409 PeekSizeMessageFilter filter;
410 node_controller_->node()->GetMessage(port_, &unused, &filter);
411 DVLOG(4) << "New message detected on message pipe " << pipe_id_
412 << " endpoint " << endpoint_ << " [port=" << port_.name()
413 << "; size=" << filter.message_size() << "]";
414 }
415 if (port_status.peer_closed) {
416 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
417 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
418 }
419 }
420 #endif
421
422 watchers_.NotifyState(GetHandleSignalsStateNoLock());
423 }
424
425 } // namespace core
426 } // namespace mojo
427