• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "mojo/public/cpp/environment/environment.h"
13 
14 namespace IPC {
15 namespace internal {
16 
MessagePipeReader(mojo::ScopedMessagePipeHandle handle)17 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
18     : pipe_wait_id_(0),
19       pipe_(handle.Pass()) {
20   StartWaiting();
21 }
22 
~MessagePipeReader()23 MessagePipeReader::~MessagePipeReader() {
24   CHECK(!IsValid());
25 }
26 
Close()27 void MessagePipeReader::Close() {
28   StopWaiting();
29   pipe_.reset();
30   OnPipeClosed();
31 }
32 
CloseWithError(MojoResult error)33 void MessagePipeReader::CloseWithError(MojoResult error) {
34   OnPipeError(error);
35   Close();
36 }
37 
38 // static
InvokePipeIsReady(void * closure,MojoResult result)39 void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
40   reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
41 }
42 
StartWaiting()43 void MessagePipeReader::StartWaiting() {
44   DCHECK(pipe_.is_valid());
45   DCHECK(!pipe_wait_id_);
46   // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
47   // MessagePipe.
48   //
49   // TODO(morrita): Should we re-set the signal when we get new
50   // message to send?
51   pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
52       pipe_.get().value(),
53       MOJO_HANDLE_SIGNAL_READABLE,
54       MOJO_DEADLINE_INDEFINITE,
55       &InvokePipeIsReady,
56       this);
57 }
58 
StopWaiting()59 void MessagePipeReader::StopWaiting() {
60   if (!pipe_wait_id_)
61     return;
62   mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
63   pipe_wait_id_ = 0;
64 }
65 
PipeIsReady(MojoResult wait_result)66 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
67   pipe_wait_id_ = 0;
68 
69   if (wait_result != MOJO_RESULT_OK) {
70     if (wait_result != MOJO_RESULT_ABORTED) {
71       // FAILED_PRECONDITION happens every time the peer is dead so
72       // it isn't worth polluting the log message.
73       DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
74           << "Pipe got error from the waiter. Closing: "
75           << wait_result;
76       OnPipeError(wait_result);
77     }
78 
79     Close();
80     return;
81   }
82 
83   while (pipe_.is_valid()) {
84     MojoResult read_result = ReadMessageBytes();
85     if (read_result == MOJO_RESULT_SHOULD_WAIT)
86       break;
87     if (read_result != MOJO_RESULT_OK) {
88       // FAILED_PRECONDITION means that all the received messages
89       // got consumed and the peer is already closed.
90       if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
91         DLOG(WARNING)
92             << "Pipe got error from ReadMessage(). Closing: " << read_result;
93         OnPipeError(read_result);
94       }
95 
96       Close();
97       break;
98     }
99 
100     OnMessageReceived();
101   }
102 
103   if (pipe_.is_valid())
104     StartWaiting();
105 }
106 
ReadMessageBytes()107 MojoResult MessagePipeReader::ReadMessageBytes() {
108   DCHECK(handle_buffer_.empty());
109 
110   uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
111   uint32_t num_handles = 0;
112   MojoResult result = MojoReadMessage(pipe_.get().value(),
113                                       num_bytes ? &data_buffer_[0] : NULL,
114                                       &num_bytes,
115                                       NULL,
116                                       &num_handles,
117                                       MOJO_READ_MESSAGE_FLAG_NONE);
118   data_buffer_.resize(num_bytes);
119   handle_buffer_.resize(num_handles);
120   if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
121     // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
122     // it needs more bufer. So we re-read it with resized buffers.
123     result = MojoReadMessage(pipe_.get().value(),
124                              num_bytes ? &data_buffer_[0] : NULL,
125                              &num_bytes,
126                              num_handles ? &handle_buffer_[0] : NULL,
127                              &num_handles,
128                              MOJO_READ_MESSAGE_FLAG_NONE);
129   }
130 
131   DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
132   DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
133   return result;
134 }
135 
operator ()(MessagePipeReader * ptr) const136 void MessagePipeReader::DelayedDeleter::operator()(
137     MessagePipeReader* ptr) const {
138   ptr->Close();
139   base::MessageLoopProxy::current()->PostTask(
140       FROM_HERE, base::Bind(&DeleteNow, ptr));
141 }
142 
143 }  // namespace internal
144 }  // namespace IPC
145