1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <algorithm>
11 #include <limits>
12 #include <utility>
13
14 #include "base/bind.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/message_loop/message_loop.h"
18 #include "mojo/edk/embedder/embedder_internal.h"
19 #include "mojo/edk/embedder/platform_shared_buffer.h"
20 #include "mojo/edk/system/core.h"
21 #include "mojo/edk/system/data_pipe_control_message.h"
22 #include "mojo/edk/system/node_controller.h"
23 #include "mojo/edk/system/ports_message.h"
24 #include "mojo/edk/system/request_context.h"
25 #include "mojo/public/c/system/data_pipe.h"
26
27 namespace mojo {
28 namespace edk {
29
30 namespace {
31
32 const uint8_t kFlagPeerClosed = 0x01;
33
34 #pragma pack(push, 1)
35
36 struct SerializedState {
37 MojoCreateDataPipeOptions options;
38 uint64_t pipe_id;
39 uint32_t read_offset;
40 uint32_t bytes_available;
41 uint8_t flags;
42 char padding[7];
43 };
44
45 static_assert(sizeof(SerializedState) % 8 == 0,
46 "Invalid SerializedState size.");
47
48 #pragma pack(pop)
49
50 } // namespace
51
52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
53 // reference to the dispatcher to ensure it lives as long as the observed port.
54 class DataPipeConsumerDispatcher::PortObserverThunk
55 : public NodeController::PortObserver {
56 public:
PortObserverThunk(scoped_refptr<DataPipeConsumerDispatcher> dispatcher)57 explicit PortObserverThunk(
58 scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
59 : dispatcher_(dispatcher) {}
60
61 private:
~PortObserverThunk()62 ~PortObserverThunk() override {}
63
64 // NodeController::PortObserver:
OnPortStatusChanged()65 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
66
67 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
68
69 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
70 };
71
DataPipeConsumerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,const MojoCreateDataPipeOptions & options,bool initialized,uint64_t pipe_id)72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
73 NodeController* node_controller,
74 const ports::PortRef& control_port,
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options,
77 bool initialized,
78 uint64_t pipe_id)
79 : options_(options),
80 node_controller_(node_controller),
81 control_port_(control_port),
82 pipe_id_(pipe_id),
83 shared_ring_buffer_(shared_ring_buffer) {
84 if (initialized) {
85 base::AutoLock lock(lock_);
86 InitializeNoLock();
87 }
88 }
89
GetType() const90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
91 return Type::DATA_PIPE_CONSUMER;
92 }
93
Close()94 MojoResult DataPipeConsumerDispatcher::Close() {
95 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
97 return CloseNoLock();
98 }
99
100
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)101 MojoResult DataPipeConsumerDispatcher::Watch(
102 MojoHandleSignals signals,
103 const Watcher::WatchCallback& callback,
104 uintptr_t context) {
105 base::AutoLock lock(lock_);
106
107 if (is_closed_ || in_transit_)
108 return MOJO_RESULT_INVALID_ARGUMENT;
109
110 return awakable_list_.AddWatcher(
111 signals, callback, context, GetHandleSignalsStateNoLock());
112 }
113
CancelWatch(uintptr_t context)114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115 base::AutoLock lock(lock_);
116
117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT;
119
120 return awakable_list_.RemoveWatcher(context);
121 }
122
ReadData(void * elements,uint32_t * num_bytes,MojoReadDataFlags flags)123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124 uint32_t* num_bytes,
125 MojoReadDataFlags flags) {
126 base::AutoLock lock(lock_);
127 new_data_available_ = false;
128
129 if (!shared_ring_buffer_ || in_transit_)
130 return MOJO_RESULT_INVALID_ARGUMENT;
131
132 if (in_two_phase_read_)
133 return MOJO_RESULT_BUSY;
134
135 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
136 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
137 (flags & MOJO_READ_DATA_FLAG_DISCARD))
138 return MOJO_RESULT_INVALID_ARGUMENT;
139 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
140 DVLOG_IF(2, elements)
141 << "Query mode: ignoring non-null |elements|";
142 *num_bytes = static_cast<uint32_t>(bytes_available_);
143 return MOJO_RESULT_OK;
144 }
145
146 bool discard = false;
147 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
148 // These flags are mutally exclusive.
149 if (flags & MOJO_READ_DATA_FLAG_PEEK)
150 return MOJO_RESULT_INVALID_ARGUMENT;
151 DVLOG_IF(2, elements)
152 << "Discard mode: ignoring non-null |elements|";
153 discard = true;
154 }
155
156 uint32_t max_num_bytes_to_read = *num_bytes;
157 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
158 return MOJO_RESULT_INVALID_ARGUMENT;
159
160 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
161 uint32_t min_num_bytes_to_read =
162 all_or_none ? max_num_bytes_to_read : 0;
163
164 if (min_num_bytes_to_read > bytes_available_) {
165 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
166 : MOJO_RESULT_OUT_OF_RANGE;
167 }
168
169 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
170 if (bytes_to_read == 0) {
171 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
172 : MOJO_RESULT_SHOULD_WAIT;
173 }
174
175 if (!discard) {
176 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
177 CHECK(data);
178
179 uint8_t* destination = static_cast<uint8_t*>(elements);
180 CHECK(destination);
181
182 DCHECK_LE(read_offset_, options_.capacity_num_bytes);
183 uint32_t tail_bytes_to_copy =
184 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
185 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
186 if (tail_bytes_to_copy > 0)
187 memcpy(destination, data + read_offset_, tail_bytes_to_copy);
188 if (head_bytes_to_copy > 0)
189 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
190 }
191 *num_bytes = bytes_to_read;
192
193 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
194 if (discard || !peek) {
195 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
196 bytes_available_ -= bytes_to_read;
197
198 base::AutoUnlock unlock(lock_);
199 NotifyRead(bytes_to_read);
200 }
201
202 return MOJO_RESULT_OK;
203 }
204
BeginReadData(const void ** buffer,uint32_t * buffer_num_bytes,MojoReadDataFlags flags)205 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
206 uint32_t* buffer_num_bytes,
207 MojoReadDataFlags flags) {
208 base::AutoLock lock(lock_);
209 new_data_available_ = false;
210 if (!shared_ring_buffer_ || in_transit_)
211 return MOJO_RESULT_INVALID_ARGUMENT;
212
213 if (in_two_phase_read_)
214 return MOJO_RESULT_BUSY;
215
216 // These flags may not be used in two-phase mode.
217 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
218 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
219 (flags & MOJO_READ_DATA_FLAG_PEEK))
220 return MOJO_RESULT_INVALID_ARGUMENT;
221
222 if (bytes_available_ == 0) {
223 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
224 : MOJO_RESULT_SHOULD_WAIT;
225 }
226
227 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
228 uint32_t bytes_to_read = std::min(bytes_available_,
229 options_.capacity_num_bytes - read_offset_);
230
231 CHECK(ring_buffer_mapping_);
232 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
233 CHECK(data);
234
235 in_two_phase_read_ = true;
236 *buffer = data + read_offset_;
237 *buffer_num_bytes = bytes_to_read;
238 two_phase_max_bytes_read_ = bytes_to_read;
239
240 return MOJO_RESULT_OK;
241 }
242
EndReadData(uint32_t num_bytes_read)243 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
244 base::AutoLock lock(lock_);
245 if (!in_two_phase_read_)
246 return MOJO_RESULT_FAILED_PRECONDITION;
247
248 if (in_transit_)
249 return MOJO_RESULT_INVALID_ARGUMENT;
250
251 CHECK(shared_ring_buffer_);
252
253 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
254 MojoResult rv;
255 if (num_bytes_read > two_phase_max_bytes_read_ ||
256 num_bytes_read % options_.element_num_bytes != 0) {
257 rv = MOJO_RESULT_INVALID_ARGUMENT;
258 } else {
259 rv = MOJO_RESULT_OK;
260 read_offset_ =
261 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
262
263 DCHECK_GE(bytes_available_, num_bytes_read);
264 bytes_available_ -= num_bytes_read;
265
266 base::AutoUnlock unlock(lock_);
267 NotifyRead(num_bytes_read);
268 }
269
270 in_two_phase_read_ = false;
271 two_phase_max_bytes_read_ = 0;
272
273 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
274 if (!new_state.equals(old_state))
275 awakable_list_.AwakeForStateChange(new_state);
276
277 return rv;
278 }
279
GetHandleSignalsState() const280 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
281 base::AutoLock lock(lock_);
282 return GetHandleSignalsStateNoLock();
283 }
284
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)285 MojoResult DataPipeConsumerDispatcher::AddAwakable(
286 Awakable* awakable,
287 MojoHandleSignals signals,
288 uintptr_t context,
289 HandleSignalsState* signals_state) {
290 base::AutoLock lock(lock_);
291 if (!shared_ring_buffer_ || in_transit_) {
292 if (signals_state)
293 *signals_state = HandleSignalsState();
294 return MOJO_RESULT_INVALID_ARGUMENT;
295 }
296 UpdateSignalsStateNoLock();
297 HandleSignalsState state = GetHandleSignalsStateNoLock();
298 if (state.satisfies(signals)) {
299 if (signals_state)
300 *signals_state = state;
301 return MOJO_RESULT_ALREADY_EXISTS;
302 }
303 if (!state.can_satisfy(signals)) {
304 if (signals_state)
305 *signals_state = state;
306 return MOJO_RESULT_FAILED_PRECONDITION;
307 }
308
309 awakable_list_.Add(awakable, signals, context);
310 return MOJO_RESULT_OK;
311 }
312
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)313 void DataPipeConsumerDispatcher::RemoveAwakable(
314 Awakable* awakable,
315 HandleSignalsState* signals_state) {
316 base::AutoLock lock(lock_);
317 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
318 *signals_state = HandleSignalsState();
319 else if (signals_state)
320 *signals_state = GetHandleSignalsStateNoLock();
321 awakable_list_.Remove(awakable);
322 }
323
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)324 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
325 uint32_t* num_ports,
326 uint32_t* num_handles) {
327 base::AutoLock lock(lock_);
328 DCHECK(in_transit_);
329 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
330 *num_ports = 1;
331 *num_handles = 1;
332 }
333
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)334 bool DataPipeConsumerDispatcher::EndSerialize(
335 void* destination,
336 ports::PortName* ports,
337 PlatformHandle* platform_handles) {
338 SerializedState* state = static_cast<SerializedState*>(destination);
339 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
340 memset(state->padding, 0, sizeof(state->padding));
341
342 base::AutoLock lock(lock_);
343 DCHECK(in_transit_);
344 state->pipe_id = pipe_id_;
345 state->read_offset = read_offset_;
346 state->bytes_available = bytes_available_;
347 state->flags = peer_closed_ ? kFlagPeerClosed : 0;
348
349 ports[0] = control_port_.name();
350
351 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
352 platform_handles[0] = buffer_handle_for_transit_.get();
353
354 return true;
355 }
356
BeginTransit()357 bool DataPipeConsumerDispatcher::BeginTransit() {
358 base::AutoLock lock(lock_);
359 if (in_transit_)
360 return false;
361 in_transit_ = !in_two_phase_read_;
362 return in_transit_;
363 }
364
CompleteTransitAndClose()365 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
366 node_controller_->SetPortObserver(control_port_, nullptr);
367
368 base::AutoLock lock(lock_);
369 DCHECK(in_transit_);
370 in_transit_ = false;
371 transferred_ = true;
372 ignore_result(buffer_handle_for_transit_.release());
373 CloseNoLock();
374 }
375
CancelTransit()376 void DataPipeConsumerDispatcher::CancelTransit() {
377 base::AutoLock lock(lock_);
378 DCHECK(in_transit_);
379 in_transit_ = false;
380 buffer_handle_for_transit_.reset();
381 UpdateSignalsStateNoLock();
382 }
383
384 // static
385 scoped_refptr<DataPipeConsumerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)386 DataPipeConsumerDispatcher::Deserialize(const void* data,
387 size_t num_bytes,
388 const ports::PortName* ports,
389 size_t num_ports,
390 PlatformHandle* handles,
391 size_t num_handles) {
392 if (num_ports != 1 || num_handles != 1 ||
393 num_bytes != sizeof(SerializedState)) {
394 return nullptr;
395 }
396
397 const SerializedState* state = static_cast<const SerializedState*>(data);
398
399 NodeController* node_controller = internal::g_core->GetNodeController();
400 ports::PortRef port;
401 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
402 return nullptr;
403
404 PlatformHandle buffer_handle;
405 std::swap(buffer_handle, handles[0]);
406 scoped_refptr<PlatformSharedBuffer> ring_buffer =
407 PlatformSharedBuffer::CreateFromPlatformHandle(
408 state->options.capacity_num_bytes,
409 false /* read_only */,
410 ScopedPlatformHandle(buffer_handle));
411 if (!ring_buffer) {
412 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
413 return nullptr;
414 }
415
416 scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
417 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
418 state->options, false /* initialized */,
419 state->pipe_id);
420
421 {
422 base::AutoLock lock(dispatcher->lock_);
423 dispatcher->read_offset_ = state->read_offset;
424 dispatcher->bytes_available_ = state->bytes_available;
425 dispatcher->new_data_available_ = state->bytes_available > 0;
426 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
427 dispatcher->InitializeNoLock();
428 dispatcher->UpdateSignalsStateNoLock();
429 }
430
431 return dispatcher;
432 }
433
~DataPipeConsumerDispatcher()434 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
435 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
436 !in_transit_);
437 }
438
InitializeNoLock()439 void DataPipeConsumerDispatcher::InitializeNoLock() {
440 lock_.AssertAcquired();
441
442 if (shared_ring_buffer_) {
443 DCHECK(!ring_buffer_mapping_);
444 ring_buffer_mapping_ =
445 shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
446 if (!ring_buffer_mapping_) {
447 DLOG(ERROR) << "Failed to map shared buffer.";
448 shared_ring_buffer_ = nullptr;
449 }
450 }
451
452 base::AutoUnlock unlock(lock_);
453 node_controller_->SetPortObserver(
454 control_port_,
455 make_scoped_refptr(new PortObserverThunk(this)));
456 }
457
CloseNoLock()458 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
459 lock_.AssertAcquired();
460 if (is_closed_ || in_transit_)
461 return MOJO_RESULT_INVALID_ARGUMENT;
462 is_closed_ = true;
463 ring_buffer_mapping_.reset();
464 shared_ring_buffer_ = nullptr;
465
466 awakable_list_.CancelAll();
467 if (!transferred_) {
468 base::AutoUnlock unlock(lock_);
469 node_controller_->ClosePort(control_port_);
470 }
471
472 return MOJO_RESULT_OK;
473 }
474
475 HandleSignalsState
GetHandleSignalsStateNoLock() const476 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
477 lock_.AssertAcquired();
478
479 HandleSignalsState rv;
480 if (shared_ring_buffer_ && bytes_available_) {
481 if (!in_two_phase_read_) {
482 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
483 if (new_data_available_)
484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
485 }
486 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
487 } else if (!peer_closed_ && shared_ring_buffer_) {
488 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
489 }
490
491 if (shared_ring_buffer_) {
492 if (new_data_available_ || !peer_closed_)
493 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
494 }
495
496 if (peer_closed_)
497 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
498 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
499
500 return rv;
501 }
502
NotifyRead(uint32_t num_bytes)503 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
504 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
505 << num_bytes << " bytes read. [control_port="
506 << control_port_.name() << "]";
507
508 SendDataPipeControlMessage(node_controller_, control_port_,
509 DataPipeCommand::DATA_WAS_READ, num_bytes);
510 }
511
OnPortStatusChanged()512 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
513 DCHECK(RequestContext::current());
514
515 base::AutoLock lock(lock_);
516
517 // We stop observing the control port as soon it's transferred, but this can
518 // race with events which are raised right before that happens. This is fine
519 // to ignore.
520 if (transferred_)
521 return;
522
523 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
524
525 UpdateSignalsStateNoLock();
526 }
527
UpdateSignalsStateNoLock()528 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
529 lock_.AssertAcquired();
530
531 bool was_peer_closed = peer_closed_;
532 size_t previous_bytes_available = bytes_available_;
533
534 ports::PortStatus port_status;
535 int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
536 if (rv != ports::OK || !port_status.receiving_messages) {
537 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
538 << " [control_port=" << control_port_.name() << "]";
539 peer_closed_ = true;
540 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
541 ports::ScopedMessage message;
542 do {
543 int rv = node_controller_->node()->GetMessage(
544 control_port_, &message, nullptr);
545 if (rv != ports::OK)
546 peer_closed_ = true;
547 if (message) {
548 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
549 peer_closed_ = true;
550 break;
551 }
552
553 const DataPipeControlMessage* m =
554 static_cast<const DataPipeControlMessage*>(
555 message->payload_bytes());
556
557 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
558 DLOG(ERROR) << "Unexpected control message from producer.";
559 peer_closed_ = true;
560 break;
561 }
562
563 if (static_cast<size_t>(bytes_available_) + m->num_bytes >
564 options_.capacity_num_bytes) {
565 DLOG(ERROR) << "Producer claims to have written too many bytes.";
566 peer_closed_ = true;
567 break;
568 }
569
570 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
571 << m->num_bytes << " bytes were written. [control_port="
572 << control_port_.name() << "]";
573
574 bytes_available_ += m->num_bytes;
575 }
576 } while (message);
577 }
578
579 bool has_new_data = bytes_available_ != previous_bytes_available;
580 if (has_new_data)
581 new_data_available_ = true;
582
583 if (peer_closed_ != was_peer_closed || has_new_data) {
584 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
585 }
586 }
587
588 } // namespace edk
589 } // namespace mojo
590