• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_message_pipe_reader.h"
6 
7 #include <stdint.h>
8 
9 #include <utility>
10 
11 #include "base/containers/span.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/logging.h"
16 #include "base/memory/raw_ref.h"
17 #include "base/task/sequenced_task_runner.h"
18 #include "base/task/single_thread_task_runner.h"
19 #include "base/trace_event/trace_event.h"
20 #include "ipc/ipc_channel_mojo.h"
21 #include "mojo/public/cpp/bindings/message.h"
22 #include "mojo/public/cpp/bindings/thread_safe_proxy.h"
23 
24 namespace IPC {
25 namespace internal {
26 
27 namespace {
28 
29 class ThreadSafeProxy : public mojo::ThreadSafeProxy {
30  public:
31   using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
32 
ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,Forwarder forwarder,mojo::AssociatedGroupController & group_controller)33   ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,
34                   Forwarder forwarder,
35                   mojo::AssociatedGroupController& group_controller)
36       : task_runner_(std::move(task_runner)),
37         forwarder_(std::move(forwarder)),
38         group_controller_(group_controller) {}
39 
40   // mojo::ThreadSafeProxy:
SendMessage(mojo::Message & message)41   void SendMessage(mojo::Message& message) override {
42     message.SerializeHandles(&*group_controller_);
43     task_runner_->PostTask(FROM_HERE,
44                            base::BindOnce(forwarder_, std::move(message)));
45   }
46 
SendMessageWithResponder(mojo::Message & message,std::unique_ptr<mojo::MessageReceiver> responder)47   void SendMessageWithResponder(
48       mojo::Message& message,
49       std::unique_ptr<mojo::MessageReceiver> responder) override {
50     // We don't bother supporting this because it's not used in practice.
51     NOTREACHED();
52   }
53 
54  private:
55   ~ThreadSafeProxy() override = default;
56 
57   const scoped_refptr<base::SequencedTaskRunner> task_runner_;
58   const Forwarder forwarder_;
59   const raw_ref<mojo::AssociatedGroupController> group_controller_;
60 };
61 
62 }  // namespace
63 
MessagePipeReader(mojo::MessagePipeHandle pipe,mojo::PendingAssociatedRemote<mojom::Channel> sender,mojo::PendingAssociatedReceiver<mojom::Channel> receiver,scoped_refptr<base::SequencedTaskRunner> task_runner,MessagePipeReader::Delegate * delegate)64 MessagePipeReader::MessagePipeReader(
65     mojo::MessagePipeHandle pipe,
66     mojo::PendingAssociatedRemote<mojom::Channel> sender,
67     mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
68     scoped_refptr<base::SequencedTaskRunner> task_runner,
69     MessagePipeReader::Delegate* delegate)
70     : delegate_(delegate),
71       sender_(std::move(sender), task_runner),
72       receiver_(this, std::move(receiver), task_runner) {
73   thread_safe_sender_ =
74       std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
75           base::MakeRefCounted<ThreadSafeProxy>(
76               task_runner,
77               base::BindRepeating(&MessagePipeReader::ForwardMessage,
78                                   weak_ptr_factory_.GetWeakPtr()),
79               *sender_.internal_state()->associated_group()->GetController()));
80 
81   thread_checker_.DetachFromThread();
82 }
83 
~MessagePipeReader()84 MessagePipeReader::~MessagePipeReader() {
85   DCHECK(thread_checker_.CalledOnValidThread());
86   // The pipe should be closed before deletion.
87 }
88 
FinishInitializationOnIOThread(base::ProcessId self_pid)89 void MessagePipeReader::FinishInitializationOnIOThread(
90     base::ProcessId self_pid) {
91   sender_.set_disconnect_handler(
92       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
93                      MOJO_RESULT_FAILED_PRECONDITION));
94   receiver_.set_disconnect_handler(
95       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
96                      MOJO_RESULT_FAILED_PRECONDITION));
97 
98   sender_->SetPeerPid(self_pid);
99 }
100 
Close()101 void MessagePipeReader::Close() {
102   DCHECK(thread_checker_.CalledOnValidThread());
103   sender_.reset();
104   if (receiver_.is_bound())
105     receiver_.reset();
106 }
107 
Send(std::unique_ptr<Message> message)108 bool MessagePipeReader::Send(std::unique_ptr<Message> message) {
109   CHECK(message->IsValid());
110   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send",
111                          message->flags(), TRACE_EVENT_FLAG_FLOW_OUT);
112   absl::optional<std::vector<mojo::native::SerializedHandlePtr>> handles;
113   MojoResult result = MOJO_RESULT_OK;
114   result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
115   if (result != MOJO_RESULT_OK)
116     return false;
117 
118   if (!sender_)
119     return false;
120 
121   base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()),
122                                   message->size());
123   sender_->Receive(MessageView(bytes, std::move(handles)));
124   DVLOG(4) << "Send " << message->type() << ": " << message->size();
125   return true;
126 }
127 
GetRemoteInterface(mojo::GenericPendingAssociatedReceiver receiver)128 void MessagePipeReader::GetRemoteInterface(
129     mojo::GenericPendingAssociatedReceiver receiver) {
130   if (!sender_.is_bound())
131     return;
132   sender_->GetAssociatedInterface(std::move(receiver));
133 }
134 
SetPeerPid(int32_t peer_pid)135 void MessagePipeReader::SetPeerPid(int32_t peer_pid) {
136   delegate_->OnPeerPidReceived(peer_pid);
137 }
138 
Receive(MessageView message_view)139 void MessagePipeReader::Receive(MessageView message_view) {
140   if (message_view.bytes().empty()) {
141     delegate_->OnBrokenDataReceived();
142     return;
143   }
144   Message message(reinterpret_cast<const char*>(message_view.bytes().data()),
145                   message_view.bytes().size());
146   if (!message.IsValid()) {
147     delegate_->OnBrokenDataReceived();
148     return;
149   }
150 
151   DVLOG(4) << "Receive " << message.type() << ": " << message.size();
152   MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(
153       message_view.TakeHandles(), &message);
154   if (write_result != MOJO_RESULT_OK) {
155     OnPipeError(write_result);
156     return;
157   }
158 
159   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive",
160                          message.flags(), TRACE_EVENT_FLAG_FLOW_IN);
161   delegate_->OnMessageReceived(message);
162 }
163 
GetAssociatedInterface(mojo::GenericPendingAssociatedReceiver receiver)164 void MessagePipeReader::GetAssociatedInterface(
165     mojo::GenericPendingAssociatedReceiver receiver) {
166   DCHECK(thread_checker_.CalledOnValidThread());
167   if (delegate_)
168     delegate_->OnAssociatedInterfaceRequest(std::move(receiver));
169 }
170 
OnPipeError(MojoResult error)171 void MessagePipeReader::OnPipeError(MojoResult error) {
172   DCHECK(thread_checker_.CalledOnValidThread());
173 
174   Close();
175 
176   // NOTE: The delegate call below may delete |this|.
177   if (delegate_)
178     delegate_->OnPipeError();
179 }
180 
ForwardMessage(mojo::Message message)181 void MessagePipeReader::ForwardMessage(mojo::Message message) {
182   sender_.internal_state()->ForwardMessage(std::move(message));
183 }
184 
185 }  // namespace internal
186 }  // namespace IPC
187