1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6
7 #include <limits>
8 #include <memory>
9
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/edk/embedder/embedder_internal.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/message_for_transit.h"
16 #include "mojo/edk/system/node_controller.h"
17 #include "mojo/edk/system/ports/message_filter.h"
18 #include "mojo/edk/system/ports_message.h"
19 #include "mojo/edk/system/request_context.h"
20
21 namespace mojo {
22 namespace edk {
23
24 namespace {
25
26 using DispatcherHeader = MessageForTransit::DispatcherHeader;
27 using MessageHeader = MessageForTransit::MessageHeader;
28
29 #pragma pack(push, 1)
30
31 struct SerializedState {
32 uint64_t pipe_id;
33 int8_t endpoint;
34 char padding[7];
35 };
36
37 static_assert(sizeof(SerializedState) % 8 == 0,
38 "Invalid SerializedState size.");
39
40 #pragma pack(pop)
41
42 } // namespace
43
44 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
45 // reference to the MPD to ensure it lives as long as the observed port.
46 class MessagePipeDispatcher::PortObserverThunk
47 : public NodeController::PortObserver {
48 public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)49 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
50 : dispatcher_(dispatcher) {}
51
52 private:
~PortObserverThunk()53 ~PortObserverThunk() override {}
54
55 // NodeController::PortObserver:
OnPortStatusChanged()56 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
57
58 scoped_refptr<MessagePipeDispatcher> dispatcher_;
59
60 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
61 };
62
63 // A MessageFilter used by ReadMessage to determine whether a message should
64 // actually be consumed yet.
65 class ReadMessageFilter : public ports::MessageFilter {
66 public:
67 // Creates a new ReadMessageFilter which captures and potentially modifies
68 // various (unowned) local state within MessagePipeDispatcher::ReadMessage.
ReadMessageFilter(bool read_any_size,bool may_discard,uint32_t * num_bytes,uint32_t * num_handles,bool * no_space,bool * invalid_message)69 ReadMessageFilter(bool read_any_size,
70 bool may_discard,
71 uint32_t* num_bytes,
72 uint32_t* num_handles,
73 bool* no_space,
74 bool* invalid_message)
75 : read_any_size_(read_any_size),
76 may_discard_(may_discard),
77 num_bytes_(num_bytes),
78 num_handles_(num_handles),
79 no_space_(no_space),
80 invalid_message_(invalid_message) {}
81
~ReadMessageFilter()82 ~ReadMessageFilter() override {}
83
84 // ports::MessageFilter:
Match(const ports::Message & m)85 bool Match(const ports::Message& m) override {
86 const PortsMessage& message = static_cast<const PortsMessage&>(m);
87 if (message.num_payload_bytes() < sizeof(MessageHeader)) {
88 *invalid_message_ = true;
89 return true;
90 }
91
92 const MessageHeader* header =
93 static_cast<const MessageHeader*>(message.payload_bytes());
94 if (header->header_size > message.num_payload_bytes()) {
95 *invalid_message_ = true;
96 return true;
97 }
98
99 uint32_t bytes_to_read = 0;
100 uint32_t bytes_available =
101 static_cast<uint32_t>(message.num_payload_bytes()) -
102 header->header_size;
103 if (num_bytes_) {
104 bytes_to_read = std::min(*num_bytes_, bytes_available);
105 *num_bytes_ = bytes_available;
106 }
107
108 uint32_t handles_to_read = 0;
109 uint32_t handles_available = header->num_dispatchers;
110 if (num_handles_) {
111 handles_to_read = std::min(*num_handles_, handles_available);
112 *num_handles_ = handles_available;
113 }
114
115 if (handles_to_read < handles_available ||
116 (!read_any_size_ && bytes_to_read < bytes_available)) {
117 *no_space_ = true;
118 return may_discard_;
119 }
120
121 return true;
122 }
123
124 private:
125 const bool read_any_size_;
126 const bool may_discard_;
127 uint32_t* const num_bytes_;
128 uint32_t* const num_handles_;
129 bool* const no_space_;
130 bool* const invalid_message_;
131
132 DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter);
133 };
134
135 #if DCHECK_IS_ON()
136
137 // A MessageFilter which never matches a message. Used to peek at the size of
138 // the next available message on a port, for debug logging only.
139 class PeekSizeMessageFilter : public ports::MessageFilter {
140 public:
PeekSizeMessageFilter()141 PeekSizeMessageFilter() {}
~PeekSizeMessageFilter()142 ~PeekSizeMessageFilter() override {}
143
144 // ports::MessageFilter:
Match(const ports::Message & message)145 bool Match(const ports::Message& message) override {
146 message_size_ = message.num_payload_bytes();
147 return false;
148 }
149
message_size() const150 size_t message_size() const { return message_size_; }
151
152 private:
153 size_t message_size_ = 0;
154
155 DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
156 };
157
158 #endif // DCHECK_IS_ON()
159
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
161 const ports::PortRef& port,
162 uint64_t pipe_id,
163 int endpoint)
164 : node_controller_(node_controller),
165 port_(port),
166 pipe_id_(pipe_id),
167 endpoint_(endpoint) {
168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
170
171 node_controller_->SetPortObserver(
172 port_,
173 make_scoped_refptr(new PortObserverThunk(this)));
174 }
175
Fuse(MessagePipeDispatcher * other)176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
177 node_controller_->SetPortObserver(port_, nullptr);
178 node_controller_->SetPortObserver(other->port_, nullptr);
179
180 ports::PortRef port0;
181 {
182 base::AutoLock lock(signal_lock_);
183 port0 = port_;
184 port_closed_.Set(true);
185 awakables_.CancelAll();
186 }
187
188 ports::PortRef port1;
189 {
190 base::AutoLock lock(other->signal_lock_);
191 port1 = other->port_;
192 other->port_closed_.Set(true);
193 other->awakables_.CancelAll();
194 }
195
196 // Both ports are always closed by this call.
197 int rv = node_controller_->MergeLocalPorts(port0, port1);
198 return rv == ports::OK;
199 }
200
GetType() const201 Dispatcher::Type MessagePipeDispatcher::GetType() const {
202 return Type::MESSAGE_PIPE;
203 }
204
Close()205 MojoResult MessagePipeDispatcher::Close() {
206 base::AutoLock lock(signal_lock_);
207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
208 << " [port=" << port_.name() << "]";
209 return CloseNoLock();
210 }
211
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
213 const Watcher::WatchCallback& callback,
214 uintptr_t context) {
215 base::AutoLock lock(signal_lock_);
216
217 if (port_closed_ || in_transit_)
218 return MOJO_RESULT_INVALID_ARGUMENT;
219
220 return awakables_.AddWatcher(
221 signals, callback, context, GetHandleSignalsStateNoLock());
222 }
223
CancelWatch(uintptr_t context)224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
225 base::AutoLock lock(signal_lock_);
226
227 if (port_closed_ || in_transit_)
228 return MOJO_RESULT_INVALID_ARGUMENT;
229
230 return awakables_.RemoveWatcher(context);
231 }
232
WriteMessage(std::unique_ptr<MessageForTransit> message,MojoWriteMessageFlags flags)233 MojoResult MessagePipeDispatcher::WriteMessage(
234 std::unique_ptr<MessageForTransit> message,
235 MojoWriteMessageFlags flags) {
236 if (port_closed_ || in_transit_)
237 return MOJO_RESULT_INVALID_ARGUMENT;
238
239 size_t num_bytes = message->num_bytes();
240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
241
242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
243 << " [port=" << port_.name() << "; rv=" << rv
244 << "; num_bytes=" << num_bytes << "]";
245
246 if (rv != ports::OK) {
247 if (rv == ports::ERROR_PORT_UNKNOWN ||
248 rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
249 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
250 return MOJO_RESULT_INVALID_ARGUMENT;
251 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
252 return MOJO_RESULT_FAILED_PRECONDITION;
253 }
254
255 NOTREACHED();
256 return MOJO_RESULT_UNKNOWN;
257 }
258
259 return MOJO_RESULT_OK;
260 }
261
ReadMessage(std::unique_ptr<MessageForTransit> * message,uint32_t * num_bytes,MojoHandle * handles,uint32_t * num_handles,MojoReadMessageFlags flags,bool read_any_size)262 MojoResult MessagePipeDispatcher::ReadMessage(
263 std::unique_ptr<MessageForTransit>* message,
264 uint32_t* num_bytes,
265 MojoHandle* handles,
266 uint32_t* num_handles,
267 MojoReadMessageFlags flags,
268 bool read_any_size) {
269 // We can't read from a port that's closed or in transit!
270 if (port_closed_ || in_transit_)
271 return MOJO_RESULT_INVALID_ARGUMENT;
272
273 bool no_space = false;
274 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
275 bool invalid_message = false;
276
277 // Grab a message if the provided handles buffer is large enough. If the input
278 // |num_bytes| is provided and |read_any_size| is false, we also ensure
279 // that it specifies a size at least as large as the next available payload.
280 //
281 // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
282 // This flag exists to support both new and old API behavior.
283
284 ports::ScopedMessage ports_message;
285 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
286 &no_space, &invalid_message);
287 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
288
289 if (invalid_message)
290 return MOJO_RESULT_UNKNOWN;
291
292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
293 if (rv == ports::ERROR_PORT_UNKNOWN ||
294 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
295 return MOJO_RESULT_INVALID_ARGUMENT;
296
297 NOTREACHED();
298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
299 }
300
301 if (no_space) {
302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
303 // sufficient to hold this message's data. The message will still be in
304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
305 return MOJO_RESULT_RESOURCE_EXHAUSTED;
306 }
307
308 if (!ports_message) {
309 // No message was available in queue.
310
311 if (rv == ports::OK)
312 return MOJO_RESULT_SHOULD_WAIT;
313
314 // Peer is closed and there are no more messages to read.
315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
316 return MOJO_RESULT_FAILED_PRECONDITION;
317 }
318
319 // Alright! We have a message and the caller has provided sufficient storage
320 // in which to receive it.
321
322 std::unique_ptr<PortsMessage> msg(
323 static_cast<PortsMessage*>(ports_message.release()));
324
325 const MessageHeader* header =
326 static_cast<const MessageHeader*>(msg->payload_bytes());
327 const DispatcherHeader* dispatcher_headers =
328 reinterpret_cast<const DispatcherHeader*>(header + 1);
329
330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
331 return MOJO_RESULT_UNKNOWN;
332
333 // Deserialize dispatchers.
334 if (header->num_dispatchers > 0) {
335 CHECK(handles);
336 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
337 size_t data_payload_index = sizeof(MessageHeader) +
338 header->num_dispatchers * sizeof(DispatcherHeader);
339 if (data_payload_index > header->header_size)
340 return MOJO_RESULT_UNKNOWN;
341 const char* dispatcher_data = reinterpret_cast<const char*>(
342 dispatcher_headers + header->num_dispatchers);
343 size_t port_index = 0;
344 size_t platform_handle_index = 0;
345 ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles();
346 const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0;
347 for (size_t i = 0; i < header->num_dispatchers; ++i) {
348 const DispatcherHeader& dh = dispatcher_headers[i];
349 Type type = static_cast<Type>(dh.type);
350
351 size_t next_payload_index = data_payload_index + dh.num_bytes;
352 if (msg->num_payload_bytes() < next_payload_index ||
353 next_payload_index < data_payload_index) {
354 return MOJO_RESULT_UNKNOWN;
355 }
356
357 size_t next_port_index = port_index + dh.num_ports;
358 if (msg->num_ports() < next_port_index || next_port_index < port_index)
359 return MOJO_RESULT_UNKNOWN;
360
361 size_t next_platform_handle_index =
362 platform_handle_index + dh.num_platform_handles;
363 if (num_msg_handles < next_platform_handle_index ||
364 next_platform_handle_index < platform_handle_index) {
365 return MOJO_RESULT_UNKNOWN;
366 }
367
368 PlatformHandle* out_handles =
369 num_msg_handles ? msg_handles->data() + platform_handle_index
370 : nullptr;
371 dispatchers[i].dispatcher = Dispatcher::Deserialize(
372 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
373 dh.num_ports, out_handles, dh.num_platform_handles);
374 if (!dispatchers[i].dispatcher)
375 return MOJO_RESULT_UNKNOWN;
376
377 dispatcher_data += dh.num_bytes;
378 data_payload_index = next_payload_index;
379 port_index = next_port_index;
380 platform_handle_index = next_platform_handle_index;
381 }
382
383 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
384 handles))
385 return MOJO_RESULT_UNKNOWN;
386 }
387
388 CHECK(msg);
389 *message = MessageForTransit::WrapPortsMessage(std::move(msg));
390 return MOJO_RESULT_OK;
391 }
392
393 HandleSignalsState
GetHandleSignalsState() const394 MessagePipeDispatcher::GetHandleSignalsState() const {
395 base::AutoLock lock(signal_lock_);
396 return GetHandleSignalsStateNoLock();
397 }
398
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)399 MojoResult MessagePipeDispatcher::AddAwakable(
400 Awakable* awakable,
401 MojoHandleSignals signals,
402 uintptr_t context,
403 HandleSignalsState* signals_state) {
404 base::AutoLock lock(signal_lock_);
405
406 if (port_closed_ || in_transit_) {
407 if (signals_state)
408 *signals_state = HandleSignalsState();
409 return MOJO_RESULT_INVALID_ARGUMENT;
410 }
411
412 HandleSignalsState state = GetHandleSignalsStateNoLock();
413
414 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
415 << endpoint_ << " [awakable=" << awakable << "; port="
416 << port_.name() << "; signals=" << signals << "; satisfied="
417 << state.satisfied_signals << "; satisfiable="
418 << state.satisfiable_signals << "]";
419
420 if (state.satisfies(signals)) {
421 if (signals_state)
422 *signals_state = state;
423 DVLOG(2) << "Signals already set for " << port_.name();
424 return MOJO_RESULT_ALREADY_EXISTS;
425 }
426 if (!state.can_satisfy(signals)) {
427 if (signals_state)
428 *signals_state = state;
429 DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
430 return MOJO_RESULT_FAILED_PRECONDITION;
431 }
432
433 DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
434 << endpoint_ << " [awakable=" << awakable << "; port="
435 << port_.name() << "; signals=" << signals << "]";
436
437 awakables_.Add(awakable, signals, context);
438 return MOJO_RESULT_OK;
439 }
440
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)441 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
442 HandleSignalsState* signals_state) {
443 base::AutoLock lock(signal_lock_);
444 if (port_closed_ || in_transit_) {
445 if (signals_state)
446 *signals_state = HandleSignalsState();
447 } else if (signals_state) {
448 *signals_state = GetHandleSignalsStateNoLock();
449 }
450
451 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
452 << endpoint_ << " [awakable=" << awakable << "; port="
453 << port_.name() << "]";
454
455 awakables_.Remove(awakable);
456 }
457
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)458 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
459 uint32_t* num_ports,
460 uint32_t* num_handles) {
461 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
462 *num_ports = 1;
463 *num_handles = 0;
464 }
465
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)466 bool MessagePipeDispatcher::EndSerialize(void* destination,
467 ports::PortName* ports,
468 PlatformHandle* handles) {
469 SerializedState* state = static_cast<SerializedState*>(destination);
470 state->pipe_id = pipe_id_;
471 state->endpoint = static_cast<int8_t>(endpoint_);
472 memset(state->padding, 0, sizeof(state->padding));
473 ports[0] = port_.name();
474 return true;
475 }
476
BeginTransit()477 bool MessagePipeDispatcher::BeginTransit() {
478 base::AutoLock lock(signal_lock_);
479 if (in_transit_ || port_closed_)
480 return false;
481 in_transit_.Set(true);
482 return in_transit_;
483 }
484
CompleteTransitAndClose()485 void MessagePipeDispatcher::CompleteTransitAndClose() {
486 node_controller_->SetPortObserver(port_, nullptr);
487
488 base::AutoLock lock(signal_lock_);
489 port_transferred_ = true;
490 in_transit_.Set(false);
491 CloseNoLock();
492 }
493
CancelTransit()494 void MessagePipeDispatcher::CancelTransit() {
495 base::AutoLock lock(signal_lock_);
496 in_transit_.Set(false);
497
498 // Something may have happened while we were waiting for potential transit.
499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
500 }
501
502 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
504 const void* data,
505 size_t num_bytes,
506 const ports::PortName* ports,
507 size_t num_ports,
508 PlatformHandle* handles,
509 size_t num_handles) {
510 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
511 return nullptr;
512
513 const SerializedState* state = static_cast<const SerializedState*>(data);
514
515 ports::PortRef port;
516 CHECK_EQ(
517 ports::OK,
518 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
519
520 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
521 state->pipe_id, state->endpoint);
522 }
523
~MessagePipeDispatcher()524 MessagePipeDispatcher::~MessagePipeDispatcher() {
525 DCHECK(port_closed_ && !in_transit_);
526 }
527
CloseNoLock()528 MojoResult MessagePipeDispatcher::CloseNoLock() {
529 signal_lock_.AssertAcquired();
530 if (port_closed_ || in_transit_)
531 return MOJO_RESULT_INVALID_ARGUMENT;
532
533 port_closed_.Set(true);
534 awakables_.CancelAll();
535
536 if (!port_transferred_) {
537 base::AutoUnlock unlock(signal_lock_);
538 node_controller_->ClosePort(port_);
539 }
540
541 return MOJO_RESULT_OK;
542 }
543
GetHandleSignalsStateNoLock() const544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
545 HandleSignalsState rv;
546
547 ports::PortStatus port_status;
548 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
549 CHECK(in_transit_ || port_transferred_ || port_closed_);
550 return HandleSignalsState();
551 }
552
553 if (port_status.has_messages) {
554 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
555 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
556 }
557 if (port_status.receiving_messages)
558 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
559 if (!port_status.peer_closed) {
560 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
561 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
562 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
563 } else {
564 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
565 }
566 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
567 return rv;
568 }
569
OnPortStatusChanged()570 void MessagePipeDispatcher::OnPortStatusChanged() {
571 DCHECK(RequestContext::current());
572
573 base::AutoLock lock(signal_lock_);
574
575 // We stop observing our port as soon as it's transferred, but this can race
576 // with events which are raised right before that happens. This is fine to
577 // ignore.
578 if (port_transferred_)
579 return;
580
581 #if DCHECK_IS_ON()
582 ports::PortStatus port_status;
583 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
584 if (port_status.has_messages) {
585 ports::ScopedMessage unused;
586 PeekSizeMessageFilter filter;
587 node_controller_->node()->GetMessage(port_, &unused, &filter);
588 DVLOG(4) << "New message detected on message pipe " << pipe_id_
589 << " endpoint " << endpoint_ << " [port=" << port_.name()
590 << "; size=" << filter.message_size() << "]";
591 }
592 if (port_status.peer_closed) {
593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
595 }
596 }
597 #endif
598
599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
600 }
601
602 } // namespace edk
603 } // namespace mojo
604