1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "mojo/public/cpp/environment/environment.h"
13
14 namespace IPC {
15 namespace internal {
16
MessagePipeReader(mojo::ScopedMessagePipeHandle handle)17 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
18 : pipe_wait_id_(0),
19 pipe_(handle.Pass()) {
20 StartWaiting();
21 }
22
~MessagePipeReader()23 MessagePipeReader::~MessagePipeReader() {
24 CHECK(!IsValid());
25 }
26
Close()27 void MessagePipeReader::Close() {
28 StopWaiting();
29 pipe_.reset();
30 OnPipeClosed();
31 }
32
CloseWithError(MojoResult error)33 void MessagePipeReader::CloseWithError(MojoResult error) {
34 OnPipeError(error);
35 Close();
36 }
37
38 // static
InvokePipeIsReady(void * closure,MojoResult result)39 void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
40 reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
41 }
42
StartWaiting()43 void MessagePipeReader::StartWaiting() {
44 DCHECK(pipe_.is_valid());
45 DCHECK(!pipe_wait_id_);
46 // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
47 // MessagePipe.
48 //
49 // TODO(morrita): Should we re-set the signal when we get new
50 // message to send?
51 pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
52 pipe_.get().value(),
53 MOJO_HANDLE_SIGNAL_READABLE,
54 MOJO_DEADLINE_INDEFINITE,
55 &InvokePipeIsReady,
56 this);
57 }
58
StopWaiting()59 void MessagePipeReader::StopWaiting() {
60 if (!pipe_wait_id_)
61 return;
62 mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
63 pipe_wait_id_ = 0;
64 }
65
PipeIsReady(MojoResult wait_result)66 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
67 pipe_wait_id_ = 0;
68
69 if (wait_result != MOJO_RESULT_OK) {
70 if (wait_result != MOJO_RESULT_ABORTED) {
71 // FAILED_PRECONDITION happens every time the peer is dead so
72 // it isn't worth polluting the log message.
73 DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
74 << "Pipe got error from the waiter. Closing: "
75 << wait_result;
76 OnPipeError(wait_result);
77 }
78
79 Close();
80 return;
81 }
82
83 while (pipe_.is_valid()) {
84 MojoResult read_result = ReadMessageBytes();
85 if (read_result == MOJO_RESULT_SHOULD_WAIT)
86 break;
87 if (read_result != MOJO_RESULT_OK) {
88 // FAILED_PRECONDITION means that all the received messages
89 // got consumed and the peer is already closed.
90 if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
91 DLOG(WARNING)
92 << "Pipe got error from ReadMessage(). Closing: " << read_result;
93 OnPipeError(read_result);
94 }
95
96 Close();
97 break;
98 }
99
100 OnMessageReceived();
101 }
102
103 if (pipe_.is_valid())
104 StartWaiting();
105 }
106
ReadMessageBytes()107 MojoResult MessagePipeReader::ReadMessageBytes() {
108 DCHECK(handle_buffer_.empty());
109
110 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
111 uint32_t num_handles = 0;
112 MojoResult result = MojoReadMessage(pipe_.get().value(),
113 num_bytes ? &data_buffer_[0] : NULL,
114 &num_bytes,
115 NULL,
116 &num_handles,
117 MOJO_READ_MESSAGE_FLAG_NONE);
118 data_buffer_.resize(num_bytes);
119 handle_buffer_.resize(num_handles);
120 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
121 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
122 // it needs more bufer. So we re-read it with resized buffers.
123 result = MojoReadMessage(pipe_.get().value(),
124 num_bytes ? &data_buffer_[0] : NULL,
125 &num_bytes,
126 num_handles ? &handle_buffer_[0] : NULL,
127 &num_handles,
128 MOJO_READ_MESSAGE_FLAG_NONE);
129 }
130
131 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
132 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
133 return result;
134 }
135
operator ()(MessagePipeReader * ptr) const136 void MessagePipeReader::DelayedDeleter::operator()(
137 MessagePipeReader* ptr) const {
138 ptr->Close();
139 base::MessageLoopProxy::current()->PostTask(
140 FROM_HERE, base::Bind(&DeleteNow, ptr));
141 }
142
143 } // namespace internal
144 } // namespace IPC
145