1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/memory/heapprofd_producer.h"
18
19 #include <inttypes.h>
20 #include <signal.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 #include "perfetto/base/file_utils.h"
26 #include "perfetto/base/string_utils.h"
27 #include "perfetto/base/thread_task_runner.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 #include "perfetto/tracing/core/trace_writer.h"
31 #include "perfetto/tracing/ipc/producer_ipc_client.h"
32
33 namespace perfetto {
34 namespace profiling {
35 namespace {
36 using ::perfetto::protos::pbzero::ProfilePacket;
37
38 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
39 constexpr size_t kUnwinderThreads = 5;
40 constexpr int kHeapprofdSignal = 36;
41
42 constexpr uint32_t kInitialConnectionBackoffMs = 100;
43 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
44
45 constexpr uint32_t kChildModeWatchdogPeriodMs = 10 * 1000;
46
47 constexpr uint64_t kDefaultShmemSize = 8 * 1048576; // ~8 MB
48 constexpr uint64_t kMaxShmemSize = 500 * 1048576; // ~500 MB
49
MakeClientConfiguration(const DataSourceConfig & cfg)50 ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
51 ClientConfiguration client_config;
52 client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
53 client_config.block_client = cfg.heapprofd_config().block_client();
54 return client_config;
55 }
56
MakeUnwindingWorkers(HeapprofdProducer * delegate,size_t n)57 std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
58 size_t n) {
59 std::vector<UnwindingWorker> ret;
60 for (size_t i = 0; i < n; ++i) {
61 ret.emplace_back(delegate, base::ThreadTaskRunner::CreateAndStart());
62 }
63 return ret;
64 }
65
ConfigTargetsProcess(const HeapprofdConfig & cfg,const Process & proc,const std::vector<std::string> & normalized_cmdlines)66 bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
67 const Process& proc,
68 const std::vector<std::string>& normalized_cmdlines) {
69 if (cfg.all())
70 return true;
71
72 const auto& pids = cfg.pid();
73 if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
74 pids.cend()) {
75 return true;
76 }
77
78 if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
79 proc.cmdline) != normalized_cmdlines.cend()) {
80 return true;
81 }
82 return false;
83 }
84
85 // Return largest n such that pow(2, n) < value.
Log2LessThan(uint64_t value)86 size_t Log2LessThan(uint64_t value) {
87 size_t i = 0;
88 while (value) {
89 i++;
90 value >>= 1;
91 }
92 return i;
93 }
94
95 } // namespace
96
97 const uint64_t LogHistogram::kMaxBucket = 0;
98
GetData()99 std::vector<std::pair<uint64_t, uint64_t>> LogHistogram::GetData() {
100 std::vector<std::pair<uint64_t, uint64_t>> data;
101 data.reserve(kBuckets);
102 for (size_t i = 0; i < kBuckets; ++i) {
103 if (i == kBuckets - 1)
104 data.emplace_back(kMaxBucket, values_[i]);
105 else
106 data.emplace_back(1 << i, values_[i]);
107 }
108 return data;
109 }
110
GetBucket(uint64_t value)111 size_t LogHistogram::GetBucket(uint64_t value) {
112 if (value == 0)
113 return 0;
114
115 size_t hibit = Log2LessThan(value);
116 if (hibit >= kBuckets)
117 return kBuckets - 1;
118 return hibit;
119 }
120
121 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
122 // thread.
HeapprofdProducer(HeapprofdMode mode,base::TaskRunner * task_runner)123 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
124 base::TaskRunner* task_runner)
125 : task_runner_(task_runner),
126 mode_(mode),
127 unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
128 socket_delegate_(this),
129 weak_factory_(this) {
130 if (mode == HeapprofdMode::kCentral) {
131 listening_socket_ = MakeListeningSocket();
132 }
133 }
134
~HeapprofdProducer()135 HeapprofdProducer::~HeapprofdProducer() {
136 // We only borrowed this from the environment variable.
137 // UnixSocket always owns the socket, so we need to manually release it
138 // here.
139 if (mode_ == HeapprofdMode::kCentral && bool(listening_socket_))
140 listening_socket_->ReleaseSocket().ReleaseFd().release();
141 }
142
MakeListeningSocket()143 std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeListeningSocket() {
144 const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
145 if (sock_fd == nullptr) {
146 unlink(kHeapprofdSocketFile);
147 return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_delegate_,
148 task_runner_);
149 }
150 char* end;
151 int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
152 if (*end != '\0')
153 PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
154 kHeapprofdSocketEnvVar);
155 return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_delegate_,
156 task_runner_);
157 }
158
SetTargetProcess(pid_t target_pid,std::string target_cmdline,base::ScopedFile inherited_socket)159 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
160 std::string target_cmdline,
161 base::ScopedFile inherited_socket) {
162 target_process_.pid = target_pid;
163 target_process_.cmdline = target_cmdline;
164 inherited_fd_ = std::move(inherited_socket);
165 }
166
AdoptTargetProcessSocket()167 void HeapprofdProducer::AdoptTargetProcessSocket() {
168 PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
169 auto socket = base::UnixSocket::AdoptConnected(
170 std::move(inherited_fd_), &socket_delegate_, task_runner_,
171 base::SockType::kStream);
172
173 HandleClientConnection(std::move(socket), target_process_);
174 }
175
OnConnect()176 void HeapprofdProducer::OnConnect() {
177 PERFETTO_DCHECK(state_ == kConnecting);
178 state_ = kConnected;
179 ResetConnectionBackoff();
180 PERFETTO_LOG("Connected to the service, mode [%s].",
181 mode_ == HeapprofdMode::kCentral ? "central" : "child");
182
183 DataSourceDescriptor desc;
184 desc.set_name(kHeapprofdDataSource);
185 endpoint_->RegisterDataSource(desc);
186 }
187
OnDisconnect()188 void HeapprofdProducer::OnDisconnect() {
189 PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
190 PERFETTO_LOG("Disconnected from tracing service");
191
192 // Do not attempt to reconnect if we're a process-private process, just quit.
193 if (mode_ == HeapprofdMode::kChild) {
194 TerminateProcess(/*exit_status=*/1); // does not return
195 }
196
197 // Central mode - attempt to reconnect.
198 auto weak_producer = weak_factory_.GetWeakPtr();
199 if (state_ == kConnected)
200 return task_runner_->PostTask([weak_producer] {
201 if (!weak_producer)
202 return;
203 weak_producer->Restart();
204 });
205
206 state_ = kNotConnected;
207 IncreaseConnectionBackoff();
208 task_runner_->PostDelayedTask(
209 [weak_producer] {
210 if (!weak_producer)
211 return;
212 weak_producer->ConnectService();
213 },
214 connection_backoff_ms_);
215 }
216
ConnectWithRetries(const char * socket_name)217 void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
218 PERFETTO_DCHECK(state_ == kNotStarted);
219 state_ = kNotConnected;
220
221 ResetConnectionBackoff();
222 producer_sock_name_ = socket_name;
223 ConnectService();
224 }
225
ConnectService()226 void HeapprofdProducer::ConnectService() {
227 SetProducerEndpoint(ProducerIPCClient::Connect(
228 producer_sock_name_, this, "android.heapprofd", task_runner_));
229 }
230
SetProducerEndpoint(std::unique_ptr<TracingService::ProducerEndpoint> endpoint)231 void HeapprofdProducer::SetProducerEndpoint(
232 std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
233 PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
234 state_ = kConnecting;
235 endpoint_ = std::move(endpoint);
236 }
237
IncreaseConnectionBackoff()238 void HeapprofdProducer::IncreaseConnectionBackoff() {
239 connection_backoff_ms_ *= 2;
240 if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
241 connection_backoff_ms_ = kMaxConnectionBackoffMs;
242 }
243
ResetConnectionBackoff()244 void HeapprofdProducer::ResetConnectionBackoff() {
245 connection_backoff_ms_ = kInitialConnectionBackoffMs;
246 }
247
Restart()248 void HeapprofdProducer::Restart() {
249 // We lost the connection with the tracing service. At this point we need
250 // to reset all the data sources. Trying to handle that manually is going to
251 // be error prone. What we do here is simply destroy the instance and
252 // recreate it again.
253
254 // Child mode producer should not attempt restarts. Note that this also means
255 // the rest of this method doesn't have to handle child-specific state.
256 if (mode_ == HeapprofdMode::kChild)
257 PERFETTO_FATAL("Attempting to restart a child mode producer.");
258
259 HeapprofdMode mode = mode_;
260 base::TaskRunner* task_runner = task_runner_;
261 const char* socket_name = producer_sock_name_;
262
263 // Invoke destructor and then the constructor again.
264 this->~HeapprofdProducer();
265 new (this) HeapprofdProducer(mode, task_runner);
266
267 ConnectWithRetries(socket_name);
268 }
269
ScheduleActiveDataSourceWatchdog()270 void HeapprofdProducer::ScheduleActiveDataSourceWatchdog() {
271 PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
272
273 // Post the first check after a delay, to let the freshly forked heapprofd
274 // to receive the active data sources from traced. The checks will reschedule
275 // themselves from that point onwards.
276 auto weak_producer = weak_factory_.GetWeakPtr();
277 task_runner_->PostDelayedTask(
278 [weak_producer]() {
279 if (!weak_producer)
280 return;
281 weak_producer->ActiveDataSourceWatchdogCheck();
282 },
283 kChildModeWatchdogPeriodMs);
284 }
285
ActiveDataSourceWatchdogCheck()286 void HeapprofdProducer::ActiveDataSourceWatchdogCheck() {
287 PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
288
289 // Fork mode heapprofd should be working on exactly one data source matching
290 // its target process.
291 if (data_sources_.empty()) {
292 PERFETTO_LOG(
293 "Child heapprofd exiting as it never received a data source for the "
294 "target process, or somehow lost/finished the task without exiting.");
295 TerminateProcess(/*exit_status=*/1);
296 } else {
297 // reschedule check.
298 auto weak_producer = weak_factory_.GetWeakPtr();
299 task_runner_->PostDelayedTask(
300 [weak_producer]() {
301 if (!weak_producer)
302 return;
303 weak_producer->ActiveDataSourceWatchdogCheck();
304 },
305 kChildModeWatchdogPeriodMs);
306 }
307 }
308
309 // TODO(rsavitski): would be cleaner to shut down the event loop instead
310 // (letting main exit). One test-friendly approach is to supply a shutdown
311 // callback in the constructor.
TerminateProcess(int exit_status)312 __attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
313 int exit_status) {
314 PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
315 exit(exit_status);
316 }
317
OnTracingSetup()318 void HeapprofdProducer::OnTracingSetup() {}
319
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)320 void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
321 const DataSourceConfig& cfg) {
322 PERFETTO_DLOG("Setting up data source.");
323 if (mode_ == HeapprofdMode::kChild && cfg.enable_extra_guardrails()) {
324 PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
325 return;
326 }
327
328 const HeapprofdConfig& heapprofd_config = cfg.heapprofd_config();
329 if (heapprofd_config.all() && !heapprofd_config.pid().empty())
330 PERFETTO_ELOG("No point setting all and pid");
331 if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
332 PERFETTO_ELOG("No point setting all and process_cmdline");
333
334 if (cfg.name() != kHeapprofdDataSource) {
335 PERFETTO_DLOG("Invalid data source name.");
336 return;
337 }
338
339 auto it = data_sources_.find(id);
340 if (it != data_sources_.end()) {
341 PERFETTO_DFATAL("Received duplicated data source instance id: %" PRIu64,
342 id);
343 return;
344 }
345
346 std::vector<std::string> normalized_cmdlines =
347 NormalizeCmdlines(heapprofd_config.process_cmdline());
348
349 // Child mode is only interested in the first data source matching the
350 // already-connected process.
351 if (mode_ == HeapprofdMode::kChild) {
352 if (!ConfigTargetsProcess(heapprofd_config, target_process_,
353 normalized_cmdlines)) {
354 PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
355 return;
356 }
357
358 if (!data_sources_.empty()) {
359 PERFETTO_LOG("Child mode skipping concurrent data source.");
360
361 // Manually write one ProfilePacket about the rejected session.
362 auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
363 auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
364 auto trace_packet = trace_writer->NewTracePacket();
365 trace_packet->set_timestamp(
366 static_cast<uint64_t>(base::GetBootTimeNs().count()));
367 auto profile_packet = trace_packet->set_profile_packet();
368 auto process_dump = profile_packet->add_process_dumps();
369 process_dump->set_pid(static_cast<uint64_t>(target_process_.pid));
370 process_dump->set_rejected_concurrent(true);
371 trace_packet->Finalize();
372 trace_writer->Flush();
373 return;
374 }
375 }
376
377 DataSource data_source;
378 data_source.id = id;
379 data_source.client_configuration = MakeClientConfiguration(cfg);
380 data_source.config = heapprofd_config;
381 auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
382 data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
383 data_source.normalized_cmdlines = std::move(normalized_cmdlines);
384
385 data_sources_.emplace(id, std::move(data_source));
386 PERFETTO_DLOG("Set up data source.");
387
388 if (mode_ == HeapprofdMode::kChild)
389 AdoptTargetProcessSocket();
390 }
391
IsPidProfiled(pid_t pid)392 bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
393 for (const auto& pair : data_sources_) {
394 const DataSource& ds = pair.second;
395 if (ds.process_states.find(pid) != ds.process_states.cend())
396 return true;
397 }
398 return false;
399 }
400
StartDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)401 void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
402 const DataSourceConfig& cfg) {
403 PERFETTO_DLOG("Start DataSource");
404 const HeapprofdConfig& heapprofd_config = cfg.heapprofd_config();
405
406 auto it = data_sources_.find(id);
407 if (it == data_sources_.end()) {
408 // This is expected in child heapprofd, where we reject uninteresting data
409 // sources in SetupDataSource.
410 if (mode_ == HeapprofdMode::kCentral) {
411 PERFETTO_DFATAL(
412 "Received invalid data source instance to start: %" PRIu64, id);
413 }
414 return;
415 }
416 DataSource& data_source = it->second;
417
418 // Central daemon - set system properties for any targets that start later,
419 // and signal already-running targets to start the profiling client.
420 if (mode_ == HeapprofdMode::kCentral) {
421 if (heapprofd_config.all())
422 data_source.properties.emplace_back(properties_.SetAll());
423
424 for (std::string cmdline : data_source.normalized_cmdlines)
425 data_source.properties.emplace_back(
426 properties_.SetProperty(std::move(cmdline)));
427
428 std::set<pid_t> pids;
429 if (heapprofd_config.all())
430 FindAllProfilablePids(&pids);
431 for (uint64_t pid : heapprofd_config.pid())
432 pids.emplace(static_cast<pid_t>(pid));
433
434 if (!data_source.normalized_cmdlines.empty())
435 FindPidsForCmdlines(data_source.normalized_cmdlines, &pids);
436
437 for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
438 pid_t pid = *pid_it;
439 if (IsPidProfiled(pid)) {
440 PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
441 static_cast<intmax_t>(pid));
442 data_source.rejected_pids.emplace(pid);
443 pid_it = pids.erase(pid_it);
444 continue;
445 }
446
447 PERFETTO_DLOG("Sending %d to %d", kHeapprofdSignal, pid);
448 if (kill(pid, kHeapprofdSignal) != 0) {
449 PERFETTO_DPLOG("kill");
450 }
451 ++pid_it;
452 }
453 data_source.signaled_pids = std::move(pids);
454 }
455
456 const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
457 uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
458 if (dump_interval) {
459 auto weak_producer = weak_factory_.GetWeakPtr();
460 task_runner_->PostDelayedTask(
461 [weak_producer, id, dump_interval] {
462 if (!weak_producer)
463 return;
464 weak_producer->DoContinuousDump(id, dump_interval);
465 },
466 continuous_dump_config.dump_phase_ms());
467 }
468 PERFETTO_DLOG("Started DataSource");
469 }
470
UnwinderForPID(pid_t pid)471 UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
472 return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
473 }
474
StopDataSource(DataSourceInstanceID id)475 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
476 auto it = data_sources_.find(id);
477 if (it == data_sources_.end()) {
478 if (mode_ == HeapprofdMode::kCentral)
479 PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
480 return;
481 }
482
483 DataSource& data_source = it->second;
484 for (const auto& pid_and_process_state : data_source.process_states) {
485 pid_t pid = pid_and_process_state.first;
486 UnwinderForPID(pid).PostDisconnectSocket(pid);
487 }
488
489 data_sources_.erase(it);
490
491 if (mode_ == HeapprofdMode::kChild)
492 TerminateProcess(/*exit_status=*/0); // does not return
493 }
494
DoContinuousDump(DataSourceInstanceID id,uint32_t dump_interval)495 void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
496 uint32_t dump_interval) {
497 if (!Dump(id, 0 /* flush_id */, false /* is_flush */))
498 return;
499 auto weak_producer = weak_factory_.GetWeakPtr();
500 task_runner_->PostDelayedTask(
501 [weak_producer, id, dump_interval] {
502 if (!weak_producer)
503 return;
504 weak_producer->DoContinuousDump(id, dump_interval);
505 },
506 dump_interval);
507 }
508
Dump(DataSourceInstanceID id,FlushRequestID flush_id,bool has_flush_id)509 bool HeapprofdProducer::Dump(DataSourceInstanceID id,
510 FlushRequestID flush_id,
511 bool has_flush_id) {
512 auto it = data_sources_.find(id);
513 if (it == data_sources_.end()) {
514 PERFETTO_LOG(
515 "Data source not found (harmless if using continuous_dump_config).");
516 return false;
517 }
518 DataSource& data_source = it->second;
519
520 DumpState dump_state(data_source.trace_writer.get(),
521 &data_source.next_index_);
522
523 for (pid_t rejected_pid : data_source.rejected_pids) {
524 ProfilePacket::ProcessHeapSamples* proto =
525 dump_state.current_profile_packet->add_process_dumps();
526 proto->set_pid(static_cast<uint64_t>(rejected_pid));
527 proto->set_rejected_concurrent(true);
528 }
529
530 for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
531 data_source.process_states) {
532 pid_t pid = pid_and_process_state.first;
533 ProcessState& process_state = pid_and_process_state.second;
534 HeapTracker& heap_tracker = process_state.heap_tracker;
535 bool from_startup =
536 data_source.signaled_pids.find(pid) == data_source.signaled_pids.cend();
537 auto new_heapsamples = [pid, from_startup, &process_state](
538 ProfilePacket::ProcessHeapSamples* proto) {
539 proto->set_pid(static_cast<uint64_t>(pid));
540 proto->set_from_startup(from_startup);
541 proto->set_disconnected(process_state.disconnected);
542 proto->set_buffer_overran(process_state.buffer_overran);
543 proto->set_buffer_corrupted(process_state.buffer_corrupted);
544 auto* stats = proto->set_stats();
545 stats->set_unwinding_errors(process_state.unwinding_errors);
546 stats->set_heap_samples(process_state.heap_samples);
547 stats->set_map_reparses(process_state.map_reparses);
548 stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
549 auto* unwinding_hist = stats->set_unwinding_time_us();
550 for (const auto& p : process_state.unwinding_time_us.GetData()) {
551 auto* bucket = unwinding_hist->add_buckets();
552 if (p.first == LogHistogram::kMaxBucket)
553 bucket->set_max_bucket(true);
554 else
555 bucket->set_upper_limit(p.first);
556 bucket->set_count(p.second);
557 }
558 };
559 heap_tracker.Dump(std::move(new_heapsamples), &dump_state);
560 }
561
562 for (GlobalCallstackTrie::Node* node : dump_state.callstacks_to_dump) {
563 // There need to be two separate loops over built_callstack because
564 // protozero cannot interleave different messages.
565 auto built_callstack = callsites_.BuildCallstack(node);
566 for (const Interned<Frame>& frame : built_callstack)
567 dump_state.WriteFrame(frame);
568 ProfilePacket::Callstack* callstack =
569 dump_state.current_profile_packet->add_callstacks();
570 callstack->set_id(node->id());
571 for (const Interned<Frame>& frame : built_callstack)
572 callstack->add_frame_ids(frame.id());
573 }
574
575 dump_state.current_trace_packet->Finalize();
576 if (has_flush_id) {
577 auto weak_producer = weak_factory_.GetWeakPtr();
578 auto callback = [weak_producer, flush_id] {
579 if (weak_producer)
580 return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
581 if (weak_producer)
582 return weak_producer->FinishDataSourceFlush(flush_id);
583 });
584 };
585 data_source.trace_writer->Flush(std::move(callback));
586 }
587 return true;
588 }
589
DumpAll()590 void HeapprofdProducer::DumpAll() {
591 for (const auto& id_and_data_source : data_sources_) {
592 if (!Dump(id_and_data_source.first, 0 /* flush_id */, false /* is_flush */))
593 PERFETTO_DLOG("Failed to dump %" PRIu64, id_and_data_source.first);
594 }
595 }
596
Flush(FlushRequestID flush_id,const DataSourceInstanceID * ids,size_t num_ids)597 void HeapprofdProducer::Flush(FlushRequestID flush_id,
598 const DataSourceInstanceID* ids,
599 size_t num_ids) {
600 if (num_ids == 0)
601 return;
602
603 size_t& flush_in_progress = flushes_in_progress_[flush_id];
604 PERFETTO_DCHECK(flush_in_progress == 0);
605 flush_in_progress = num_ids;
606 for (size_t i = 0; i < num_ids; ++i)
607 Dump(ids[i], flush_id, true);
608 }
609
FinishDataSourceFlush(FlushRequestID flush_id)610 void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
611 auto it = flushes_in_progress_.find(flush_id);
612 if (it == flushes_in_progress_.end()) {
613 PERFETTO_DFATAL("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
614 return;
615 }
616 size_t& flush_in_progress = it->second;
617 if (--flush_in_progress == 0) {
618 endpoint_->NotifyFlushComplete(flush_id);
619 flushes_in_progress_.erase(flush_id);
620 }
621 }
622
OnDisconnect(base::UnixSocket * self)623 void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
624 auto it = producer_->pending_processes_.find(self->peer_pid());
625 if (it == producer_->pending_processes_.end()) {
626 PERFETTO_DFATAL("Unexpected disconnect.");
627 return;
628 }
629
630 if (self == it->second.sock.get())
631 producer_->pending_processes_.erase(it);
632 }
633
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> new_connection)634 void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
635 base::UnixSocket*,
636 std::unique_ptr<base::UnixSocket> new_connection) {
637 Process peer_process;
638 peer_process.pid = new_connection->peer_pid();
639 if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
640 PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
641
642 producer_->HandleClientConnection(std::move(new_connection), peer_process);
643 }
644
OnDataAvailable(base::UnixSocket * self)645 void HeapprofdProducer::SocketDelegate::OnDataAvailable(
646 base::UnixSocket* self) {
647 auto it = producer_->pending_processes_.find(self->peer_pid());
648 if (it == producer_->pending_processes_.end()) {
649 PERFETTO_DFATAL("Unexpected data.");
650 return;
651 }
652
653 PendingProcess& pending_process = it->second;
654
655 base::ScopedFile fds[kHandshakeSize];
656 char buf[1];
657 self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
658
659 static_assert(kHandshakeSize == 2, "change if below.");
660 if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
661 auto ds_it =
662 producer_->data_sources_.find(pending_process.data_source_instance_id);
663 if (ds_it == producer_->data_sources_.end()) {
664 producer_->pending_processes_.erase(it);
665 return;
666 }
667
668 DataSource& data_source = ds_it->second;
669 data_source.process_states.emplace(self->peer_pid(),
670 &producer_->callsites_);
671
672 PERFETTO_DLOG("%d: Received FDs.", self->peer_pid());
673 int raw_fd = pending_process.shmem.fd();
674 // TODO(fmayer): Full buffer could deadlock us here.
675 self->Send(&data_source.client_configuration,
676 sizeof(data_source.client_configuration), &raw_fd, 1,
677 base::UnixSocket::BlockingMode::kBlocking);
678
679 UnwindingWorker::HandoffData handoff_data;
680 handoff_data.data_source_instance_id =
681 pending_process.data_source_instance_id;
682 handoff_data.sock = self->ReleaseSocket();
683 for (size_t i = 0; i < kHandshakeSize; ++i)
684 handoff_data.fds[i] = std::move(fds[i]);
685 handoff_data.shmem = std::move(pending_process.shmem);
686 handoff_data.client_config = data_source.client_configuration;
687
688 producer_->UnwinderForPID(self->peer_pid())
689 .PostHandoffSocket(std::move(handoff_data));
690 producer_->pending_processes_.erase(it);
691 } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
692 PERFETTO_DFATAL("%d: Received partial FDs.", self->peer_pid());
693 producer_->pending_processes_.erase(it);
694 } else {
695 PERFETTO_DLOG("%d: Received no FDs.", self->peer_pid());
696 }
697 }
698
GetDataSourceForProcess(const Process & proc)699 HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
700 const Process& proc) {
701 for (auto& ds_id_and_datasource : data_sources_) {
702 DataSource& ds = ds_id_and_datasource.second;
703 if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
704 return &ds;
705 }
706 return nullptr;
707 }
708
RecordOtherSourcesAsRejected(DataSource * active_ds,const Process & proc)709 void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
710 const Process& proc) {
711 for (auto& ds_id_and_datasource : data_sources_) {
712 DataSource& ds = ds_id_and_datasource.second;
713 if (&ds != active_ds &&
714 ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
715 ds.rejected_pids.emplace(proc.pid);
716 }
717 }
718
HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,Process process)719 void HeapprofdProducer::HandleClientConnection(
720 std::unique_ptr<base::UnixSocket> new_connection,
721 Process process) {
722 DataSource* data_source = GetDataSourceForProcess(process);
723 if (!data_source) {
724 PERFETTO_LOG("No data source found.");
725 return;
726 }
727 RecordOtherSourcesAsRejected(data_source, process);
728
729 uint64_t shmem_size = data_source->config.shmem_size_bytes();
730 if (!shmem_size)
731 shmem_size = kDefaultShmemSize;
732 if (shmem_size > kMaxShmemSize)
733 shmem_size = kMaxShmemSize;
734
735 auto shmem = SharedRingBuffer::Create(shmem_size);
736 if (!shmem || !shmem->is_valid()) {
737 PERFETTO_LOG("Failed to create shared memory.");
738 return;
739 }
740
741 pid_t peer_pid = new_connection->peer_pid();
742 if (peer_pid != process.pid) {
743 PERFETTO_DFATAL("Invalid PID connected.");
744 return;
745 }
746
747 PendingProcess pending_process;
748 pending_process.sock = std::move(new_connection);
749 pending_process.data_source_instance_id = data_source->id;
750 pending_process.shmem = std::move(*shmem);
751 pending_processes_.emplace(peer_pid, std::move(pending_process));
752 }
753
PostAllocRecord(AllocRecord alloc_rec)754 void HeapprofdProducer::PostAllocRecord(AllocRecord alloc_rec) {
755 // Once we can use C++14, this should be std::moved into the lambda instead.
756 AllocRecord* raw_alloc_rec = new AllocRecord(std::move(alloc_rec));
757 auto weak_this = weak_factory_.GetWeakPtr();
758 task_runner_->PostTask([weak_this, raw_alloc_rec] {
759 if (weak_this)
760 weak_this->HandleAllocRecord(std::move(*raw_alloc_rec));
761 delete raw_alloc_rec;
762 });
763 }
764
PostFreeRecord(FreeRecord free_rec)765 void HeapprofdProducer::PostFreeRecord(FreeRecord free_rec) {
766 // Once we can use C++14, this should be std::moved into the lambda instead.
767 FreeRecord* raw_free_rec = new FreeRecord(std::move(free_rec));
768 auto weak_this = weak_factory_.GetWeakPtr();
769 task_runner_->PostTask([weak_this, raw_free_rec] {
770 if (weak_this)
771 weak_this->HandleFreeRecord(std::move(*raw_free_rec));
772 delete raw_free_rec;
773 });
774 }
775
PostSocketDisconnected(DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)776 void HeapprofdProducer::PostSocketDisconnected(DataSourceInstanceID ds_id,
777 pid_t pid,
778 SharedRingBuffer::Stats stats) {
779 auto weak_this = weak_factory_.GetWeakPtr();
780 task_runner_->PostTask([weak_this, ds_id, pid, stats] {
781 if (weak_this)
782 weak_this->HandleSocketDisconnected(ds_id, pid, stats);
783 });
784 }
785
HandleAllocRecord(AllocRecord alloc_rec)786 void HeapprofdProducer::HandleAllocRecord(AllocRecord alloc_rec) {
787 const AllocMetadata& alloc_metadata = alloc_rec.alloc_metadata;
788 auto it = data_sources_.find(alloc_rec.data_source_instance_id);
789 if (it == data_sources_.end()) {
790 PERFETTO_LOG("Invalid data source in alloc record.");
791 return;
792 }
793
794 DataSource& ds = it->second;
795 auto process_state_it = ds.process_states.find(alloc_rec.pid);
796 if (process_state_it == ds.process_states.end()) {
797 PERFETTO_LOG("Invalid PID in alloc record.");
798 return;
799 }
800
801 const auto& prefixes = ds.config.skip_symbol_prefix();
802 if (!prefixes.empty()) {
803 for (FrameData& frame_data : alloc_rec.frames) {
804 const std::string& map = frame_data.frame.map_name;
805 if (std::find_if(prefixes.cbegin(), prefixes.cend(),
806 [&map](const std::string& prefix) {
807 return base::StartsWith(map, prefix);
808 }) != prefixes.cend()) {
809 frame_data.frame.function_name = "FILTERED";
810 }
811 }
812 }
813
814 ProcessState& process_state = process_state_it->second;
815 HeapTracker& heap_tracker = process_state.heap_tracker;
816
817 if (alloc_rec.error)
818 process_state.unwinding_errors++;
819 if (alloc_rec.reparsed_map)
820 process_state.map_reparses++;
821 process_state.heap_samples++;
822 process_state.unwinding_time_us.Add(alloc_rec.unwinding_time_us);
823 process_state.total_unwinding_time_us += alloc_rec.unwinding_time_us;
824
825 heap_tracker.RecordMalloc(alloc_rec.frames, alloc_metadata.alloc_address,
826 alloc_metadata.total_size,
827 alloc_metadata.sequence_number,
828 alloc_metadata.clock_monotonic_coarse_timestamp);
829 }
830
HandleFreeRecord(FreeRecord free_rec)831 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
832 const FreeBatch& free_batch = free_rec.free_batch;
833 auto it = data_sources_.find(free_rec.data_source_instance_id);
834 if (it == data_sources_.end()) {
835 PERFETTO_LOG("Invalid data source in free record.");
836 return;
837 }
838
839 DataSource& ds = it->second;
840 auto process_state_it = ds.process_states.find(free_rec.pid);
841 if (process_state_it == ds.process_states.end()) {
842 PERFETTO_LOG("Invalid PID in free record.");
843 return;
844 }
845
846 ProcessState& process_state = process_state_it->second;
847 HeapTracker& heap_tracker = process_state.heap_tracker;
848
849 const FreeBatchEntry* entries = free_batch.entries;
850 uint64_t num_entries = free_batch.num_entries;
851 if (num_entries > kFreeBatchSize) {
852 PERFETTO_DFATAL("Malformed free page.");
853 return;
854 }
855 for (size_t i = 0; i < num_entries; ++i) {
856 const FreeBatchEntry& entry = entries[i];
857 heap_tracker.RecordFree(entry.addr, entry.sequence_number,
858 free_batch.clock_monotonic_coarse_timestamp);
859 }
860 }
861
HandleSocketDisconnected(DataSourceInstanceID id,pid_t pid,SharedRingBuffer::Stats stats)862 void HeapprofdProducer::HandleSocketDisconnected(
863 DataSourceInstanceID id,
864 pid_t pid,
865 SharedRingBuffer::Stats stats) {
866 auto it = data_sources_.find(id);
867 if (it == data_sources_.end())
868 return;
869 DataSource& ds = it->second;
870
871 auto process_state_it = ds.process_states.find(pid);
872 if (process_state_it == ds.process_states.end())
873 return;
874 ProcessState& process_state = process_state_it->second;
875 process_state.disconnected = true;
876 process_state.buffer_overran = stats.num_writes_overflow > 0;
877 process_state.buffer_corrupted =
878 stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
879
880 // TODO(fmayer): Dump on process disconnect rather than data source
881 // destruction. This prevents us needing to hold onto the bookkeeping data
882 // after the process disconnected.
883 }
884
885 } // namespace profiling
886 } // namespace perfetto
887