1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/data_pipe_producer_dispatcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "mojo/core/configuration.h"
16 #include "mojo/core/core.h"
17 #include "mojo/core/data_pipe_control_message.h"
18 #include "mojo/core/node_controller.h"
19 #include "mojo/core/platform_handle_utils.h"
20 #include "mojo/core/request_context.h"
21 #include "mojo/core/user_message_impl.h"
22 #include "mojo/public/c/system/data_pipe.h"
23
24 namespace mojo {
25 namespace core {
26
27 namespace {
28
29 const uint8_t kFlagPeerClosed = 0x01;
30
31 #pragma pack(push, 1)
32
33 struct SerializedState {
34 MojoCreateDataPipeOptions options;
35 uint64_t pipe_id;
36 uint32_t write_offset;
37 uint32_t available_capacity;
38 uint8_t flags;
39 uint64_t buffer_guid_high;
40 uint64_t buffer_guid_low;
41 char padding[7];
42 };
43
44 static_assert(sizeof(SerializedState) % 8 == 0,
45 "Invalid SerializedState size.");
46
47 #pragma pack(pop)
48
49 } // namespace
50
51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
52 // reference to the dispatcher to ensure it lives as long as the observed port.
53 class DataPipeProducerDispatcher::PortObserverThunk
54 : public NodeController::PortObserver {
55 public:
PortObserverThunk(scoped_refptr<DataPipeProducerDispatcher> dispatcher)56 explicit PortObserverThunk(
57 scoped_refptr<DataPipeProducerDispatcher> dispatcher)
58 : dispatcher_(dispatcher) {}
59
60 private:
~PortObserverThunk()61 ~PortObserverThunk() override {}
62
63 // NodeController::PortObserver:
OnPortStatusChanged()64 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
65
66 scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
67
68 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
69 };
70
71 // static
Create(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)72 scoped_refptr<DataPipeProducerDispatcher> DataPipeProducerDispatcher::Create(
73 NodeController* node_controller,
74 const ports::PortRef& control_port,
75 base::UnsafeSharedMemoryRegion shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options,
77 uint64_t pipe_id) {
78 scoped_refptr<DataPipeProducerDispatcher> producer =
79 new DataPipeProducerDispatcher(node_controller, control_port,
80 std::move(shared_ring_buffer), options,
81 pipe_id);
82 base::AutoLock lock(producer->lock_);
83 if (!producer->InitializeNoLock())
84 return nullptr;
85 return producer;
86 }
87
GetType() const88 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
89 return Type::DATA_PIPE_PRODUCER;
90 }
91
Close()92 MojoResult DataPipeProducerDispatcher::Close() {
93 base::AutoLock lock(lock_);
94 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
95 return CloseNoLock();
96 }
97
WriteData(const void * elements,uint32_t * num_bytes,const MojoWriteDataOptions & options)98 MojoResult DataPipeProducerDispatcher::WriteData(
99 const void* elements,
100 uint32_t* num_bytes,
101 const MojoWriteDataOptions& options) {
102 base::AutoLock lock(lock_);
103 if (!shared_ring_buffer_.IsValid() || in_transit_)
104 return MOJO_RESULT_INVALID_ARGUMENT;
105
106 if (in_two_phase_write_)
107 return MOJO_RESULT_BUSY;
108
109 if (peer_closed_)
110 return MOJO_RESULT_FAILED_PRECONDITION;
111
112 if (*num_bytes % options_.element_num_bytes != 0)
113 return MOJO_RESULT_INVALID_ARGUMENT;
114 if (*num_bytes == 0)
115 return MOJO_RESULT_OK; // Nothing to do.
116
117 if ((options.flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
118 (*num_bytes > available_capacity_)) {
119 // Don't return "should wait" since you can't wait for a specified amount of
120 // data.
121 return MOJO_RESULT_OUT_OF_RANGE;
122 }
123
124 DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
125 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
126 if (num_bytes_to_write == 0)
127 return MOJO_RESULT_SHOULD_WAIT;
128
129 *num_bytes = num_bytes_to_write;
130
131 CHECK(ring_buffer_mapping_.IsValid());
132 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
133 CHECK(data);
134
135 const uint8_t* source = static_cast<const uint8_t*>(elements);
136 CHECK(source);
137
138 DCHECK_LE(write_offset_, options_.capacity_num_bytes);
139 uint32_t tail_bytes_to_write =
140 std::min(options_.capacity_num_bytes - write_offset_, num_bytes_to_write);
141 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
142
143 DCHECK_GT(tail_bytes_to_write, 0u);
144 memcpy(data + write_offset_, source, tail_bytes_to_write);
145 if (head_bytes_to_write > 0)
146 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
147
148 DCHECK_LE(num_bytes_to_write, available_capacity_);
149 available_capacity_ -= num_bytes_to_write;
150 write_offset_ =
151 (write_offset_ + num_bytes_to_write) % options_.capacity_num_bytes;
152
153 watchers_.NotifyState(GetHandleSignalsStateNoLock());
154
155 base::AutoUnlock unlock(lock_);
156 NotifyWrite(num_bytes_to_write);
157
158 return MOJO_RESULT_OK;
159 }
160
BeginWriteData(void ** buffer,uint32_t * buffer_num_bytes)161 MojoResult DataPipeProducerDispatcher::BeginWriteData(
162 void** buffer,
163 uint32_t* buffer_num_bytes) {
164 base::AutoLock lock(lock_);
165 if (!shared_ring_buffer_.IsValid() || in_transit_)
166 return MOJO_RESULT_INVALID_ARGUMENT;
167
168 if (in_two_phase_write_)
169 return MOJO_RESULT_BUSY;
170 if (peer_closed_)
171 return MOJO_RESULT_FAILED_PRECONDITION;
172
173 if (available_capacity_ == 0) {
174 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
175 : MOJO_RESULT_SHOULD_WAIT;
176 }
177
178 in_two_phase_write_ = true;
179 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
180 available_capacity_);
181 DCHECK_GT(*buffer_num_bytes, 0u);
182
183 CHECK(ring_buffer_mapping_.IsValid());
184 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
185 *buffer = data + write_offset_;
186
187 return MOJO_RESULT_OK;
188 }
189
EndWriteData(uint32_t num_bytes_written)190 MojoResult DataPipeProducerDispatcher::EndWriteData(
191 uint32_t num_bytes_written) {
192 base::AutoLock lock(lock_);
193 if (is_closed_ || in_transit_)
194 return MOJO_RESULT_INVALID_ARGUMENT;
195
196 if (!in_two_phase_write_)
197 return MOJO_RESULT_FAILED_PRECONDITION;
198
199 // Note: Allow successful completion of the two-phase write even if the other
200 // side has been closed.
201 MojoResult rv = MOJO_RESULT_OK;
202 if (num_bytes_written > available_capacity_ ||
203 num_bytes_written % options_.element_num_bytes != 0 ||
204 write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
205 rv = MOJO_RESULT_INVALID_ARGUMENT;
206 } else {
207 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
208 available_capacity_ -= num_bytes_written;
209 write_offset_ =
210 (write_offset_ + num_bytes_written) % options_.capacity_num_bytes;
211
212 base::AutoUnlock unlock(lock_);
213 NotifyWrite(num_bytes_written);
214 }
215
216 in_two_phase_write_ = false;
217
218 // If we're now writable, we *became* writable (since we weren't writable
219 // during the two-phase write), so notify watchers.
220 watchers_.NotifyState(GetHandleSignalsStateNoLock());
221
222 return rv;
223 }
224
GetHandleSignalsState() const225 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
226 base::AutoLock lock(lock_);
227 return GetHandleSignalsStateNoLock();
228 }
229
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)230 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
231 const scoped_refptr<WatcherDispatcher>& watcher,
232 uintptr_t context) {
233 base::AutoLock lock(lock_);
234 if (is_closed_ || in_transit_)
235 return MOJO_RESULT_INVALID_ARGUMENT;
236 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
237 }
238
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)239 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
240 WatcherDispatcher* watcher,
241 uintptr_t context) {
242 base::AutoLock lock(lock_);
243 if (is_closed_ || in_transit_)
244 return MOJO_RESULT_INVALID_ARGUMENT;
245 return watchers_.Remove(watcher, context);
246 }
247
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)248 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
249 uint32_t* num_ports,
250 uint32_t* num_handles) {
251 base::AutoLock lock(lock_);
252 DCHECK(in_transit_);
253 *num_bytes = sizeof(SerializedState);
254 *num_ports = 1;
255 *num_handles = 1;
256 }
257
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)258 bool DataPipeProducerDispatcher::EndSerialize(
259 void* destination,
260 ports::PortName* ports,
261 PlatformHandle* platform_handles) {
262 SerializedState* state = static_cast<SerializedState*>(destination);
263 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
264 memset(state->padding, 0, sizeof(state->padding));
265
266 base::AutoLock lock(lock_);
267 DCHECK(in_transit_);
268 state->pipe_id = pipe_id_;
269 state->write_offset = write_offset_;
270 state->available_capacity = available_capacity_;
271 state->flags = peer_closed_ ? kFlagPeerClosed : 0;
272
273 auto region_handle =
274 base::UnsafeSharedMemoryRegion::TakeHandleForSerialization(
275 std::move(shared_ring_buffer_));
276 const base::UnguessableToken& guid = region_handle.GetGUID();
277 state->buffer_guid_high = guid.GetHighForSerialization();
278 state->buffer_guid_low = guid.GetLowForSerialization();
279
280 ports[0] = control_port_.name();
281
282 PlatformHandle handle;
283 PlatformHandle ignored_handle;
284 ExtractPlatformHandlesFromSharedMemoryRegionHandle(
285 region_handle.PassPlatformHandle(), &handle, &ignored_handle);
286 if (!handle.is_valid() || ignored_handle.is_valid())
287 return false;
288
289 platform_handles[0] = std::move(handle);
290 return true;
291 }
292
BeginTransit()293 bool DataPipeProducerDispatcher::BeginTransit() {
294 base::AutoLock lock(lock_);
295 if (in_transit_)
296 return false;
297 in_transit_ = !in_two_phase_write_;
298 return in_transit_;
299 }
300
CompleteTransitAndClose()301 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
302 node_controller_->SetPortObserver(control_port_, nullptr);
303
304 base::AutoLock lock(lock_);
305 DCHECK(in_transit_);
306 transferred_ = true;
307 in_transit_ = false;
308 CloseNoLock();
309 }
310
CancelTransit()311 void DataPipeProducerDispatcher::CancelTransit() {
312 base::AutoLock lock(lock_);
313 DCHECK(in_transit_);
314 in_transit_ = false;
315
316 HandleSignalsState state = GetHandleSignalsStateNoLock();
317 watchers_.NotifyState(state);
318 }
319
320 // static
321 scoped_refptr<DataPipeProducerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)322 DataPipeProducerDispatcher::Deserialize(const void* data,
323 size_t num_bytes,
324 const ports::PortName* ports,
325 size_t num_ports,
326 PlatformHandle* handles,
327 size_t num_handles) {
328 if (num_ports != 1 || num_handles != 1 ||
329 num_bytes != sizeof(SerializedState)) {
330 return nullptr;
331 }
332
333 const SerializedState* state = static_cast<const SerializedState*>(data);
334 if (!state->options.capacity_num_bytes || !state->options.element_num_bytes ||
335 state->options.capacity_num_bytes < state->options.element_num_bytes) {
336 return nullptr;
337 }
338
339 NodeController* node_controller = Core::Get()->GetNodeController();
340 ports::PortRef port;
341 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
342 return nullptr;
343
344 auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles(
345 std::move(handles[0]), PlatformHandle());
346 auto region = base::subtle::PlatformSharedMemoryRegion::Take(
347 std::move(region_handle),
348 base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe,
349 state->options.capacity_num_bytes,
350 base::UnguessableToken::Deserialize(state->buffer_guid_high,
351 state->buffer_guid_low));
352 auto ring_buffer =
353 base::UnsafeSharedMemoryRegion::Deserialize(std::move(region));
354 if (!ring_buffer.IsValid()) {
355 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
356 return nullptr;
357 }
358
359 scoped_refptr<DataPipeProducerDispatcher> dispatcher =
360 new DataPipeProducerDispatcher(node_controller, port,
361 std::move(ring_buffer), state->options,
362 state->pipe_id);
363
364 {
365 base::AutoLock lock(dispatcher->lock_);
366 dispatcher->write_offset_ = state->write_offset;
367 dispatcher->available_capacity_ = state->available_capacity;
368 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
369 if (!dispatcher->InitializeNoLock())
370 return nullptr;
371 dispatcher->UpdateSignalsStateNoLock();
372 }
373
374 return dispatcher;
375 }
376
DataPipeProducerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)377 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
378 NodeController* node_controller,
379 const ports::PortRef& control_port,
380 base::UnsafeSharedMemoryRegion shared_ring_buffer,
381 const MojoCreateDataPipeOptions& options,
382 uint64_t pipe_id)
383 : options_(options),
384 node_controller_(node_controller),
385 control_port_(control_port),
386 pipe_id_(pipe_id),
387 watchers_(this),
388 shared_ring_buffer_(std::move(shared_ring_buffer)),
389 available_capacity_(options_.capacity_num_bytes) {}
390
~DataPipeProducerDispatcher()391 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
392 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_.IsValid() &&
393 !ring_buffer_mapping_.IsValid());
394 }
395
InitializeNoLock()396 bool DataPipeProducerDispatcher::InitializeNoLock() {
397 lock_.AssertAcquired();
398 if (!shared_ring_buffer_.IsValid())
399 return false;
400
401 DCHECK(!ring_buffer_mapping_.IsValid());
402 ring_buffer_mapping_ = shared_ring_buffer_.Map();
403 if (!ring_buffer_mapping_.IsValid()) {
404 DLOG(ERROR) << "Failed to map shared buffer.";
405 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
406 return false;
407 }
408
409 base::AutoUnlock unlock(lock_);
410 node_controller_->SetPortObserver(
411 control_port_, base::MakeRefCounted<PortObserverThunk>(this));
412
413 return true;
414 }
415
CloseNoLock()416 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
417 lock_.AssertAcquired();
418 if (is_closed_ || in_transit_)
419 return MOJO_RESULT_INVALID_ARGUMENT;
420 is_closed_ = true;
421 ring_buffer_mapping_ = base::WritableSharedMemoryMapping();
422 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
423
424 watchers_.NotifyClosed();
425 if (!transferred_) {
426 base::AutoUnlock unlock(lock_);
427 node_controller_->ClosePort(control_port_);
428 }
429
430 return MOJO_RESULT_OK;
431 }
432
GetHandleSignalsStateNoLock() const433 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
434 const {
435 lock_.AssertAcquired();
436 HandleSignalsState rv;
437 if (!peer_closed_) {
438 if (!in_two_phase_write_ && shared_ring_buffer_.IsValid() &&
439 available_capacity_ > 0)
440 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
441 if (peer_remote_)
442 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
443 rv.satisfiable_signals |=
444 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_REMOTE;
445 } else {
446 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
447 }
448 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
449 return rv;
450 }
451
NotifyWrite(uint32_t num_bytes)452 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
453 DVLOG(1) << "Data pipe producer " << pipe_id_
454 << " notifying peer: " << num_bytes
455 << " bytes written. [control_port=" << control_port_.name() << "]";
456
457 SendDataPipeControlMessage(node_controller_, control_port_,
458 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
459 }
460
OnPortStatusChanged()461 void DataPipeProducerDispatcher::OnPortStatusChanged() {
462 DCHECK(RequestContext::current());
463
464 base::AutoLock lock(lock_);
465
466 // We stop observing the control port as soon it's transferred, but this can
467 // race with events which are raised right before that happens. This is fine
468 // to ignore.
469 if (transferred_)
470 return;
471
472 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
473
474 UpdateSignalsStateNoLock();
475 }
476
UpdateSignalsStateNoLock()477 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
478 lock_.AssertAcquired();
479
480 const bool was_peer_closed = peer_closed_;
481 const bool was_peer_remote = peer_remote_;
482 size_t previous_capacity = available_capacity_;
483
484 ports::PortStatus port_status;
485 int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
486 peer_remote_ = rv == ports::OK && port_status.peer_remote;
487 if (rv != ports::OK || !port_status.receiving_messages) {
488 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
489 << " [control_port=" << control_port_.name() << "]";
490 peer_closed_ = true;
491 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
492 std::unique_ptr<ports::UserMessageEvent> message_event;
493 do {
494 int rv = node_controller_->node()->GetMessage(control_port_,
495 &message_event, nullptr);
496 if (rv != ports::OK)
497 peer_closed_ = true;
498 if (message_event) {
499 auto* message = message_event->GetMessage<UserMessageImpl>();
500 if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
501 peer_closed_ = true;
502 break;
503 }
504
505 const DataPipeControlMessage* m =
506 static_cast<const DataPipeControlMessage*>(message->user_payload());
507
508 if (m->command != DataPipeCommand::DATA_WAS_READ) {
509 DLOG(ERROR) << "Unexpected message from consumer.";
510 peer_closed_ = true;
511 break;
512 }
513
514 if (static_cast<size_t>(available_capacity_) + m->num_bytes >
515 options_.capacity_num_bytes) {
516 DLOG(ERROR) << "Consumer claims to have read too many bytes.";
517 break;
518 }
519
520 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
521 << m->num_bytes
522 << " bytes were read. [control_port=" << control_port_.name()
523 << "]";
524
525 available_capacity_ += m->num_bytes;
526 }
527 } while (message_event);
528 }
529
530 if (peer_closed_ != was_peer_closed ||
531 available_capacity_ != previous_capacity ||
532 was_peer_remote != peer_remote_) {
533 watchers_.NotifyState(GetHandleSignalsStateNoLock());
534 }
535 }
536
537 } // namespace core
538 } // namespace mojo
539