• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/memory/heapprofd_producer.h"
18 
19 #include <inttypes.h>
20 #include <signal.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24 
25 #include "perfetto/base/file_utils.h"
26 #include "perfetto/base/string_utils.h"
27 #include "perfetto/base/thread_task_runner.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 #include "perfetto/tracing/core/trace_writer.h"
31 #include "perfetto/tracing/ipc/producer_ipc_client.h"
32 
33 namespace perfetto {
34 namespace profiling {
35 namespace {
36 using ::perfetto::protos::pbzero::ProfilePacket;
37 
38 constexpr char kHeapprofdDataSource[] = "android.heapprofd";
39 constexpr size_t kUnwinderThreads = 5;
40 constexpr int kHeapprofdSignal = 36;
41 
42 constexpr uint32_t kInitialConnectionBackoffMs = 100;
43 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
44 
45 constexpr uint32_t kChildModeWatchdogPeriodMs = 10 * 1000;
46 
47 constexpr uint64_t kDefaultShmemSize = 8 * 1048576;  // ~8 MB
48 constexpr uint64_t kMaxShmemSize = 500 * 1048576;    // ~500 MB
49 
MakeClientConfiguration(const DataSourceConfig & cfg)50 ClientConfiguration MakeClientConfiguration(const DataSourceConfig& cfg) {
51   ClientConfiguration client_config;
52   client_config.interval = cfg.heapprofd_config().sampling_interval_bytes();
53   client_config.block_client = cfg.heapprofd_config().block_client();
54   return client_config;
55 }
56 
MakeUnwindingWorkers(HeapprofdProducer * delegate,size_t n)57 std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
58                                                   size_t n) {
59   std::vector<UnwindingWorker> ret;
60   for (size_t i = 0; i < n; ++i) {
61     ret.emplace_back(delegate, base::ThreadTaskRunner::CreateAndStart());
62   }
63   return ret;
64 }
65 
ConfigTargetsProcess(const HeapprofdConfig & cfg,const Process & proc,const std::vector<std::string> & normalized_cmdlines)66 bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
67                           const Process& proc,
68                           const std::vector<std::string>& normalized_cmdlines) {
69   if (cfg.all())
70     return true;
71 
72   const auto& pids = cfg.pid();
73   if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
74       pids.cend()) {
75     return true;
76   }
77 
78   if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
79                 proc.cmdline) != normalized_cmdlines.cend()) {
80     return true;
81   }
82   return false;
83 }
84 
85 // Return largest n such that pow(2, n) < value.
Log2LessThan(uint64_t value)86 size_t Log2LessThan(uint64_t value) {
87   size_t i = 0;
88   while (value) {
89     i++;
90     value >>= 1;
91   }
92   return i;
93 }
94 
95 }  // namespace
96 
97 const uint64_t LogHistogram::kMaxBucket = 0;
98 
GetData()99 std::vector<std::pair<uint64_t, uint64_t>> LogHistogram::GetData() {
100   std::vector<std::pair<uint64_t, uint64_t>> data;
101   data.reserve(kBuckets);
102   for (size_t i = 0; i < kBuckets; ++i) {
103     if (i == kBuckets - 1)
104       data.emplace_back(kMaxBucket, values_[i]);
105     else
106       data.emplace_back(1 << i, values_[i]);
107   }
108   return data;
109 }
110 
GetBucket(uint64_t value)111 size_t LogHistogram::GetBucket(uint64_t value) {
112   if (value == 0)
113     return 0;
114 
115   size_t hibit = Log2LessThan(value);
116   if (hibit >= kBuckets)
117     return kBuckets - 1;
118   return hibit;
119 }
120 
121 // We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
122 // thread.
HeapprofdProducer(HeapprofdMode mode,base::TaskRunner * task_runner)123 HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
124                                      base::TaskRunner* task_runner)
125     : task_runner_(task_runner),
126       mode_(mode),
127       unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)),
128       socket_delegate_(this),
129       weak_factory_(this) {
130   if (mode == HeapprofdMode::kCentral) {
131     listening_socket_ = MakeListeningSocket();
132   }
133 }
134 
~HeapprofdProducer()135 HeapprofdProducer::~HeapprofdProducer() {
136   // We only borrowed this from the environment variable.
137   // UnixSocket always owns the socket, so we need to manually release it
138   // here.
139   if (mode_ == HeapprofdMode::kCentral && bool(listening_socket_))
140     listening_socket_->ReleaseSocket().ReleaseFd().release();
141 }
142 
MakeListeningSocket()143 std::unique_ptr<base::UnixSocket> HeapprofdProducer::MakeListeningSocket() {
144   const char* sock_fd = getenv(kHeapprofdSocketEnvVar);
145   if (sock_fd == nullptr) {
146     unlink(kHeapprofdSocketFile);
147     return base::UnixSocket::Listen(kHeapprofdSocketFile, &socket_delegate_,
148                                     task_runner_);
149   }
150   char* end;
151   int raw_fd = static_cast<int>(strtol(sock_fd, &end, 10));
152   if (*end != '\0')
153     PERFETTO_FATAL("Invalid %s. Expected decimal integer.",
154                    kHeapprofdSocketEnvVar);
155   return base::UnixSocket::Listen(base::ScopedFile(raw_fd), &socket_delegate_,
156                                   task_runner_);
157 }
158 
SetTargetProcess(pid_t target_pid,std::string target_cmdline,base::ScopedFile inherited_socket)159 void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
160                                          std::string target_cmdline,
161                                          base::ScopedFile inherited_socket) {
162   target_process_.pid = target_pid;
163   target_process_.cmdline = target_cmdline;
164   inherited_fd_ = std::move(inherited_socket);
165 }
166 
AdoptTargetProcessSocket()167 void HeapprofdProducer::AdoptTargetProcessSocket() {
168   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
169   auto socket = base::UnixSocket::AdoptConnected(
170       std::move(inherited_fd_), &socket_delegate_, task_runner_,
171       base::SockType::kStream);
172 
173   HandleClientConnection(std::move(socket), target_process_);
174 }
175 
OnConnect()176 void HeapprofdProducer::OnConnect() {
177   PERFETTO_DCHECK(state_ == kConnecting);
178   state_ = kConnected;
179   ResetConnectionBackoff();
180   PERFETTO_LOG("Connected to the service, mode [%s].",
181                mode_ == HeapprofdMode::kCentral ? "central" : "child");
182 
183   DataSourceDescriptor desc;
184   desc.set_name(kHeapprofdDataSource);
185   endpoint_->RegisterDataSource(desc);
186 }
187 
OnDisconnect()188 void HeapprofdProducer::OnDisconnect() {
189   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
190   PERFETTO_LOG("Disconnected from tracing service");
191 
192   // Do not attempt to reconnect if we're a process-private process, just quit.
193   if (mode_ == HeapprofdMode::kChild) {
194     TerminateProcess(/*exit_status=*/1);  // does not return
195   }
196 
197   // Central mode - attempt to reconnect.
198   auto weak_producer = weak_factory_.GetWeakPtr();
199   if (state_ == kConnected)
200     return task_runner_->PostTask([weak_producer] {
201       if (!weak_producer)
202         return;
203       weak_producer->Restart();
204     });
205 
206   state_ = kNotConnected;
207   IncreaseConnectionBackoff();
208   task_runner_->PostDelayedTask(
209       [weak_producer] {
210         if (!weak_producer)
211           return;
212         weak_producer->ConnectService();
213       },
214       connection_backoff_ms_);
215 }
216 
ConnectWithRetries(const char * socket_name)217 void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
218   PERFETTO_DCHECK(state_ == kNotStarted);
219   state_ = kNotConnected;
220 
221   ResetConnectionBackoff();
222   producer_sock_name_ = socket_name;
223   ConnectService();
224 }
225 
ConnectService()226 void HeapprofdProducer::ConnectService() {
227   SetProducerEndpoint(ProducerIPCClient::Connect(
228       producer_sock_name_, this, "android.heapprofd", task_runner_));
229 }
230 
SetProducerEndpoint(std::unique_ptr<TracingService::ProducerEndpoint> endpoint)231 void HeapprofdProducer::SetProducerEndpoint(
232     std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
233   PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
234   state_ = kConnecting;
235   endpoint_ = std::move(endpoint);
236 }
237 
IncreaseConnectionBackoff()238 void HeapprofdProducer::IncreaseConnectionBackoff() {
239   connection_backoff_ms_ *= 2;
240   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
241     connection_backoff_ms_ = kMaxConnectionBackoffMs;
242 }
243 
ResetConnectionBackoff()244 void HeapprofdProducer::ResetConnectionBackoff() {
245   connection_backoff_ms_ = kInitialConnectionBackoffMs;
246 }
247 
Restart()248 void HeapprofdProducer::Restart() {
249   // We lost the connection with the tracing service. At this point we need
250   // to reset all the data sources. Trying to handle that manually is going to
251   // be error prone. What we do here is simply destroy the instance and
252   // recreate it again.
253 
254   // Child mode producer should not attempt restarts. Note that this also means
255   // the rest of this method doesn't have to handle child-specific state.
256   if (mode_ == HeapprofdMode::kChild)
257     PERFETTO_FATAL("Attempting to restart a child mode producer.");
258 
259   HeapprofdMode mode = mode_;
260   base::TaskRunner* task_runner = task_runner_;
261   const char* socket_name = producer_sock_name_;
262 
263   // Invoke destructor and then the constructor again.
264   this->~HeapprofdProducer();
265   new (this) HeapprofdProducer(mode, task_runner);
266 
267   ConnectWithRetries(socket_name);
268 }
269 
ScheduleActiveDataSourceWatchdog()270 void HeapprofdProducer::ScheduleActiveDataSourceWatchdog() {
271   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
272 
273   // Post the first check after a delay, to let the freshly forked heapprofd
274   // to receive the active data sources from traced. The checks will reschedule
275   // themselves from that point onwards.
276   auto weak_producer = weak_factory_.GetWeakPtr();
277   task_runner_->PostDelayedTask(
278       [weak_producer]() {
279         if (!weak_producer)
280           return;
281         weak_producer->ActiveDataSourceWatchdogCheck();
282       },
283       kChildModeWatchdogPeriodMs);
284 }
285 
ActiveDataSourceWatchdogCheck()286 void HeapprofdProducer::ActiveDataSourceWatchdogCheck() {
287   PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
288 
289   // Fork mode heapprofd should be working on exactly one data source matching
290   // its target process.
291   if (data_sources_.empty()) {
292     PERFETTO_LOG(
293         "Child heapprofd exiting as it never received a data source for the "
294         "target process, or somehow lost/finished the task without exiting.");
295     TerminateProcess(/*exit_status=*/1);
296   } else {
297     // reschedule check.
298     auto weak_producer = weak_factory_.GetWeakPtr();
299     task_runner_->PostDelayedTask(
300         [weak_producer]() {
301           if (!weak_producer)
302             return;
303           weak_producer->ActiveDataSourceWatchdogCheck();
304         },
305         kChildModeWatchdogPeriodMs);
306   }
307 }
308 
309 // TODO(rsavitski): would be cleaner to shut down the event loop instead
310 // (letting main exit). One test-friendly approach is to supply a shutdown
311 // callback in the constructor.
TerminateProcess(int exit_status)312 __attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
313     int exit_status) {
314   PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
315   exit(exit_status);
316 }
317 
OnTracingSetup()318 void HeapprofdProducer::OnTracingSetup() {}
319 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)320 void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
321                                         const DataSourceConfig& cfg) {
322   PERFETTO_DLOG("Setting up data source.");
323   if (mode_ == HeapprofdMode::kChild && cfg.enable_extra_guardrails()) {
324     PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
325     return;
326   }
327 
328   const HeapprofdConfig& heapprofd_config = cfg.heapprofd_config();
329   if (heapprofd_config.all() && !heapprofd_config.pid().empty())
330     PERFETTO_ELOG("No point setting all and pid");
331   if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
332     PERFETTO_ELOG("No point setting all and process_cmdline");
333 
334   if (cfg.name() != kHeapprofdDataSource) {
335     PERFETTO_DLOG("Invalid data source name.");
336     return;
337   }
338 
339   auto it = data_sources_.find(id);
340   if (it != data_sources_.end()) {
341     PERFETTO_DFATAL("Received duplicated data source instance id: %" PRIu64,
342                     id);
343     return;
344   }
345 
346   std::vector<std::string> normalized_cmdlines =
347       NormalizeCmdlines(heapprofd_config.process_cmdline());
348 
349   // Child mode is only interested in the first data source matching the
350   // already-connected process.
351   if (mode_ == HeapprofdMode::kChild) {
352     if (!ConfigTargetsProcess(heapprofd_config, target_process_,
353                               normalized_cmdlines)) {
354       PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
355       return;
356     }
357 
358     if (!data_sources_.empty()) {
359       PERFETTO_LOG("Child mode skipping concurrent data source.");
360 
361       // Manually write one ProfilePacket about the rejected session.
362       auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
363       auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
364       auto trace_packet = trace_writer->NewTracePacket();
365       trace_packet->set_timestamp(
366           static_cast<uint64_t>(base::GetBootTimeNs().count()));
367       auto profile_packet = trace_packet->set_profile_packet();
368       auto process_dump = profile_packet->add_process_dumps();
369       process_dump->set_pid(static_cast<uint64_t>(target_process_.pid));
370       process_dump->set_rejected_concurrent(true);
371       trace_packet->Finalize();
372       trace_writer->Flush();
373       return;
374     }
375   }
376 
377   DataSource data_source;
378   data_source.id = id;
379   data_source.client_configuration = MakeClientConfiguration(cfg);
380   data_source.config = heapprofd_config;
381   auto buffer_id = static_cast<BufferID>(cfg.target_buffer());
382   data_source.trace_writer = endpoint_->CreateTraceWriter(buffer_id);
383   data_source.normalized_cmdlines = std::move(normalized_cmdlines);
384 
385   data_sources_.emplace(id, std::move(data_source));
386   PERFETTO_DLOG("Set up data source.");
387 
388   if (mode_ == HeapprofdMode::kChild)
389     AdoptTargetProcessSocket();
390 }
391 
IsPidProfiled(pid_t pid)392 bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
393   for (const auto& pair : data_sources_) {
394     const DataSource& ds = pair.second;
395     if (ds.process_states.find(pid) != ds.process_states.cend())
396       return true;
397   }
398   return false;
399 }
400 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)401 void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
402                                         const DataSourceConfig& cfg) {
403   PERFETTO_DLOG("Start DataSource");
404   const HeapprofdConfig& heapprofd_config = cfg.heapprofd_config();
405 
406   auto it = data_sources_.find(id);
407   if (it == data_sources_.end()) {
408     // This is expected in child heapprofd, where we reject uninteresting data
409     // sources in SetupDataSource.
410     if (mode_ == HeapprofdMode::kCentral) {
411       PERFETTO_DFATAL(
412           "Received invalid data source instance to start: %" PRIu64, id);
413     }
414     return;
415   }
416   DataSource& data_source = it->second;
417 
418   // Central daemon - set system properties for any targets that start later,
419   // and signal already-running targets to start the profiling client.
420   if (mode_ == HeapprofdMode::kCentral) {
421     if (heapprofd_config.all())
422       data_source.properties.emplace_back(properties_.SetAll());
423 
424     for (std::string cmdline : data_source.normalized_cmdlines)
425       data_source.properties.emplace_back(
426           properties_.SetProperty(std::move(cmdline)));
427 
428     std::set<pid_t> pids;
429     if (heapprofd_config.all())
430       FindAllProfilablePids(&pids);
431     for (uint64_t pid : heapprofd_config.pid())
432       pids.emplace(static_cast<pid_t>(pid));
433 
434     if (!data_source.normalized_cmdlines.empty())
435       FindPidsForCmdlines(data_source.normalized_cmdlines, &pids);
436 
437     for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
438       pid_t pid = *pid_it;
439       if (IsPidProfiled(pid)) {
440         PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
441                      static_cast<intmax_t>(pid));
442         data_source.rejected_pids.emplace(pid);
443         pid_it = pids.erase(pid_it);
444         continue;
445       }
446 
447       PERFETTO_DLOG("Sending %d to %d", kHeapprofdSignal, pid);
448       if (kill(pid, kHeapprofdSignal) != 0) {
449         PERFETTO_DPLOG("kill");
450       }
451       ++pid_it;
452     }
453     data_source.signaled_pids = std::move(pids);
454   }
455 
456   const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
457   uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
458   if (dump_interval) {
459     auto weak_producer = weak_factory_.GetWeakPtr();
460     task_runner_->PostDelayedTask(
461         [weak_producer, id, dump_interval] {
462           if (!weak_producer)
463             return;
464           weak_producer->DoContinuousDump(id, dump_interval);
465         },
466         continuous_dump_config.dump_phase_ms());
467   }
468   PERFETTO_DLOG("Started DataSource");
469 }
470 
UnwinderForPID(pid_t pid)471 UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
472   return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
473 }
474 
StopDataSource(DataSourceInstanceID id)475 void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
476   auto it = data_sources_.find(id);
477   if (it == data_sources_.end()) {
478     if (mode_ == HeapprofdMode::kCentral)
479       PERFETTO_DFATAL("Trying to stop non existing data source: %" PRIu64, id);
480     return;
481   }
482 
483   DataSource& data_source = it->second;
484   for (const auto& pid_and_process_state : data_source.process_states) {
485     pid_t pid = pid_and_process_state.first;
486     UnwinderForPID(pid).PostDisconnectSocket(pid);
487   }
488 
489   data_sources_.erase(it);
490 
491   if (mode_ == HeapprofdMode::kChild)
492     TerminateProcess(/*exit_status=*/0);  // does not return
493 }
494 
DoContinuousDump(DataSourceInstanceID id,uint32_t dump_interval)495 void HeapprofdProducer::DoContinuousDump(DataSourceInstanceID id,
496                                          uint32_t dump_interval) {
497   if (!Dump(id, 0 /* flush_id */, false /* is_flush */))
498     return;
499   auto weak_producer = weak_factory_.GetWeakPtr();
500   task_runner_->PostDelayedTask(
501       [weak_producer, id, dump_interval] {
502         if (!weak_producer)
503           return;
504         weak_producer->DoContinuousDump(id, dump_interval);
505       },
506       dump_interval);
507 }
508 
Dump(DataSourceInstanceID id,FlushRequestID flush_id,bool has_flush_id)509 bool HeapprofdProducer::Dump(DataSourceInstanceID id,
510                              FlushRequestID flush_id,
511                              bool has_flush_id) {
512   auto it = data_sources_.find(id);
513   if (it == data_sources_.end()) {
514     PERFETTO_LOG(
515         "Data source not found (harmless if using continuous_dump_config).");
516     return false;
517   }
518   DataSource& data_source = it->second;
519 
520   DumpState dump_state(data_source.trace_writer.get(),
521                        &data_source.next_index_);
522 
523   for (pid_t rejected_pid : data_source.rejected_pids) {
524     ProfilePacket::ProcessHeapSamples* proto =
525         dump_state.current_profile_packet->add_process_dumps();
526     proto->set_pid(static_cast<uint64_t>(rejected_pid));
527     proto->set_rejected_concurrent(true);
528   }
529 
530   for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
531        data_source.process_states) {
532     pid_t pid = pid_and_process_state.first;
533     ProcessState& process_state = pid_and_process_state.second;
534     HeapTracker& heap_tracker = process_state.heap_tracker;
535     bool from_startup =
536         data_source.signaled_pids.find(pid) == data_source.signaled_pids.cend();
537     auto new_heapsamples = [pid, from_startup, &process_state](
538                                ProfilePacket::ProcessHeapSamples* proto) {
539       proto->set_pid(static_cast<uint64_t>(pid));
540       proto->set_from_startup(from_startup);
541       proto->set_disconnected(process_state.disconnected);
542       proto->set_buffer_overran(process_state.buffer_overran);
543       proto->set_buffer_corrupted(process_state.buffer_corrupted);
544       auto* stats = proto->set_stats();
545       stats->set_unwinding_errors(process_state.unwinding_errors);
546       stats->set_heap_samples(process_state.heap_samples);
547       stats->set_map_reparses(process_state.map_reparses);
548       stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
549       auto* unwinding_hist = stats->set_unwinding_time_us();
550       for (const auto& p : process_state.unwinding_time_us.GetData()) {
551         auto* bucket = unwinding_hist->add_buckets();
552         if (p.first == LogHistogram::kMaxBucket)
553           bucket->set_max_bucket(true);
554         else
555           bucket->set_upper_limit(p.first);
556         bucket->set_count(p.second);
557       }
558     };
559     heap_tracker.Dump(std::move(new_heapsamples), &dump_state);
560   }
561 
562   for (GlobalCallstackTrie::Node* node : dump_state.callstacks_to_dump) {
563     // There need to be two separate loops over built_callstack because
564     // protozero cannot interleave different messages.
565     auto built_callstack = callsites_.BuildCallstack(node);
566     for (const Interned<Frame>& frame : built_callstack)
567       dump_state.WriteFrame(frame);
568     ProfilePacket::Callstack* callstack =
569         dump_state.current_profile_packet->add_callstacks();
570     callstack->set_id(node->id());
571     for (const Interned<Frame>& frame : built_callstack)
572       callstack->add_frame_ids(frame.id());
573   }
574 
575   dump_state.current_trace_packet->Finalize();
576   if (has_flush_id) {
577     auto weak_producer = weak_factory_.GetWeakPtr();
578     auto callback = [weak_producer, flush_id] {
579       if (weak_producer)
580         return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
581           if (weak_producer)
582             return weak_producer->FinishDataSourceFlush(flush_id);
583         });
584     };
585     data_source.trace_writer->Flush(std::move(callback));
586   }
587   return true;
588 }
589 
DumpAll()590 void HeapprofdProducer::DumpAll() {
591   for (const auto& id_and_data_source : data_sources_) {
592     if (!Dump(id_and_data_source.first, 0 /* flush_id */, false /* is_flush */))
593       PERFETTO_DLOG("Failed to dump %" PRIu64, id_and_data_source.first);
594   }
595 }
596 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * ids,size_t num_ids)597 void HeapprofdProducer::Flush(FlushRequestID flush_id,
598                               const DataSourceInstanceID* ids,
599                               size_t num_ids) {
600   if (num_ids == 0)
601     return;
602 
603   size_t& flush_in_progress = flushes_in_progress_[flush_id];
604   PERFETTO_DCHECK(flush_in_progress == 0);
605   flush_in_progress = num_ids;
606   for (size_t i = 0; i < num_ids; ++i)
607     Dump(ids[i], flush_id, true);
608 }
609 
FinishDataSourceFlush(FlushRequestID flush_id)610 void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
611   auto it = flushes_in_progress_.find(flush_id);
612   if (it == flushes_in_progress_.end()) {
613     PERFETTO_DFATAL("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
614     return;
615   }
616   size_t& flush_in_progress = it->second;
617   if (--flush_in_progress == 0) {
618     endpoint_->NotifyFlushComplete(flush_id);
619     flushes_in_progress_.erase(flush_id);
620   }
621 }
622 
OnDisconnect(base::UnixSocket * self)623 void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
624   auto it = producer_->pending_processes_.find(self->peer_pid());
625   if (it == producer_->pending_processes_.end()) {
626     PERFETTO_DFATAL("Unexpected disconnect.");
627     return;
628   }
629 
630   if (self == it->second.sock.get())
631     producer_->pending_processes_.erase(it);
632 }
633 
OnNewIncomingConnection(base::UnixSocket *,std::unique_ptr<base::UnixSocket> new_connection)634 void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
635     base::UnixSocket*,
636     std::unique_ptr<base::UnixSocket> new_connection) {
637   Process peer_process;
638   peer_process.pid = new_connection->peer_pid();
639   if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
640     PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
641 
642   producer_->HandleClientConnection(std::move(new_connection), peer_process);
643 }
644 
OnDataAvailable(base::UnixSocket * self)645 void HeapprofdProducer::SocketDelegate::OnDataAvailable(
646     base::UnixSocket* self) {
647   auto it = producer_->pending_processes_.find(self->peer_pid());
648   if (it == producer_->pending_processes_.end()) {
649     PERFETTO_DFATAL("Unexpected data.");
650     return;
651   }
652 
653   PendingProcess& pending_process = it->second;
654 
655   base::ScopedFile fds[kHandshakeSize];
656   char buf[1];
657   self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
658 
659   static_assert(kHandshakeSize == 2, "change if below.");
660   if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
661     auto ds_it =
662         producer_->data_sources_.find(pending_process.data_source_instance_id);
663     if (ds_it == producer_->data_sources_.end()) {
664       producer_->pending_processes_.erase(it);
665       return;
666     }
667 
668     DataSource& data_source = ds_it->second;
669     data_source.process_states.emplace(self->peer_pid(),
670                                        &producer_->callsites_);
671 
672     PERFETTO_DLOG("%d: Received FDs.", self->peer_pid());
673     int raw_fd = pending_process.shmem.fd();
674     // TODO(fmayer): Full buffer could deadlock us here.
675     self->Send(&data_source.client_configuration,
676                sizeof(data_source.client_configuration), &raw_fd, 1,
677                base::UnixSocket::BlockingMode::kBlocking);
678 
679     UnwindingWorker::HandoffData handoff_data;
680     handoff_data.data_source_instance_id =
681         pending_process.data_source_instance_id;
682     handoff_data.sock = self->ReleaseSocket();
683     for (size_t i = 0; i < kHandshakeSize; ++i)
684       handoff_data.fds[i] = std::move(fds[i]);
685     handoff_data.shmem = std::move(pending_process.shmem);
686     handoff_data.client_config = data_source.client_configuration;
687 
688     producer_->UnwinderForPID(self->peer_pid())
689         .PostHandoffSocket(std::move(handoff_data));
690     producer_->pending_processes_.erase(it);
691   } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
692     PERFETTO_DFATAL("%d: Received partial FDs.", self->peer_pid());
693     producer_->pending_processes_.erase(it);
694   } else {
695     PERFETTO_DLOG("%d: Received no FDs.", self->peer_pid());
696   }
697 }
698 
GetDataSourceForProcess(const Process & proc)699 HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
700     const Process& proc) {
701   for (auto& ds_id_and_datasource : data_sources_) {
702     DataSource& ds = ds_id_and_datasource.second;
703     if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
704       return &ds;
705   }
706   return nullptr;
707 }
708 
RecordOtherSourcesAsRejected(DataSource * active_ds,const Process & proc)709 void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
710                                                      const Process& proc) {
711   for (auto& ds_id_and_datasource : data_sources_) {
712     DataSource& ds = ds_id_and_datasource.second;
713     if (&ds != active_ds &&
714         ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
715       ds.rejected_pids.emplace(proc.pid);
716   }
717 }
718 
HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,Process process)719 void HeapprofdProducer::HandleClientConnection(
720     std::unique_ptr<base::UnixSocket> new_connection,
721     Process process) {
722   DataSource* data_source = GetDataSourceForProcess(process);
723   if (!data_source) {
724     PERFETTO_LOG("No data source found.");
725     return;
726   }
727   RecordOtherSourcesAsRejected(data_source, process);
728 
729   uint64_t shmem_size = data_source->config.shmem_size_bytes();
730   if (!shmem_size)
731     shmem_size = kDefaultShmemSize;
732   if (shmem_size > kMaxShmemSize)
733     shmem_size = kMaxShmemSize;
734 
735   auto shmem = SharedRingBuffer::Create(shmem_size);
736   if (!shmem || !shmem->is_valid()) {
737     PERFETTO_LOG("Failed to create shared memory.");
738     return;
739   }
740 
741   pid_t peer_pid = new_connection->peer_pid();
742   if (peer_pid != process.pid) {
743     PERFETTO_DFATAL("Invalid PID connected.");
744     return;
745   }
746 
747   PendingProcess pending_process;
748   pending_process.sock = std::move(new_connection);
749   pending_process.data_source_instance_id = data_source->id;
750   pending_process.shmem = std::move(*shmem);
751   pending_processes_.emplace(peer_pid, std::move(pending_process));
752 }
753 
PostAllocRecord(AllocRecord alloc_rec)754 void HeapprofdProducer::PostAllocRecord(AllocRecord alloc_rec) {
755   // Once we can use C++14, this should be std::moved into the lambda instead.
756   AllocRecord* raw_alloc_rec = new AllocRecord(std::move(alloc_rec));
757   auto weak_this = weak_factory_.GetWeakPtr();
758   task_runner_->PostTask([weak_this, raw_alloc_rec] {
759     if (weak_this)
760       weak_this->HandleAllocRecord(std::move(*raw_alloc_rec));
761     delete raw_alloc_rec;
762   });
763 }
764 
PostFreeRecord(FreeRecord free_rec)765 void HeapprofdProducer::PostFreeRecord(FreeRecord free_rec) {
766   // Once we can use C++14, this should be std::moved into the lambda instead.
767   FreeRecord* raw_free_rec = new FreeRecord(std::move(free_rec));
768   auto weak_this = weak_factory_.GetWeakPtr();
769   task_runner_->PostTask([weak_this, raw_free_rec] {
770     if (weak_this)
771       weak_this->HandleFreeRecord(std::move(*raw_free_rec));
772     delete raw_free_rec;
773   });
774 }
775 
PostSocketDisconnected(DataSourceInstanceID ds_id,pid_t pid,SharedRingBuffer::Stats stats)776 void HeapprofdProducer::PostSocketDisconnected(DataSourceInstanceID ds_id,
777                                                pid_t pid,
778                                                SharedRingBuffer::Stats stats) {
779   auto weak_this = weak_factory_.GetWeakPtr();
780   task_runner_->PostTask([weak_this, ds_id, pid, stats] {
781     if (weak_this)
782       weak_this->HandleSocketDisconnected(ds_id, pid, stats);
783   });
784 }
785 
HandleAllocRecord(AllocRecord alloc_rec)786 void HeapprofdProducer::HandleAllocRecord(AllocRecord alloc_rec) {
787   const AllocMetadata& alloc_metadata = alloc_rec.alloc_metadata;
788   auto it = data_sources_.find(alloc_rec.data_source_instance_id);
789   if (it == data_sources_.end()) {
790     PERFETTO_LOG("Invalid data source in alloc record.");
791     return;
792   }
793 
794   DataSource& ds = it->second;
795   auto process_state_it = ds.process_states.find(alloc_rec.pid);
796   if (process_state_it == ds.process_states.end()) {
797     PERFETTO_LOG("Invalid PID in alloc record.");
798     return;
799   }
800 
801   const auto& prefixes = ds.config.skip_symbol_prefix();
802   if (!prefixes.empty()) {
803     for (FrameData& frame_data : alloc_rec.frames) {
804       const std::string& map = frame_data.frame.map_name;
805       if (std::find_if(prefixes.cbegin(), prefixes.cend(),
806                        [&map](const std::string& prefix) {
807                          return base::StartsWith(map, prefix);
808                        }) != prefixes.cend()) {
809         frame_data.frame.function_name = "FILTERED";
810       }
811     }
812   }
813 
814   ProcessState& process_state = process_state_it->second;
815   HeapTracker& heap_tracker = process_state.heap_tracker;
816 
817   if (alloc_rec.error)
818     process_state.unwinding_errors++;
819   if (alloc_rec.reparsed_map)
820     process_state.map_reparses++;
821   process_state.heap_samples++;
822   process_state.unwinding_time_us.Add(alloc_rec.unwinding_time_us);
823   process_state.total_unwinding_time_us += alloc_rec.unwinding_time_us;
824 
825   heap_tracker.RecordMalloc(alloc_rec.frames, alloc_metadata.alloc_address,
826                             alloc_metadata.total_size,
827                             alloc_metadata.sequence_number,
828                             alloc_metadata.clock_monotonic_coarse_timestamp);
829 }
830 
HandleFreeRecord(FreeRecord free_rec)831 void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
832   const FreeBatch& free_batch = free_rec.free_batch;
833   auto it = data_sources_.find(free_rec.data_source_instance_id);
834   if (it == data_sources_.end()) {
835     PERFETTO_LOG("Invalid data source in free record.");
836     return;
837   }
838 
839   DataSource& ds = it->second;
840   auto process_state_it = ds.process_states.find(free_rec.pid);
841   if (process_state_it == ds.process_states.end()) {
842     PERFETTO_LOG("Invalid PID in free record.");
843     return;
844   }
845 
846   ProcessState& process_state = process_state_it->second;
847   HeapTracker& heap_tracker = process_state.heap_tracker;
848 
849   const FreeBatchEntry* entries = free_batch.entries;
850   uint64_t num_entries = free_batch.num_entries;
851   if (num_entries > kFreeBatchSize) {
852     PERFETTO_DFATAL("Malformed free page.");
853     return;
854   }
855   for (size_t i = 0; i < num_entries; ++i) {
856     const FreeBatchEntry& entry = entries[i];
857     heap_tracker.RecordFree(entry.addr, entry.sequence_number,
858                             free_batch.clock_monotonic_coarse_timestamp);
859   }
860 }
861 
HandleSocketDisconnected(DataSourceInstanceID id,pid_t pid,SharedRingBuffer::Stats stats)862 void HeapprofdProducer::HandleSocketDisconnected(
863     DataSourceInstanceID id,
864     pid_t pid,
865     SharedRingBuffer::Stats stats) {
866   auto it = data_sources_.find(id);
867   if (it == data_sources_.end())
868     return;
869   DataSource& ds = it->second;
870 
871   auto process_state_it = ds.process_states.find(pid);
872   if (process_state_it == ds.process_states.end())
873     return;
874   ProcessState& process_state = process_state_it->second;
875   process_state.disconnected = true;
876   process_state.buffer_overran = stats.num_writes_overflow > 0;
877   process_state.buffer_corrupted =
878       stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
879 
880   // TODO(fmayer): Dump on process disconnect rather than data source
881   // destruction. This prevents us needing to hold onto the bookkeeping data
882   // after the process disconnected.
883 }
884 
885 }  // namespace profiling
886 }  // namespace perfetto
887