1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/data_pipe_consumer_dispatcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <algorithm>
11 #include <limits>
12 #include <utility>
13
14 #include "base/bind.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "mojo/core/core.h"
18 #include "mojo/core/data_pipe_control_message.h"
19 #include "mojo/core/node_controller.h"
20 #include "mojo/core/platform_handle_utils.h"
21 #include "mojo/core/request_context.h"
22 #include "mojo/core/user_message_impl.h"
23 #include "mojo/public/c/system/data_pipe.h"
24
25 namespace mojo {
26 namespace core {
27
28 namespace {
29
30 const uint8_t kFlagPeerClosed = 0x01;
31
32 #pragma pack(push, 1)
33
34 struct SerializedState {
35 MojoCreateDataPipeOptions options;
36 uint64_t pipe_id;
37 uint32_t read_offset;
38 uint32_t bytes_available;
39 uint8_t flags;
40 uint64_t buffer_guid_high;
41 uint64_t buffer_guid_low;
42 char padding[7];
43 };
44
45 static_assert(sizeof(SerializedState) % 8 == 0,
46 "Invalid SerializedState size.");
47
48 #pragma pack(pop)
49
50 } // namespace
51
52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
53 // reference to the dispatcher to ensure it lives as long as the observed port.
54 class DataPipeConsumerDispatcher::PortObserverThunk
55 : public NodeController::PortObserver {
56 public:
PortObserverThunk(scoped_refptr<DataPipeConsumerDispatcher> dispatcher)57 explicit PortObserverThunk(
58 scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
59 : dispatcher_(dispatcher) {}
60
61 private:
~PortObserverThunk()62 ~PortObserverThunk() override {}
63
64 // NodeController::PortObserver:
OnPortStatusChanged()65 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
66
67 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
68
69 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
70 };
71
72 // static
Create(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)73 scoped_refptr<DataPipeConsumerDispatcher> DataPipeConsumerDispatcher::Create(
74 NodeController* node_controller,
75 const ports::PortRef& control_port,
76 base::UnsafeSharedMemoryRegion shared_ring_buffer,
77 const MojoCreateDataPipeOptions& options,
78 uint64_t pipe_id) {
79 scoped_refptr<DataPipeConsumerDispatcher> consumer =
80 new DataPipeConsumerDispatcher(node_controller, control_port,
81 std::move(shared_ring_buffer), options,
82 pipe_id);
83 base::AutoLock lock(consumer->lock_);
84 if (!consumer->InitializeNoLock())
85 return nullptr;
86 return consumer;
87 }
88
GetType() const89 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
90 return Type::DATA_PIPE_CONSUMER;
91 }
92
Close()93 MojoResult DataPipeConsumerDispatcher::Close() {
94 base::AutoLock lock(lock_);
95 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
96 return CloseNoLock();
97 }
98
ReadData(const MojoReadDataOptions & options,void * elements,uint32_t * num_bytes)99 MojoResult DataPipeConsumerDispatcher::ReadData(
100 const MojoReadDataOptions& options,
101 void* elements,
102 uint32_t* num_bytes) {
103 base::AutoLock lock(lock_);
104
105 if (!shared_ring_buffer_.IsValid() || in_transit_)
106 return MOJO_RESULT_INVALID_ARGUMENT;
107
108 if (in_two_phase_read_)
109 return MOJO_RESULT_BUSY;
110
111 const bool had_new_data = new_data_available_;
112 new_data_available_ = false;
113
114 if ((options.flags & MOJO_READ_DATA_FLAG_QUERY)) {
115 if ((options.flags & MOJO_READ_DATA_FLAG_PEEK) ||
116 (options.flags & MOJO_READ_DATA_FLAG_DISCARD))
117 return MOJO_RESULT_INVALID_ARGUMENT;
118 DCHECK(!(options.flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
119 DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|";
120 *num_bytes = static_cast<uint32_t>(bytes_available_);
121
122 if (had_new_data)
123 watchers_.NotifyState(GetHandleSignalsStateNoLock());
124 return MOJO_RESULT_OK;
125 }
126
127 bool discard = false;
128 if ((options.flags & MOJO_READ_DATA_FLAG_DISCARD)) {
129 // These flags are mutally exclusive.
130 if (options.flags & MOJO_READ_DATA_FLAG_PEEK)
131 return MOJO_RESULT_INVALID_ARGUMENT;
132 DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|";
133 discard = true;
134 }
135
136 uint32_t max_num_bytes_to_read = *num_bytes;
137 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
138 return MOJO_RESULT_INVALID_ARGUMENT;
139
140 bool all_or_none = options.flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
141 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
142
143 if (min_num_bytes_to_read > bytes_available_) {
144 if (had_new_data)
145 watchers_.NotifyState(GetHandleSignalsStateNoLock());
146 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
147 : MOJO_RESULT_OUT_OF_RANGE;
148 }
149
150 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
151 if (bytes_to_read == 0) {
152 if (had_new_data)
153 watchers_.NotifyState(GetHandleSignalsStateNoLock());
154 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
155 : MOJO_RESULT_SHOULD_WAIT;
156 }
157
158 if (!discard) {
159 const uint8_t* data =
160 static_cast<const uint8_t*>(ring_buffer_mapping_.memory());
161 CHECK(data);
162
163 uint8_t* destination = static_cast<uint8_t*>(elements);
164 CHECK(destination);
165
166 DCHECK_LE(read_offset_, options_.capacity_num_bytes);
167 uint32_t tail_bytes_to_copy =
168 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
169 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
170 if (tail_bytes_to_copy > 0)
171 memcpy(destination, data + read_offset_, tail_bytes_to_copy);
172 if (head_bytes_to_copy > 0)
173 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
174 }
175 *num_bytes = bytes_to_read;
176
177 bool peek = !!(options.flags & MOJO_READ_DATA_FLAG_PEEK);
178 if (discard || !peek) {
179 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
180 bytes_available_ -= bytes_to_read;
181
182 base::AutoUnlock unlock(lock_);
183 NotifyRead(bytes_to_read);
184 }
185
186 // We may have just read the last available data and thus changed the signals
187 // state.
188 watchers_.NotifyState(GetHandleSignalsStateNoLock());
189
190 return MOJO_RESULT_OK;
191 }
192
BeginReadData(const void ** buffer,uint32_t * buffer_num_bytes)193 MojoResult DataPipeConsumerDispatcher::BeginReadData(
194 const void** buffer,
195 uint32_t* buffer_num_bytes) {
196 base::AutoLock lock(lock_);
197 if (!shared_ring_buffer_.IsValid() || in_transit_)
198 return MOJO_RESULT_INVALID_ARGUMENT;
199
200 if (in_two_phase_read_)
201 return MOJO_RESULT_BUSY;
202
203 const bool had_new_data = new_data_available_;
204 new_data_available_ = false;
205
206 if (bytes_available_ == 0) {
207 if (had_new_data)
208 watchers_.NotifyState(GetHandleSignalsStateNoLock());
209 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
210 : MOJO_RESULT_SHOULD_WAIT;
211 }
212
213 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
214 uint32_t bytes_to_read =
215 std::min(bytes_available_, options_.capacity_num_bytes - read_offset_);
216
217 CHECK(ring_buffer_mapping_.IsValid());
218 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
219 CHECK(data);
220
221 in_two_phase_read_ = true;
222 *buffer = data + read_offset_;
223 *buffer_num_bytes = bytes_to_read;
224 two_phase_max_bytes_read_ = bytes_to_read;
225
226 if (had_new_data)
227 watchers_.NotifyState(GetHandleSignalsStateNoLock());
228
229 return MOJO_RESULT_OK;
230 }
231
EndReadData(uint32_t num_bytes_read)232 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
233 base::AutoLock lock(lock_);
234 if (!in_two_phase_read_)
235 return MOJO_RESULT_FAILED_PRECONDITION;
236
237 if (in_transit_)
238 return MOJO_RESULT_INVALID_ARGUMENT;
239
240 CHECK(shared_ring_buffer_.IsValid());
241
242 MojoResult rv;
243 if (num_bytes_read > two_phase_max_bytes_read_ ||
244 num_bytes_read % options_.element_num_bytes != 0) {
245 rv = MOJO_RESULT_INVALID_ARGUMENT;
246 } else {
247 rv = MOJO_RESULT_OK;
248 read_offset_ =
249 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
250
251 DCHECK_GE(bytes_available_, num_bytes_read);
252 bytes_available_ -= num_bytes_read;
253
254 base::AutoUnlock unlock(lock_);
255 NotifyRead(num_bytes_read);
256 }
257
258 in_two_phase_read_ = false;
259 two_phase_max_bytes_read_ = 0;
260
261 watchers_.NotifyState(GetHandleSignalsStateNoLock());
262
263 return rv;
264 }
265
GetHandleSignalsState() const266 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
267 base::AutoLock lock(lock_);
268 return GetHandleSignalsStateNoLock();
269 }
270
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)271 MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
272 const scoped_refptr<WatcherDispatcher>& watcher,
273 uintptr_t context) {
274 base::AutoLock lock(lock_);
275 if (is_closed_ || in_transit_)
276 return MOJO_RESULT_INVALID_ARGUMENT;
277 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
278 }
279
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)280 MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
281 WatcherDispatcher* watcher,
282 uintptr_t context) {
283 base::AutoLock lock(lock_);
284 if (is_closed_ || in_transit_)
285 return MOJO_RESULT_INVALID_ARGUMENT;
286 return watchers_.Remove(watcher, context);
287 }
288
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)289 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
290 uint32_t* num_ports,
291 uint32_t* num_handles) {
292 base::AutoLock lock(lock_);
293 DCHECK(in_transit_);
294 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
295 *num_ports = 1;
296 *num_handles = 1;
297 }
298
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)299 bool DataPipeConsumerDispatcher::EndSerialize(
300 void* destination,
301 ports::PortName* ports,
302 PlatformHandle* platform_handles) {
303 SerializedState* state = static_cast<SerializedState*>(destination);
304 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
305 memset(state->padding, 0, sizeof(state->padding));
306
307 base::AutoLock lock(lock_);
308 DCHECK(in_transit_);
309 state->pipe_id = pipe_id_;
310 state->read_offset = read_offset_;
311 state->bytes_available = bytes_available_;
312 state->flags = peer_closed_ ? kFlagPeerClosed : 0;
313
314 auto region_handle =
315 base::UnsafeSharedMemoryRegion::TakeHandleForSerialization(
316 std::move(shared_ring_buffer_));
317 const base::UnguessableToken& guid = region_handle.GetGUID();
318 state->buffer_guid_high = guid.GetHighForSerialization();
319 state->buffer_guid_low = guid.GetLowForSerialization();
320
321 ports[0] = control_port_.name();
322
323 PlatformHandle handle;
324 PlatformHandle ignored_handle;
325 ExtractPlatformHandlesFromSharedMemoryRegionHandle(
326 region_handle.PassPlatformHandle(), &handle, &ignored_handle);
327 if (!handle.is_valid() || ignored_handle.is_valid())
328 return false;
329
330 platform_handles[0] = std::move(handle);
331 return true;
332 }
333
BeginTransit()334 bool DataPipeConsumerDispatcher::BeginTransit() {
335 base::AutoLock lock(lock_);
336 if (in_transit_)
337 return false;
338 in_transit_ = !in_two_phase_read_;
339 return in_transit_;
340 }
341
CompleteTransitAndClose()342 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
343 node_controller_->SetPortObserver(control_port_, nullptr);
344
345 base::AutoLock lock(lock_);
346 DCHECK(in_transit_);
347 in_transit_ = false;
348 transferred_ = true;
349 CloseNoLock();
350 }
351
CancelTransit()352 void DataPipeConsumerDispatcher::CancelTransit() {
353 base::AutoLock lock(lock_);
354 DCHECK(in_transit_);
355 in_transit_ = false;
356 UpdateSignalsStateNoLock();
357 }
358
359 // static
360 scoped_refptr<DataPipeConsumerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)361 DataPipeConsumerDispatcher::Deserialize(const void* data,
362 size_t num_bytes,
363 const ports::PortName* ports,
364 size_t num_ports,
365 PlatformHandle* handles,
366 size_t num_handles) {
367 if (num_ports != 1 || num_handles != 1 ||
368 num_bytes != sizeof(SerializedState)) {
369 return nullptr;
370 }
371
372 const SerializedState* state = static_cast<const SerializedState*>(data);
373 if (!state->options.capacity_num_bytes || !state->options.element_num_bytes ||
374 state->options.capacity_num_bytes < state->options.element_num_bytes) {
375 return nullptr;
376 }
377
378 NodeController* node_controller = Core::Get()->GetNodeController();
379 ports::PortRef port;
380 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
381 return nullptr;
382
383 auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles(
384 std::move(handles[0]), PlatformHandle());
385 auto region = base::subtle::PlatformSharedMemoryRegion::Take(
386 std::move(region_handle),
387 base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe,
388 state->options.capacity_num_bytes,
389 base::UnguessableToken::Deserialize(state->buffer_guid_high,
390 state->buffer_guid_low));
391 auto ring_buffer =
392 base::UnsafeSharedMemoryRegion::Deserialize(std::move(region));
393 if (!ring_buffer.IsValid()) {
394 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
395 return nullptr;
396 }
397
398 scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
399 new DataPipeConsumerDispatcher(node_controller, port,
400 std::move(ring_buffer), state->options,
401 state->pipe_id);
402
403 {
404 base::AutoLock lock(dispatcher->lock_);
405 dispatcher->read_offset_ = state->read_offset;
406 dispatcher->bytes_available_ = state->bytes_available;
407 dispatcher->new_data_available_ = state->bytes_available > 0;
408 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
409 if (!dispatcher->InitializeNoLock())
410 return nullptr;
411 dispatcher->UpdateSignalsStateNoLock();
412 }
413
414 return dispatcher;
415 }
416
DataPipeConsumerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)417 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
418 NodeController* node_controller,
419 const ports::PortRef& control_port,
420 base::UnsafeSharedMemoryRegion shared_ring_buffer,
421 const MojoCreateDataPipeOptions& options,
422 uint64_t pipe_id)
423 : options_(options),
424 node_controller_(node_controller),
425 control_port_(control_port),
426 pipe_id_(pipe_id),
427 watchers_(this),
428 shared_ring_buffer_(std::move(shared_ring_buffer)) {}
429
~DataPipeConsumerDispatcher()430 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
431 DCHECK(is_closed_ && !shared_ring_buffer_.IsValid() &&
432 !ring_buffer_mapping_.IsValid() && !in_transit_);
433 }
434
InitializeNoLock()435 bool DataPipeConsumerDispatcher::InitializeNoLock() {
436 lock_.AssertAcquired();
437 if (!shared_ring_buffer_.IsValid())
438 return false;
439
440 DCHECK(!ring_buffer_mapping_.IsValid());
441 ring_buffer_mapping_ = shared_ring_buffer_.Map();
442 if (!ring_buffer_mapping_.IsValid()) {
443 DLOG(ERROR) << "Failed to map shared buffer.";
444 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
445 return false;
446 }
447
448 base::AutoUnlock unlock(lock_);
449 node_controller_->SetPortObserver(
450 control_port_, base::MakeRefCounted<PortObserverThunk>(this));
451
452 return true;
453 }
454
CloseNoLock()455 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
456 lock_.AssertAcquired();
457 if (is_closed_ || in_transit_)
458 return MOJO_RESULT_INVALID_ARGUMENT;
459 is_closed_ = true;
460 ring_buffer_mapping_ = base::WritableSharedMemoryMapping();
461 shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
462
463 watchers_.NotifyClosed();
464 if (!transferred_) {
465 base::AutoUnlock unlock(lock_);
466 node_controller_->ClosePort(control_port_);
467 }
468
469 return MOJO_RESULT_OK;
470 }
471
GetHandleSignalsStateNoLock() const472 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock()
473 const {
474 lock_.AssertAcquired();
475
476 HandleSignalsState rv;
477 if (shared_ring_buffer_.IsValid() && bytes_available_) {
478 if (!in_two_phase_read_) {
479 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
480 if (new_data_available_)
481 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
482 }
483 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
484 } else if (!peer_closed_ && shared_ring_buffer_.IsValid()) {
485 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
486 }
487
488 if (shared_ring_buffer_.IsValid()) {
489 if (new_data_available_ || !peer_closed_)
490 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE;
491 }
492
493 if (peer_closed_) {
494 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
495 } else {
496 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
497 if (peer_remote_)
498 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
499 }
500 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
501
502 return rv;
503 }
504
NotifyRead(uint32_t num_bytes)505 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
506 DVLOG(1) << "Data pipe consumer " << pipe_id_
507 << " notifying peer: " << num_bytes
508 << " bytes read. [control_port=" << control_port_.name() << "]";
509
510 SendDataPipeControlMessage(node_controller_, control_port_,
511 DataPipeCommand::DATA_WAS_READ, num_bytes);
512 }
513
OnPortStatusChanged()514 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
515 DCHECK(RequestContext::current());
516
517 base::AutoLock lock(lock_);
518
519 // We stop observing the control port as soon it's transferred, but this can
520 // race with events which are raised right before that happens. This is fine
521 // to ignore.
522 if (transferred_)
523 return;
524
525 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
526
527 UpdateSignalsStateNoLock();
528 }
529
UpdateSignalsStateNoLock()530 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
531 lock_.AssertAcquired();
532
533 const bool was_peer_closed = peer_closed_;
534 const bool was_peer_remote = peer_remote_;
535 size_t previous_bytes_available = bytes_available_;
536
537 ports::PortStatus port_status;
538 int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
539 peer_remote_ = rv == ports::OK && port_status.peer_remote;
540 if (rv != ports::OK || !port_status.receiving_messages) {
541 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
542 << " [control_port=" << control_port_.name() << "]";
543 peer_closed_ = true;
544 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
545 std::unique_ptr<ports::UserMessageEvent> message_event;
546 do {
547 int rv = node_controller_->node()->GetMessage(control_port_,
548 &message_event, nullptr);
549 if (rv != ports::OK)
550 peer_closed_ = true;
551 if (message_event) {
552 auto* message = message_event->GetMessage<UserMessageImpl>();
553 if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
554 peer_closed_ = true;
555 break;
556 }
557
558 const DataPipeControlMessage* m =
559 static_cast<const DataPipeControlMessage*>(message->user_payload());
560
561 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
562 DLOG(ERROR) << "Unexpected control message from producer.";
563 peer_closed_ = true;
564 break;
565 }
566
567 if (static_cast<size_t>(bytes_available_) + m->num_bytes >
568 options_.capacity_num_bytes) {
569 DLOG(ERROR) << "Producer claims to have written too many bytes.";
570 peer_closed_ = true;
571 break;
572 }
573
574 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
575 << m->num_bytes << " bytes were written. [control_port="
576 << control_port_.name() << "]";
577
578 bytes_available_ += m->num_bytes;
579 }
580 } while (message_event);
581 }
582
583 bool has_new_data = bytes_available_ != previous_bytes_available;
584 if (has_new_data)
585 new_data_available_ = true;
586
587 if (peer_closed_ != was_peer_closed || has_new_data ||
588 peer_remote_ != was_peer_remote) {
589 watchers_.NotifyState(GetHandleSignalsStateNoLock());
590 }
591 }
592
593 } // namespace core
594 } // namespace mojo
595