• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/memory/heapprofd_producer.h"
18 
19 #include <signal.h>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23 
24 #include <algorithm>
25 #include <cinttypes>
26 #include <functional>
27 #include <optional>
28 #include <string>
29 
30 #include "perfetto/base/compiler.h"
31 #include "perfetto/base/logging.h"
32 #include "perfetto/ext/base/file_utils.h"
33 #include "perfetto/ext/base/string_splitter.h"
34 #include "perfetto/ext/base/string_utils.h"
35 #include "perfetto/ext/base/thread_task_runner.h"
36 #include "perfetto/ext/base/watchdog_posix.h"
37 #include "perfetto/ext/tracing/core/basic_types.h"
38 #include "perfetto/ext/tracing/core/trace_writer.h"
39 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40 #include "perfetto/tracing/core/data_source_config.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "perfetto/tracing/core/forward_decls.h"
43 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
44 #include "src/profiling/common/producer_support.h"
45 #include "src/profiling/common/profiler_guardrails.h"
46 #include "src/profiling/memory/shared_ring_buffer.h"
47 #include "src/profiling/memory/unwound_messages.h"
48 #include "src/profiling/memory/wire_protocol.h"
49 
50 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
51 #include <sys/system_properties.h>
52 #endif
53 
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57 using ::perfetto::protos::pbzero::ProfilePacket;
58 
59 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
60 constexpr size_t kUnwinderThreads = 5;
61 
62 constexpr uint32_t kInitialConnectionBackoffMs = 100;
63 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
64 constexpr uint32_t kGuardrailIntervalMs = 30 * 1000;
65 
66 constexpr uint64_t kDefaultShmemSize = 8 * 1048576;  // ~8 MB
67 constexpr uint64_t kMaxShmemSize = 500 * 1048576;    // ~500 MB
68 
69 // Constants specified by bionic, hardcoded here for simplicity.
70 constexpr int kProfilingSignal = __SIGRTMIN + 4;
71 constexpr int kHeapprofdSignalValue = 0;
72 
MakeUnwindingWorkers(HeapprofdProducer * delegate,size_t n)73 std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
74                                                   size_t n) {
75   std::vector<UnwindingWorker> ret;
76   for (size_t i = 0; i < n; ++i) {
77     ret.emplace_back(delegate,
78                      base::ThreadTaskRunner::CreateAndStart("heapprofdunwind"));
79   }
80   return ret;
81 }
82 
ConfigTargetsProcess(const HeapprofdConfig & cfg,const Process & proc,const std::vector<std::string> & normalized_cmdlines)83 bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
84                           const Process& proc,
85                           const std::vector<std::string>& normalized_cmdlines) {
86   if (cfg.all())
87     return true;
88 
89   const auto& pids = cfg.pid();
90   if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
91       pids.cend()) {
92     return true;
93   }
94 
95   if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
96                 proc.cmdline) != normalized_cmdlines.cend()) {
97     return true;
98   }
99   return false;
100 }
101 
IsFile(int fd,const char * fn)102 bool IsFile(int fd, const char* fn) {
103   struct stat fdstat;
104   struct stat fnstat;
105   if (fstat(fd, &fdstat) == -1) {
106     PERFETTO_PLOG("fstat");
107     return false;
108   }
109   if (lstat(fn, &fnstat) == -1) {
110     PERFETTO_PLOG("lstat");
111     return false;
112   }
113   return fdstat.st_ino == fnstat.st_ino;
114 }
115 
116 protos::pbzero::ProfilePacket::ProcessHeapSamples::ClientError
ErrorStateToProto(SharedRingBuffer::ErrorState state)117 ErrorStateToProto(SharedRingBuffer::ErrorState state) {
118   switch (state) {
119     case (SharedRingBuffer::kNoError):
120       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
121           CLIENT_ERROR_NONE;
122     case (SharedRingBuffer::kHitTimeout):
123       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
124           CLIENT_ERROR_HIT_TIMEOUT;
125     case (SharedRingBuffer::kInvalidStackBounds):
126       return protos::pbzero::ProfilePacket::ProcessHeapSamples::
127           CLIENT_ERROR_INVALID_STACK_BOUNDS;
128   }
129 }
130 
131 }  // namespace
132 
HeapprofdConfigToClientConfiguration(const HeapprofdConfig & heapprofd_config,ClientConfiguration * cli_config)133 bool HeapprofdConfigToClientConfiguration(
134     const HeapprofdConfig& heapprofd_config,
135     ClientConfiguration* cli_config) {
136   cli_config->default_interval = heapprofd_config.sampling_interval_bytes();
137   cli_config->block_client = heapprofd_config.block_client();
138   cli_config->disable_fork_teardown = heapprofd_config.disable_fork_teardown();
139   cli_config->disable_vfork_detection =
140       heapprofd_config.disable_vfork_detection();
141   cli_config->block_client_timeout_us =
142       heapprofd_config.block_client_timeout_us();
143   cli_config->all_heaps = heapprofd_config.all_heaps();
144   cli_config->adaptive_sampling_shmem_threshold =
145       heapprofd_config.adaptive_sampling_shmem_threshold();
146   cli_config->adaptive_sampling_max_sampling_interval_bytes =
147       heapprofd_config.adaptive_sampling_max_sampling_interval_bytes();
148   size_t n = 0;
149   const std::vector<std::string>& exclude_heaps =
150       heapprofd_config.exclude_heaps();
151   // heaps[i] and heaps_interval[i] represent that the heap named in heaps[i]
152   // should be sampled with sampling interval of heap_interval[i].
153   std::vector<std::string> heaps = heapprofd_config.heaps();
154   std::vector<uint64_t> heap_intervals =
155       heapprofd_config.heap_sampling_intervals();
156   if (heaps.empty() && !cli_config->all_heaps) {
157     heaps.push_back("libc.malloc");
158   }
159 
160   if (heap_intervals.empty()) {
161     heap_intervals.assign(heaps.size(),
162                           heapprofd_config.sampling_interval_bytes());
163   }
164   if (heap_intervals.size() != heaps.size()) {
165     PERFETTO_ELOG("heap_sampling_intervals and heaps length mismatch.");
166     return false;
167   }
168   if (std::find(heap_intervals.begin(), heap_intervals.end(), 0u) !=
169       heap_intervals.end()) {
170     PERFETTO_ELOG("zero sampling interval.");
171     return false;
172   }
173   if (!exclude_heaps.empty()) {
174     // For disabled heaps, we add explicit entries but with sampling interval
175     // 0. The consumer of the sampling intervals in ClientConfiguration,
176     // GetSamplingInterval in wire_protocol.h, uses 0 to signal a heap is
177     // disabled, either because it isn't enabled (all_heaps is not set, and the
178     // heap isn't named), or because we explicitely set it here.
179     heaps.insert(heaps.end(), exclude_heaps.cbegin(), exclude_heaps.cend());
180     heap_intervals.insert(heap_intervals.end(), exclude_heaps.size(), 0u);
181   }
182   if (heaps.size() > base::ArraySize(cli_config->heaps)) {
183     heaps.resize(base::ArraySize(cli_config->heaps));
184     PERFETTO_ELOG("Too many heaps requested. Truncating.");
185   }
186   for (size_t i = 0; i < heaps.size(); ++i) {
187     const std::string& heap = heaps[i];
188     const uint64_t interval = heap_intervals[i];
189     // -1 for the \0 byte.
190     if (heap.size() > HEAPPROFD_HEAP_NAME_SZ - 1) {
191       PERFETTO_ELOG("Invalid heap name %s (larger than %d)", heap.c_str(),
192                     HEAPPROFD_HEAP_NAME_SZ - 1);
193       continue;
194     }
195     base::StringCopy(&cli_config->heaps[n].name[0], heap.c_str(),
196                      sizeof(cli_config->heaps[n].name));
197     cli_config->heaps[n].interval = interval;
198     n++;
199   }
200   cli_config->num_heaps = n;
201   return true;
202 }
203 
204 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
205 // thread.
HeapprofdProducer(HeapprofdMode mode,base::TaskRunner * task_runner,bool exit_when_done)206 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
207                                      base::TaskRunner* task_runner,
208                                      bool exit_when_done)
209     : task_runner_(task_runner),
210       mode_(mode),
211       exit_when_done_(exit_when_done),
212       socket_delegate_(this),
213       weak_factory_(this),
214       unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)) {
215   CheckDataSourceCpuTask();
216   CheckDataSourceMemoryTask();
217 }
218 
219 HeapprofdProducer::~HeapprofdProducer() = default;
220 
SetTargetProcess(pid_t target_pid,std::string target_cmdline)221 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
222                                          std::string target_cmdline) {
223   target_process_.pid = target_pid;
224   target_process_.cmdline = target_cmdline;
225 }
226 
SetDataSourceCallback(std::function<void ()> fn)227 void HeapprofdProducer::SetDataSourceCallback(std::function<void()> fn) {
228   data_source_callback_ = fn;
229 }
230 
AdoptSocket(base::ScopedFile fd)231 void HeapprofdProducer::AdoptSocket(base::ScopedFile fd) {
232   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
233   auto socket = base::UnixSocket::AdoptConnected(
234       std::move(fd), &socket_delegate_, task_runner_, base::SockFamily::kUnix,
235       base::SockType::kStream);
236 
237   HandleClientConnection(std::move(socket), target_process_);
238 }
239 
OnConnect()240 void HeapprofdProducer::OnConnect() {
241   PERFETTO_DCHECK(state_ == kConnecting);
242   state_ = kConnected;
243   ResetConnectionBackoff();
244   PERFETTO_LOG("Connected to the service, mode [%s].",
245                mode_ == HeapprofdMode::kCentral ? "central" : "child");
246 
247   DataSourceDescriptor desc;
248   desc.set_name(kHeapprofdDataSource);
249   desc.set_will_notify_on_stop(true);
250   endpoint_->RegisterDataSource(desc);
251 }
252 
OnDisconnect()253 void HeapprofdProducer::OnDisconnect() {
254   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
255   PERFETTO_LOG("Disconnected from tracing service");
256 
257   // Do not attempt to reconnect if we're a process-private process, just quit.
258   if (exit_when_done_) {
259     TerminateProcess(/*exit_status=*/1);  // does not return
260   }
261 
262   // Central mode - attempt to reconnect.
263   auto weak_producer = weak_factory_.GetWeakPtr();
264   if (state_ == kConnected)
265     return task_runner_->PostTask([weak_producer] {
266       if (!weak_producer)
267         return;
268       weak_producer->Restart();
269     });
270 
271   state_ = kNotConnected;
272   IncreaseConnectionBackoff();
273   task_runner_->PostDelayedTask(
274       [weak_producer] {
275         if (!weak_producer)
276           return;
277         weak_producer->ConnectService();
278       },
279       connection_backoff_ms_);
280 }
281 
ConnectWithRetries(const char * socket_name)282 void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
283   PERFETTO_DCHECK(state_ == kNotStarted);
284   state_ = kNotConnected;
285 
286   ResetConnectionBackoff();
287   producer_sock_name_ = socket_name;
288   ConnectService();
289 }
290 
ConnectService()291 void HeapprofdProducer::ConnectService() {
292   SetProducerEndpoint(ProducerIPCClient::Connect(
293       producer_sock_name_, this, "android.heapprofd", task_runner_));
294 }
295 
SetProducerEndpoint(std::unique_ptr<TracingService::ProducerEndpoint> endpoint)296 void HeapprofdProducer::SetProducerEndpoint(
297     std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
298   PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
299   state_ = kConnecting;
300   endpoint_ = std::move(endpoint);
301 }
302 
IncreaseConnectionBackoff()303 void HeapprofdProducer::IncreaseConnectionBackoff() {
304   connection_backoff_ms_ *= 2;
305   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
306     connection_backoff_ms_ = kMaxConnectionBackoffMs;
307 }
308 
ResetConnectionBackoff()309 void HeapprofdProducer::ResetConnectionBackoff() {
310   connection_backoff_ms_ = kInitialConnectionBackoffMs;
311 }
312 
Restart()313 void HeapprofdProducer::Restart() {
314   // We lost the connection with the tracing service. At this point we need
315   // to reset all the data sources. Trying to handle that manually is going to
316   // be error prone. What we do here is simply destroy the instance and
317   // recreate it again.
318 
319   // Oneshot producer should not attempt restarts.
320   if (exit_when_done_)
321     PERFETTO_FATAL("Attempting to restart a one shot producer.");
322 
323   HeapprofdMode mode = mode_;
324   base::TaskRunner* task_runner = task_runner_;
325   const char* socket_name = producer_sock_name_;
326   const bool exit_when_done = exit_when_done_;
327 
328   // Invoke destructor and then the constructor again.
329   this->~HeapprofdProducer();
330   new (this) HeapprofdProducer(mode, task_runner, exit_when_done);
331 
332   ConnectWithRetries(socket_name);
333 }
334 
335 // TODO(rsavitski): would be cleaner to shut down the event loop instead
336 // (letting main exit). One test-friendly approach is to supply a shutdown
337 // callback in the constructor.
TerminateProcess(int exit_status)338 __attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
339     int exit_status) {
340   PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
341   PERFETTO_LOG("Shutting down child heapprofd (status %d).", exit_status);
342   exit(exit_status);
343 }
344 
OnTracingSetup()345 void HeapprofdProducer::OnTracingSetup() {}
346 
WriteRejectedConcurrentSession(BufferID buffer_id,pid_t pid)347 void HeapprofdProducer::WriteRejectedConcurrentSession(BufferID buffer_id,
348                                                        pid_t pid) {
349   auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
350   auto trace_packet = trace_writer->NewTracePacket();
351   trace_packet->set_timestamp(
352       static_cast<uint64_t>(base::GetBootTimeNs().count()));
353   auto profile_packet = trace_packet->set_profile_packet();
354   auto process_dump = profile_packet->add_process_dumps();
355   process_dump->set_pid(static_cast<uint64_t>(pid));
356   process_dump->set_rejected_concurrent(true);
357   trace_packet->Finalize();
358   trace_writer->Flush();
359 }
360 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & ds_config)361 void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
362                                         const DataSourceConfig& ds_config) {
363   if (ds_config.session_initiator() ==
364       DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM) {
365     PERFETTO_LOG("Setting up datasource: statsd initiator.");
366   } else {
367     PERFETTO_LOG("Setting up datasource: non-statsd initiator.");
368   }
369   if (mode_ == HeapprofdMode::kChild && ds_config.enable_extra_guardrails()) {
370     PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
371     return;
372   }
373 
374   HeapprofdConfig heapprofd_config;
375   heapprofd_config.ParseFromString(ds_config.heapprofd_config_raw());
376 
377   if (heapprofd_config.all() && !heapprofd_config.pid().empty())
378     PERFETTO_ELOG("No point setting all and pid");
379   if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
380     PERFETTO_ELOG("No point setting all and process_cmdline");
381 
382   if (ds_config.name() != kHeapprofdDataSource) {
383     PERFETTO_DLOG("Invalid data source name.");
384     return;
385   }
386 
387   if (data_sources_.find(id) != data_sources_.end()) {
388     PERFETTO_DFATAL_OR_ELOG(
389         "Received duplicated data source instance id: %" PRIu64, id);
390     return;
391   }
392 
393   std::optional<std::vector<std::string>> normalized_cmdlines =
394       NormalizeCmdlines(heapprofd_config.process_cmdline());
395   if (!normalized_cmdlines.has_value()) {
396     PERFETTO_ELOG("Rejecting data source due to invalid cmdline in config.");
397     return;
398   }
399 
400   // Child mode is only interested in the first data source matching the
401   // already-connected process.
402   if (mode_ == HeapprofdMode::kChild) {
403     if (!ConfigTargetsProcess(heapprofd_config, target_process_,
404                               normalized_cmdlines.value())) {
405       PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
406       return;
407     }
408 
409     if (!data_sources_.empty()) {
410       PERFETTO_LOG("Child mode skipping concurrent data source.");
411 
412       // Manually write one ProfilePacket about the rejected session.
413       auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
414       WriteRejectedConcurrentSession(buffer_id, target_process_.pid);
415       return;
416     }
417   }
418 
419   std::optional<uint64_t> start_cputime_sec;
420   if (heapprofd_config.max_heapprofd_cpu_secs() > 0) {
421     start_cputime_sec = GetCputimeSecForCurrentProcess();
422 
423     if (!start_cputime_sec) {
424       PERFETTO_ELOG("Failed to enforce CPU guardrail. Rejecting config.");
425       return;
426     }
427   }
428 
429   auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
430   DataSource data_source(endpoint_->CreateTraceWriter(buffer_id));
431   data_source.id = id;
432   auto& cli_config = data_source.client_configuration;
433   if (!HeapprofdConfigToClientConfiguration(heapprofd_config, &cli_config))
434     return;
435   data_source.config = heapprofd_config;
436   data_source.ds_config = ds_config;
437   data_source.normalized_cmdlines = std::move(normalized_cmdlines.value());
438   data_source.stop_timeout_ms = ds_config.stop_timeout_ms()
439                                     ? ds_config.stop_timeout_ms()
440                                     : 5000 /* kDataSourceStopTimeoutMs */;
441   data_source.guardrail_config.cpu_start_secs = start_cputime_sec;
442   data_source.guardrail_config.memory_guardrail_kb =
443       heapprofd_config.max_heapprofd_memory_kb();
444   data_source.guardrail_config.cpu_guardrail_sec =
445       heapprofd_config.max_heapprofd_cpu_secs();
446 
447   InterningOutputTracker::WriteFixedInterningsPacket(
448       data_source.trace_writer.get(),
449       protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
450   data_sources_.emplace(id, std::move(data_source));
451   PERFETTO_DLOG("Set up data source.");
452 
453   if (mode_ == HeapprofdMode::kChild && data_source_callback_)
454     (*data_source_callback_)();
455 }
456 
IsPidProfiled(pid_t pid)457 bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
458   return std::any_of(
459       data_sources_.cbegin(), data_sources_.cend(),
460       [pid](const std::pair<const DataSourceInstanceID, DataSource>& p) {
461         const DataSource& ds = p.second;
462         return ds.process_states.count(pid) > 0;
463       });
464 }
465 
SetStartupProperties(DataSource * data_source)466 void HeapprofdProducer::SetStartupProperties(DataSource* data_source) {
467   const HeapprofdConfig& heapprofd_config = data_source->config;
468   if (heapprofd_config.all())
469     data_source->properties.emplace_back(properties_.SetAll());
470 
471   for (std::string cmdline : data_source->normalized_cmdlines)
472     data_source->properties.emplace_back(
473         properties_.SetProperty(std::move(cmdline)));
474 }
475 
SignalRunningProcesses(DataSource * data_source)476 void HeapprofdProducer::SignalRunningProcesses(DataSource* data_source) {
477   const HeapprofdConfig& heapprofd_config = data_source->config;
478 
479   std::set<pid_t> pids;
480   if (heapprofd_config.all())
481     FindAllProfilablePids(&pids);
482   for (uint64_t pid : heapprofd_config.pid())
483     pids.emplace(static_cast<pid_t>(pid));
484 
485   if (!data_source->normalized_cmdlines.empty())
486     FindPidsForCmdlines(data_source->normalized_cmdlines, &pids);
487 
488   if (heapprofd_config.min_anonymous_memory_kb() > 0)
489     RemoveUnderAnonThreshold(heapprofd_config.min_anonymous_memory_kb(), &pids);
490 
491   for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
492     pid_t pid = *pid_it;
493     if (IsPidProfiled(pid)) {
494       PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
495                    static_cast<intmax_t>(pid));
496       data_source->rejected_pids.emplace(pid);
497       pid_it = pids.erase(pid_it);
498       continue;
499     }
500 
501     PERFETTO_DLOG("Sending signal: %d (si_value: %d) to pid: %d",
502                   kProfilingSignal, kHeapprofdSignalValue, pid);
503     union sigval signal_value;
504     signal_value.sival_int = kHeapprofdSignalValue;
505     if (sigqueue(pid, kProfilingSignal, signal_value) != 0) {
506       PERFETTO_DPLOG("sigqueue");
507     }
508     ++pid_it;
509   }
510   data_source->signaled_pids = std::move(pids);
511 }
512 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)513 void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
514                                         const DataSourceConfig&) {
515   PERFETTO_DLOG("Starting data source %" PRIu64, id);
516 
517   auto it = data_sources_.find(id);
518   if (it == data_sources_.end()) {
519     // This is expected in child heapprofd, where we reject uninteresting data
520     // sources in SetupDataSource.
521     if (mode_ == HeapprofdMode::kCentral) {
522       PERFETTO_DFATAL_OR_ELOG(
523           "Received invalid data source instance to start: %" PRIu64, id);
524     }
525     return;
526   }
527 
528   DataSource& data_source = it->second;
529   if (data_source.started) {
530     PERFETTO_DFATAL_OR_ELOG(
531         "Trying to start already started data-source: %" PRIu64, id);
532     return;
533   }
534   const HeapprofdConfig& heapprofd_config = data_source.config;
535 
536   // Central daemon - set system properties for any targets that start later,
537   // and signal already-running targets to start the profiling client.
538   if (mode_ == HeapprofdMode::kCentral) {
539     if (!heapprofd_config.no_startup())
540       SetStartupProperties(&data_source);
541     if (!heapprofd_config.no_running())
542       SignalRunningProcesses(&data_source);
543   }
544 
545   const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
546   uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
547   if (dump_interval) {
548     data_source.dump_interval_ms = dump_interval;
549     auto weak_producer = weak_factory_.GetWeakPtr();
550     task_runner_->PostDelayedTask(
551         [weak_producer, id] {
552           if (!weak_producer)
553             return;
554           weak_producer->DoDrainAndContinuousDump(id);
555         },
556         continuous_dump_config.dump_phase_ms());
557   }
558   data_source.started = true;
559   PERFETTO_DLOG("Started DataSource");
560 }
561 
UnwinderForPID(pid_t pid)562 UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
563   return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
564 }
565 
StopDataSource(DataSourceInstanceID id)566 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
567   auto it = data_sources_.find(id);
568   if (it == data_sources_.end()) {
569     endpoint_->NotifyDataSourceStopped(id);
570     if (mode_ == HeapprofdMode::kCentral)
571       PERFETTO_DFATAL_OR_ELOG(
572           "Trying to stop non existing data source: %" PRIu64, id);
573     return;
574   }
575 
576   PERFETTO_LOG("Stopping data source %" PRIu64, id);
577 
578   DataSource& data_source = it->second;
579   data_source.was_stopped = true;
580   ShutdownDataSource(&data_source);
581 }
582 
ShutdownDataSource(DataSource * data_source)583 void HeapprofdProducer::ShutdownDataSource(DataSource* data_source) {
584   data_source->shutting_down = true;
585   // If no processes connected, or all of them have already disconnected
586   // (and have been dumped) and no PIDs have been rejected,
587   // MaybeFinishDataSource can tear down the data source.
588   if (MaybeFinishDataSource(data_source))
589     return;
590 
591   if (!data_source->rejected_pids.empty()) {
592     auto trace_packet = data_source->trace_writer->NewTracePacket();
593     ProfilePacket* profile_packet = trace_packet->set_profile_packet();
594     for (pid_t rejected_pid : data_source->rejected_pids) {
595       ProfilePacket::ProcessHeapSamples* proto =
596           profile_packet->add_process_dumps();
597       proto->set_pid(static_cast<uint64_t>(rejected_pid));
598       proto->set_rejected_concurrent(true);
599     }
600     trace_packet->Finalize();
601     data_source->rejected_pids.clear();
602     if (MaybeFinishDataSource(data_source))
603       return;
604   }
605 
606   for (const auto& pid_and_process_state : data_source->process_states) {
607     pid_t pid = pid_and_process_state.first;
608     UnwinderForPID(pid).PostDisconnectSocket(pid);
609   }
610 
611   auto id = data_source->id;
612   auto weak_producer = weak_factory_.GetWeakPtr();
613   task_runner_->PostDelayedTask(
614       [weak_producer, id] {
615         if (!weak_producer)
616           return;
617         auto ds_it = weak_producer->data_sources_.find(id);
618         if (ds_it != weak_producer->data_sources_.end()) {
619           PERFETTO_ELOG("Final dump timed out.");
620           DataSource& ds = ds_it->second;
621 
622           for (const auto& pid_and_process_state : ds.process_states) {
623             pid_t pid = pid_and_process_state.first;
624             weak_producer->UnwinderForPID(pid).PostPurgeProcess(pid);
625           }
626           // Do not dump any stragglers, just trigger the Flush and tear down
627           // the data source.
628           ds.process_states.clear();
629           ds.rejected_pids.clear();
630           PERFETTO_CHECK(weak_producer->MaybeFinishDataSource(&ds));
631         }
632       },
633       data_source->stop_timeout_ms);
634 }
635 
DoDrainAndContinuousDump(DataSourceInstanceID id)636 void HeapprofdProducer::DoDrainAndContinuousDump(DataSourceInstanceID id) {
637   auto it = data_sources_.find(id);
638   if (it == data_sources_.end())
639     return;
640   DataSource& data_source = it->second;
641   PERFETTO_DCHECK(data_source.pending_free_drains == 0);
642 
643   for (auto& [pid, process_state] : data_source.process_states) {
644     UnwinderForPID(pid).PostDrainFree(data_source.id, pid);
645     data_source.pending_free_drains++;
646   }
647 
648   // In case there are no pending free drains, dump immediately.
649   DoContinuousDump(&data_source);
650 }
651 
DoContinuousDump(DataSource * ds)652 void HeapprofdProducer::DoContinuousDump(DataSource* ds) {
653   if (ds->pending_free_drains != 0) {
654     return;
655   }
656 
657   DumpProcessesInDataSource(ds);
658   auto id = ds->id;
659   auto weak_producer = weak_factory_.GetWeakPtr();
660   task_runner_->PostDelayedTask(
661       [weak_producer, id] {
662         if (!weak_producer)
663           return;
664         weak_producer->DoDrainAndContinuousDump(id);
665       },
666       ds->dump_interval_ms);
667 }
668 
PostDrainDone(UnwindingWorker *,DataSourceInstanceID ds_id)669 void HeapprofdProducer::PostDrainDone(UnwindingWorker*,
670                                       DataSourceInstanceID ds_id) {
671   auto weak_this = weak_factory_.GetWeakPtr();
672   task_runner_->PostTask([weak_this, ds_id] {
673     if (weak_this)
674       weak_this->DrainDone(ds_id);
675   });
676 }
677 
DrainDone(DataSourceInstanceID ds_id)678 void HeapprofdProducer::DrainDone(DataSourceInstanceID ds_id) {
679   auto it = data_sources_.find(ds_id);
680   if (it == data_sources_.end()) {
681     return;
682   }
683   DataSource& data_source = it->second;
684   data_source.pending_free_drains--;
685   DoContinuousDump(&data_source);
686 }
687 
688 // static
SetStats(protos::pbzero::ProfilePacket::ProcessStats * stats,const ProcessState & process_state)689 void HeapprofdProducer::SetStats(
690     protos::pbzero::ProfilePacket::ProcessStats* stats,
691     const ProcessState& process_state) {
692   stats->set_unwinding_errors(process_state.unwinding_errors);
693   stats->set_heap_samples(process_state.heap_samples);
694   stats->set_map_reparses(process_state.map_reparses);
695   stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
696   stats->set_client_spinlock_blocked_us(
697       process_state.client_spinlock_blocked_us);
698   auto* unwinding_hist = stats->set_unwinding_time_us();
699   for (const auto& p : process_state.unwinding_time_us.GetData()) {
700     auto* bucket = unwinding_hist->add_buckets();
701     if (p.first == LogHistogram::kMaxBucket)
702       bucket->set_max_bucket(true);
703     else
704       bucket->set_upper_limit(p.first);
705     bucket->set_count(p.second);
706   }
707 }
708 
DumpProcessState(DataSource * data_source,pid_t pid,ProcessState * process_state)709 void HeapprofdProducer::DumpProcessState(DataSource* data_source,
710                                          pid_t pid,
711                                          ProcessState* process_state) {
712   for (auto& heap_id_and_heap_info : process_state->heap_infos) {
713     ProcessState::HeapInfo& heap_info = heap_id_and_heap_info.second;
714 
715     bool from_startup = data_source->signaled_pids.find(pid) ==
716                         data_source->signaled_pids.cend();
717 
718     auto new_heapsamples = [pid, from_startup, process_state, data_source,
719                             &heap_info](
720                                ProfilePacket::ProcessHeapSamples* proto) {
721       proto->set_pid(static_cast<uint64_t>(pid));
722       proto->set_timestamp(heap_info.heap_tracker.dump_timestamp());
723       proto->set_from_startup(from_startup);
724       proto->set_disconnected(process_state->disconnected);
725       proto->set_buffer_overran(process_state->error_state ==
726                                 SharedRingBuffer::kHitTimeout);
727       proto->set_client_error(ErrorStateToProto(process_state->error_state));
728       proto->set_buffer_corrupted(process_state->buffer_corrupted);
729       proto->set_hit_guardrail(data_source->hit_guardrail);
730       if (!heap_info.heap_name.empty())
731         proto->set_heap_name(heap_info.heap_name.c_str());
732       proto->set_sampling_interval_bytes(heap_info.sampling_interval);
733       proto->set_orig_sampling_interval_bytes(heap_info.orig_sampling_interval);
734       auto* stats = proto->set_stats();
735       SetStats(stats, *process_state);
736     };
737 
738     DumpState dump_state(data_source->trace_writer.get(),
739                          std::move(new_heapsamples),
740                          &data_source->intern_state);
741 
742     heap_info.heap_tracker.GetCallstackAllocations(
743         [&dump_state,
744          &data_source](const HeapTracker::CallstackAllocations& alloc) {
745           dump_state.WriteAllocation(alloc, data_source->config.dump_at_max());
746         });
747     dump_state.DumpCallstacks(&callsites_);
748   }
749 }
750 
DumpProcessesInDataSource(DataSource * ds)751 void HeapprofdProducer::DumpProcessesInDataSource(DataSource* ds) {
752   for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
753        ds->process_states) {
754     pid_t pid = pid_and_process_state.first;
755     ProcessState& process_state = pid_and_process_state.second;
756     DumpProcessState(ds, pid, &process_state);
757   }
758 }
759 
DumpAll()760 void HeapprofdProducer::DumpAll() {
761   PERFETTO_LOG("Received signal. Dumping all data sources.");
762   for (auto& id_and_data_source : data_sources_)
763     DumpProcessesInDataSource(&id_and_data_source.second);
764 }
765 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * ids,size_t num_ids,FlushFlags)766 void HeapprofdProducer::Flush(FlushRequestID flush_id,
767                               const DataSourceInstanceID* ids,
768                               size_t num_ids,
769                               FlushFlags) {
770   size_t& flush_in_progress = flushes_in_progress_[flush_id];
771   PERFETTO_DCHECK(flush_in_progress == 0);
772   flush_in_progress = num_ids;
773   for (size_t i = 0; i < num_ids; ++i) {
774     auto it = data_sources_.find(ids[i]);
775     if (it == data_sources_.end()) {
776       PERFETTO_DFATAL_OR_ELOG("Trying to flush unknown data-source %" PRIu64,
777                               ids[i]);
778       flush_in_progress--;
779       continue;
780     }
781     DataSource& data_source = it->second;
782     auto weak_producer = weak_factory_.GetWeakPtr();
783 
784     auto callback = [weak_producer, flush_id] {
785       if (weak_producer)
786         // Reposting because this task runner could be on a different thread
787         // than the IPC task runner.
788         return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
789           if (weak_producer)
790             return weak_producer->FinishDataSourceFlush(flush_id);
791         });
792     };
793     data_source.trace_writer->Flush(std::move(callback));
794   }
795   if (flush_in_progress == 0) {
796     endpoint_->NotifyFlushComplete(flush_id);
797     flushes_in_progress_.erase(flush_id);
798   }
799 }
800 
FinishDataSourceFlush(FlushRequestID flush_id)801 void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
802   auto it = flushes_in_progress_.find(flush_id);
803   if (it == flushes_in_progress_.end()) {
804     PERFETTO_DFATAL_OR_ELOG("FinishDataSourceFlush id invalid: %" PRIu64,
805                             flush_id);
806     return;
807   }
808   size_t& flush_in_progress = it->second;
809   if (--flush_in_progress == 0) {
810     endpoint_->NotifyFlushComplete(flush_id);
811     flushes_in_progress_.erase(flush_id);
812   }
813 }
814 
OnDisconnect(base::UnixSocket * self)815 void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
816   auto it = producer_->pending_processes_.find(self->peer_pid_linux());
817   if (it == producer_->pending_processes_.end()) {
818     PERFETTO_DFATAL_OR_ELOG("Unexpected disconnect.");
819     return;
820   }
821 
822   if (self == it->second.sock.get())
823     producer_->pending_processes_.erase(it);
824 }
825 
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> new_connection)826 void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
827     base::UnixSocket*,
828     std::unique_ptr<base::UnixSocket> new_connection) {
829   Process peer_process;
830   peer_process.pid = new_connection->peer_pid_linux();
831   if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
832     PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
833 
834   producer_->HandleClientConnection(std::move(new_connection), peer_process);
835 }
836 
OnDataAvailable(base::UnixSocket * self)837 void HeapprofdProducer::SocketDelegate::OnDataAvailable(
838     base::UnixSocket* self) {
839   auto it = producer_->pending_processes_.find(self->peer_pid_linux());
840   if (it == producer_->pending_processes_.end()) {
841     PERFETTO_DFATAL_OR_ELOG("Unexpected data.");
842     return;
843   }
844 
845   PendingProcess& pending_process = it->second;
846 
847   base::ScopedFile fds[kHandshakeSize];
848   char buf[1];
849   self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
850 
851   static_assert(kHandshakeSize == 2, "change if and else if below.");
852   if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
853     auto ds_it =
854         producer_->data_sources_.find(pending_process.data_source_instance_id);
855     if (ds_it == producer_->data_sources_.end()) {
856       producer_->pending_processes_.erase(it);
857       return;
858     }
859     DataSource& data_source = ds_it->second;
860 
861     if (data_source.shutting_down) {
862       producer_->pending_processes_.erase(it);
863       PERFETTO_LOG("Got handshake for DS that is shutting down. Rejecting.");
864       return;
865     }
866 
867     std::string maps_file =
868         "/proc/" + std::to_string(self->peer_pid_linux()) + "/maps";
869     if (!IsFile(*fds[kHandshakeMaps], maps_file.c_str())) {
870       producer_->pending_processes_.erase(it);
871       PERFETTO_ELOG("Received invalid maps FD.");
872       return;
873     }
874 
875     std::string mem_file =
876         "/proc/" + std::to_string(self->peer_pid_linux()) + "/mem";
877     if (!IsFile(*fds[kHandshakeMem], mem_file.c_str())) {
878       producer_->pending_processes_.erase(it);
879       PERFETTO_ELOG("Received invalid mem FD.");
880       return;
881     }
882 
883     data_source.process_states.emplace(
884         std::piecewise_construct, std::forward_as_tuple(self->peer_pid_linux()),
885         std::forward_as_tuple(&producer_->callsites_,
886                               data_source.config.dump_at_max()));
887 
888     PERFETTO_DLOG("%d: Received FDs.", self->peer_pid_linux());
889     int raw_fd = pending_process.shmem.fd();
890     // TODO(fmayer): Full buffer could deadlock us here.
891     if (!self->Send(&data_source.client_configuration,
892                     sizeof(data_source.client_configuration), &raw_fd, 1)) {
893       // If Send fails, the socket will have been Shutdown, and the raw socket
894       // closed.
895       producer_->pending_processes_.erase(it);
896       return;
897     }
898 
899     UnwindingWorker::HandoffData handoff_data;
900     handoff_data.data_source_instance_id =
901         pending_process.data_source_instance_id;
902     handoff_data.sock = self->ReleaseSocket();
903     handoff_data.maps_fd = std::move(fds[kHandshakeMaps]);
904     handoff_data.mem_fd = std::move(fds[kHandshakeMem]);
905     handoff_data.shmem = std::move(pending_process.shmem);
906     handoff_data.client_config = data_source.client_configuration;
907     handoff_data.stream_allocations = data_source.config.stream_allocations();
908 
909     producer_->UnwinderForPID(self->peer_pid_linux())
910         .PostHandoffSocket(std::move(handoff_data));
911     producer_->pending_processes_.erase(it);
912   } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
913     PERFETTO_DFATAL_OR_ELOG("%d: Received partial FDs.",
914                             self->peer_pid_linux());
915     producer_->pending_processes_.erase(it);
916   } else {
917     PERFETTO_ELOG("%d: Received no FDs.", self->peer_pid_linux());
918   }
919 }
920 
GetDataSourceForProcess(const Process & proc)921 HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
922     const Process& proc) {
923   for (auto& ds_id_and_datasource : data_sources_) {
924     DataSource& ds = ds_id_and_datasource.second;
925     if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
926       return &ds;
927   }
928   return nullptr;
929 }
930 
RecordOtherSourcesAsRejected(DataSource * active_ds,const Process & proc)931 void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
932                                                      const Process& proc) {
933   for (auto& ds_id_and_datasource : data_sources_) {
934     DataSource& ds = ds_id_and_datasource.second;
935     if (&ds != active_ds &&
936         ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
937       ds.rejected_pids.emplace(proc.pid);
938   }
939 }
940 
HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,Process process)941 void HeapprofdProducer::HandleClientConnection(
942     std::unique_ptr<base::UnixSocket> new_connection,
943     Process process) {
944   DataSource* data_source = GetDataSourceForProcess(process);
945   if (!data_source) {
946     PERFETTO_LOG("No data source found.");
947     return;
948   }
949   RecordOtherSourcesAsRejected(data_source, process);
950 
951   // In fork mode, right now we check whether the target is not profileable
952   // in the client, because we cannot read packages.list there.
953   if (mode_ == HeapprofdMode::kCentral &&
954       !CanProfile(data_source->ds_config, new_connection->peer_uid_posix(),
955                   data_source->config.target_installed_by())) {
956     PERFETTO_ELOG("%d (%s) is not profileable.", process.pid,
957                   process.cmdline.c_str());
958     return;
959   }
960 
961   uint64_t shmem_size = data_source->config.shmem_size_bytes();
962   if (!shmem_size)
963     shmem_size = kDefaultShmemSize;
964   if (shmem_size > kMaxShmemSize) {
965     PERFETTO_LOG("Specified shared memory size of %" PRIu64
966                  " exceeds maximum size of %" PRIu64 ". Reducing.",
967                  shmem_size, kMaxShmemSize);
968     shmem_size = kMaxShmemSize;
969   }
970 
971   auto shmem = SharedRingBuffer::Create(static_cast<size_t>(shmem_size));
972   if (!shmem || !shmem->is_valid()) {
973     PERFETTO_LOG("Failed to create shared memory.");
974     return;
975   }
976 
977   pid_t peer_pid = new_connection->peer_pid_linux();
978   if (peer_pid != process.pid) {
979     PERFETTO_DFATAL_OR_ELOG("Invalid PID connected.");
980     return;
981   }
982 
983   PendingProcess pending_process;
984   pending_process.sock = std::move(new_connection);
985   pending_process.data_source_instance_id = data_source->id;
986   pending_process.shmem = std::move(*shmem);
987   pending_processes_.emplace(peer_pid, std::move(pending_process));
988 }
989 
PostAllocRecord(UnwindingWorker * worker,std::unique_ptr<AllocRecord> alloc_rec)990 void HeapprofdProducer::PostAllocRecord(
991     UnwindingWorker* worker,
992     std::unique_ptr<AllocRecord> alloc_rec) {
993   // Once we can use C++14, this should be std::moved into the lambda instead.
994   auto* raw_alloc_rec = alloc_rec.release();
995   auto weak_this = weak_factory_.GetWeakPtr();
996   task_runner_->PostTask([weak_this, raw_alloc_rec, worker] {
997     std::unique_ptr<AllocRecord> unique_alloc_ref =
998         std::unique_ptr<AllocRecord>(raw_alloc_rec);
999     if (weak_this) {
1000       weak_this->HandleAllocRecord(unique_alloc_ref.get());
1001       worker->ReturnAllocRecord(std::move(unique_alloc_ref));
1002     }
1003   });
1004 }
1005 
PostFreeRecord(UnwindingWorker *,std::vector<FreeRecord> free_recs)1006 void HeapprofdProducer::PostFreeRecord(UnwindingWorker*,
1007                                        std::vector<FreeRecord> free_recs) {
1008   // Once we can use C++14, this should be std::moved into the lambda instead.
1009   std::vector<FreeRecord>* raw_free_recs =
1010       new std::vector<FreeRecord>(std::move(free_recs));
1011   auto weak_this = weak_factory_.GetWeakPtr();
1012   task_runner_->PostTask([weak_this, raw_free_recs] {
1013     if (weak_this) {
1014       for (FreeRecord& free_rec : *raw_free_recs)
1015         weak_this->HandleFreeRecord(std::move(free_rec));
1016     }
1017     delete raw_free_recs;
1018   });
1019 }
1020 
PostHeapNameRecord(UnwindingWorker *,HeapNameRecord rec)1021 void HeapprofdProducer::PostHeapNameRecord(UnwindingWorker*,
1022                                            HeapNameRecord rec) {
1023   auto weak_this = weak_factory_.GetWeakPtr();
1024   task_runner_->PostTask([weak_this, rec] {
1025     if (weak_this)
1026       weak_this->HandleHeapNameRecord(rec);
1027   });
1028 }
1029 
PostSocketDisconnected(UnwindingWorker *,DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1030 void HeapprofdProducer::PostSocketDisconnected(UnwindingWorker*,
1031                                                DataSourceInstanceID ds_id,
1032                                                pid_t pid,
1033                                                SharedRingBuffer::Stats stats) {
1034   auto weak_this = weak_factory_.GetWeakPtr();
1035   task_runner_->PostTask([weak_this, ds_id, pid, stats] {
1036     if (weak_this)
1037       weak_this->HandleSocketDisconnected(ds_id, pid, stats);
1038   });
1039 }
1040 
HandleAllocRecord(AllocRecord * alloc_rec)1041 void HeapprofdProducer::HandleAllocRecord(AllocRecord* alloc_rec) {
1042   const AllocMetadata& alloc_metadata = alloc_rec->alloc_metadata;
1043   auto it = data_sources_.find(alloc_rec->data_source_instance_id);
1044   if (it == data_sources_.end()) {
1045     PERFETTO_LOG("Invalid data source in alloc record.");
1046     return;
1047   }
1048 
1049   DataSource& ds = it->second;
1050   auto process_state_it = ds.process_states.find(alloc_rec->pid);
1051   if (process_state_it == ds.process_states.end()) {
1052     PERFETTO_LOG("Invalid PID in alloc record.");
1053     return;
1054   }
1055 
1056   if (ds.config.stream_allocations()) {
1057     auto packet = ds.trace_writer->NewTracePacket();
1058     auto* streaming_alloc = packet->set_streaming_allocation();
1059     streaming_alloc->add_address(alloc_metadata.alloc_address);
1060     streaming_alloc->add_size(alloc_metadata.alloc_size);
1061     streaming_alloc->add_sample_size(alloc_metadata.sample_size);
1062     streaming_alloc->add_clock_monotonic_coarse_timestamp(
1063         alloc_metadata.clock_monotonic_coarse_timestamp);
1064     streaming_alloc->add_heap_id(alloc_metadata.heap_id);
1065     streaming_alloc->add_sequence_number(alloc_metadata.sequence_number);
1066     return;
1067   }
1068 
1069   const auto& prefixes = ds.config.skip_symbol_prefix();
1070   if (!prefixes.empty()) {
1071     for (unwindstack::FrameData& frame_data : alloc_rec->frames) {
1072       if (frame_data.map_info == nullptr) {
1073         continue;
1074       }
1075       const std::string& map = frame_data.map_info->name();
1076       if (std::find_if(prefixes.cbegin(), prefixes.cend(),
1077                        [&map](const std::string& prefix) {
1078                          return base::StartsWith(map, prefix);
1079                        }) != prefixes.cend()) {
1080         frame_data.function_name = "FILTERED";
1081       }
1082     }
1083   }
1084 
1085   ProcessState& process_state = process_state_it->second;
1086   HeapTracker& heap_tracker =
1087       process_state.GetHeapTracker(alloc_rec->alloc_metadata.heap_id);
1088 
1089   if (alloc_rec->error)
1090     process_state.unwinding_errors++;
1091   if (alloc_rec->reparsed_map)
1092     process_state.map_reparses++;
1093   process_state.heap_samples++;
1094   process_state.unwinding_time_us.Add(alloc_rec->unwinding_time_us);
1095   process_state.total_unwinding_time_us += alloc_rec->unwinding_time_us;
1096 
1097   // abspc may no longer refer to the same functions, as we had to reparse
1098   // maps. Reset the cache.
1099   if (alloc_rec->reparsed_map)
1100     heap_tracker.ClearFrameCache();
1101 
1102   heap_tracker.RecordMalloc(
1103       alloc_rec->frames, alloc_rec->build_ids, alloc_metadata.alloc_address,
1104       alloc_metadata.sample_size, alloc_metadata.alloc_size,
1105       alloc_metadata.sequence_number,
1106       alloc_metadata.clock_monotonic_coarse_timestamp);
1107 }
1108 
HandleFreeRecord(FreeRecord free_rec)1109 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
1110   auto it = data_sources_.find(free_rec.data_source_instance_id);
1111   if (it == data_sources_.end()) {
1112     PERFETTO_LOG("Invalid data source in free record.");
1113     return;
1114   }
1115 
1116   DataSource& ds = it->second;
1117   auto process_state_it = ds.process_states.find(free_rec.pid);
1118   if (process_state_it == ds.process_states.end()) {
1119     PERFETTO_LOG("Invalid PID in free record.");
1120     return;
1121   }
1122 
1123   if (ds.config.stream_allocations()) {
1124     auto packet = ds.trace_writer->NewTracePacket();
1125     auto* streaming_free = packet->set_streaming_free();
1126     streaming_free->add_address(free_rec.entry.addr);
1127     streaming_free->add_heap_id(free_rec.entry.heap_id);
1128     streaming_free->add_sequence_number(free_rec.entry.sequence_number);
1129     return;
1130   }
1131 
1132   ProcessState& process_state = process_state_it->second;
1133 
1134   const FreeEntry& entry = free_rec.entry;
1135   HeapTracker& heap_tracker = process_state.GetHeapTracker(entry.heap_id);
1136   heap_tracker.RecordFree(entry.addr, entry.sequence_number, 0);
1137 }
1138 
HandleHeapNameRecord(HeapNameRecord rec)1139 void HeapprofdProducer::HandleHeapNameRecord(HeapNameRecord rec) {
1140   auto it = data_sources_.find(rec.data_source_instance_id);
1141   if (it == data_sources_.end()) {
1142     PERFETTO_LOG("Invalid data source in free record.");
1143     return;
1144   }
1145 
1146   DataSource& ds = it->second;
1147   auto process_state_it = ds.process_states.find(rec.pid);
1148   if (process_state_it == ds.process_states.end()) {
1149     PERFETTO_LOG("Invalid PID in free record.");
1150     return;
1151   }
1152 
1153   ProcessState& process_state = process_state_it->second;
1154   const HeapName& entry = rec.entry;
1155   if (entry.heap_name[0] != '\0') {
1156     std::string heap_name = entry.heap_name;
1157     if (entry.heap_id == 0) {
1158       PERFETTO_ELOG("Invalid zero heap ID.");
1159       return;
1160     }
1161     ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1162     if (!hi.heap_name.empty() && hi.heap_name != heap_name) {
1163       PERFETTO_ELOG("Overriding heap name %s with %s", hi.heap_name.c_str(),
1164                     heap_name.c_str());
1165     }
1166     hi.heap_name = entry.heap_name;
1167   }
1168   if (entry.sample_interval != 0) {
1169     ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1170     if (!hi.sampling_interval)
1171       hi.orig_sampling_interval = entry.sample_interval;
1172     hi.sampling_interval = entry.sample_interval;
1173   }
1174 }
1175 
TerminateWhenDone()1176 void HeapprofdProducer::TerminateWhenDone() {
1177   if (data_sources_.empty())
1178     TerminateProcess(0);
1179   exit_when_done_ = true;
1180 }
1181 
MaybeFinishDataSource(DataSource * ds)1182 bool HeapprofdProducer::MaybeFinishDataSource(DataSource* ds) {
1183   if (!ds->process_states.empty() || !ds->rejected_pids.empty() ||
1184       !ds->shutting_down) {
1185     return false;
1186   }
1187 
1188   bool was_stopped = ds->was_stopped;
1189   DataSourceInstanceID ds_id = ds->id;
1190   auto weak_producer = weak_factory_.GetWeakPtr();
1191   bool exit_when_done = exit_when_done_;
1192   ds->trace_writer->Flush([weak_producer, exit_when_done, ds_id, was_stopped] {
1193     if (!weak_producer)
1194       return;
1195 
1196     if (was_stopped)
1197       weak_producer->endpoint_->NotifyDataSourceStopped(ds_id);
1198     weak_producer->data_sources_.erase(ds_id);
1199 
1200     if (exit_when_done) {
1201       // Post this as a task to allow NotifyDataSourceStopped to post tasks.
1202       weak_producer->task_runner_->PostTask([weak_producer] {
1203         if (!weak_producer)
1204           return;
1205         weak_producer->TerminateProcess(
1206             /*exit_status=*/0);  // does not return
1207       });
1208     }
1209   });
1210   return true;
1211 }
1212 
HandleSocketDisconnected(DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)1213 void HeapprofdProducer::HandleSocketDisconnected(
1214     DataSourceInstanceID ds_id,
1215     pid_t pid,
1216     SharedRingBuffer::Stats stats) {
1217   auto it = data_sources_.find(ds_id);
1218   if (it == data_sources_.end())
1219     return;
1220   DataSource& ds = it->second;
1221 
1222   auto process_state_it = ds.process_states.find(pid);
1223   if (process_state_it == ds.process_states.end()) {
1224     PERFETTO_ELOG("Unexpected disconnect from %d", pid);
1225     return;
1226   }
1227 
1228   PERFETTO_LOG("%d disconnected from heapprofd (ds shutting down: %d).", pid,
1229                ds.shutting_down);
1230 
1231   ProcessState& process_state = process_state_it->second;
1232   process_state.disconnected = !ds.shutting_down;
1233   process_state.error_state = stats.error_state;
1234   process_state.client_spinlock_blocked_us = stats.client_spinlock_blocked_us;
1235   process_state.buffer_corrupted =
1236       stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
1237 
1238   DumpProcessState(&ds, pid, &process_state);
1239   ds.process_states.erase(pid);
1240   MaybeFinishDataSource(&ds);
1241 }
1242 
CheckDataSourceCpuTask()1243 void HeapprofdProducer::CheckDataSourceCpuTask() {
1244   auto weak_producer = weak_factory_.GetWeakPtr();
1245   task_runner_->PostDelayedTask(
1246       [weak_producer] {
1247         if (!weak_producer)
1248           return;
1249         weak_producer->CheckDataSourceCpuTask();
1250       },
1251       kGuardrailIntervalMs);
1252 
1253   ProfilerCpuGuardrails gr;
1254   for (auto& p : data_sources_) {
1255     DataSource& ds = p.second;
1256     if (gr.IsOverCpuThreshold(ds.guardrail_config)) {
1257       ds.hit_guardrail = true;
1258       PERFETTO_LOG("Data source %" PRIu64 " hit CPU guardrail. Shutting down.",
1259                    ds.id);
1260       ShutdownDataSource(&ds);
1261     }
1262   }
1263 }
1264 
CheckDataSourceMemoryTask()1265 void HeapprofdProducer::CheckDataSourceMemoryTask() {
1266   auto weak_producer = weak_factory_.GetWeakPtr();
1267   task_runner_->PostDelayedTask(
1268       [weak_producer] {
1269         if (!weak_producer)
1270           return;
1271         weak_producer->CheckDataSourceMemoryTask();
1272       },
1273       kGuardrailIntervalMs);
1274   ProfilerMemoryGuardrails gr;
1275   for (auto& p : data_sources_) {
1276     DataSource& ds = p.second;
1277     if (gr.IsOverMemoryThreshold(ds.guardrail_config)) {
1278       ds.hit_guardrail = true;
1279       PERFETTO_LOG("Data source %" PRIu64
1280                    " hit memory guardrail. Shutting down.",
1281                    ds.id);
1282       ShutdownDataSource(&ds);
1283     }
1284   }
1285 }
1286 
1287 }  // namespace profiling
1288 }  // namespace perfetto
1289