• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/perf/perf_producer.h"
18 
19 #include <optional>
20 #include <random>
21 #include <utility>
22 #include <vector>
23 
24 #include <unistd.h>
25 
26 #include <unwindstack/Error.h>
27 #include <unwindstack/Unwinder.h>
28 
29 #include "perfetto/base/logging.h"
30 #include "perfetto/base/task_runner.h"
31 #include "perfetto/ext/base/file_utils.h"
32 #include "perfetto/ext/base/metatrace.h"
33 #include "perfetto/ext/base/string_utils.h"
34 #include "perfetto/ext/base/utils.h"
35 #include "perfetto/ext/base/weak_ptr.h"
36 #include "perfetto/ext/tracing/core/basic_types.h"
37 #include "perfetto/ext/tracing/core/producer.h"
38 #include "perfetto/ext/tracing/core/tracing_service.h"
39 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40 #include "perfetto/tracing/core/data_source_config.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "src/profiling/common/callstack_trie.h"
43 #include "src/profiling/common/proc_cmdline.h"
44 #include "src/profiling/common/producer_support.h"
45 #include "src/profiling/common/profiler_guardrails.h"
46 #include "src/profiling/common/unwind_support.h"
47 #include "src/profiling/perf/common_types.h"
48 #include "src/profiling/perf/event_reader.h"
49 
50 #include "protos/perfetto/common/builtin_clock.pbzero.h"
51 #include "protos/perfetto/common/perf_events.gen.h"
52 #include "protos/perfetto/common/perf_events.pbzero.h"
53 #include "protos/perfetto/config/profiling/perf_event_config.gen.h"
54 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
55 #include "protos/perfetto/trace/trace_packet.pbzero.h"
56 #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"
57 
58 namespace perfetto {
59 namespace profiling {
60 namespace {
61 
62 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
63 // window between a process image being replaced by execve, and the new
64 // libc instance reinstalling the proper signal handlers. During this window,
65 // the signal disposition is defaulted to terminating the process.
66 // This is a best-effort mitigation from the daemon's side, using a heuristic
67 // that most execve calls follow a fork. So if we get a sample for a very fresh
68 // process, the grace period will give it a chance to get to
69 // a properly initialised state prior to getting signalled. This doesn't help
70 // cases when a mature process calls execve, or when the target gets descheduled
71 // (since this is a naive walltime wait).
72 // The proper fix is in the platform, see bug for progress.
73 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
74 
75 constexpr uint32_t kMemoryLimitCheckPeriodMs = 1000;
76 
77 constexpr uint32_t kInitialConnectionBackoffMs = 100;
78 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
79 
80 constexpr char kProducerName[] = "perfetto.traced_perf";
81 constexpr char kDataSourceName[] = "linux.perf";
82 
NumberOfCpus()83 size_t NumberOfCpus() {
84   return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
85 }
86 
GetOnlineCpus()87 std::vector<uint32_t> GetOnlineCpus() {
88   size_t cpu_count = NumberOfCpus();
89   if (cpu_count == 0) {
90     return {};
91   }
92 
93   static constexpr char kOnlineValue[] = "1\n";
94   std::vector<uint32_t> online_cpus;
95   online_cpus.reserve(cpu_count);
96   for (uint32_t cpu = 0; cpu < cpu_count; ++cpu) {
97     std::string res;
98     base::StackString<1024> path("/sys/devices/system/cpu/cpu%u/online", cpu);
99     if (!base::ReadFile(path.c_str(), &res)) {
100       // Always consider CPU 0 to be online if the "online" file does not exist
101       // for it. There seem to be several assumptions in the kernel which make
102       // CPU 0 special so this is a pretty safe bet.
103       if (cpu != 0) {
104         return {};
105       }
106       res = kOnlineValue;
107     }
108     if (res != kOnlineValue) {
109       continue;
110     }
111     online_cpus.push_back(cpu);
112   }
113   return online_cpus;
114 }
115 
ToBuiltinClock(int32_t clockid)116 int32_t ToBuiltinClock(int32_t clockid) {
117   switch (clockid) {
118     case CLOCK_REALTIME:
119       return protos::pbzero::BUILTIN_CLOCK_REALTIME;
120     case CLOCK_MONOTONIC:
121       return protos::pbzero::BUILTIN_CLOCK_MONOTONIC;
122     case CLOCK_MONOTONIC_RAW:
123       return protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW;
124     case CLOCK_BOOTTIME:
125       return protos::pbzero::BUILTIN_CLOCK_BOOTTIME;
126     // Should never get invalid input here as otherwise the syscall itself
127     // would've failed earlier.
128     default:
129       return protos::pbzero::BUILTIN_CLOCK_UNKNOWN;
130   }
131 }
132 
StartTracePacket(TraceWriter * trace_writer)133 TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
134   auto packet = trace_writer->NewTracePacket();
135   packet->set_sequence_flags(
136       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
137   return packet;
138 }
139 
WritePerfEventDefaultsPacket(const EventConfig & event_config,TraceWriter * trace_writer)140 void WritePerfEventDefaultsPacket(const EventConfig& event_config,
141                                   TraceWriter* trace_writer) {
142   auto packet = trace_writer->NewTracePacket();
143   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
144   packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);
145 
146   // start new incremental state generation:
147   packet->set_sequence_flags(
148       protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
149 
150   // default packet timestamp clock for the samples:
151   perf_event_attr* perf_attr = event_config.perf_attr();
152   auto* defaults = packet->set_trace_packet_defaults();
153   int32_t builtin_clock = ToBuiltinClock(perf_attr->clockid);
154   defaults->set_timestamp_clock_id(static_cast<uint32_t>(builtin_clock));
155 
156   auto* perf_defaults = defaults->set_perf_sample_defaults();
157   auto* timebase_pb = perf_defaults->set_timebase();
158 
159   // frequency/period:
160   if (perf_attr->freq) {
161     timebase_pb->set_frequency(perf_attr->sample_freq);
162   } else {
163     timebase_pb->set_period(perf_attr->sample_period);
164   }
165 
166   // event:
167   const PerfCounter& timebase = event_config.timebase_event();
168   switch (timebase.event_type()) {
169     case PerfCounter::Type::kBuiltinCounter: {
170       timebase_pb->set_counter(
171           static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
172       break;
173     }
174     case PerfCounter::Type::kTracepoint: {
175       auto* tracepoint_pb = timebase_pb->set_tracepoint();
176       tracepoint_pb->set_name(timebase.tracepoint_name);
177       tracepoint_pb->set_filter(timebase.tracepoint_filter);
178       break;
179     }
180     case PerfCounter::Type::kRawEvent: {
181       auto* raw_pb = timebase_pb->set_raw_event();
182       raw_pb->set_type(timebase.attr_type);
183       raw_pb->set_config(timebase.attr_config);
184       raw_pb->set_config1(timebase.attr_config1);
185       raw_pb->set_config2(timebase.attr_config2);
186       break;
187     }
188   }
189 
190   // optional name to identify the counter during parsing:
191   if (!timebase.name.empty()) {
192     timebase_pb->set_name(timebase.name);
193   }
194 
195   // Not setting timebase.timestamp_clock since the field that matters during
196   // parsing is the root timestamp_clock_id set above.
197 
198   // Record the random shard we've chosen so that the post-processing can infer
199   // which processes would've been unwound if sampled. In particular this lets
200   // us distinguish between "running but not chosen" and "running and chosen,
201   // but not sampled" cases.
202   const auto& process_sharding = event_config.filter().process_sharding;
203   if (process_sharding.has_value()) {
204     perf_defaults->set_process_shard_count(process_sharding->shard_count);
205     perf_defaults->set_chosen_process_shard(process_sharding->chosen_shard);
206   }
207 }
208 
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)209 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
210   // Normally, we'd schedule the next tick at the next |period_ms|
211   // boundary of the boot clock. However, to avoid aligning the read tasks of
212   // all concurrent data sources, we select a deterministic offset based on the
213   // data source id.
214   std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
215   std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
216   uint32_t ds_period_offset = dist(prng);
217 
218   uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
219   return period_ms - ((now_ms - ds_period_offset) % period_ms);
220 }
221 
ToCpuModeEnum(uint16_t perf_cpu_mode)222 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
223   using Profiling = protos::pbzero::Profiling;
224   switch (perf_cpu_mode) {
225     case PERF_RECORD_MISC_KERNEL:
226       return Profiling::MODE_KERNEL;
227     case PERF_RECORD_MISC_USER:
228       return Profiling::MODE_USER;
229     case PERF_RECORD_MISC_HYPERVISOR:
230       return Profiling::MODE_HYPERVISOR;
231     case PERF_RECORD_MISC_GUEST_KERNEL:
232       return Profiling::MODE_GUEST_KERNEL;
233     case PERF_RECORD_MISC_GUEST_USER:
234       return Profiling::MODE_GUEST_USER;
235     default:
236       return Profiling::MODE_UNKNOWN;
237   }
238 }
239 
ToProtoEnum(unwindstack::ErrorCode error_code)240 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
241     unwindstack::ErrorCode error_code) {
242   using Profiling = protos::pbzero::Profiling;
243   switch (error_code) {
244     case unwindstack::ERROR_NONE:
245       return Profiling::UNWIND_ERROR_NONE;
246     case unwindstack::ERROR_MEMORY_INVALID:
247       return Profiling::UNWIND_ERROR_MEMORY_INVALID;
248     case unwindstack::ERROR_UNWIND_INFO:
249       return Profiling::UNWIND_ERROR_UNWIND_INFO;
250     case unwindstack::ERROR_UNSUPPORTED:
251       return Profiling::UNWIND_ERROR_UNSUPPORTED;
252     case unwindstack::ERROR_INVALID_MAP:
253       return Profiling::UNWIND_ERROR_INVALID_MAP;
254     case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
255       return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
256     case unwindstack::ERROR_REPEATED_FRAME:
257       return Profiling::UNWIND_ERROR_REPEATED_FRAME;
258     case unwindstack::ERROR_INVALID_ELF:
259       return Profiling::UNWIND_ERROR_INVALID_ELF;
260     case unwindstack::ERROR_SYSTEM_CALL:
261       return Profiling::UNWIND_ERROR_SYSTEM_CALL;
262     case unwindstack::ERROR_THREAD_TIMEOUT:
263       return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
264     case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
265       return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
266     case unwindstack::ERROR_BAD_ARCH:
267       return Profiling::UNWIND_ERROR_BAD_ARCH;
268     case unwindstack::ERROR_MAPS_PARSE:
269       return Profiling::UNWIND_ERROR_MAPS_PARSE;
270     case unwindstack::ERROR_INVALID_PARAMETER:
271       return Profiling::UNWIND_ERROR_INVALID_PARAMETER;
272     case unwindstack::ERROR_PTRACE_CALL:
273       return Profiling::UNWIND_ERROR_PTRACE_CALL;
274   }
275   return Profiling::UNWIND_ERROR_UNKNOWN;
276 }
277 
278 }  // namespace
279 
280 // static
ShouldRejectDueToFilter(pid_t pid,const TargetFilter & filter,bool skip_cmdline,base::FlatSet<std::string> * additional_cmdlines,std::function<bool (std::string *)> read_proc_pid_cmdline)281 bool PerfProducer::ShouldRejectDueToFilter(
282     pid_t pid,
283     const TargetFilter& filter,
284     bool skip_cmdline,
285     base::FlatSet<std::string>* additional_cmdlines,
286     std::function<bool(std::string*)> read_proc_pid_cmdline) {
287   PERFETTO_CHECK(additional_cmdlines);
288 
289   std::string cmdline;
290   bool have_cmdline = false;
291   if (!skip_cmdline)
292     have_cmdline = read_proc_pid_cmdline(&cmdline);
293 
294   const char* binname = "";
295   if (have_cmdline) {
296     binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
297   }
298 
299   auto has_matching_pattern = [](const std::vector<std::string>& patterns,
300                                  const char* cmd, const char* name) {
301     for (const std::string& pattern : patterns) {
302       if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
303         return true;
304       }
305     }
306     return false;
307   };
308 
309   if (have_cmdline &&
310       has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
311     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
312                   static_cast<int>(pid));
313     return true;
314   }
315   if (filter.exclude_pids.count(pid)) {
316     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
317                   static_cast<int>(pid));
318     return true;
319   }
320 
321   if (have_cmdline &&
322       has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
323     return false;
324   }
325   if (filter.pids.count(pid)) {
326     return false;
327   }
328 
329   // Empty allow filter means keep everything that isn't explicitly excluded.
330   if (filter.cmdlines.empty() && filter.pids.empty() &&
331       !filter.additional_cmdline_count &&
332       !filter.process_sharding.has_value()) {
333     return false;
334   }
335 
336   // Niche option: process sharding to amortise systemwide unwinding costs.
337   // Selects a subset of all processes by using the low order bits of their pid.
338   if (filter.process_sharding.has_value()) {
339     uint32_t upid = static_cast<uint32_t>(pid);
340     if (upid % filter.process_sharding->shard_count ==
341         filter.process_sharding->chosen_shard) {
342       PERFETTO_DLOG("Process sharding: keeping pid [%d]",
343                     static_cast<int>(pid));
344       return false;
345     } else {
346       PERFETTO_DLOG("Process sharding: rejecting pid [%d]",
347                     static_cast<int>(pid));
348       return true;
349     }
350   }
351 
352   // Niche option: additionally remember the first seen N process cmdlines, and
353   // keep all processes with those names.
354   if (have_cmdline) {
355     if (additional_cmdlines->count(cmdline)) {
356       return false;
357     }
358     if (additional_cmdlines->size() < filter.additional_cmdline_count) {
359       additional_cmdlines->insert(cmdline);
360       return false;
361     }
362   }
363 
364   PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
365   return true;
366 }
367 
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)368 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
369                            base::TaskRunner* task_runner)
370     : task_runner_(task_runner),
371       proc_fd_getter_(proc_fd_getter),
372       unwinding_worker_(this),
373       weak_factory_(this) {
374   proc_fd_getter->SetDelegate(this);
375 }
376 
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)377 void PerfProducer::SetupDataSource(DataSourceInstanceID,
378                                    const DataSourceConfig&) {}
379 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)380 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
381                                    const DataSourceConfig& config) {
382   uint64_t tracing_session_id = config.tracing_session_id();
383   PERFETTO_LOG("StartDataSource(ds %zu, session %" PRIu64 ", name %s)",
384                static_cast<size_t>(ds_id), tracing_session_id,
385                config.name().c_str());
386 
387   if (config.name() == MetatraceWriter::kDataSourceName) {
388     StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
389     return;
390   }
391 
392   // linux.perf data source
393   if (config.name() != kDataSourceName)
394     return;
395 
396   // Tracepoint name -> id lookup in case the config asks for tracepoints:
397   auto tracepoint_id_lookup = [this](const std::string& group,
398                                      const std::string& name) {
399     if (!tracefs_)  // lazy init or retry
400       tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
401     if (!tracefs_)  // still didn't find an accessible tracefs
402       return 0u;
403     return tracefs_->ReadEventId(group, name);
404   };
405 
406   protos::gen::PerfEventConfig event_config_pb;
407   if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
408     PERFETTO_ELOG("PerfEventConfig could not be parsed.");
409     return;
410   }
411 
412   // Unlikely: handle a callstack sampling option that shares a random decision
413   // between all data sources within a tracing session. Instead of introducing
414   // session-scoped data, we replicate the decision in each per-DS EventConfig.
415   std::optional<ProcessSharding> process_sharding;
416   uint32_t shard_count =
417       event_config_pb.callstack_sampling().scope().process_shard_count();
418   if (shard_count > 0) {
419     process_sharding =
420         GetOrChooseCallstackProcessShard(tracing_session_id, shard_count);
421   }
422 
423   std::optional<EventConfig> event_config = EventConfig::Create(
424       event_config_pb, config, process_sharding, tracepoint_id_lookup);
425   if (!event_config.has_value()) {
426     PERFETTO_ELOG("PerfEventConfig rejected.");
427     return;
428   }
429 
430   std::vector<uint32_t> online_cpus = GetOnlineCpus();
431   if (online_cpus.empty()) {
432     PERFETTO_ELOG("No online CPUs found.");
433     return;
434   }
435 
436   std::vector<EventReader> per_cpu_readers;
437   for (uint32_t cpu : online_cpus) {
438     std::optional<EventReader> event_reader =
439         EventReader::ConfigureEvents(cpu, event_config.value());
440     if (!event_reader.has_value()) {
441       PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
442                     ", discarding data source.",
443                     cpu);
444       return;
445     }
446     per_cpu_readers.emplace_back(std::move(event_reader.value()));
447   }
448 
449   auto buffer_id = static_cast<BufferID>(config.target_buffer());
450   auto writer = endpoint_->CreateTraceWriter(buffer_id);
451 
452   // Construct the data source instance.
453   std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
454   bool inserted;
455   std::tie(ds_it, inserted) = data_sources_.emplace(
456       std::piecewise_construct, std::forward_as_tuple(ds_id),
457       std::forward_as_tuple(event_config.value(), tracing_session_id,
458                             std::move(writer), std::move(per_cpu_readers)));
459   PERFETTO_CHECK(inserted);
460   DataSourceState& ds = ds_it->second;
461 
462   // Start the configured events.
463   for (auto& per_cpu_reader : ds.per_cpu_readers) {
464     per_cpu_reader.EnableEvents();
465   }
466 
467   WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
468 
469   InterningOutputTracker::WriteFixedInterningsPacket(
470       ds_it->second.trace_writer.get(),
471       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
472 
473   // Inform unwinder of the new data source instance, and optionally start a
474   // periodic task to clear its cached state.
475   unwinding_worker_->PostStartDataSource(ds_id,
476                                          ds.event_config.kernel_frames());
477   if (ds.event_config.unwind_state_clear_period_ms()) {
478     unwinding_worker_->PostClearCachedStatePeriodic(
479         ds_id, ds.event_config.unwind_state_clear_period_ms());
480   }
481 
482   // Kick off periodic read task.
483   auto tick_period_ms = ds.event_config.read_tick_period_ms();
484   auto weak_this = weak_factory_.GetWeakPtr();
485   task_runner_->PostDelayedTask(
486       [weak_this, ds_id] {
487         if (weak_this)
488           weak_this->TickDataSourceRead(ds_id);
489       },
490       TimeToNextReadTickMs(ds_id, tick_period_ms));
491 
492   // Optionally kick off periodic memory footprint limit check.
493   uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
494   if (max_daemon_memory_kb > 0) {
495     task_runner_->PostDelayedTask(
496         [weak_this, ds_id, max_daemon_memory_kb] {
497           if (weak_this)
498             weak_this->CheckMemoryFootprintPeriodic(ds_id,
499                                                     max_daemon_memory_kb);
500         },
501         kMemoryLimitCheckPeriodMs);
502   }
503 }
504 
CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,uint32_t max_daemon_memory_kb)505 void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
506                                                 uint32_t max_daemon_memory_kb) {
507   auto ds_it = data_sources_.find(ds_id);
508   if (ds_it == data_sources_.end())
509     return;  // stop recurring
510 
511   GuardrailConfig gconfig = {};
512   gconfig.memory_guardrail_kb = max_daemon_memory_kb;
513 
514   ProfilerMemoryGuardrails footprint_snapshot;
515   if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
516     PurgeDataSource(ds_id);
517     return;  // stop recurring
518   }
519 
520   // repost
521   auto weak_this = weak_factory_.GetWeakPtr();
522   task_runner_->PostDelayedTask(
523       [weak_this, ds_id, max_daemon_memory_kb] {
524         if (weak_this)
525           weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
526       },
527       kMemoryLimitCheckPeriodMs);
528 }
529 
StopDataSource(DataSourceInstanceID ds_id)530 void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
531   PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
532 
533   // Metatrace: stop immediately (will miss the events from the
534   // asynchronous shutdown of the primary data source).
535   auto meta_it = metatrace_writers_.find(ds_id);
536   if (meta_it != metatrace_writers_.end()) {
537     meta_it->second.WriteAllAndFlushTraceWriter([] {});
538     metatrace_writers_.erase(meta_it);
539     return;
540   }
541 
542   auto ds_it = data_sources_.find(ds_id);
543   if (ds_it == data_sources_.end()) {
544     // Most likely, the source is missing due to an abrupt stop (via
545     // |PurgeDataSource|). Tell the service that we've stopped the source now,
546     // so that it doesn't wait for the ack until the timeout.
547     endpoint_->NotifyDataSourceStopped(ds_id);
548     return;
549   }
550 
551   // Start shutting down the reading frontend, which will propagate the stop
552   // further as the intermediate buffers are cleared.
553   DataSourceState& ds = ds_it->second;
554   InitiateReaderStop(&ds);
555 }
556 
557 // The perf data sources ignore flush requests, as flushing would be
558 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
559 // Instead of responding to explicit flushes, we can ensure that we're otherwise
560 // well-behaved (do not reorder packets too much), and let the service scrape
561 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources,FlushFlags)562 void PerfProducer::Flush(FlushRequestID flush_id,
563                          const DataSourceInstanceID* data_source_ids,
564                          size_t num_data_sources,
565                          FlushFlags) {
566   // Flush metatracing if requested.
567   for (size_t i = 0; i < num_data_sources; i++) {
568     auto ds_id = data_source_ids[i];
569     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
570 
571     auto meta_it = metatrace_writers_.find(ds_id);
572     if (meta_it != metatrace_writers_.end()) {
573       meta_it->second.WriteAllAndFlushTraceWriter([] {});
574     }
575   }
576 
577   endpoint_->NotifyFlushComplete(flush_id);
578 }
579 
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)580 void PerfProducer::ClearIncrementalState(
581     const DataSourceInstanceID* data_source_ids,
582     size_t num_data_sources) {
583   for (size_t i = 0; i < num_data_sources; i++) {
584     auto ds_id = data_source_ids[i];
585     PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
586 
587     if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
588       continue;
589 
590     auto ds_it = data_sources_.find(ds_id);
591     if (ds_it == data_sources_.end()) {
592       PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
593                     static_cast<size_t>(ds_id));
594       continue;
595     }
596     DataSourceState& ds = ds_it->second;
597 
598     WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
599 
600     // Forget which incremental state we've emitted before.
601     ds.interning_output.ClearHistory();
602     InterningOutputTracker::WriteFixedInterningsPacket(
603         ds.trace_writer.get(),
604         protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
605 
606     // Drop the cross-datasource callstack interning trie. This is not
607     // necessary for correctness (the preceding step is sufficient). However,
608     // incremental clearing is likely to be used in ring buffer traces, where
609     // it makes sense to reset the trie's size periodically, and this is a
610     // reasonable point to do so. The trie keeps the monotonic interning IDs,
611     // so there is no confusion for other concurrent data sources. We do not
612     // bother with clearing concurrent sources' interning output trackers as
613     // their footprint should be trivial.
614     callstack_trie_.ClearTrie();
615   }
616 }
617 
TickDataSourceRead(DataSourceInstanceID ds_id)618 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
619   auto it = data_sources_.find(ds_id);
620   if (it == data_sources_.end()) {
621     PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
622                   static_cast<size_t>(ds_id));
623     return;
624   }
625   DataSourceState& ds = it->second;
626 
627   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
628 
629   // Make a pass over all per-cpu readers.
630   uint64_t max_samples = ds.event_config.samples_per_tick_limit();
631   bool more_records_available = false;
632   for (EventReader& reader : ds.per_cpu_readers) {
633     if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
634       more_records_available = true;
635     }
636   }
637 
638   // Wake up the unwinder as we've (likely) pushed samples into its queue.
639   unwinding_worker_->PostProcessQueue();
640 
641   if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
642       !more_records_available) {
643     unwinding_worker_->PostInitiateDataSourceStop(ds_id);
644   } else {
645     // otherwise, keep reading
646     auto tick_period_ms = it->second.event_config.read_tick_period_ms();
647     auto weak_this = weak_factory_.GetWeakPtr();
648     task_runner_->PostDelayedTask(
649         [weak_this, ds_id] {
650           if (weak_this)
651             weak_this->TickDataSourceRead(ds_id);
652         },
653         TimeToNextReadTickMs(ds_id, tick_period_ms));
654   }
655 }
656 
ReadAndParsePerCpuBuffer(EventReader * reader,uint64_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)657 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
658                                             uint64_t max_samples,
659                                             DataSourceInstanceID ds_id,
660                                             DataSourceState* ds) {
661   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
662 
663   // If the kernel ring buffer dropped data, record it in the trace.
664   size_t cpu = reader->cpu();
665   auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
666     auto weak_this = weak_factory_.GetWeakPtr();
667     task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
668       if (weak_this)
669         weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
670     });
671   };
672 
673   for (uint64_t i = 0; i < max_samples; i++) {
674     std::optional<ParsedSample> sample =
675         reader->ReadUntilSample(records_lost_callback);
676     if (!sample) {
677       return false;  // caught up to the writer
678     }
679 
680     // Counter-only mode: skip the unwinding stage, serialise the sample
681     // immediately.
682     const EventConfig& event_config = ds->event_config;
683     if (!event_config.sample_callstacks()) {
684       CompletedSample output;
685       output.common = sample->common;
686       EmitSample(ds_id, std::move(output));
687       continue;
688     }
689 
690     // Sampling either or both of userspace and kernel callstacks.
691     pid_t pid = sample->common.pid;
692     auto& process_state = ds->process_states[pid];  // insert if new
693 
694     // Asynchronous proc-fd lookup timed out.
695     if (process_state == ProcessTrackingStatus::kFdsTimedOut) {
696       PERFETTO_DLOG("Skipping sample for pid [%d]: kFdsTimedOut",
697                     static_cast<int>(pid));
698       EmitSkippedSample(ds_id, std::move(sample.value()),
699                         SampleSkipReason::kReadStage);
700       continue;
701     }
702 
703     // Previously excluded, e.g. due to failing the target filter check.
704     if (process_state == ProcessTrackingStatus::kRejected) {
705       PERFETTO_DLOG("Skipping sample for pid [%d]: kRejected",
706                     static_cast<int>(pid));
707       continue;
708     }
709 
710     // Seeing pid for the first time. We need to consider whether the process
711     // is a kernel thread, and which callstacks we're recording.
712     //
713     // {user} stacks -> user processes: signal for proc-fd lookup
714     //               -> kthreads: reject
715     //
716     // {kernel} stacks -> user processes: accept without proc-fds
717     //                 -> kthreads: accept without proc-fds
718     //
719     // {kernel+user} stacks -> user processes: signal for proc-fd lookup
720     //                      -> kthreads: accept without proc-fds
721     //
722     if (process_state == ProcessTrackingStatus::kInitial) {
723       PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
724 
725       // Kernel threads (which have no userspace state) are never relevant if
726       // we're not recording kernel callchains.
727       bool is_kthread = !sample->regs;  // no userspace regs
728       if (is_kthread && !event_config.kernel_frames()) {
729         process_state = ProcessTrackingStatus::kRejected;
730         continue;
731       }
732 
733       // Check whether samples for this new process should be dropped due to
734       // the target filtering. Kernel threads don't have a cmdline, but we
735       // still check against pid inclusion/exclusion.
736       if (ShouldRejectDueToFilter(
737               pid, event_config.filter(), is_kthread, &ds->additional_cmdlines,
738               [pid](std::string* cmdline) {
739                 return glob_aware::ReadProcCmdlineForPID(pid, cmdline);
740               })) {
741         process_state = ProcessTrackingStatus::kRejected;
742         continue;
743       }
744 
745       // At this point, sampled process is known to be of interest.
746       if (!is_kthread && event_config.user_frames()) {
747         // Start resolving the proc-fds. Response is async.
748         process_state = ProcessTrackingStatus::kFdsResolving;
749         InitiateDescriptorLookup(ds_id, pid,
750                                  event_config.remote_descriptor_timeout_ms());
751         // note: fallthrough
752       } else {
753         // Either a kernel thread (no need to obtain proc-fds), or a userspace
754         // process but we're not recording userspace callstacks.
755         process_state = ProcessTrackingStatus::kAccepted;
756         unwinding_worker_->PostRecordNoUserspaceProcess(ds_id, pid);
757         // note: fallthrough
758       }
759     }
760 
761     PERFETTO_CHECK(process_state == ProcessTrackingStatus::kAccepted ||
762                    process_state == ProcessTrackingStatus::kFdsResolving);
763 
764     // If we're only interested in the kernel callchains, then userspace
765     // process samples are relevant only if they were sampled during kernel
766     // context.
767     if (!event_config.user_frames() &&
768         sample->common.cpu_mode == PERF_RECORD_MISC_USER) {
769       PERFETTO_DLOG("Skipping usermode sample for kernel-only config");
770       continue;
771     }
772 
773     // Optionally: drop sample if above a given threshold of sampled stacks
774     // that are waiting in the unwinding queue.
775     uint64_t max_footprint_bytes = event_config.max_enqueued_footprint_bytes();
776     uint64_t sample_stack_size = sample->stack.size();
777     if (max_footprint_bytes) {
778       uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
779       if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
780         PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
781         EmitSkippedSample(ds_id, std::move(sample.value()),
782                           SampleSkipReason::kUnwindEnqueue);
783         continue;
784       }
785     }
786 
787     // Push the sample into the unwinding queue if there is room.
788     auto& queue = unwinding_worker_->unwind_queue();
789     WriteView write_view = queue.BeginWrite();
790     if (write_view.valid) {
791       queue.at(write_view.write_pos) =
792           UnwindEntry{ds_id, std::move(sample.value())};
793       queue.CommitWrite();
794       unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
795     } else {
796       PERFETTO_DLOG("Unwinder queue full, skipping sample");
797       EmitSkippedSample(ds_id, std::move(sample.value()),
798                         SampleSkipReason::kUnwindEnqueue);
799     }
800   }  // for (i < max_samples)
801 
802   // Most likely more events in the kernel buffer. Though we might be exactly on
803   // the boundary due to |max_samples|.
804   return true;
805 }
806 
807 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
808 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,uid_t uid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)809 void PerfProducer::OnProcDescriptors(pid_t pid,
810                                      uid_t uid,
811                                      base::ScopedFile maps_fd,
812                                      base::ScopedFile mem_fd) {
813   // Find first-fit data source that requested descriptors for the process.
814   for (auto& it : data_sources_) {
815     DataSourceState& ds = it.second;
816     auto proc_status_it = ds.process_states.find(pid);
817     if (proc_status_it == ds.process_states.end())
818       continue;
819 
820     // TODO(rsavitski): consider checking ProcessTrackingStatus before
821     // CanProfile.
822     if (!CanProfile(ds.event_config.raw_ds_config(), uid,
823                     ds.event_config.target_installed_by())) {
824       PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
825                     static_cast<int>(pid), static_cast<int>(uid),
826                     static_cast<size_t>(it.first));
827       continue;
828     }
829 
830     // Match against either resolving, or expired state. In the latter
831     // case, it means that the async response was slow enough that we've marked
832     // the lookup as expired (but can now recover for future samples).
833     auto proc_status = proc_status_it->second;
834     if (proc_status == ProcessTrackingStatus::kFdsResolving ||
835         proc_status == ProcessTrackingStatus::kFdsTimedOut) {
836       PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
837                     static_cast<int>(pid), static_cast<size_t>(it.first));
838 
839       proc_status_it->second = ProcessTrackingStatus::kAccepted;
840       unwinding_worker_->PostAdoptProcDescriptors(
841           it.first, pid, std::move(maps_fd), std::move(mem_fd));
842       return;  // done
843     }
844   }
845   PERFETTO_DLOG(
846       "Discarding proc-fds for pid [%d] as found no outstanding requests.",
847       static_cast<int>(pid));
848 }
849 
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)850 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
851                                             pid_t pid,
852                                             uint32_t timeout_ms) {
853   if (!proc_fd_getter_->RequiresDelayedRequest()) {
854     StartDescriptorLookup(ds_id, pid, timeout_ms);
855     return;
856   }
857 
858   // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
859   auto weak_this = weak_factory_.GetWeakPtr();
860   task_runner_->PostDelayedTask(
861       [weak_this, ds_id, pid, timeout_ms] {
862         if (weak_this)
863           weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
864       },
865       kProcDescriptorsAndroidDelayMs);
866 }
867 
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)868 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
869                                          pid_t pid,
870                                          uint32_t timeout_ms) {
871   proc_fd_getter_->GetDescriptorsForPid(pid);
872 
873   auto weak_this = weak_factory_.GetWeakPtr();
874   task_runner_->PostDelayedTask(
875       [weak_this, ds_id, pid] {
876         if (weak_this)
877           weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
878       },
879       timeout_ms);
880 }
881 
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)882 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
883                                                    pid_t pid) {
884   auto ds_it = data_sources_.find(ds_id);
885   if (ds_it == data_sources_.end())
886     return;
887 
888   DataSourceState& ds = ds_it->second;
889   auto proc_status_it = ds.process_states.find(pid);
890   if (proc_status_it == ds.process_states.end())
891     return;
892 
893   // If the request is still outstanding, mark the process as expired (causing
894   // outstanding and future samples to be discarded).
895   auto proc_status = proc_status_it->second;
896   if (proc_status == ProcessTrackingStatus::kFdsResolving) {
897     PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
898                   static_cast<int>(pid), static_cast<size_t>(ds_it->first));
899 
900     proc_status_it->second = ProcessTrackingStatus::kFdsTimedOut;
901     // Also inform the unwinder of the state change (so that it can discard any
902     // of the already-enqueued samples).
903     unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
904   }
905 }
906 
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)907 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
908                                   CompletedSample sample) {
909   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
910   CompletedSample* raw_sample = new CompletedSample(std::move(sample));
911   auto weak_this = weak_factory_.GetWeakPtr();
912   task_runner_->PostTask([weak_this, ds_id, raw_sample] {
913     if (weak_this)
914       weak_this->EmitSample(ds_id, std::move(*raw_sample));
915     delete raw_sample;
916   });
917 }
918 
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)919 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
920                               CompletedSample sample) {
921   auto ds_it = data_sources_.find(ds_id);
922   if (ds_it == data_sources_.end()) {
923     PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
924                   static_cast<size_t>(ds_id));
925     return;
926   }
927   DataSourceState& ds = ds_it->second;
928 
929   // intern callsite
930   GlobalCallstackTrie::Node* callstack_root =
931       callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
932   uint64_t callstack_iid = callstack_root->id();
933 
934   // start packet, timestamp domain defaults to monotonic_raw
935   auto packet = StartTracePacket(ds.trace_writer.get());
936   packet->set_timestamp(sample.common.timestamp);
937 
938   // write new interning data (if any)
939   protos::pbzero::InternedData* interned_out = packet->set_interned_data();
940   ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
941                                      interned_out);
942 
943   // write the sample itself
944   auto* perf_sample = packet->set_perf_sample();
945   perf_sample->set_cpu(sample.common.cpu);
946   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
947   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
948   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
949   perf_sample->set_timebase_count(sample.common.timebase_count);
950   perf_sample->set_callstack_iid(callstack_iid);
951   if (sample.unwind_error != unwindstack::ERROR_NONE) {
952     perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
953   }
954 }
955 
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)956 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
957                                       size_t cpu,
958                                       uint64_t records_lost) {
959   auto ds_it = data_sources_.find(ds_id);
960   if (ds_it == data_sources_.end())
961     return;
962   DataSourceState& ds = ds_it->second;
963   PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
964                 static_cast<size_t>(ds_id), cpu, records_lost);
965 
966   // The data loss record relates to a single ring buffer, and indicates loss
967   // since the last successfully-written record in that buffer. Therefore the
968   // data loss record itself has no timestamp.
969   // We timestamp the packet with the boot clock for packet ordering purposes,
970   // but it no longer has a (precise) interpretation relative to the sample
971   // stream from that per-cpu buffer. See the proto comments for more details.
972   auto packet = StartTracePacket(ds.trace_writer.get());
973   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
974   packet->set_timestamp_clock_id(
975       protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
976 
977   auto* perf_sample = packet->set_perf_sample();
978   perf_sample->set_cpu(static_cast<uint32_t>(cpu));
979   perf_sample->set_kernel_records_lost(records_lost);
980 }
981 
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)982 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
983                                                  ParsedSample sample) {
984   PostEmitSkippedSample(ds_id, std::move(sample),
985                         SampleSkipReason::kUnwindStage);
986 }
987 
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)988 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
989                                          ParsedSample sample,
990                                          SampleSkipReason reason) {
991   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
992   ParsedSample* raw_sample = new ParsedSample(std::move(sample));
993   auto weak_this = weak_factory_.GetWeakPtr();
994   task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
995     if (weak_this)
996       weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
997     delete raw_sample;
998   });
999 }
1000 
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)1001 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
1002                                      ParsedSample sample,
1003                                      SampleSkipReason reason) {
1004   auto ds_it = data_sources_.find(ds_id);
1005   if (ds_it == data_sources_.end())
1006     return;
1007   DataSourceState& ds = ds_it->second;
1008 
1009   // Note: timestamp defaults to the monotonic_raw domain.
1010   auto packet = StartTracePacket(ds.trace_writer.get());
1011   packet->set_timestamp(sample.common.timestamp);
1012   auto* perf_sample = packet->set_perf_sample();
1013   perf_sample->set_cpu(sample.common.cpu);
1014   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
1015   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
1016   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
1017   perf_sample->set_timebase_count(sample.common.timebase_count);
1018 
1019   using PerfSample = protos::pbzero::PerfSample;
1020   switch (reason) {
1021     case SampleSkipReason::kReadStage:
1022       perf_sample->set_sample_skipped_reason(
1023           PerfSample::PROFILER_SKIP_READ_STAGE);
1024       break;
1025     case SampleSkipReason::kUnwindEnqueue:
1026       perf_sample->set_sample_skipped_reason(
1027           PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
1028       break;
1029     case SampleSkipReason::kUnwindStage:
1030       perf_sample->set_sample_skipped_reason(
1031           PerfSample::PROFILER_SKIP_UNWIND_STAGE);
1032       break;
1033   }
1034 }
1035 
InitiateReaderStop(DataSourceState * ds)1036 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
1037   PERFETTO_DLOG("InitiateReaderStop");
1038   PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
1039 
1040   ds->status = DataSourceState::Status::kShuttingDown;
1041   for (auto& event_reader : ds->per_cpu_readers) {
1042     event_reader.DisableEvents();
1043   }
1044 }
1045 
PostFinishDataSourceStop(DataSourceInstanceID ds_id)1046 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
1047   auto weak_producer = weak_factory_.GetWeakPtr();
1048   task_runner_->PostTask([weak_producer, ds_id] {
1049     if (weak_producer)
1050       weak_producer->FinishDataSourceStop(ds_id);
1051   });
1052 }
1053 
FinishDataSourceStop(DataSourceInstanceID ds_id)1054 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
1055   PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
1056   auto ds_it = data_sources_.find(ds_id);
1057   if (ds_it == data_sources_.end()) {
1058     PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
1059                   static_cast<size_t>(ds_id));
1060     return;
1061   }
1062   DataSourceState& ds = ds_it->second;
1063   PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
1064 
1065   ds.trace_writer->Flush();
1066   data_sources_.erase(ds_it);
1067 
1068   endpoint_->NotifyDataSourceStopped(ds_id);
1069 
1070   // Clean up resources if there are no more active sources.
1071   if (data_sources_.empty()) {
1072     callstack_trie_.ClearTrie();  // purge internings
1073     base::MaybeReleaseAllocatorMemToOS();
1074   }
1075 }
1076 
1077 // TODO(rsavitski): maybe make the tracing service respect premature
1078 // producer-driven stops, and then issue a NotifyDataSourceStopped here.
1079 // Alternatively (and at the expense of higher complexity) introduce a new data
1080 // source status of "tombstoned", and propagate it until the source is stopped
1081 // by the service (this would technically allow for stricter lifetime checking
1082 // of data sources, and help with discarding periodic flushes).
1083 // TODO(rsavitski): Purging while stopping will currently leave the stop
1084 // unacknowledged. Consider checking whether the DS is stopping here, and if so,
1085 // notifying immediately after erasing.
PurgeDataSource(DataSourceInstanceID ds_id)1086 void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
1087   auto ds_it = data_sources_.find(ds_id);
1088   if (ds_it == data_sources_.end())
1089     return;
1090   DataSourceState& ds = ds_it->second;
1091 
1092   PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
1093                static_cast<size_t>(ds_id));
1094 
1095   unwinding_worker_->PostPurgeDataSource(ds_id);
1096 
1097   // Write a packet indicating the abrupt stop.
1098   {
1099     auto packet = StartTracePacket(ds.trace_writer.get());
1100     packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
1101     packet->set_timestamp_clock_id(
1102         protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
1103     auto* perf_sample = packet->set_perf_sample();
1104     auto* producer_event = perf_sample->set_producer_event();
1105     producer_event->set_source_stop_reason(
1106         protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
1107   }
1108 
1109   ds.trace_writer->Flush();
1110   data_sources_.erase(ds_it);
1111 
1112   // Clean up resources if there are no more active sources.
1113   if (data_sources_.empty()) {
1114     callstack_trie_.ClearTrie();  // purge internings
1115     base::MaybeReleaseAllocatorMemToOS();
1116   }
1117 }
1118 
1119 // Either:
1120 // * choose a random number up to |shard_count|.
1121 // * reuse a choice made previously by a data source within this tracing
1122 //   session. The config option requires that all data sources within one config
1123 //   have the same shard count.
GetOrChooseCallstackProcessShard(uint64_t tracing_session_id,uint32_t shard_count)1124 std::optional<ProcessSharding> PerfProducer::GetOrChooseCallstackProcessShard(
1125     uint64_t tracing_session_id,
1126     uint32_t shard_count) {
1127   for (auto& it : data_sources_) {
1128     const DataSourceState& ds = it.second;
1129     const auto& sharding = ds.event_config.filter().process_sharding;
1130     if ((ds.tracing_session_id != tracing_session_id) || !sharding.has_value())
1131       continue;
1132 
1133     // Found existing data source, reuse its decision while doing best-effort
1134     // error reporting (logging) if the shard count is not the same.
1135     if (sharding->shard_count != shard_count) {
1136       PERFETTO_ELOG(
1137           "Mismatch of process_shard_count between data sources in tracing "
1138           "session %" PRIu64 ". Overriding shard count to match.",
1139           tracing_session_id);
1140     }
1141     return sharding;
1142   }
1143 
1144   // First data source in this session, choose random shard.
1145   std::random_device r;
1146   std::minstd_rand minstd(r());
1147   std::uniform_int_distribution<uint32_t> dist(0, shard_count - 1);
1148   uint32_t chosen_shard = dist(minstd);
1149 
1150   ProcessSharding ret;
1151   ret.shard_count = shard_count;
1152   ret.chosen_shard = chosen_shard;
1153 
1154   PERFETTO_DCHECK(ret.shard_count && ret.chosen_shard < ret.shard_count);
1155   return ret;
1156 }
1157 
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)1158 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
1159                                         BufferID target_buffer) {
1160   auto writer = endpoint_->CreateTraceWriter(target_buffer);
1161 
1162   auto it_and_inserted = metatrace_writers_.emplace(
1163       std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
1164   PERFETTO_DCHECK(it_and_inserted.second);
1165   // Note: only the first concurrent writer will actually be active.
1166   metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
1167                                    metatrace::TAG_ANY);
1168 }
1169 
ConnectWithRetries(const char * socket_name)1170 void PerfProducer::ConnectWithRetries(const char* socket_name) {
1171   PERFETTO_DCHECK(state_ == kNotStarted);
1172   state_ = kNotConnected;
1173 
1174   ResetConnectionBackoff();
1175   producer_socket_name_ = socket_name;
1176   ConnectService();
1177 }
1178 
ConnectService()1179 void PerfProducer::ConnectService() {
1180   PERFETTO_DCHECK(state_ == kNotConnected);
1181   state_ = kConnecting;
1182   endpoint_ = ProducerIPCClient::Connect(
1183       producer_socket_name_, this, kProducerName, task_runner_,
1184       TracingService::ProducerSMBScrapingMode::kEnabled);
1185 }
1186 
IncreaseConnectionBackoff()1187 void PerfProducer::IncreaseConnectionBackoff() {
1188   connection_backoff_ms_ *= 2;
1189   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
1190     connection_backoff_ms_ = kMaxConnectionBackoffMs;
1191 }
1192 
ResetConnectionBackoff()1193 void PerfProducer::ResetConnectionBackoff() {
1194   connection_backoff_ms_ = kInitialConnectionBackoffMs;
1195 }
1196 
OnConnect()1197 void PerfProducer::OnConnect() {
1198   PERFETTO_DCHECK(state_ == kConnecting);
1199   state_ = kConnected;
1200   ResetConnectionBackoff();
1201   PERFETTO_LOG("Connected to the service");
1202 
1203   {
1204     // linux.perf
1205     DataSourceDescriptor desc;
1206     desc.set_name(kDataSourceName);
1207     desc.set_handles_incremental_state_clear(true);
1208     desc.set_will_notify_on_stop(true);
1209     endpoint_->RegisterDataSource(desc);
1210   }
1211   {
1212     // metatrace
1213     DataSourceDescriptor desc;
1214     desc.set_name(MetatraceWriter::kDataSourceName);
1215     endpoint_->RegisterDataSource(desc);
1216   }
1217   // Used by tracebox to synchronize with traced_probes being registered.
1218   if (all_data_sources_registered_cb_) {
1219     endpoint_->Sync(all_data_sources_registered_cb_);
1220   }
1221 }
1222 
OnDisconnect()1223 void PerfProducer::OnDisconnect() {
1224   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
1225   PERFETTO_LOG("Disconnected from tracing service");
1226 
1227   auto weak_producer = weak_factory_.GetWeakPtr();
1228   if (state_ == kConnected)
1229     return task_runner_->PostTask([weak_producer] {
1230       if (weak_producer)
1231         weak_producer->Restart();
1232     });
1233 
1234   state_ = kNotConnected;
1235   IncreaseConnectionBackoff();
1236   task_runner_->PostDelayedTask(
1237       [weak_producer] {
1238         if (weak_producer)
1239           weak_producer->ConnectService();
1240       },
1241       connection_backoff_ms_);
1242 }
1243 
Restart()1244 void PerfProducer::Restart() {
1245   // We lost the connection with the tracing service. At this point we need
1246   // to reset all the data sources. Trying to handle that manually is going to
1247   // be error prone. What we do here is simply destroy the instance and
1248   // recreate it again.
1249   base::TaskRunner* task_runner = task_runner_;
1250   const char* socket_name = producer_socket_name_;
1251   ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
1252 
1253   // Invoke destructor and then the constructor again.
1254   this->~PerfProducer();
1255   new (this) PerfProducer(proc_fd_getter, task_runner);
1256 
1257   ConnectWithRetries(socket_name);
1258 }
1259 
1260 }  // namespace profiling
1261 }  // namespace perfetto
1262