• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifdef UNSAFE_BUFFERS_BUILD
6 // TODO(crbug.com/351564777): Remove this and convert code to safer constructs.
7 #pragma allow_unsafe_buffers
8 #endif
9 
10 #include "ipc/ipc_message_pipe_reader.h"
11 
12 #include <stdint.h>
13 
14 #include <utility>
15 
16 #include "base/containers/span.h"
17 #include "base/functional/bind.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/location.h"
20 #include "base/logging.h"
21 #include "base/memory/raw_ref.h"
22 #include "base/task/sequenced_task_runner.h"
23 #include "base/task/single_thread_task_runner.h"
24 #include "base/trace_event/trace_event.h"
25 #include "ipc/ipc_channel_mojo.h"
26 #include "mojo/public/cpp/bindings/message.h"
27 #include "mojo/public/cpp/bindings/thread_safe_proxy.h"
28 
29 namespace IPC {
30 namespace internal {
31 
32 namespace {
33 
34 class ThreadSafeProxy : public mojo::ThreadSafeProxy {
35  public:
36   using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
37 
ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,Forwarder forwarder,mojo::AssociatedGroupController & group_controller)38   ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,
39                   Forwarder forwarder,
40                   mojo::AssociatedGroupController& group_controller)
41       : task_runner_(std::move(task_runner)),
42         forwarder_(std::move(forwarder)),
43         group_controller_(group_controller) {}
44 
45   // mojo::ThreadSafeProxy:
SendMessage(mojo::Message & message)46   void SendMessage(mojo::Message& message) override {
47     message.SerializeHandles(&*group_controller_);
48     task_runner_->PostTask(FROM_HERE,
49                            base::BindOnce(forwarder_, std::move(message)));
50   }
51 
SendMessageWithResponder(mojo::Message & message,std::unique_ptr<mojo::MessageReceiver> responder)52   void SendMessageWithResponder(
53       mojo::Message& message,
54       std::unique_ptr<mojo::MessageReceiver> responder) override {
55     // We don't bother supporting this because it's not used in practice.
56     NOTREACHED();
57   }
58 
59  private:
60   ~ThreadSafeProxy() override = default;
61 
62   const scoped_refptr<base::SequencedTaskRunner> task_runner_;
63   const Forwarder forwarder_;
64   const raw_ref<mojo::AssociatedGroupController> group_controller_;
65 };
66 
67 }  // namespace
68 
MessagePipeReader(mojo::MessagePipeHandle pipe,mojo::PendingAssociatedRemote<mojom::Channel> sender,mojo::PendingAssociatedReceiver<mojom::Channel> receiver,scoped_refptr<base::SequencedTaskRunner> task_runner,MessagePipeReader::Delegate * delegate)69 MessagePipeReader::MessagePipeReader(
70     mojo::MessagePipeHandle pipe,
71     mojo::PendingAssociatedRemote<mojom::Channel> sender,
72     mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
73     scoped_refptr<base::SequencedTaskRunner> task_runner,
74     MessagePipeReader::Delegate* delegate)
75     : delegate_(delegate),
76       sender_(std::move(sender), task_runner),
77       receiver_(this, std::move(receiver), task_runner) {
78   thread_safe_sender_ =
79       std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
80           base::MakeRefCounted<ThreadSafeProxy>(
81               task_runner,
82               base::BindRepeating(&MessagePipeReader::ForwardMessage,
83                                   weak_ptr_factory_.GetWeakPtr()),
84               *sender_.internal_state()->associated_group()->GetController()));
85 
86   thread_checker_.DetachFromThread();
87 }
88 
~MessagePipeReader()89 MessagePipeReader::~MessagePipeReader() {
90   DCHECK(thread_checker_.CalledOnValidThread());
91   // The pipe should be closed before deletion.
92 }
93 
FinishInitializationOnIOThread(base::ProcessId self_pid)94 void MessagePipeReader::FinishInitializationOnIOThread(
95     base::ProcessId self_pid) {
96   sender_.set_disconnect_handler(
97       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
98                      MOJO_RESULT_FAILED_PRECONDITION));
99   receiver_.set_disconnect_handler(
100       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
101                      MOJO_RESULT_FAILED_PRECONDITION));
102 
103   sender_->SetPeerPid(self_pid);
104 }
105 
Close()106 void MessagePipeReader::Close() {
107   DCHECK(thread_checker_.CalledOnValidThread());
108   sender_.reset();
109   if (receiver_.is_bound())
110     receiver_.reset();
111 }
112 
Send(std::unique_ptr<Message> message)113 bool MessagePipeReader::Send(std::unique_ptr<Message> message) {
114   CHECK(message->IsValid());
115   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send",
116                          message->flags(), TRACE_EVENT_FLAG_FLOW_OUT);
117   std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles;
118   MojoResult result = MOJO_RESULT_OK;
119   result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
120   if (result != MOJO_RESULT_OK)
121     return false;
122 
123   if (!sender_)
124     return false;
125 
126   base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()),
127                                   message->size());
128   sender_->Receive(MessageView(bytes, std::move(handles)));
129   DVLOG(4) << "Send " << message->type() << ": " << message->size();
130   return true;
131 }
132 
GetRemoteInterface(mojo::GenericPendingAssociatedReceiver receiver)133 void MessagePipeReader::GetRemoteInterface(
134     mojo::GenericPendingAssociatedReceiver receiver) {
135   if (!sender_.is_bound())
136     return;
137   sender_->GetAssociatedInterface(std::move(receiver));
138 }
139 
SetPeerPid(int32_t peer_pid)140 void MessagePipeReader::SetPeerPid(int32_t peer_pid) {
141   delegate_->OnPeerPidReceived(peer_pid);
142 }
143 
Receive(MessageView message_view)144 void MessagePipeReader::Receive(MessageView message_view) {
145   if (message_view.bytes().empty()) {
146     delegate_->OnBrokenDataReceived();
147     return;
148   }
149   Message message(reinterpret_cast<const char*>(message_view.bytes().data()),
150                   message_view.bytes().size());
151   if (!message.IsValid()) {
152     delegate_->OnBrokenDataReceived();
153     return;
154   }
155 
156   DVLOG(4) << "Receive " << message.type() << ": " << message.size();
157   MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(
158       message_view.TakeHandles(), &message);
159   if (write_result != MOJO_RESULT_OK) {
160     OnPipeError(write_result);
161     return;
162   }
163 
164   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive",
165                          message.flags(), TRACE_EVENT_FLAG_FLOW_IN);
166   delegate_->OnMessageReceived(message);
167 }
168 
GetAssociatedInterface(mojo::GenericPendingAssociatedReceiver receiver)169 void MessagePipeReader::GetAssociatedInterface(
170     mojo::GenericPendingAssociatedReceiver receiver) {
171   DCHECK(thread_checker_.CalledOnValidThread());
172   if (delegate_)
173     delegate_->OnAssociatedInterfaceRequest(std::move(receiver));
174 }
175 
OnPipeError(MojoResult error)176 void MessagePipeReader::OnPipeError(MojoResult error) {
177   DCHECK(thread_checker_.CalledOnValidThread());
178 
179   Close();
180 
181   // NOTE: The delegate call below may delete |this|.
182   if (delegate_)
183     delegate_->OnPipeError();
184 }
185 
ForwardMessage(mojo::Message message)186 void MessagePipeReader::ForwardMessage(mojo::Message message) {
187   sender_.internal_state()->ForwardMessage(std::move(message));
188 }
189 
190 }  // namespace internal
191 }  // namespace IPC
192