1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/memory/heapprofd_producer.h"
18
19 #include "perfetto/base/proc_utils.h"
20 #include "perfetto/ext/base/thread_task_runner.h"
21 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
22 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
23 #include "protos/perfetto/common/data_source_descriptor.gen.h"
24 #include "protos/perfetto/config/trace_config.gen.h"
25 #include "src/base/test/test_task_runner.h"
26 #include "src/base/test/tmp_dir_tree.h"
27 #include "src/profiling/memory/client.h"
28 #include "src/profiling/memory/unhooked_allocator.h"
29 #include "src/tracing/test/mock_consumer.h"
30 #include "test/gtest_and_gmock.h"
31
32 namespace perfetto {
33 namespace profiling {
34 namespace {
35
36 using ::testing::NiceMock;
37 using ::testing::NotNull;
38
39 class TracingServiceThread {
40 public:
TracingServiceThread(const std::string & producer_socket,const std::string & consumer_socket)41 TracingServiceThread(const std::string& producer_socket,
42 const std::string& consumer_socket)
43 : runner_(base::ThreadTaskRunner::CreateAndStart("perfetto.svc")),
44 producer_socket_(producer_socket),
45 consumer_socket_(consumer_socket) {
46 runner_.PostTaskAndWaitForTesting([this]() {
47 svc_ = ServiceIPCHost::CreateInstance(&runner_);
48 bool res =
49 svc_->Start(producer_socket_.c_str(), consumer_socket_.c_str());
50 if (!res) {
51 PERFETTO_FATAL("Failed to start service listening on %s and %s",
52 producer_socket_.c_str(), consumer_socket_.c_str());
53 }
54 });
55 }
56
~TracingServiceThread()57 ~TracingServiceThread() {
58 runner_.PostTaskAndWaitForTesting([this]() { svc_.reset(); });
59 }
60
producer_socket() const61 const std::string& producer_socket() const { return producer_socket_; }
consumer_socket() const62 const std::string& consumer_socket() const { return consumer_socket_; }
63
64 private:
65 base::ThreadTaskRunner runner_;
66
67 std::string producer_socket_;
68 std::string consumer_socket_;
69 std::unique_ptr<ServiceIPCHost> svc_;
70 };
71
72 class HeapprofdThread {
73 public:
HeapprofdThread(const std::string & producer_socket,const std::string & heapprofd_socket)74 HeapprofdThread(const std::string& producer_socket,
75 const std::string& heapprofd_socket)
76 : runner_(base::ThreadTaskRunner::CreateAndStart("heapprofd.svc")),
77 producer_socket_(producer_socket),
78 heapprofd_socket_(heapprofd_socket) {
79 runner_.PostTaskAndWaitForTesting([this]() {
80 heapprofd_.reset(new HeapprofdProducer(HeapprofdMode::kCentral, &runner_,
81 /* exit_when_done= */ false));
82
83 heapprofd_->ConnectWithRetries(producer_socket_.c_str());
84 listen_sock_ = base::UnixSocket::Listen(
85 heapprofd_socket_.c_str(), &heapprofd_->socket_delegate(), &runner_,
86 base::SockFamily::kUnix, base::SockType::kStream);
87 EXPECT_THAT(listen_sock_, NotNull());
88 });
89 }
90
Sync()91 void Sync() {
92 runner_.PostTaskAndWaitForTesting([]() {});
93 }
94
~HeapprofdThread()95 ~HeapprofdThread() {
96 runner_.PostTaskAndWaitForTesting([this]() {
97 listen_sock_.reset();
98 heapprofd_.reset();
99 });
100 }
101
producer_socket() const102 const std::string& producer_socket() const { return producer_socket_; }
heapprofd_socket() const103 const std::string& heapprofd_socket() const { return heapprofd_socket_; }
104
105 private:
106 base::ThreadTaskRunner runner_;
107
108 std::string producer_socket_;
109 std::string heapprofd_socket_;
110 std::unique_ptr<HeapprofdProducer> heapprofd_;
111 std::unique_ptr<base::UnixSocket> listen_sock_;
112 };
113
114 class TraceConsumer {
115 public:
TraceConsumer(base::TestTaskRunner * runner,std::string socket)116 explicit TraceConsumer(base::TestTaskRunner* runner, std::string socket)
117 : socket_(std::move(socket)), consumer_(runner) {
118 consumer_.Connect(
119 ConsumerIPCClient::Connect(socket_.c_str(), &consumer_, runner));
120 }
121
consumer()122 NiceMock<MockConsumer>& consumer() { return consumer_; }
123
124 private:
125 // consumer_ refers to socket_.
126 const std::string socket_;
127 NiceMock<MockConsumer> consumer_;
128 };
129
MakeTraceConfig()130 TraceConfig MakeTraceConfig() {
131 TraceConfig trace_config;
132 trace_config.add_buffers()->set_size_kb(10 * 1024);
133
134 auto* ds_config = trace_config.add_data_sources()->mutable_config();
135 ds_config->set_name("android.heapprofd");
136 ds_config->set_target_buffer(0);
137
138 protos::gen::HeapprofdConfig heapprofd_config;
139 heapprofd_config.set_sampling_interval_bytes(1);
140 heapprofd_config.add_pid(static_cast<uint64_t>(base::GetProcessId()));
141 heapprofd_config.set_all_heaps(true);
142 heapprofd_config.set_no_startup(true);
143 heapprofd_config.set_no_running(true);
144 ds_config->set_heapprofd_config_raw(heapprofd_config.SerializeAsString());
145 return trace_config;
146 }
147
WaitFor(std::function<bool ()> predicate,long long timeout_ms=40000)148 bool WaitFor(std::function<bool()> predicate, long long timeout_ms = 40000) {
149 long long deadline_ms = base::GetWallTimeMs().count() + timeout_ms;
150 while (base::GetWallTimeMs().count() < deadline_ms) {
151 if (predicate())
152 return true;
153 base::SleepMicroseconds(100 * 1000); // 0.1 s.
154 }
155 return false;
156 }
157
WaitForDsRegistered(MockConsumer * mock_consumer,const std::string & ds_name)158 bool WaitForDsRegistered(MockConsumer* mock_consumer,
159 const std::string& ds_name) {
160 return WaitFor([mock_consumer, &ds_name]() {
161 auto dss = mock_consumer->QueryServiceState().data_sources();
162 return std::any_of(dss.begin(), dss.end(),
163 [&](const TracingServiceState::DataSource& ds) {
164 return ds.ds_descriptor().name() == ds_name;
165 });
166 });
167 }
168
169 class HeapprofdProducerIntegrationTest : public testing::Test {
170 protected:
171 static constexpr char kProducerSock[] = "producer.sock";
172 static constexpr char kConsumerSock[] = "consumer.sock";
173 static constexpr char kHeapprofdSock[] = "heapprofd.sock";
174
ProducerSockPath() const175 std::string ProducerSockPath() const {
176 return tmpdir_.AbsolutePath(kProducerSock);
177 }
178
ConsumerSockPath() const179 std::string ConsumerSockPath() const {
180 return tmpdir_.AbsolutePath(kConsumerSock);
181 }
182
HeapprofdSockPath() const183 std::string HeapprofdSockPath() const {
184 return tmpdir_.AbsolutePath(kHeapprofdSock);
185 }
186
SetUp()187 void SetUp() override {
188 tmpdir_.TrackFile(kProducerSock);
189 tmpdir_.TrackFile(kConsumerSock);
190 StartTracingService();
191
192 tmpdir_.TrackFile(kHeapprofdSock);
193 heapprofd_service_.emplace(ProducerSockPath(), HeapprofdSockPath());
194 }
195
StartTracingService()196 void StartTracingService() {
197 tracing_service_.emplace(ProducerSockPath(), ConsumerSockPath());
198 }
199
200 // Waits for the heapprofd data source to be registered and starts a trace
201 // with it.
StartHeapprofdTrace(TraceConfig cfg)202 std::unique_ptr<TraceConsumer> StartHeapprofdTrace(TraceConfig cfg) {
203 auto trace_consumer =
204 std::make_unique<TraceConsumer>(&task_runner_, ConsumerSockPath());
205
206 if (WaitForDsRegistered(&trace_consumer->consumer(), "android.heapprofd") ==
207 false) {
208 ADD_FAILURE();
209 return nullptr;
210 }
211
212 trace_consumer->consumer().ObserveEvents(
213 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
214 trace_consumer->consumer().EnableTracing(cfg);
215 trace_consumer->consumer().WaitForObservableEvents();
216
217 return trace_consumer;
218 }
219
CreateHeapprofdClient() const220 std::shared_ptr<Client> CreateHeapprofdClient() const {
221 std::optional<base::UnixSocketRaw> client_sock =
222 perfetto::profiling::Client::ConnectToHeapprofd(HeapprofdSockPath());
223 if (!client_sock.has_value()) {
224 return nullptr;
225 }
226
227 return perfetto::profiling::Client::CreateAndHandshake(
228 std::move(client_sock.value()),
229 UnhookedAllocator<perfetto::profiling::Client>(malloc, free));
230 }
231
232 base::TmpDirTree tmpdir_;
233 base::TestTaskRunner task_runner_;
234 std::optional<TracingServiceThread> tracing_service_;
235 std::optional<HeapprofdThread> heapprofd_service_;
236 };
237
TEST_F(HeapprofdProducerIntegrationTest,Restart)238 TEST_F(HeapprofdProducerIntegrationTest, Restart) {
239 std::unique_ptr<TraceConsumer> consumer =
240 StartHeapprofdTrace(MakeTraceConfig());
241 ASSERT_THAT(consumer, NotNull());
242
243 std::shared_ptr<Client> client = CreateHeapprofdClient();
244 ASSERT_THAT(client, NotNull());
245
246 // Shutdown tracing service. This should cause HeapprofdProducer::Restart() to
247 // be executed on the heapprofd thread.
248 tracing_service_.reset();
249 // Wait for the effects of the tracing service disconnect to propagate to the
250 // heapprofd thread.
251 heapprofd_service_->Sync();
252
253 consumer->consumer().ForceDisconnect();
254 consumer.reset();
255
256 task_runner_.RunUntilIdle();
257
258 // Start tracing service again. Heapprofd should reconnect.
259 ASSERT_EQ(remove(ProducerSockPath().c_str()), 0);
260 ASSERT_EQ(remove(ConsumerSockPath().c_str()), 0);
261 StartTracingService();
262
263 consumer = StartHeapprofdTrace(MakeTraceConfig());
264 ASSERT_THAT(consumer, NotNull());
265
266 consumer->consumer().ForceDisconnect();
267 consumer.reset();
268 }
269
270 } // namespace
271 } // namespace profiling
272 } // namespace perfetto
273