1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/memory/heapprofd_producer.h"
18
19 #include <signal.h>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23
24 #include <algorithm>
25 #include <cinttypes>
26 #include <functional>
27 #include <optional>
28 #include <string>
29
30 #include "perfetto/base/compiler.h"
31 #include "perfetto/base/logging.h"
32 #include "perfetto/ext/base/file_utils.h"
33 #include "perfetto/ext/base/string_splitter.h"
34 #include "perfetto/ext/base/string_utils.h"
35 #include "perfetto/ext/base/thread_task_runner.h"
36 #include "perfetto/ext/base/watchdog_posix.h"
37 #include "perfetto/ext/tracing/core/basic_types.h"
38 #include "perfetto/ext/tracing/core/trace_writer.h"
39 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40 #include "perfetto/tracing/core/data_source_config.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "perfetto/tracing/core/forward_decls.h"
43 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
44 #include "src/profiling/common/producer_support.h"
45 #include "src/profiling/common/profiler_guardrails.h"
46 #include "src/profiling/memory/shared_ring_buffer.h"
47 #include "src/profiling/memory/unwound_messages.h"
48 #include "src/profiling/memory/wire_protocol.h"
49
50 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
51 #include <sys/system_properties.h>
52 #endif
53
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57 using ::perfetto::protos::pbzero::ProfilePacket;
58
59 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
60 constexpr size_t kUnwinderThreads = 5;
61
62 constexpr uint32_t kInitialConnectionBackoffMs = 100;
63 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
64 constexpr uint32_t kGuardrailIntervalMs = 30 * 1000;
65
66 constexpr uint64_t kDefaultShmemSize = 8 * 1048576; // ~8 MB
67 constexpr uint64_t kMaxShmemSize = 500 * 1048576; // ~500 MB
68
69 // Constants specified by bionic, hardcoded here for simplicity.
70 constexpr int kProfilingSignal = __SIGRTMIN + 4;
71 constexpr int kHeapprofdSignalValue = 0;
72
MakeUnwindingWorkers(HeapprofdProducer * delegate,size_t n)73 std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
74 size_t n) {
75 std::vector<UnwindingWorker> ret;
76 for (size_t i = 0; i < n; ++i) {
77 ret.emplace_back(delegate,
78 base::ThreadTaskRunner::CreateAndStart("heapprofdunwind"));
79 }
80 return ret;
81 }
82
ConfigTargetsProcess(const HeapprofdConfig & cfg,const Process & proc,const std::vector<std::string> & normalized_cmdlines)83 bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
84 const Process& proc,
85 const std::vector<std::string>& normalized_cmdlines) {
86 if (cfg.all())
87 return true;
88
89 const auto& pids = cfg.pid();
90 if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
91 pids.cend()) {
92 return true;
93 }
94
95 if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
96 proc.cmdline) != normalized_cmdlines.cend()) {
97 return true;
98 }
99 return false;
100 }
101
IsFile(int fd,const char * fn)102 bool IsFile(int fd, const char* fn) {
103 struct stat fdstat;
104 struct stat fnstat;
105 if (fstat(fd, &fdstat) == -1) {
106 PERFETTO_PLOG("fstat");
107 return false;
108 }
109 if (lstat(fn, &fnstat) == -1) {
110 PERFETTO_PLOG("lstat");
111 return false;
112 }
113 return fdstat.st_ino == fnstat.st_ino;
114 }
115
116 protos::pbzero::ProfilePacket::ProcessHeapSamples::ClientError
ErrorStateToProto(SharedRingBuffer::ErrorState state)117 ErrorStateToProto(SharedRingBuffer::ErrorState state) {
118 switch (state) {
119 case (SharedRingBuffer::kNoError):
120 return protos::pbzero::ProfilePacket::ProcessHeapSamples::
121 CLIENT_ERROR_NONE;
122 case (SharedRingBuffer::kHitTimeout):
123 return protos::pbzero::ProfilePacket::ProcessHeapSamples::
124 CLIENT_ERROR_HIT_TIMEOUT;
125 case (SharedRingBuffer::kInvalidStackBounds):
126 return protos::pbzero::ProfilePacket::ProcessHeapSamples::
127 CLIENT_ERROR_INVALID_STACK_BOUNDS;
128 }
129 }
130
131 } // namespace
132
HeapprofdConfigToClientConfiguration(const HeapprofdConfig & heapprofd_config,ClientConfiguration * cli_config)133 bool HeapprofdConfigToClientConfiguration(
134 const HeapprofdConfig& heapprofd_config,
135 ClientConfiguration* cli_config) {
136 cli_config->default_interval = heapprofd_config.sampling_interval_bytes();
137 cli_config->block_client = heapprofd_config.block_client();
138 cli_config->disable_fork_teardown = heapprofd_config.disable_fork_teardown();
139 cli_config->disable_vfork_detection =
140 heapprofd_config.disable_vfork_detection();
141 cli_config->block_client_timeout_us =
142 heapprofd_config.block_client_timeout_us();
143 cli_config->all_heaps = heapprofd_config.all_heaps();
144 cli_config->adaptive_sampling_shmem_threshold =
145 heapprofd_config.adaptive_sampling_shmem_threshold();
146 cli_config->adaptive_sampling_max_sampling_interval_bytes =
147 heapprofd_config.adaptive_sampling_max_sampling_interval_bytes();
148 size_t n = 0;
149 const std::vector<std::string>& exclude_heaps =
150 heapprofd_config.exclude_heaps();
151 // heaps[i] and heaps_interval[i] represent that the heap named in heaps[i]
152 // should be sampled with sampling interval of heap_interval[i].
153 std::vector<std::string> heaps = heapprofd_config.heaps();
154 std::vector<uint64_t> heap_intervals =
155 heapprofd_config.heap_sampling_intervals();
156 if (heaps.empty() && !cli_config->all_heaps) {
157 heaps.push_back("libc.malloc");
158 }
159
160 if (heap_intervals.empty()) {
161 heap_intervals.assign(heaps.size(),
162 heapprofd_config.sampling_interval_bytes());
163 }
164 if (heap_intervals.size() != heaps.size()) {
165 PERFETTO_ELOG("heap_sampling_intervals and heaps length mismatch.");
166 return false;
167 }
168 if (std::find(heap_intervals.begin(), heap_intervals.end(), 0u) !=
169 heap_intervals.end()) {
170 PERFETTO_ELOG("zero sampling interval.");
171 return false;
172 }
173 if (!exclude_heaps.empty()) {
174 // For disabled heaps, we add explicit entries but with sampling interval
175 // 0. The consumer of the sampling intervals in ClientConfiguration,
176 // GetSamplingInterval in wire_protocol.h, uses 0 to signal a heap is
177 // disabled, either because it isn't enabled (all_heaps is not set, and the
178 // heap isn't named), or because we explicitely set it here.
179 heaps.insert(heaps.end(), exclude_heaps.cbegin(), exclude_heaps.cend());
180 heap_intervals.insert(heap_intervals.end(), exclude_heaps.size(), 0u);
181 }
182 if (heaps.size() > base::ArraySize(cli_config->heaps)) {
183 heaps.resize(base::ArraySize(cli_config->heaps));
184 PERFETTO_ELOG("Too many heaps requested. Truncating.");
185 }
186 for (size_t i = 0; i < heaps.size(); ++i) {
187 const std::string& heap = heaps[i];
188 const uint64_t interval = heap_intervals[i];
189 // -1 for the \0 byte.
190 if (heap.size() > HEAPPROFD_HEAP_NAME_SZ - 1) {
191 PERFETTO_ELOG("Invalid heap name %s (larger than %d)", heap.c_str(),
192 HEAPPROFD_HEAP_NAME_SZ - 1);
193 continue;
194 }
195 base::StringCopy(&cli_config->heaps[n].name[0], heap.c_str(),
196 sizeof(cli_config->heaps[n].name));
197 cli_config->heaps[n].interval = interval;
198 n++;
199 }
200 cli_config->num_heaps = n;
201 return true;
202 }
203
204 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
205 // thread.
HeapprofdProducer(HeapprofdMode mode,base::TaskRunner * task_runner,bool exit_when_done)206 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
207 base::TaskRunner* task_runner,
208 bool exit_when_done)
209 : task_runner_(task_runner),
210 mode_(mode),
211 exit_when_done_(exit_when_done),
212 unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
213 socket_delegate_(this),
214 weak_factory_(this) {
215 CheckDataSourceCpuTask();
216 CheckDataSourceMemoryTask();
217 }
218
219 HeapprofdProducer::~HeapprofdProducer() = default;
220
SetTargetProcess(pid_t target_pid,std::string target_cmdline)221 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
222 std::string target_cmdline) {
223 target_process_.pid = target_pid;
224 target_process_.cmdline = target_cmdline;
225 }
226
SetDataSourceCallback(std::function<void ()> fn)227 void HeapprofdProducer::SetDataSourceCallback(std::function<void()> fn) {
228 data_source_callback_ = fn;
229 }
230
AdoptSocket(base::ScopedFile fd)231 void HeapprofdProducer::AdoptSocket(base::ScopedFile fd) {
232 PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
233 auto socket = base::UnixSocket::AdoptConnected(
234 std::move(fd), &socket_delegate_, task_runner_, base::SockFamily::kUnix,
235 base::SockType::kStream);
236
237 HandleClientConnection(std::move(socket), target_process_);
238 }
239
OnConnect()240 void HeapprofdProducer::OnConnect() {
241 PERFETTO_DCHECK(state_ == kConnecting);
242 state_ = kConnected;
243 ResetConnectionBackoff();
244 PERFETTO_LOG("Connected to the service, mode [%s].",
245 mode_ == HeapprofdMode::kCentral ? "central" : "child");
246
247 DataSourceDescriptor desc;
248 desc.set_name(kHeapprofdDataSource);
249 desc.set_will_notify_on_stop(true);
250 endpoint_->RegisterDataSource(desc);
251 }
252
OnDisconnect()253 void HeapprofdProducer::OnDisconnect() {
254 PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
255 PERFETTO_LOG("Disconnected from tracing service");
256
257 // Do not attempt to reconnect if we're a process-private process, just quit.
258 if (exit_when_done_) {
259 TerminateProcess(/*exit_status=*/1); // does not return
260 }
261
262 // Central mode - attempt to reconnect.
263 auto weak_producer = weak_factory_.GetWeakPtr();
264 if (state_ == kConnected)
265 return task_runner_->PostTask([weak_producer] {
266 if (!weak_producer)
267 return;
268 weak_producer->Restart();
269 });
270
271 state_ = kNotConnected;
272 IncreaseConnectionBackoff();
273 task_runner_->PostDelayedTask(
274 [weak_producer] {
275 if (!weak_producer)
276 return;
277 weak_producer->ConnectService();
278 },
279 connection_backoff_ms_);
280 }
281
ConnectWithRetries(const char * socket_name)282 void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
283 PERFETTO_DCHECK(state_ == kNotStarted);
284 state_ = kNotConnected;
285
286 ResetConnectionBackoff();
287 producer_sock_name_ = socket_name;
288 ConnectService();
289 }
290
ConnectService()291 void HeapprofdProducer::ConnectService() {
292 SetProducerEndpoint(ProducerIPCClient::Connect(
293 producer_sock_name_, this, "android.heapprofd", task_runner_));
294 }
295
SetProducerEndpoint(std::unique_ptr<TracingService::ProducerEndpoint> endpoint)296 void HeapprofdProducer::SetProducerEndpoint(
297 std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
298 PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
299 state_ = kConnecting;
300 endpoint_ = std::move(endpoint);
301 }
302
IncreaseConnectionBackoff()303 void HeapprofdProducer::IncreaseConnectionBackoff() {
304 connection_backoff_ms_ *= 2;
305 if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
306 connection_backoff_ms_ = kMaxConnectionBackoffMs;
307 }
308
ResetConnectionBackoff()309 void HeapprofdProducer::ResetConnectionBackoff() {
310 connection_backoff_ms_ = kInitialConnectionBackoffMs;
311 }
312
Restart()313 void HeapprofdProducer::Restart() {
314 // We lost the connection with the tracing service. At this point we need
315 // to reset all the data sources. Trying to handle that manually is going to
316 // be error prone. What we do here is simply destroy the instance and
317 // recreate it again.
318
319 // Oneshot producer should not attempt restarts.
320 if (exit_when_done_)
321 PERFETTO_FATAL("Attempting to restart a one shot producer.");
322
323 HeapprofdMode mode = mode_;
324 base::TaskRunner* task_runner = task_runner_;
325 const char* socket_name = producer_sock_name_;
326 const bool exit_when_done = exit_when_done_;
327
328 // Invoke destructor and then the constructor again.
329 this->~HeapprofdProducer();
330 new (this) HeapprofdProducer(mode, task_runner, exit_when_done);
331
332 ConnectWithRetries(socket_name);
333 }
334
335 // TODO(rsavitski): would be cleaner to shut down the event loop instead
336 // (letting main exit). One test-friendly approach is to supply a shutdown
337 // callback in the constructor.
TerminateProcess(int exit_status)338 __attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
339 int exit_status) {
340 PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
341 PERFETTO_LOG("Shutting down child heapprofd (status %d).", exit_status);
342 exit(exit_status);
343 }
344
OnTracingSetup()345 void HeapprofdProducer::OnTracingSetup() {}
346
WriteRejectedConcurrentSession(BufferID buffer_id,pid_t pid)347 void HeapprofdProducer::WriteRejectedConcurrentSession(BufferID buffer_id,
348 pid_t pid) {
349 auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
350 auto trace_packet = trace_writer->NewTracePacket();
351 trace_packet->set_timestamp(
352 static_cast<uint64_t>(base::GetBootTimeNs().count()));
353 auto profile_packet = trace_packet->set_profile_packet();
354 auto process_dump = profile_packet->add_process_dumps();
355 process_dump->set_pid(static_cast<uint64_t>(pid));
356 process_dump->set_rejected_concurrent(true);
357 trace_packet->Finalize();
358 trace_writer->Flush();
359 }
360
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & ds_config)361 void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
362 const DataSourceConfig& ds_config) {
363 if (ds_config.session_initiator() ==
364 DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM) {
365 PERFETTO_LOG("Setting up datasource: statsd initiator.");
366 } else {
367 PERFETTO_LOG("Setting up datasource: non-statsd initiator.");
368 }
369 if (mode_ == HeapprofdMode::kChild && ds_config.enable_extra_guardrails()) {
370 PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
371 return;
372 }
373
374 HeapprofdConfig heapprofd_config;
375 heapprofd_config.ParseFromString(ds_config.heapprofd_config_raw());
376
377 if (heapprofd_config.all() && !heapprofd_config.pid().empty())
378 PERFETTO_ELOG("No point setting all and pid");
379 if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
380 PERFETTO_ELOG("No point setting all and process_cmdline");
381
382 if (ds_config.name() != kHeapprofdDataSource) {
383 PERFETTO_DLOG("Invalid data source name.");
384 return;
385 }
386
387 if (data_sources_.find(id) != data_sources_.end()) {
388 PERFETTO_DFATAL_OR_ELOG(
389 "Received duplicated data source instance id: %" PRIu64, id);
390 return;
391 }
392
393 std::optional<std::vector<std::string>> normalized_cmdlines =
394 NormalizeCmdlines(heapprofd_config.process_cmdline());
395 if (!normalized_cmdlines.has_value()) {
396 PERFETTO_ELOG("Rejecting data source due to invalid cmdline in config.");
397 return;
398 }
399
400 // Child mode is only interested in the first data source matching the
401 // already-connected process.
402 if (mode_ == HeapprofdMode::kChild) {
403 if (!ConfigTargetsProcess(heapprofd_config, target_process_,
404 normalized_cmdlines.value())) {
405 PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
406 return;
407 }
408
409 if (!data_sources_.empty()) {
410 PERFETTO_LOG("Child mode skipping concurrent data source.");
411
412 // Manually write one ProfilePacket about the rejected session.
413 auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
414 WriteRejectedConcurrentSession(buffer_id, target_process_.pid);
415 return;
416 }
417 }
418
419 std::optional<uint64_t> start_cputime_sec;
420 if (heapprofd_config.max_heapprofd_cpu_secs() > 0) {
421 start_cputime_sec = GetCputimeSecForCurrentProcess();
422
423 if (!start_cputime_sec) {
424 PERFETTO_ELOG("Failed to enforce CPU guardrail. Rejecting config.");
425 return;
426 }
427 }
428
429 auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
430 DataSource data_source(endpoint_->CreateTraceWriter(buffer_id));
431 data_source.id = id;
432 auto& cli_config = data_source.client_configuration;
433 if (!HeapprofdConfigToClientConfiguration(heapprofd_config, &cli_config))
434 return;
435 data_source.config = heapprofd_config;
436 data_source.ds_config = ds_config;
437 data_source.normalized_cmdlines = std::move(normalized_cmdlines.value());
438 data_source.stop_timeout_ms = ds_config.stop_timeout_ms()
439 ? ds_config.stop_timeout_ms()
440 : 5000 /* kDataSourceStopTimeoutMs */;
441 data_source.guardrail_config.cpu_start_secs = start_cputime_sec;
442 data_source.guardrail_config.memory_guardrail_kb =
443 heapprofd_config.max_heapprofd_memory_kb();
444 data_source.guardrail_config.cpu_guardrail_sec =
445 heapprofd_config.max_heapprofd_cpu_secs();
446
447 InterningOutputTracker::WriteFixedInterningsPacket(
448 data_source.trace_writer.get(),
449 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
450 data_sources_.emplace(id, std::move(data_source));
451 PERFETTO_DLOG("Set up data source.");
452
453 if (mode_ == HeapprofdMode::kChild && data_source_callback_)
454 (*data_source_callback_)();
455 }
456
IsPidProfiled(pid_t pid)457 bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
458 return std::any_of(
459 data_sources_.cbegin(), data_sources_.cend(),
460 [pid](const std::pair<const DataSourceInstanceID, DataSource>& p) {
461 const DataSource& ds = p.second;
462 return ds.process_states.count(pid) > 0;
463 });
464 }
465
SetStartupProperties(DataSource * data_source)466 void HeapprofdProducer::SetStartupProperties(DataSource* data_source) {
467 const HeapprofdConfig& heapprofd_config = data_source->config;
468 if (heapprofd_config.all())
469 data_source->properties.emplace_back(properties_.SetAll());
470
471 for (std::string cmdline : data_source->normalized_cmdlines)
472 data_source->properties.emplace_back(
473 properties_.SetProperty(std::move(cmdline)));
474 }
475
SignalRunningProcesses(DataSource * data_source)476 void HeapprofdProducer::SignalRunningProcesses(DataSource* data_source) {
477 const HeapprofdConfig& heapprofd_config = data_source->config;
478
479 std::set<pid_t> pids;
480 if (heapprofd_config.all())
481 FindAllProfilablePids(&pids);
482 for (uint64_t pid : heapprofd_config.pid())
483 pids.emplace(static_cast<pid_t>(pid));
484
485 if (!data_source->normalized_cmdlines.empty())
486 FindPidsForCmdlines(data_source->normalized_cmdlines, &pids);
487
488 if (heapprofd_config.min_anonymous_memory_kb() > 0)
489 RemoveUnderAnonThreshold(heapprofd_config.min_anonymous_memory_kb(), &pids);
490
491 for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
492 pid_t pid = *pid_it;
493 if (IsPidProfiled(pid)) {
494 PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
495 static_cast<intmax_t>(pid));
496 data_source->rejected_pids.emplace(pid);
497 pid_it = pids.erase(pid_it);
498 continue;
499 }
500
501 PERFETTO_DLOG("Sending signal: %d (si_value: %d) to pid: %d",
502 kProfilingSignal, kHeapprofdSignalValue, pid);
503 union sigval signal_value;
504 signal_value.sival_int = kHeapprofdSignalValue;
505 if (sigqueue(pid, kProfilingSignal, signal_value) != 0) {
506 PERFETTO_DPLOG("sigqueue");
507 }
508 ++pid_it;
509 }
510 data_source->signaled_pids = std::move(pids);
511 }
512
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)513 void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
514 const DataSourceConfig&) {
515 PERFETTO_DLOG("Starting data source %" PRIu64, id);
516
517 auto it = data_sources_.find(id);
518 if (it == data_sources_.end()) {
519 // This is expected in child heapprofd, where we reject uninteresting data
520 // sources in SetupDataSource.
521 if (mode_ == HeapprofdMode::kCentral) {
522 PERFETTO_DFATAL_OR_ELOG(
523 "Received invalid data source instance to start: %" PRIu64, id);
524 }
525 return;
526 }
527
528 DataSource& data_source = it->second;
529 if (data_source.started) {
530 PERFETTO_DFATAL_OR_ELOG(
531 "Trying to start already started data-source: %" PRIu64, id);
532 return;
533 }
534 const HeapprofdConfig& heapprofd_config = data_source.config;
535
536 // Central daemon - set system properties for any targets that start later,
537 // and signal already-running targets to start the profiling client.
538 if (mode_ == HeapprofdMode::kCentral) {
539 if (!heapprofd_config.no_startup())
540 SetStartupProperties(&data_source);
541 if (!heapprofd_config.no_running())
542 SignalRunningProcesses(&data_source);
543 }
544
545 const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
546 uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
547 if (dump_interval) {
548 data_source.dump_interval_ms = dump_interval;
549 auto weak_producer = weak_factory_.GetWeakPtr();
550 task_runner_->PostDelayedTask(
551 [weak_producer, id] {
552 if (!weak_producer)
553 return;
554 weak_producer->DoDrainAndContinuousDump(id);
555 },
556 continuous_dump_config.dump_phase_ms());
557 }
558 data_source.started = true;
559 PERFETTO_DLOG("Started DataSource");
560 }
561
UnwinderForPID(pid_t pid)562 UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
563 return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
564 }
565
StopDataSource(DataSourceInstanceID id)566 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
567 auto it = data_sources_.find(id);
568 if (it == data_sources_.end()) {
569 endpoint_->NotifyDataSourceStopped(id);
570 if (mode_ == HeapprofdMode::kCentral)
571 PERFETTO_DFATAL_OR_ELOG(
572 "Trying to stop non existing data source: %" PRIu64, id);
573 return;
574 }
575
576 PERFETTO_LOG("Stopping data source %" PRIu64, id);
577
578 DataSource& data_source = it->second;
579 data_source.was_stopped = true;
580 ShutdownDataSource(&data_source);
581 }
582
ShutdownDataSource(DataSource * data_source)583 void HeapprofdProducer::ShutdownDataSource(DataSource* data_source) {
584 data_source->shutting_down = true;
585 // If no processes connected, or all of them have already disconnected
586 // (and have been dumped) and no PIDs have been rejected,
587 // MaybeFinishDataSource can tear down the data source.
588 if (MaybeFinishDataSource(data_source))
589 return;
590
591 if (!data_source->rejected_pids.empty()) {
592 auto trace_packet = data_source->trace_writer->NewTracePacket();
593 ProfilePacket* profile_packet = trace_packet->set_profile_packet();
594 for (pid_t rejected_pid : data_source->rejected_pids) {
595 ProfilePacket::ProcessHeapSamples* proto =
596 profile_packet->add_process_dumps();
597 proto->set_pid(static_cast<uint64_t>(rejected_pid));
598 proto->set_rejected_concurrent(true);
599 }
600 trace_packet->Finalize();
601 data_source->rejected_pids.clear();
602 if (MaybeFinishDataSource(data_source))
603 return;
604 }
605
606 for (const auto& pid_and_process_state : data_source->process_states) {
607 pid_t pid = pid_and_process_state.first;
608 UnwinderForPID(pid).PostDisconnectSocket(pid);
609 }
610
611 auto id = data_source->id;
612 auto weak_producer = weak_factory_.GetWeakPtr();
613 task_runner_->PostDelayedTask(
614 [weak_producer, id] {
615 if (!weak_producer)
616 return;
617 auto ds_it = weak_producer->data_sources_.find(id);
618 if (ds_it != weak_producer->data_sources_.end()) {
619 PERFETTO_ELOG("Final dump timed out.");
620 DataSource& ds = ds_it->second;
621
622 for (const auto& pid_and_process_state : ds.process_states) {
623 pid_t pid = pid_and_process_state.first;
624 weak_producer->UnwinderForPID(pid).PostPurgeProcess(pid);
625 }
626 // Do not dump any stragglers, just trigger the Flush and tear down
627 // the data source.
628 ds.process_states.clear();
629 ds.rejected_pids.clear();
630 PERFETTO_CHECK(weak_producer->MaybeFinishDataSource(&ds));
631 }
632 },
633 data_source->stop_timeout_ms);
634 }
635
DoDrainAndContinuousDump(DataSourceInstanceID id)636 void HeapprofdProducer::DoDrainAndContinuousDump(DataSourceInstanceID id) {
637 auto it = data_sources_.find(id);
638 if (it == data_sources_.end())
639 return;
640 DataSource& data_source = it->second;
641 PERFETTO_DCHECK(data_source.pending_free_drains == 0);
642
643 for (auto& [pid, process_state] : data_source.process_states) {
644 UnwinderForPID(pid).PostDrainFree(data_source.id, pid);
645 data_source.pending_free_drains++;
646 }
647
648 // In case there are no pending free drains, dump immediately.
649 DoContinuousDump(&data_source);
650 }
651
DoContinuousDump(DataSource * ds)652 void HeapprofdProducer::DoContinuousDump(DataSource* ds) {
653 if (ds->pending_free_drains != 0) {
654 return;
655 }
656
657 DumpProcessesInDataSource(ds);
658 auto id = ds->id;
659 auto weak_producer = weak_factory_.GetWeakPtr();
660 task_runner_->PostDelayedTask(
661 [weak_producer, id] {
662 if (!weak_producer)
663 return;
664 weak_producer->DoDrainAndContinuousDump(id);
665 },
666 ds->dump_interval_ms);
667 }
668
PostDrainDone(UnwindingWorker *,DataSourceInstanceID ds_id)669 void HeapprofdProducer::PostDrainDone(UnwindingWorker*,
670 DataSourceInstanceID ds_id) {
671 auto weak_this = weak_factory_.GetWeakPtr();
672 task_runner_->PostTask([weak_this, ds_id] {
673 if (weak_this)
674 weak_this->DrainDone(ds_id);
675 });
676 }
677
DrainDone(DataSourceInstanceID ds_id)678 void HeapprofdProducer::DrainDone(DataSourceInstanceID ds_id) {
679 auto it = data_sources_.find(ds_id);
680 if (it == data_sources_.end()) {
681 return;
682 }
683 DataSource& data_source = it->second;
684 data_source.pending_free_drains--;
685 DoContinuousDump(&data_source);
686 }
687
688 // static
SetStats(protos::pbzero::ProfilePacket::ProcessStats * stats,const ProcessState & process_state)689 void HeapprofdProducer::SetStats(
690 protos::pbzero::ProfilePacket::ProcessStats* stats,
691 const ProcessState& process_state) {
692 stats->set_unwinding_errors(process_state.unwinding_errors);
693 stats->set_heap_samples(process_state.heap_samples);
694 stats->set_map_reparses(process_state.map_reparses);
695 stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
696 stats->set_client_spinlock_blocked_us(
697 process_state.client_spinlock_blocked_us);
698 auto* unwinding_hist = stats->set_unwinding_time_us();
699 for (const auto& p : process_state.unwinding_time_us.GetData()) {
700 auto* bucket = unwinding_hist->add_buckets();
701 if (p.first == LogHistogram::kMaxBucket)
702 bucket->set_max_bucket(true);
703 else
704 bucket->set_upper_limit(p.first);
705 bucket->set_count(p.second);
706 }
707 }
708
DumpProcessState(DataSource * data_source,pid_t pid,ProcessState * process_state)709 void HeapprofdProducer::DumpProcessState(DataSource* data_source,
710 pid_t pid,
711 ProcessState* process_state) {
712 for (auto& heap_id_and_heap_info : process_state->heap_infos) {
713 ProcessState::HeapInfo& heap_info = heap_id_and_heap_info.second;
714
715 bool from_startup = data_source->signaled_pids.find(pid) ==
716 data_source->signaled_pids.cend();
717
718 auto new_heapsamples = [pid, from_startup, process_state, data_source,
719 &heap_info](
720 ProfilePacket::ProcessHeapSamples* proto) {
721 proto->set_pid(static_cast<uint64_t>(pid));
722 proto->set_timestamp(heap_info.heap_tracker.dump_timestamp());
723 proto->set_from_startup(from_startup);
724 proto->set_disconnected(process_state->disconnected);
725 proto->set_buffer_overran(process_state->error_state ==
726 SharedRingBuffer::kHitTimeout);
727 proto->set_client_error(ErrorStateToProto(process_state->error_state));
728 proto->set_buffer_corrupted(process_state->buffer_corrupted);
729 proto->set_hit_guardrail(data_source->hit_guardrail);
730 if (!heap_info.heap_name.empty())
731 proto->set_heap_name(heap_info.heap_name.c_str());
732 proto->set_sampling_interval_bytes(heap_info.sampling_interval);
733 proto->set_orig_sampling_interval_bytes(heap_info.orig_sampling_interval);
734 auto* stats = proto->set_stats();
735 SetStats(stats, *process_state);
736 };
737
738 DumpState dump_state(data_source->trace_writer.get(),
739 std::move(new_heapsamples),
740 &data_source->intern_state);
741
742 heap_info.heap_tracker.GetCallstackAllocations(
743 [&dump_state,
744 &data_source](const HeapTracker::CallstackAllocations& alloc) {
745 dump_state.WriteAllocation(alloc, data_source->config.dump_at_max());
746 });
747 dump_state.DumpCallstacks(&callsites_);
748 }
749 }
750
DumpProcessesInDataSource(DataSource * ds)751 void HeapprofdProducer::DumpProcessesInDataSource(DataSource* ds) {
752 for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
753 ds->process_states) {
754 pid_t pid = pid_and_process_state.first;
755 ProcessState& process_state = pid_and_process_state.second;
756 DumpProcessState(ds, pid, &process_state);
757 }
758 }
759
DumpAll()760 void HeapprofdProducer::DumpAll() {
761 PERFETTO_LOG("Received signal. Dumping all data sources.");
762 for (auto& id_and_data_source : data_sources_)
763 DumpProcessesInDataSource(&id_and_data_source.second);
764 }
765
Flush(FlushRequestID flush_id,const DataSourceInstanceID * ids,size_t num_ids)766 void HeapprofdProducer::Flush(FlushRequestID flush_id,
767 const DataSourceInstanceID* ids,
768 size_t num_ids) {
769 size_t& flush_in_progress = flushes_in_progress_[flush_id];
770 PERFETTO_DCHECK(flush_in_progress == 0);
771 flush_in_progress = num_ids;
772 for (size_t i = 0; i < num_ids; ++i) {
773 auto it = data_sources_.find(ids[i]);
774 if (it == data_sources_.end()) {
775 PERFETTO_DFATAL_OR_ELOG("Trying to flush unknown data-source %" PRIu64,
776 ids[i]);
777 flush_in_progress--;
778 continue;
779 }
780 DataSource& data_source = it->second;
781 auto weak_producer = weak_factory_.GetWeakPtr();
782
783 auto callback = [weak_producer, flush_id] {
784 if (weak_producer)
785 // Reposting because this task runner could be on a different thread
786 // than the IPC task runner.
787 return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
788 if (weak_producer)
789 return weak_producer->FinishDataSourceFlush(flush_id);
790 });
791 };
792 data_source.trace_writer->Flush(std::move(callback));
793 }
794 if (flush_in_progress == 0) {
795 endpoint_->NotifyFlushComplete(flush_id);
796 flushes_in_progress_.erase(flush_id);
797 }
798 }
799
FinishDataSourceFlush(FlushRequestID flush_id)800 void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
801 auto it = flushes_in_progress_.find(flush_id);
802 if (it == flushes_in_progress_.end()) {
803 PERFETTO_DFATAL_OR_ELOG("FinishDataSourceFlush id invalid: %" PRIu64,
804 flush_id);
805 return;
806 }
807 size_t& flush_in_progress = it->second;
808 if (--flush_in_progress == 0) {
809 endpoint_->NotifyFlushComplete(flush_id);
810 flushes_in_progress_.erase(flush_id);
811 }
812 }
813
OnDisconnect(base::UnixSocket * self)814 void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
815 auto it = producer_->pending_processes_.find(self->peer_pid_linux());
816 if (it == producer_->pending_processes_.end()) {
817 PERFETTO_DFATAL_OR_ELOG("Unexpected disconnect.");
818 return;
819 }
820
821 if (self == it->second.sock.get())
822 producer_->pending_processes_.erase(it);
823 }
824
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> new_connection)825 void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
826 base::UnixSocket*,
827 std::unique_ptr<base::UnixSocket> new_connection) {
828 Process peer_process;
829 peer_process.pid = new_connection->peer_pid_linux();
830 if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
831 PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
832
833 producer_->HandleClientConnection(std::move(new_connection), peer_process);
834 }
835
OnDataAvailable(base::UnixSocket * self)836 void HeapprofdProducer::SocketDelegate::OnDataAvailable(
837 base::UnixSocket* self) {
838 auto it = producer_->pending_processes_.find(self->peer_pid_linux());
839 if (it == producer_->pending_processes_.end()) {
840 PERFETTO_DFATAL_OR_ELOG("Unexpected data.");
841 return;
842 }
843
844 PendingProcess& pending_process = it->second;
845
846 base::ScopedFile fds[kHandshakeSize];
847 char buf[1];
848 self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
849
850 static_assert(kHandshakeSize == 2, "change if and else if below.");
851 if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
852 auto ds_it =
853 producer_->data_sources_.find(pending_process.data_source_instance_id);
854 if (ds_it == producer_->data_sources_.end()) {
855 producer_->pending_processes_.erase(it);
856 return;
857 }
858 DataSource& data_source = ds_it->second;
859
860 if (data_source.shutting_down) {
861 producer_->pending_processes_.erase(it);
862 PERFETTO_LOG("Got handshake for DS that is shutting down. Rejecting.");
863 return;
864 }
865
866 std::string maps_file =
867 "/proc/" + std::to_string(self->peer_pid_linux()) + "/maps";
868 if (!IsFile(*fds[kHandshakeMaps], maps_file.c_str())) {
869 producer_->pending_processes_.erase(it);
870 PERFETTO_ELOG("Received invalid maps FD.");
871 return;
872 }
873
874 std::string mem_file =
875 "/proc/" + std::to_string(self->peer_pid_linux()) + "/mem";
876 if (!IsFile(*fds[kHandshakeMem], mem_file.c_str())) {
877 producer_->pending_processes_.erase(it);
878 PERFETTO_ELOG("Received invalid mem FD.");
879 return;
880 }
881
882 data_source.process_states.emplace(
883 std::piecewise_construct, std::forward_as_tuple(self->peer_pid_linux()),
884 std::forward_as_tuple(&producer_->callsites_,
885 data_source.config.dump_at_max()));
886
887 PERFETTO_DLOG("%d: Received FDs.", self->peer_pid_linux());
888 int raw_fd = pending_process.shmem.fd();
889 // TODO(fmayer): Full buffer could deadlock us here.
890 if (!self->Send(&data_source.client_configuration,
891 sizeof(data_source.client_configuration), &raw_fd, 1)) {
892 // If Send fails, the socket will have been Shutdown, and the raw socket
893 // closed.
894 producer_->pending_processes_.erase(it);
895 return;
896 }
897
898 UnwindingWorker::HandoffData handoff_data;
899 handoff_data.data_source_instance_id =
900 pending_process.data_source_instance_id;
901 handoff_data.sock = self->ReleaseSocket();
902 handoff_data.maps_fd = std::move(fds[kHandshakeMaps]);
903 handoff_data.mem_fd = std::move(fds[kHandshakeMem]);
904 handoff_data.shmem = std::move(pending_process.shmem);
905 handoff_data.client_config = data_source.client_configuration;
906 handoff_data.stream_allocations = data_source.config.stream_allocations();
907
908 producer_->UnwinderForPID(self->peer_pid_linux())
909 .PostHandoffSocket(std::move(handoff_data));
910 producer_->pending_processes_.erase(it);
911 } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
912 PERFETTO_DFATAL_OR_ELOG("%d: Received partial FDs.",
913 self->peer_pid_linux());
914 producer_->pending_processes_.erase(it);
915 } else {
916 PERFETTO_ELOG("%d: Received no FDs.", self->peer_pid_linux());
917 }
918 }
919
GetDataSourceForProcess(const Process & proc)920 HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
921 const Process& proc) {
922 for (auto& ds_id_and_datasource : data_sources_) {
923 DataSource& ds = ds_id_and_datasource.second;
924 if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
925 return &ds;
926 }
927 return nullptr;
928 }
929
RecordOtherSourcesAsRejected(DataSource * active_ds,const Process & proc)930 void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
931 const Process& proc) {
932 for (auto& ds_id_and_datasource : data_sources_) {
933 DataSource& ds = ds_id_and_datasource.second;
934 if (&ds != active_ds &&
935 ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
936 ds.rejected_pids.emplace(proc.pid);
937 }
938 }
939
HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,Process process)940 void HeapprofdProducer::HandleClientConnection(
941 std::unique_ptr<base::UnixSocket> new_connection,
942 Process process) {
943 DataSource* data_source = GetDataSourceForProcess(process);
944 if (!data_source) {
945 PERFETTO_LOG("No data source found.");
946 return;
947 }
948 RecordOtherSourcesAsRejected(data_source, process);
949
950 // In fork mode, right now we check whether the target is not profileable
951 // in the client, because we cannot read packages.list there.
952 if (mode_ == HeapprofdMode::kCentral &&
953 !CanProfile(data_source->ds_config, new_connection->peer_uid_posix(),
954 data_source->config.target_installed_by())) {
955 PERFETTO_ELOG("%d (%s) is not profileable.", process.pid,
956 process.cmdline.c_str());
957 return;
958 }
959
960 uint64_t shmem_size = data_source->config.shmem_size_bytes();
961 if (!shmem_size)
962 shmem_size = kDefaultShmemSize;
963 if (shmem_size > kMaxShmemSize) {
964 PERFETTO_LOG("Specified shared memory size of %" PRIu64
965 " exceeds maximum size of %" PRIu64 ". Reducing.",
966 shmem_size, kMaxShmemSize);
967 shmem_size = kMaxShmemSize;
968 }
969
970 auto shmem = SharedRingBuffer::Create(static_cast<size_t>(shmem_size));
971 if (!shmem || !shmem->is_valid()) {
972 PERFETTO_LOG("Failed to create shared memory.");
973 return;
974 }
975
976 pid_t peer_pid = new_connection->peer_pid_linux();
977 if (peer_pid != process.pid) {
978 PERFETTO_DFATAL_OR_ELOG("Invalid PID connected.");
979 return;
980 }
981
982 PendingProcess pending_process;
983 pending_process.sock = std::move(new_connection);
984 pending_process.data_source_instance_id = data_source->id;
985 pending_process.shmem = std::move(*shmem);
986 pending_processes_.emplace(peer_pid, std::move(pending_process));
987 }
988
PostAllocRecord(UnwindingWorker * worker,std::unique_ptr<AllocRecord> alloc_rec)989 void HeapprofdProducer::PostAllocRecord(
990 UnwindingWorker* worker,
991 std::unique_ptr<AllocRecord> alloc_rec) {
992 // Once we can use C++14, this should be std::moved into the lambda instead.
993 auto* raw_alloc_rec = alloc_rec.release();
994 auto weak_this = weak_factory_.GetWeakPtr();
995 task_runner_->PostTask([weak_this, raw_alloc_rec, worker] {
996 std::unique_ptr<AllocRecord> unique_alloc_ref =
997 std::unique_ptr<AllocRecord>(raw_alloc_rec);
998 if (weak_this) {
999 weak_this->HandleAllocRecord(unique_alloc_ref.get());
1000 worker->ReturnAllocRecord(std::move(unique_alloc_ref));
1001 }
1002 });
1003 }
1004
PostFreeRecord(UnwindingWorker *,std::vector<FreeRecord> free_recs)1005 void HeapprofdProducer::PostFreeRecord(UnwindingWorker*,
1006 std::vector<FreeRecord> free_recs) {
1007 // Once we can use C++14, this should be std::moved into the lambda instead.
1008 std::vector<FreeRecord>* raw_free_recs =
1009 new std::vector<FreeRecord>(std::move(free_recs));
1010 auto weak_this = weak_factory_.GetWeakPtr();
1011 task_runner_->PostTask([weak_this, raw_free_recs] {
1012 if (weak_this) {
1013 for (FreeRecord& free_rec : *raw_free_recs)
1014 weak_this->HandleFreeRecord(std::move(free_rec));
1015 }
1016 delete raw_free_recs;
1017 });
1018 }
1019
PostHeapNameRecord(UnwindingWorker *,HeapNameRecord rec)1020 void HeapprofdProducer::PostHeapNameRecord(UnwindingWorker*,
1021 HeapNameRecord rec) {
1022 auto weak_this = weak_factory_.GetWeakPtr();
1023 task_runner_->PostTask([weak_this, rec] {
1024 if (weak_this)
1025 weak_this->HandleHeapNameRecord(rec);
1026 });
1027 }
1028
PostSocketDisconnected(UnwindingWorker *,DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1029 void HeapprofdProducer::PostSocketDisconnected(UnwindingWorker*,
1030 DataSourceInstanceID ds_id,
1031 pid_t pid,
1032 SharedRingBuffer::Stats stats) {
1033 auto weak_this = weak_factory_.GetWeakPtr();
1034 task_runner_->PostTask([weak_this, ds_id, pid, stats] {
1035 if (weak_this)
1036 weak_this->HandleSocketDisconnected(ds_id, pid, stats);
1037 });
1038 }
1039
HandleAllocRecord(AllocRecord * alloc_rec)1040 void HeapprofdProducer::HandleAllocRecord(AllocRecord* alloc_rec) {
1041 const AllocMetadata& alloc_metadata = alloc_rec->alloc_metadata;
1042 auto it = data_sources_.find(alloc_rec->data_source_instance_id);
1043 if (it == data_sources_.end()) {
1044 PERFETTO_LOG("Invalid data source in alloc record.");
1045 return;
1046 }
1047
1048 DataSource& ds = it->second;
1049 auto process_state_it = ds.process_states.find(alloc_rec->pid);
1050 if (process_state_it == ds.process_states.end()) {
1051 PERFETTO_LOG("Invalid PID in alloc record.");
1052 return;
1053 }
1054
1055 if (ds.config.stream_allocations()) {
1056 auto packet = ds.trace_writer->NewTracePacket();
1057 auto* streaming_alloc = packet->set_streaming_allocation();
1058 streaming_alloc->add_address(alloc_metadata.alloc_address);
1059 streaming_alloc->add_size(alloc_metadata.alloc_size);
1060 streaming_alloc->add_sample_size(alloc_metadata.sample_size);
1061 streaming_alloc->add_clock_monotonic_coarse_timestamp(
1062 alloc_metadata.clock_monotonic_coarse_timestamp);
1063 streaming_alloc->add_heap_id(alloc_metadata.heap_id);
1064 streaming_alloc->add_sequence_number(alloc_metadata.sequence_number);
1065 return;
1066 }
1067
1068 const auto& prefixes = ds.config.skip_symbol_prefix();
1069 if (!prefixes.empty()) {
1070 for (unwindstack::FrameData& frame_data : alloc_rec->frames) {
1071 if (frame_data.map_info == nullptr) {
1072 continue;
1073 }
1074 const std::string& map = frame_data.map_info->name();
1075 if (std::find_if(prefixes.cbegin(), prefixes.cend(),
1076 [&map](const std::string& prefix) {
1077 return base::StartsWith(map, prefix);
1078 }) != prefixes.cend()) {
1079 frame_data.function_name = "FILTERED";
1080 }
1081 }
1082 }
1083
1084 ProcessState& process_state = process_state_it->second;
1085 HeapTracker& heap_tracker =
1086 process_state.GetHeapTracker(alloc_rec->alloc_metadata.heap_id);
1087
1088 if (alloc_rec->error)
1089 process_state.unwinding_errors++;
1090 if (alloc_rec->reparsed_map)
1091 process_state.map_reparses++;
1092 process_state.heap_samples++;
1093 process_state.unwinding_time_us.Add(alloc_rec->unwinding_time_us);
1094 process_state.total_unwinding_time_us += alloc_rec->unwinding_time_us;
1095
1096 // abspc may no longer refer to the same functions, as we had to reparse
1097 // maps. Reset the cache.
1098 if (alloc_rec->reparsed_map)
1099 heap_tracker.ClearFrameCache();
1100
1101 heap_tracker.RecordMalloc(
1102 alloc_rec->frames, alloc_rec->build_ids, alloc_metadata.alloc_address,
1103 alloc_metadata.sample_size, alloc_metadata.alloc_size,
1104 alloc_metadata.sequence_number,
1105 alloc_metadata.clock_monotonic_coarse_timestamp);
1106 }
1107
HandleFreeRecord(FreeRecord free_rec)1108 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
1109 auto it = data_sources_.find(free_rec.data_source_instance_id);
1110 if (it == data_sources_.end()) {
1111 PERFETTO_LOG("Invalid data source in free record.");
1112 return;
1113 }
1114
1115 DataSource& ds = it->second;
1116 auto process_state_it = ds.process_states.find(free_rec.pid);
1117 if (process_state_it == ds.process_states.end()) {
1118 PERFETTO_LOG("Invalid PID in free record.");
1119 return;
1120 }
1121
1122 if (ds.config.stream_allocations()) {
1123 auto packet = ds.trace_writer->NewTracePacket();
1124 auto* streaming_free = packet->set_streaming_free();
1125 streaming_free->add_address(free_rec.entry.addr);
1126 streaming_free->add_heap_id(free_rec.entry.heap_id);
1127 streaming_free->add_sequence_number(free_rec.entry.sequence_number);
1128 return;
1129 }
1130
1131 ProcessState& process_state = process_state_it->second;
1132
1133 const FreeEntry& entry = free_rec.entry;
1134 HeapTracker& heap_tracker = process_state.GetHeapTracker(entry.heap_id);
1135 heap_tracker.RecordFree(entry.addr, entry.sequence_number, 0);
1136 }
1137
HandleHeapNameRecord(HeapNameRecord rec)1138 void HeapprofdProducer::HandleHeapNameRecord(HeapNameRecord rec) {
1139 auto it = data_sources_.find(rec.data_source_instance_id);
1140 if (it == data_sources_.end()) {
1141 PERFETTO_LOG("Invalid data source in free record.");
1142 return;
1143 }
1144
1145 DataSource& ds = it->second;
1146 auto process_state_it = ds.process_states.find(rec.pid);
1147 if (process_state_it == ds.process_states.end()) {
1148 PERFETTO_LOG("Invalid PID in free record.");
1149 return;
1150 }
1151
1152 ProcessState& process_state = process_state_it->second;
1153 const HeapName& entry = rec.entry;
1154 if (entry.heap_name[0] != '\0') {
1155 std::string heap_name = entry.heap_name;
1156 if (entry.heap_id == 0) {
1157 PERFETTO_ELOG("Invalid zero heap ID.");
1158 return;
1159 }
1160 ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1161 if (!hi.heap_name.empty() && hi.heap_name != heap_name) {
1162 PERFETTO_ELOG("Overriding heap name %s with %s", hi.heap_name.c_str(),
1163 heap_name.c_str());
1164 }
1165 hi.heap_name = entry.heap_name;
1166 }
1167 if (entry.sample_interval != 0) {
1168 ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1169 if (!hi.sampling_interval)
1170 hi.orig_sampling_interval = entry.sample_interval;
1171 hi.sampling_interval = entry.sample_interval;
1172 }
1173 }
1174
TerminateWhenDone()1175 void HeapprofdProducer::TerminateWhenDone() {
1176 if (data_sources_.empty())
1177 TerminateProcess(0);
1178 exit_when_done_ = true;
1179 }
1180
MaybeFinishDataSource(DataSource * ds)1181 bool HeapprofdProducer::MaybeFinishDataSource(DataSource* ds) {
1182 if (!ds->process_states.empty() || !ds->rejected_pids.empty() ||
1183 !ds->shutting_down) {
1184 return false;
1185 }
1186
1187 bool was_stopped = ds->was_stopped;
1188 DataSourceInstanceID ds_id = ds->id;
1189 auto weak_producer = weak_factory_.GetWeakPtr();
1190 bool exit_when_done = exit_when_done_;
1191 ds->trace_writer->Flush([weak_producer, exit_when_done, ds_id, was_stopped] {
1192 if (!weak_producer)
1193 return;
1194
1195 if (was_stopped)
1196 weak_producer->endpoint_->NotifyDataSourceStopped(ds_id);
1197 weak_producer->data_sources_.erase(ds_id);
1198
1199 if (exit_when_done) {
1200 // Post this as a task to allow NotifyDataSourceStopped to post tasks.
1201 weak_producer->task_runner_->PostTask([weak_producer] {
1202 if (!weak_producer)
1203 return;
1204 weak_producer->TerminateProcess(
1205 /*exit_status=*/0); // does not return
1206 });
1207 }
1208 });
1209 return true;
1210 }
1211
HandleSocketDisconnected(DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1212 void HeapprofdProducer::HandleSocketDisconnected(
1213 DataSourceInstanceID ds_id,
1214 pid_t pid,
1215 SharedRingBuffer::Stats stats) {
1216 auto it = data_sources_.find(ds_id);
1217 if (it == data_sources_.end())
1218 return;
1219 DataSource& ds = it->second;
1220
1221 auto process_state_it = ds.process_states.find(pid);
1222 if (process_state_it == ds.process_states.end()) {
1223 PERFETTO_ELOG("Unexpected disconnect from %d", pid);
1224 return;
1225 }
1226
1227 PERFETTO_LOG("%d disconnected from heapprofd (ds shutting down: %d).", pid,
1228 ds.shutting_down);
1229
1230 ProcessState& process_state = process_state_it->second;
1231 process_state.disconnected = !ds.shutting_down;
1232 process_state.error_state = stats.error_state;
1233 process_state.client_spinlock_blocked_us = stats.client_spinlock_blocked_us;
1234 process_state.buffer_corrupted =
1235 stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
1236
1237 DumpProcessState(&ds, pid, &process_state);
1238 ds.process_states.erase(pid);
1239 MaybeFinishDataSource(&ds);
1240 }
1241
CheckDataSourceCpuTask()1242 void HeapprofdProducer::CheckDataSourceCpuTask() {
1243 auto weak_producer = weak_factory_.GetWeakPtr();
1244 task_runner_->PostDelayedTask(
1245 [weak_producer] {
1246 if (!weak_producer)
1247 return;
1248 weak_producer->CheckDataSourceCpuTask();
1249 },
1250 kGuardrailIntervalMs);
1251
1252 ProfilerCpuGuardrails gr;
1253 for (auto& p : data_sources_) {
1254 DataSource& ds = p.second;
1255 if (gr.IsOverCpuThreshold(ds.guardrail_config)) {
1256 ds.hit_guardrail = true;
1257 PERFETTO_LOG("Data source %" PRIu64 " hit CPU guardrail. Shutting down.",
1258 ds.id);
1259 ShutdownDataSource(&ds);
1260 }
1261 }
1262 }
1263
CheckDataSourceMemoryTask()1264 void HeapprofdProducer::CheckDataSourceMemoryTask() {
1265 auto weak_producer = weak_factory_.GetWeakPtr();
1266 task_runner_->PostDelayedTask(
1267 [weak_producer] {
1268 if (!weak_producer)
1269 return;
1270 weak_producer->CheckDataSourceMemoryTask();
1271 },
1272 kGuardrailIntervalMs);
1273 ProfilerMemoryGuardrails gr;
1274 for (auto& p : data_sources_) {
1275 DataSource& ds = p.second;
1276 if (gr.IsOverMemoryThreshold(ds.guardrail_config)) {
1277 ds.hit_guardrail = true;
1278 PERFETTO_LOG("Data source %" PRIu64
1279 " hit memory guardrail. Shutting down.",
1280 ds.id);
1281 ShutdownDataSource(&ds);
1282 }
1283 }
1284 }
1285
1286 } // namespace profiling
1287 } // namespace perfetto
1288