• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 
7 #include <limits>
8 #include <memory>
9 
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/edk/embedder/embedder_internal.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/message_for_transit.h"
16 #include "mojo/edk/system/node_controller.h"
17 #include "mojo/edk/system/ports/message_filter.h"
18 #include "mojo/edk/system/ports_message.h"
19 #include "mojo/edk/system/request_context.h"
20 
21 namespace mojo {
22 namespace edk {
23 
24 namespace {
25 
26 using DispatcherHeader = MessageForTransit::DispatcherHeader;
27 using MessageHeader = MessageForTransit::MessageHeader;
28 
29 #pragma pack(push, 1)
30 
31 struct SerializedState {
32   uint64_t pipe_id;
33   int8_t endpoint;
34   char padding[7];
35 };
36 
37 static_assert(sizeof(SerializedState) % 8 == 0,
38               "Invalid SerializedState size.");
39 
40 #pragma pack(pop)
41 
42 }  // namespace
43 
44 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
45 // reference to the MPD to ensure it lives as long as the observed port.
46 class MessagePipeDispatcher::PortObserverThunk
47     : public NodeController::PortObserver {
48  public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)49   explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
50       : dispatcher_(dispatcher) {}
51 
52  private:
~PortObserverThunk()53   ~PortObserverThunk() override {}
54 
55   // NodeController::PortObserver:
OnPortStatusChanged()56   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
57 
58   scoped_refptr<MessagePipeDispatcher> dispatcher_;
59 
60   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
61 };
62 
63 // A MessageFilter used by ReadMessage to determine whether a message should
64 // actually be consumed yet.
65 class ReadMessageFilter : public ports::MessageFilter {
66  public:
67   // Creates a new ReadMessageFilter which captures and potentially modifies
68   // various (unowned) local state within MessagePipeDispatcher::ReadMessage.
ReadMessageFilter(bool read_any_size,bool may_discard,uint32_t * num_bytes,uint32_t * num_handles,bool * no_space,bool * invalid_message)69   ReadMessageFilter(bool read_any_size,
70                     bool may_discard,
71                     uint32_t* num_bytes,
72                     uint32_t* num_handles,
73                     bool* no_space,
74                     bool* invalid_message)
75       : read_any_size_(read_any_size),
76         may_discard_(may_discard),
77         num_bytes_(num_bytes),
78         num_handles_(num_handles),
79         no_space_(no_space),
80         invalid_message_(invalid_message) {}
81 
~ReadMessageFilter()82   ~ReadMessageFilter() override {}
83 
84   // ports::MessageFilter:
Match(const ports::Message & m)85   bool Match(const ports::Message& m) override {
86     const PortsMessage& message = static_cast<const PortsMessage&>(m);
87     if (message.num_payload_bytes() < sizeof(MessageHeader)) {
88       *invalid_message_ = true;
89       return true;
90     }
91 
92     const MessageHeader* header =
93         static_cast<const MessageHeader*>(message.payload_bytes());
94     if (header->header_size > message.num_payload_bytes()) {
95       *invalid_message_ = true;
96       return true;
97     }
98 
99     uint32_t bytes_to_read = 0;
100     uint32_t bytes_available =
101         static_cast<uint32_t>(message.num_payload_bytes()) -
102         header->header_size;
103     if (num_bytes_) {
104       bytes_to_read = std::min(*num_bytes_, bytes_available);
105       *num_bytes_ = bytes_available;
106     }
107 
108     uint32_t handles_to_read = 0;
109     uint32_t handles_available = header->num_dispatchers;
110     if (num_handles_) {
111       handles_to_read = std::min(*num_handles_, handles_available);
112       *num_handles_ = handles_available;
113     }
114 
115     if (handles_to_read < handles_available ||
116         (!read_any_size_ && bytes_to_read < bytes_available)) {
117       *no_space_ = true;
118       return may_discard_;
119     }
120 
121     return true;
122   }
123 
124  private:
125   const bool read_any_size_;
126   const bool may_discard_;
127   uint32_t* const num_bytes_;
128   uint32_t* const num_handles_;
129   bool* const no_space_;
130   bool* const invalid_message_;
131 
132   DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter);
133 };
134 
135 #if DCHECK_IS_ON()
136 
137 // A MessageFilter which never matches a message. Used to peek at the size of
138 // the next available message on a port, for debug logging only.
139 class PeekSizeMessageFilter : public ports::MessageFilter {
140  public:
PeekSizeMessageFilter()141   PeekSizeMessageFilter() {}
~PeekSizeMessageFilter()142   ~PeekSizeMessageFilter() override {}
143 
144   // ports::MessageFilter:
Match(const ports::Message & message)145   bool Match(const ports::Message& message) override {
146     message_size_ = message.num_payload_bytes();
147     return false;
148   }
149 
message_size() const150   size_t message_size() const { return message_size_; }
151 
152  private:
153   size_t message_size_ = 0;
154 
155   DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
156 };
157 
158 #endif  // DCHECK_IS_ON()
159 
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
161                                              const ports::PortRef& port,
162                                              uint64_t pipe_id,
163                                              int endpoint)
164     : node_controller_(node_controller),
165       port_(port),
166       pipe_id_(pipe_id),
167       endpoint_(endpoint) {
168   DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
169            << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
170 
171   node_controller_->SetPortObserver(
172       port_,
173       make_scoped_refptr(new PortObserverThunk(this)));
174 }
175 
Fuse(MessagePipeDispatcher * other)176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
177   node_controller_->SetPortObserver(port_, nullptr);
178   node_controller_->SetPortObserver(other->port_, nullptr);
179 
180   ports::PortRef port0;
181   {
182     base::AutoLock lock(signal_lock_);
183     port0 = port_;
184     port_closed_.Set(true);
185     awakables_.CancelAll();
186   }
187 
188   ports::PortRef port1;
189   {
190     base::AutoLock lock(other->signal_lock_);
191     port1 = other->port_;
192     other->port_closed_.Set(true);
193     other->awakables_.CancelAll();
194   }
195 
196   // Both ports are always closed by this call.
197   int rv = node_controller_->MergeLocalPorts(port0, port1);
198   return rv == ports::OK;
199 }
200 
GetType() const201 Dispatcher::Type MessagePipeDispatcher::GetType() const {
202   return Type::MESSAGE_PIPE;
203 }
204 
Close()205 MojoResult MessagePipeDispatcher::Close() {
206   base::AutoLock lock(signal_lock_);
207   DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
208            << " [port=" << port_.name() << "]";
209   return CloseNoLock();
210 }
211 
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
213                                         const Watcher::WatchCallback& callback,
214                                         uintptr_t context) {
215   base::AutoLock lock(signal_lock_);
216 
217   if (port_closed_ || in_transit_)
218     return MOJO_RESULT_INVALID_ARGUMENT;
219 
220   return awakables_.AddWatcher(
221       signals, callback, context, GetHandleSignalsStateNoLock());
222 }
223 
CancelWatch(uintptr_t context)224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
225   base::AutoLock lock(signal_lock_);
226 
227   if (port_closed_ || in_transit_)
228     return MOJO_RESULT_INVALID_ARGUMENT;
229 
230   return awakables_.RemoveWatcher(context);
231 }
232 
WriteMessage(std::unique_ptr<MessageForTransit> message,MojoWriteMessageFlags flags)233 MojoResult MessagePipeDispatcher::WriteMessage(
234     std::unique_ptr<MessageForTransit> message,
235     MojoWriteMessageFlags flags) {
236   if (port_closed_ || in_transit_)
237     return MOJO_RESULT_INVALID_ARGUMENT;
238 
239   size_t num_bytes = message->num_bytes();
240   int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
241 
242   DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
243            << " [port=" << port_.name() << "; rv=" << rv
244            << "; num_bytes=" << num_bytes << "]";
245 
246   if (rv != ports::OK) {
247     if (rv == ports::ERROR_PORT_UNKNOWN ||
248         rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
249         rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
250       return MOJO_RESULT_INVALID_ARGUMENT;
251     } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
252       return MOJO_RESULT_FAILED_PRECONDITION;
253     }
254 
255     NOTREACHED();
256     return MOJO_RESULT_UNKNOWN;
257   }
258 
259   return MOJO_RESULT_OK;
260 }
261 
ReadMessage(std::unique_ptr<MessageForTransit> * message,uint32_t * num_bytes,MojoHandle * handles,uint32_t * num_handles,MojoReadMessageFlags flags,bool read_any_size)262 MojoResult MessagePipeDispatcher::ReadMessage(
263     std::unique_ptr<MessageForTransit>* message,
264     uint32_t* num_bytes,
265     MojoHandle* handles,
266     uint32_t* num_handles,
267     MojoReadMessageFlags flags,
268     bool read_any_size) {
269   // We can't read from a port that's closed or in transit!
270   if (port_closed_ || in_transit_)
271     return MOJO_RESULT_INVALID_ARGUMENT;
272 
273   bool no_space = false;
274   bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
275   bool invalid_message = false;
276 
277   // Grab a message if the provided handles buffer is large enough. If the input
278   // |num_bytes| is provided and |read_any_size| is false, we also ensure
279   // that it specifies a size at least as large as the next available payload.
280   //
281   // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
282   // This flag exists to support both new and old API behavior.
283 
284   ports::ScopedMessage ports_message;
285   ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles,
286                            &no_space, &invalid_message);
287   int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter);
288 
289   if (invalid_message)
290     return MOJO_RESULT_UNKNOWN;
291 
292   if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
293     if (rv == ports::ERROR_PORT_UNKNOWN ||
294         rv == ports::ERROR_PORT_STATE_UNEXPECTED)
295       return MOJO_RESULT_INVALID_ARGUMENT;
296 
297     NOTREACHED();
298     return MOJO_RESULT_UNKNOWN;  // TODO: Add a better error code here?
299   }
300 
301   if (no_space) {
302     // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
303     // sufficient to hold this message's data. The message will still be in
304     // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
305     return MOJO_RESULT_RESOURCE_EXHAUSTED;
306   }
307 
308   if (!ports_message) {
309     // No message was available in queue.
310 
311     if (rv == ports::OK)
312       return MOJO_RESULT_SHOULD_WAIT;
313 
314     // Peer is closed and there are no more messages to read.
315     DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
316     return MOJO_RESULT_FAILED_PRECONDITION;
317   }
318 
319   // Alright! We have a message and the caller has provided sufficient storage
320   // in which to receive it.
321 
322   std::unique_ptr<PortsMessage> msg(
323       static_cast<PortsMessage*>(ports_message.release()));
324 
325   const MessageHeader* header =
326       static_cast<const MessageHeader*>(msg->payload_bytes());
327   const DispatcherHeader* dispatcher_headers =
328       reinterpret_cast<const DispatcherHeader*>(header + 1);
329 
330   if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
331     return MOJO_RESULT_UNKNOWN;
332 
333   // Deserialize dispatchers.
334   if (header->num_dispatchers > 0) {
335     CHECK(handles);
336     std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
337     size_t data_payload_index = sizeof(MessageHeader) +
338         header->num_dispatchers * sizeof(DispatcherHeader);
339     if (data_payload_index > header->header_size)
340       return MOJO_RESULT_UNKNOWN;
341     const char* dispatcher_data = reinterpret_cast<const char*>(
342         dispatcher_headers + header->num_dispatchers);
343     size_t port_index = 0;
344     size_t platform_handle_index = 0;
345     ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles();
346     const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0;
347     for (size_t i = 0; i < header->num_dispatchers; ++i) {
348       const DispatcherHeader& dh = dispatcher_headers[i];
349       Type type = static_cast<Type>(dh.type);
350 
351       size_t next_payload_index = data_payload_index + dh.num_bytes;
352       if (msg->num_payload_bytes() < next_payload_index ||
353           next_payload_index < data_payload_index) {
354         return MOJO_RESULT_UNKNOWN;
355       }
356 
357       size_t next_port_index = port_index + dh.num_ports;
358       if (msg->num_ports() < next_port_index || next_port_index < port_index)
359         return MOJO_RESULT_UNKNOWN;
360 
361       size_t next_platform_handle_index =
362           platform_handle_index + dh.num_platform_handles;
363       if (num_msg_handles < next_platform_handle_index ||
364           next_platform_handle_index < platform_handle_index) {
365         return MOJO_RESULT_UNKNOWN;
366       }
367 
368       PlatformHandle* out_handles =
369           num_msg_handles ? msg_handles->data() + platform_handle_index
370                           : nullptr;
371       dispatchers[i].dispatcher = Dispatcher::Deserialize(
372           type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
373           dh.num_ports, out_handles, dh.num_platform_handles);
374       if (!dispatchers[i].dispatcher)
375         return MOJO_RESULT_UNKNOWN;
376 
377       dispatcher_data += dh.num_bytes;
378       data_payload_index = next_payload_index;
379       port_index = next_port_index;
380       platform_handle_index = next_platform_handle_index;
381     }
382 
383     if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
384                                                              handles))
385       return MOJO_RESULT_UNKNOWN;
386   }
387 
388   CHECK(msg);
389   *message = MessageForTransit::WrapPortsMessage(std::move(msg));
390   return MOJO_RESULT_OK;
391 }
392 
393 HandleSignalsState
GetHandleSignalsState() const394 MessagePipeDispatcher::GetHandleSignalsState() const {
395   base::AutoLock lock(signal_lock_);
396   return GetHandleSignalsStateNoLock();
397 }
398 
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)399 MojoResult MessagePipeDispatcher::AddAwakable(
400     Awakable* awakable,
401     MojoHandleSignals signals,
402     uintptr_t context,
403     HandleSignalsState* signals_state) {
404   base::AutoLock lock(signal_lock_);
405 
406   if (port_closed_ || in_transit_) {
407     if (signals_state)
408       *signals_state = HandleSignalsState();
409     return MOJO_RESULT_INVALID_ARGUMENT;
410   }
411 
412   HandleSignalsState state = GetHandleSignalsStateNoLock();
413 
414   DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
415            << endpoint_ << " [awakable=" << awakable << "; port="
416            << port_.name() << "; signals=" << signals << "; satisfied="
417            << state.satisfied_signals << "; satisfiable="
418            << state.satisfiable_signals << "]";
419 
420   if (state.satisfies(signals)) {
421     if (signals_state)
422       *signals_state = state;
423     DVLOG(2) << "Signals already set for " << port_.name();
424     return MOJO_RESULT_ALREADY_EXISTS;
425   }
426   if (!state.can_satisfy(signals)) {
427     if (signals_state)
428       *signals_state = state;
429     DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
430     return MOJO_RESULT_FAILED_PRECONDITION;
431   }
432 
433   DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
434            << endpoint_ << " [awakable=" << awakable << "; port="
435            << port_.name() << "; signals=" << signals << "]";
436 
437   awakables_.Add(awakable, signals, context);
438   return MOJO_RESULT_OK;
439 }
440 
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)441 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
442                                            HandleSignalsState* signals_state) {
443   base::AutoLock lock(signal_lock_);
444   if (port_closed_ || in_transit_) {
445     if (signals_state)
446       *signals_state = HandleSignalsState();
447   } else if (signals_state) {
448     *signals_state = GetHandleSignalsStateNoLock();
449   }
450 
451   DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
452            << endpoint_ << " [awakable=" << awakable << "; port="
453            << port_.name() << "]";
454 
455   awakables_.Remove(awakable);
456 }
457 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)458 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
459                                            uint32_t* num_ports,
460                                            uint32_t* num_handles) {
461   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
462   *num_ports = 1;
463   *num_handles = 0;
464 }
465 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)466 bool MessagePipeDispatcher::EndSerialize(void* destination,
467                                          ports::PortName* ports,
468                                          PlatformHandle* handles) {
469   SerializedState* state = static_cast<SerializedState*>(destination);
470   state->pipe_id = pipe_id_;
471   state->endpoint = static_cast<int8_t>(endpoint_);
472   memset(state->padding, 0, sizeof(state->padding));
473   ports[0] = port_.name();
474   return true;
475 }
476 
BeginTransit()477 bool MessagePipeDispatcher::BeginTransit() {
478   base::AutoLock lock(signal_lock_);
479   if (in_transit_ || port_closed_)
480     return false;
481   in_transit_.Set(true);
482   return in_transit_;
483 }
484 
CompleteTransitAndClose()485 void MessagePipeDispatcher::CompleteTransitAndClose() {
486   node_controller_->SetPortObserver(port_, nullptr);
487 
488   base::AutoLock lock(signal_lock_);
489   port_transferred_ = true;
490   in_transit_.Set(false);
491   CloseNoLock();
492 }
493 
CancelTransit()494 void MessagePipeDispatcher::CancelTransit() {
495   base::AutoLock lock(signal_lock_);
496   in_transit_.Set(false);
497 
498   // Something may have happened while we were waiting for potential transit.
499   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
500 }
501 
502 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
504     const void* data,
505     size_t num_bytes,
506     const ports::PortName* ports,
507     size_t num_ports,
508     PlatformHandle* handles,
509     size_t num_handles) {
510   if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
511     return nullptr;
512 
513   const SerializedState* state = static_cast<const SerializedState*>(data);
514 
515   ports::PortRef port;
516   CHECK_EQ(
517       ports::OK,
518       internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
519 
520   return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
521                                    state->pipe_id, state->endpoint);
522 }
523 
~MessagePipeDispatcher()524 MessagePipeDispatcher::~MessagePipeDispatcher() {
525   DCHECK(port_closed_ && !in_transit_);
526 }
527 
CloseNoLock()528 MojoResult MessagePipeDispatcher::CloseNoLock() {
529   signal_lock_.AssertAcquired();
530   if (port_closed_ || in_transit_)
531     return MOJO_RESULT_INVALID_ARGUMENT;
532 
533   port_closed_.Set(true);
534   awakables_.CancelAll();
535 
536   if (!port_transferred_) {
537     base::AutoUnlock unlock(signal_lock_);
538     node_controller_->ClosePort(port_);
539   }
540 
541   return MOJO_RESULT_OK;
542 }
543 
GetHandleSignalsStateNoLock() const544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
545   HandleSignalsState rv;
546 
547   ports::PortStatus port_status;
548   if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
549     CHECK(in_transit_ || port_transferred_ || port_closed_);
550     return HandleSignalsState();
551   }
552 
553   if (port_status.has_messages) {
554     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
555     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
556   }
557   if (port_status.receiving_messages)
558     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
559   if (!port_status.peer_closed) {
560     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
561     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
562     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
563   } else {
564     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
565   }
566   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
567   return rv;
568 }
569 
OnPortStatusChanged()570 void MessagePipeDispatcher::OnPortStatusChanged() {
571   DCHECK(RequestContext::current());
572 
573   base::AutoLock lock(signal_lock_);
574 
575   // We stop observing our port as soon as it's transferred, but this can race
576   // with events which are raised right before that happens. This is fine to
577   // ignore.
578   if (port_transferred_)
579     return;
580 
581 #if DCHECK_IS_ON()
582   ports::PortStatus port_status;
583   if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
584     if (port_status.has_messages) {
585       ports::ScopedMessage unused;
586       PeekSizeMessageFilter filter;
587       node_controller_->node()->GetMessage(port_, &unused, &filter);
588       DVLOG(4) << "New message detected on message pipe " << pipe_id_
589                << " endpoint " << endpoint_ << " [port=" << port_.name()
590                << "; size=" << filter.message_size() << "]";
591     }
592     if (port_status.peer_closed) {
593       DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
594                << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
595     }
596   }
597 #endif
598 
599   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
600 }
601 
602 }  // namespace edk
603 }  // namespace mojo
604