1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/perf/perf_producer.h"
18
19 #include <random>
20 #include <utility>
21
22 #include <unistd.h>
23
24 #include <unwindstack/Error.h>
25 #include <unwindstack/Unwinder.h>
26
27 #include "perfetto/base/logging.h"
28 #include "perfetto/base/task_runner.h"
29 #include "perfetto/ext/base/metatrace.h"
30 #include "perfetto/ext/base/utils.h"
31 #include "perfetto/ext/base/weak_ptr.h"
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/data_source_descriptor.h"
38 #include "src/profiling/common/callstack_trie.h"
39 #include "src/profiling/common/proc_utils.h"
40 #include "src/profiling/common/producer_support.h"
41 #include "src/profiling/common/profiler_guardrails.h"
42 #include "src/profiling/common/unwind_support.h"
43 #include "src/profiling/perf/common_types.h"
44 #include "src/profiling/perf/event_reader.h"
45
46 #include "protos/perfetto/common/builtin_clock.pbzero.h"
47 #include "protos/perfetto/common/perf_events.gen.h"
48 #include "protos/perfetto/common/perf_events.pbzero.h"
49 #include "protos/perfetto/config/profiling/perf_event_config.gen.h"
50 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
51 #include "protos/perfetto/trace/trace_packet.pbzero.h"
52 #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"
53
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57
58 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
59 // window between a process image being replaced by execve, and the new
60 // libc instance reinstalling the proper signal handlers. During this window,
61 // the signal disposition is defaulted to terminating the process.
62 // This is a best-effort mitigation from the daemon's side, using a heuristic
63 // that most execve calls follow a fork. So if we get a sample for a very fresh
64 // process, the grace period will give it a chance to get to
65 // a properly initialised state prior to getting signalled. This doesn't help
66 // cases when a mature process calls execve, or when the target gets descheduled
67 // (since this is a naive walltime wait).
68 // The proper fix is in the platform, see bug for progress.
69 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
70
71 constexpr uint32_t kMemoryLimitCheckPeriodMs = 5 * 1000;
72
73 constexpr uint32_t kInitialConnectionBackoffMs = 100;
74 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
75
76 constexpr char kProducerName[] = "perfetto.traced_perf";
77 constexpr char kDataSourceName[] = "linux.perf";
78
NumberOfCpus()79 size_t NumberOfCpus() {
80 return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
81 }
82
StartTracePacket(TraceWriter * trace_writer)83 TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
84 auto packet = trace_writer->NewTracePacket();
85 packet->set_sequence_flags(
86 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
87 return packet;
88 }
89
WritePerfEventDefaultsPacket(const EventConfig & event_config,TraceWriter * trace_writer)90 void WritePerfEventDefaultsPacket(const EventConfig& event_config,
91 TraceWriter* trace_writer) {
92 auto packet = trace_writer->NewTracePacket();
93 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
94 packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);
95
96 // start new incremental state generation:
97 packet->set_sequence_flags(
98 protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
99
100 // default packet timestamp clockid:
101 auto* defaults = packet->set_trace_packet_defaults();
102 defaults->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW);
103 PERFETTO_DCHECK(event_config.perf_attr()->clockid == CLOCK_MONOTONIC_RAW);
104
105 auto* perf_defaults = defaults->set_perf_sample_defaults();
106 auto* timebase_pb = perf_defaults->set_timebase();
107
108 // frequency/period:
109 perf_event_attr* perf_attr = event_config.perf_attr();
110 if (perf_attr->freq) {
111 timebase_pb->set_frequency(perf_attr->sample_freq);
112 } else {
113 timebase_pb->set_period(perf_attr->sample_period);
114 }
115
116 // event:
117 const PerfCounter& timebase = event_config.timebase_event();
118 if (timebase.is_counter()) {
119 timebase_pb->set_counter(
120 static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
121 } else {
122 PERFETTO_DCHECK(timebase.is_tracepoint());
123 // TODO(rsavitski): reconsider using a struct with two strings instead
124 // of the ::gen::Tracepoint class in the C++ code.
125 auto* tracepoint_pb = timebase_pb->set_tracepoint();
126 tracepoint_pb->set_name(timebase.tracepoint.name());
127 tracepoint_pb->set_filter(timebase.tracepoint.filter());
128 }
129 }
130
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)131 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
132 // Normally, we'd schedule the next tick at the next |period_ms|
133 // boundary of the boot clock. However, to avoid aligning the read tasks of
134 // all concurrent data sources, we select a deterministic offset based on the
135 // data source id.
136 std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
137 std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
138 uint32_t ds_period_offset = dist(prng);
139
140 uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
141 return period_ms - ((now_ms - ds_period_offset) % period_ms);
142 }
143
ShouldRejectDueToFilter(pid_t pid,base::FlatSet<std::string> * additional_cmdlines,const TargetFilter & filter)144 bool ShouldRejectDueToFilter(pid_t pid,
145 base::FlatSet<std::string>* additional_cmdlines,
146 const TargetFilter& filter) {
147 PERFETTO_CHECK(additional_cmdlines);
148 std::string cmdline;
149 bool have_cmdline = GetCmdlineForPID(pid, &cmdline); // normalized form
150 if (!have_cmdline) {
151 PERFETTO_DLOG("Failed to look up cmdline for pid [%d]",
152 static_cast<int>(pid));
153 }
154
155 if (have_cmdline && filter.exclude_cmdlines.count(cmdline)) {
156 PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
157 static_cast<int>(pid));
158 return true;
159 }
160 if (filter.exclude_pids.count(pid)) {
161 PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
162 static_cast<int>(pid));
163 return true;
164 }
165
166 if (have_cmdline && filter.cmdlines.count(cmdline)) {
167 return false;
168 }
169 if (filter.pids.count(pid)) {
170 return false;
171 }
172 if (filter.cmdlines.empty() && filter.pids.empty() &&
173 !filter.additional_cmdline_count) {
174 // If no filters are set allow everything.
175 return false;
176 }
177
178 // If we didn't read the command line that's a good prediction we will not be
179 // able to profile either.
180 if (have_cmdline) {
181 if (additional_cmdlines->count(cmdline)) {
182 return false;
183 }
184 if (additional_cmdlines->size() < filter.additional_cmdline_count) {
185 additional_cmdlines->insert(cmdline);
186 return false;
187 }
188 }
189
190 PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
191 return true;
192 }
193
ToCpuModeEnum(uint16_t perf_cpu_mode)194 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
195 using Profiling = protos::pbzero::Profiling;
196 switch (perf_cpu_mode) {
197 case PERF_RECORD_MISC_KERNEL:
198 return Profiling::MODE_KERNEL;
199 case PERF_RECORD_MISC_USER:
200 return Profiling::MODE_USER;
201 case PERF_RECORD_MISC_HYPERVISOR:
202 return Profiling::MODE_HYPERVISOR;
203 case PERF_RECORD_MISC_GUEST_KERNEL:
204 return Profiling::MODE_GUEST_KERNEL;
205 case PERF_RECORD_MISC_GUEST_USER:
206 return Profiling::MODE_GUEST_USER;
207 default:
208 return Profiling::MODE_UNKNOWN;
209 }
210 }
211
ToProtoEnum(unwindstack::ErrorCode error_code)212 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
213 unwindstack::ErrorCode error_code) {
214 using Profiling = protos::pbzero::Profiling;
215 switch (error_code) {
216 case unwindstack::ERROR_NONE:
217 return Profiling::UNWIND_ERROR_NONE;
218 case unwindstack::ERROR_MEMORY_INVALID:
219 return Profiling::UNWIND_ERROR_MEMORY_INVALID;
220 case unwindstack::ERROR_UNWIND_INFO:
221 return Profiling::UNWIND_ERROR_UNWIND_INFO;
222 case unwindstack::ERROR_UNSUPPORTED:
223 return Profiling::UNWIND_ERROR_UNSUPPORTED;
224 case unwindstack::ERROR_INVALID_MAP:
225 return Profiling::UNWIND_ERROR_INVALID_MAP;
226 case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
227 return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
228 case unwindstack::ERROR_REPEATED_FRAME:
229 return Profiling::UNWIND_ERROR_REPEATED_FRAME;
230 case unwindstack::ERROR_INVALID_ELF:
231 return Profiling::UNWIND_ERROR_INVALID_ELF;
232 case unwindstack::ERROR_SYSTEM_CALL:
233 return Profiling::UNWIND_ERROR_SYSTEM_CALL;
234 case unwindstack::ERROR_THREAD_TIMEOUT:
235 return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
236 case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
237 return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
238 }
239 return Profiling::UNWIND_ERROR_UNKNOWN;
240 }
241
242 } // namespace
243
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)244 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
245 base::TaskRunner* task_runner)
246 : task_runner_(task_runner),
247 proc_fd_getter_(proc_fd_getter),
248 unwinding_worker_(this),
249 weak_factory_(this) {
250 proc_fd_getter->SetDelegate(this);
251 }
252
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)253 void PerfProducer::SetupDataSource(DataSourceInstanceID,
254 const DataSourceConfig&) {}
255
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)256 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
257 const DataSourceConfig& config) {
258 PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(ds_id),
259 config.name().c_str());
260
261 if (config.name() == MetatraceWriter::kDataSourceName) {
262 StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
263 return;
264 }
265
266 // linux.perf data source
267 if (config.name() != kDataSourceName)
268 return;
269
270 // Tracepoint name -> id lookup in case the config asks for tracepoints:
271 auto tracepoint_id_lookup = [this](const std::string& group,
272 const std::string& name) {
273 if (!tracefs_) // lazy init or retry
274 tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
275 if (!tracefs_) // still didn't find an accessible tracefs
276 return 0u;
277 return tracefs_->ReadEventId(group, name);
278 };
279
280 protos::gen::PerfEventConfig event_config_pb;
281 if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
282 PERFETTO_ELOG("PerfEventConfig could not be parsed.");
283 return;
284 }
285 base::Optional<EventConfig> event_config =
286 EventConfig::Create(event_config_pb, config, tracepoint_id_lookup);
287 if (!event_config.has_value()) {
288 PERFETTO_ELOG("PerfEventConfig rejected.");
289 return;
290 }
291
292 size_t num_cpus = NumberOfCpus();
293 std::vector<EventReader> per_cpu_readers;
294 for (uint32_t cpu = 0; cpu < num_cpus; cpu++) {
295 base::Optional<EventReader> event_reader =
296 EventReader::ConfigureEvents(cpu, event_config.value());
297 if (!event_reader.has_value()) {
298 PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
299 ", discarding data source.",
300 cpu);
301 return;
302 }
303 per_cpu_readers.emplace_back(std::move(event_reader.value()));
304 }
305
306 auto buffer_id = static_cast<BufferID>(config.target_buffer());
307 auto writer = endpoint_->CreateTraceWriter(buffer_id);
308
309 // Construct the data source instance.
310 std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
311 bool inserted;
312 std::tie(ds_it, inserted) = data_sources_.emplace(
313 std::piecewise_construct, std::forward_as_tuple(ds_id),
314 std::forward_as_tuple(event_config.value(), std::move(writer),
315 std::move(per_cpu_readers)));
316 PERFETTO_CHECK(inserted);
317 DataSourceState& ds = ds_it->second;
318
319 // Start the configured events.
320 for (auto& per_cpu_reader : ds.per_cpu_readers) {
321 per_cpu_reader.EnableEvents();
322 }
323
324 WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
325
326 InterningOutputTracker::WriteFixedInterningsPacket(
327 ds_it->second.trace_writer.get(),
328 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
329
330 // Inform unwinder of the new data source instance, and optionally start a
331 // periodic task to clear its cached state.
332 unwinding_worker_->PostStartDataSource(ds_id,
333 ds.event_config.kernel_frames());
334 if (ds.event_config.unwind_state_clear_period_ms()) {
335 unwinding_worker_->PostClearCachedStatePeriodic(
336 ds_id, ds.event_config.unwind_state_clear_period_ms());
337 }
338
339 // Kick off periodic read task.
340 auto tick_period_ms = ds.event_config.read_tick_period_ms();
341 auto weak_this = weak_factory_.GetWeakPtr();
342 task_runner_->PostDelayedTask(
343 [weak_this, ds_id] {
344 if (weak_this)
345 weak_this->TickDataSourceRead(ds_id);
346 },
347 TimeToNextReadTickMs(ds_id, tick_period_ms));
348
349 // Optionally kick off periodic memory footprint limit check.
350 uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
351 if (max_daemon_memory_kb > 0) {
352 task_runner_->PostDelayedTask(
353 [weak_this, ds_id, max_daemon_memory_kb] {
354 if (weak_this)
355 weak_this->CheckMemoryFootprintPeriodic(ds_id,
356 max_daemon_memory_kb);
357 },
358 kMemoryLimitCheckPeriodMs);
359 }
360 }
361
CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,uint32_t max_daemon_memory_kb)362 void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
363 uint32_t max_daemon_memory_kb) {
364 auto ds_it = data_sources_.find(ds_id);
365 if (ds_it == data_sources_.end())
366 return; // stop recurring
367
368 GuardrailConfig gconfig = {};
369 gconfig.memory_guardrail_kb = max_daemon_memory_kb;
370
371 ProfilerMemoryGuardrails footprint_snapshot;
372 if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
373 PurgeDataSource(ds_id);
374 return; // stop recurring
375 }
376
377 // repost
378 auto weak_this = weak_factory_.GetWeakPtr();
379 task_runner_->PostDelayedTask(
380 [weak_this, ds_id, max_daemon_memory_kb] {
381 if (weak_this)
382 weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
383 },
384 kMemoryLimitCheckPeriodMs);
385 }
386
StopDataSource(DataSourceInstanceID ds_id)387 void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
388 PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
389
390 // Metatrace: stop immediately (will miss the events from the
391 // asynchronous shutdown of the primary data source).
392 auto meta_it = metatrace_writers_.find(ds_id);
393 if (meta_it != metatrace_writers_.end()) {
394 meta_it->second.WriteAllAndFlushTraceWriter([] {});
395 metatrace_writers_.erase(meta_it);
396 return;
397 }
398
399 auto ds_it = data_sources_.find(ds_id);
400 if (ds_it == data_sources_.end()) {
401 // Most likely, the source is missing due to an abrupt stop (via
402 // |PurgeDataSource|). Tell the service that we've stopped the source now,
403 // so that it doesn't wait for the ack until the timeout.
404 endpoint_->NotifyDataSourceStopped(ds_id);
405 return;
406 }
407
408 // Start shutting down the reading frontend, which will propagate the stop
409 // further as the intermediate buffers are cleared.
410 DataSourceState& ds = ds_it->second;
411 InitiateReaderStop(&ds);
412 }
413
414 // The perf data sources ignore flush requests, as flushing would be
415 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
416 // Instead of responding to explicit flushes, we can ensure that we're otherwise
417 // well-behaved (do not reorder packets too much), and let the service scrape
418 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources)419 void PerfProducer::Flush(FlushRequestID flush_id,
420 const DataSourceInstanceID* data_source_ids,
421 size_t num_data_sources) {
422 // Flush metatracing if requested.
423 for (size_t i = 0; i < num_data_sources; i++) {
424 auto ds_id = data_source_ids[i];
425 PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
426
427 auto meta_it = metatrace_writers_.find(ds_id);
428 if (meta_it != metatrace_writers_.end()) {
429 meta_it->second.WriteAllAndFlushTraceWriter([] {});
430 }
431 }
432
433 endpoint_->NotifyFlushComplete(flush_id);
434 }
435
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)436 void PerfProducer::ClearIncrementalState(
437 const DataSourceInstanceID* data_source_ids,
438 size_t num_data_sources) {
439 for (size_t i = 0; i < num_data_sources; i++) {
440 auto ds_id = data_source_ids[i];
441 PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
442
443 if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
444 continue;
445
446 auto ds_it = data_sources_.find(ds_id);
447 if (ds_it == data_sources_.end()) {
448 PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
449 static_cast<size_t>(ds_id));
450 continue;
451 }
452 DataSourceState& ds = ds_it->second;
453
454 WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
455
456 // Forget which incremental state we've emitted before.
457 ds.interning_output.ClearHistory();
458 InterningOutputTracker::WriteFixedInterningsPacket(
459 ds.trace_writer.get(),
460 protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
461
462 // Drop the cross-datasource callstack interning trie. This is not
463 // necessary for correctness (the preceding step is sufficient). However,
464 // incremental clearing is likely to be used in ring buffer traces, where
465 // it makes sense to reset the trie's size periodically, and this is a
466 // reasonable point to do so. The trie keeps the monotonic interning IDs,
467 // so there is no confusion for other concurrent data sources. We do not
468 // bother with clearing concurrent sources' interning output trackers as
469 // their footprint should be trivial.
470 callstack_trie_.ClearTrie();
471 }
472 }
473
TickDataSourceRead(DataSourceInstanceID ds_id)474 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
475 auto it = data_sources_.find(ds_id);
476 if (it == data_sources_.end()) {
477 PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
478 static_cast<size_t>(ds_id));
479 return;
480 }
481 DataSourceState& ds = it->second;
482
483 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
484
485 // Make a pass over all per-cpu readers.
486 uint64_t max_samples = ds.event_config.samples_per_tick_limit();
487 bool more_records_available = false;
488 for (EventReader& reader : ds.per_cpu_readers) {
489 if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
490 more_records_available = true;
491 }
492 }
493
494 // Wake up the unwinder as we've (likely) pushed samples into its queue.
495 unwinding_worker_->PostProcessQueue();
496
497 if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
498 !more_records_available) {
499 unwinding_worker_->PostInitiateDataSourceStop(ds_id);
500 } else {
501 // otherwise, keep reading
502 auto tick_period_ms = it->second.event_config.read_tick_period_ms();
503 auto weak_this = weak_factory_.GetWeakPtr();
504 task_runner_->PostDelayedTask(
505 [weak_this, ds_id] {
506 if (weak_this)
507 weak_this->TickDataSourceRead(ds_id);
508 },
509 TimeToNextReadTickMs(ds_id, tick_period_ms));
510 }
511 }
512
ReadAndParsePerCpuBuffer(EventReader * reader,uint64_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)513 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
514 uint64_t max_samples,
515 DataSourceInstanceID ds_id,
516 DataSourceState* ds) {
517 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
518
519 // If the kernel ring buffer dropped data, record it in the trace.
520 size_t cpu = reader->cpu();
521 auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
522 auto weak_this = weak_factory_.GetWeakPtr();
523 task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
524 if (weak_this)
525 weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
526 });
527 };
528
529 for (uint64_t i = 0; i < max_samples; i++) {
530 base::Optional<ParsedSample> sample =
531 reader->ReadUntilSample(records_lost_callback);
532 if (!sample) {
533 return false; // caught up to the writer
534 }
535
536 // Counter-only mode: skip the unwinding stage, enqueue the sample for
537 // output immediately.
538 if (!ds->event_config.sample_callstacks()) {
539 CompletedSample output;
540 output.common = sample->common;
541 EmitSample(ds_id, std::move(output));
542 continue;
543 }
544
545 // If sampling callstacks, we're not interested in kernel threads/workers.
546 if (!sample->regs) {
547 continue;
548 }
549
550 // Request proc-fds for the process if this is the first time we see it.
551 pid_t pid = sample->common.pid;
552 auto& process_state = ds->process_states[pid]; // insert if new
553
554 if (process_state == ProcessTrackingStatus::kExpired) {
555 PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
556 static_cast<int>(pid));
557 EmitSkippedSample(ds_id, std::move(sample.value()),
558 SampleSkipReason::kReadStage);
559 continue;
560 }
561
562 // Previously failed the target filter check.
563 if (process_state == ProcessTrackingStatus::kRejected) {
564 PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
565 static_cast<int>(pid));
566 continue;
567 }
568
569 // Seeing pid for the first time.
570 if (process_state == ProcessTrackingStatus::kInitial) {
571 PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
572
573 // Check whether samples for this new process should be
574 // dropped due to the target filtering.
575 const TargetFilter& filter = ds->event_config.filter();
576 if (ShouldRejectDueToFilter(pid, &ds->additional_cmdlines, filter)) {
577 process_state = ProcessTrackingStatus::kRejected;
578 continue;
579 }
580
581 // At this point, sampled process is known to be of interest, so start
582 // resolving the proc-fds. Response is async.
583 process_state = ProcessTrackingStatus::kResolving;
584 InitiateDescriptorLookup(ds_id, pid,
585 ds->event_config.remote_descriptor_timeout_ms());
586 }
587
588 PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
589 process_state == ProcessTrackingStatus::kResolving);
590
591 // Optionally: drop sample if above a given threshold of sampled stacks
592 // that are waiting in the unwinding queue.
593 uint64_t max_footprint_bytes =
594 ds->event_config.max_enqueued_footprint_bytes();
595 uint64_t sample_stack_size = sample->stack.size();
596 if (max_footprint_bytes) {
597 uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
598 if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
599 PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
600 EmitSkippedSample(ds_id, std::move(sample.value()),
601 SampleSkipReason::kUnwindEnqueue);
602 continue;
603 }
604 }
605
606 // Push the sample into the unwinding queue if there is room.
607 auto& queue = unwinding_worker_->unwind_queue();
608 WriteView write_view = queue.BeginWrite();
609 if (write_view.valid) {
610 queue.at(write_view.write_pos) =
611 UnwindEntry{ds_id, std::move(sample.value())};
612 queue.CommitWrite();
613 unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
614 } else {
615 PERFETTO_DLOG("Unwinder queue full, skipping sample");
616 EmitSkippedSample(ds_id, std::move(sample.value()),
617 SampleSkipReason::kUnwindEnqueue);
618 }
619 }
620
621 // Most likely more events in the kernel buffer. Though we might be exactly on
622 // the boundary due to |max_samples|.
623 return true;
624 }
625
626 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
627 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,uid_t uid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)628 void PerfProducer::OnProcDescriptors(pid_t pid,
629 uid_t uid,
630 base::ScopedFile maps_fd,
631 base::ScopedFile mem_fd) {
632 // Find first-fit data source that requested descriptors for the process.
633 for (auto& it : data_sources_) {
634 DataSourceState& ds = it.second;
635 auto proc_status_it = ds.process_states.find(pid);
636 if (proc_status_it == ds.process_states.end())
637 continue;
638
639 if (!CanProfile(ds.event_config.raw_ds_config(), uid,
640 ds.event_config.target_installed_by())) {
641 PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
642 static_cast<int>(pid), static_cast<int>(uid),
643 static_cast<size_t>(it.first));
644 continue;
645 }
646
647 // Match against either resolving, or expired state. In the latter
648 // case, it means that the async response was slow enough that we've marked
649 // the lookup as expired (but can now recover for future samples).
650 auto proc_status = proc_status_it->second;
651 if (proc_status == ProcessTrackingStatus::kResolving ||
652 proc_status == ProcessTrackingStatus::kExpired) {
653 PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
654 static_cast<int>(pid), static_cast<size_t>(it.first));
655
656 proc_status_it->second = ProcessTrackingStatus::kResolved;
657 unwinding_worker_->PostAdoptProcDescriptors(
658 it.first, pid, std::move(maps_fd), std::move(mem_fd));
659 return; // done
660 }
661 }
662 PERFETTO_DLOG(
663 "Discarding proc-fds for pid [%d] as found no outstanding requests.",
664 static_cast<int>(pid));
665 }
666
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)667 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
668 pid_t pid,
669 uint32_t timeout_ms) {
670 if (!proc_fd_getter_->RequiresDelayedRequest()) {
671 StartDescriptorLookup(ds_id, pid, timeout_ms);
672 return;
673 }
674
675 // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
676 auto weak_this = weak_factory_.GetWeakPtr();
677 task_runner_->PostDelayedTask(
678 [weak_this, ds_id, pid, timeout_ms] {
679 if (weak_this)
680 weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
681 },
682 kProcDescriptorsAndroidDelayMs);
683 }
684
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)685 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
686 pid_t pid,
687 uint32_t timeout_ms) {
688 proc_fd_getter_->GetDescriptorsForPid(pid);
689
690 auto weak_this = weak_factory_.GetWeakPtr();
691 task_runner_->PostDelayedTask(
692 [weak_this, ds_id, pid] {
693 if (weak_this)
694 weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
695 },
696 timeout_ms);
697 }
698
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)699 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
700 pid_t pid) {
701 auto ds_it = data_sources_.find(ds_id);
702 if (ds_it == data_sources_.end())
703 return;
704
705 DataSourceState& ds = ds_it->second;
706 auto proc_status_it = ds.process_states.find(pid);
707 if (proc_status_it == ds.process_states.end())
708 return;
709
710 // If the request is still outstanding, mark the process as expired (causing
711 // outstanding and future samples to be discarded).
712 auto proc_status = proc_status_it->second;
713 if (proc_status == ProcessTrackingStatus::kResolving) {
714 PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
715 static_cast<int>(pid), static_cast<size_t>(ds_it->first));
716
717 proc_status_it->second = ProcessTrackingStatus::kExpired;
718 // Also inform the unwinder of the state change (so that it can discard any
719 // of the already-enqueued samples).
720 unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
721 }
722 }
723
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)724 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
725 CompletedSample sample) {
726 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
727 CompletedSample* raw_sample = new CompletedSample(std::move(sample));
728 auto weak_this = weak_factory_.GetWeakPtr();
729 task_runner_->PostTask([weak_this, ds_id, raw_sample] {
730 if (weak_this)
731 weak_this->EmitSample(ds_id, std::move(*raw_sample));
732 delete raw_sample;
733 });
734 }
735
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)736 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
737 CompletedSample sample) {
738 auto ds_it = data_sources_.find(ds_id);
739 if (ds_it == data_sources_.end()) {
740 PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
741 static_cast<size_t>(ds_id));
742 return;
743 }
744 DataSourceState& ds = ds_it->second;
745
746 // intern callsite
747 GlobalCallstackTrie::Node* callstack_root =
748 callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
749 uint64_t callstack_iid = callstack_root->id();
750
751 // start packet, timestamp domain defaults to monotonic_raw
752 auto packet = StartTracePacket(ds.trace_writer.get());
753 packet->set_timestamp(sample.common.timestamp);
754
755 // write new interning data (if any)
756 protos::pbzero::InternedData* interned_out = packet->set_interned_data();
757 ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
758 interned_out);
759
760 // write the sample itself
761 auto* perf_sample = packet->set_perf_sample();
762 perf_sample->set_cpu(sample.common.cpu);
763 perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
764 perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
765 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
766 perf_sample->set_timebase_count(sample.common.timebase_count);
767 perf_sample->set_callstack_iid(callstack_iid);
768 if (sample.unwind_error != unwindstack::ERROR_NONE) {
769 perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
770 }
771 }
772
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)773 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
774 size_t cpu,
775 uint64_t records_lost) {
776 auto ds_it = data_sources_.find(ds_id);
777 if (ds_it == data_sources_.end())
778 return;
779 DataSourceState& ds = ds_it->second;
780 PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
781 static_cast<size_t>(ds_id), cpu, records_lost);
782
783 // The data loss record relates to a single ring buffer, and indicates loss
784 // since the last successfully-written record in that buffer. Therefore the
785 // data loss record itself has no timestamp.
786 // We timestamp the packet with the boot clock for packet ordering purposes,
787 // but it no longer has a (precise) interpretation relative to the sample
788 // stream from that per-cpu buffer. See the proto comments for more details.
789 auto packet = StartTracePacket(ds.trace_writer.get());
790 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
791 packet->set_timestamp_clock_id(
792 protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
793
794 auto* perf_sample = packet->set_perf_sample();
795 perf_sample->set_cpu(static_cast<uint32_t>(cpu));
796 perf_sample->set_kernel_records_lost(records_lost);
797 }
798
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)799 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
800 ParsedSample sample) {
801 PostEmitSkippedSample(ds_id, std::move(sample),
802 SampleSkipReason::kUnwindStage);
803 }
804
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)805 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
806 ParsedSample sample,
807 SampleSkipReason reason) {
808 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
809 ParsedSample* raw_sample = new ParsedSample(std::move(sample));
810 auto weak_this = weak_factory_.GetWeakPtr();
811 task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
812 if (weak_this)
813 weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
814 delete raw_sample;
815 });
816 }
817
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)818 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
819 ParsedSample sample,
820 SampleSkipReason reason) {
821 auto ds_it = data_sources_.find(ds_id);
822 if (ds_it == data_sources_.end())
823 return;
824 DataSourceState& ds = ds_it->second;
825
826 // Note: timestamp defaults to the monotonic_raw domain.
827 auto packet = StartTracePacket(ds.trace_writer.get());
828 packet->set_timestamp(sample.common.timestamp);
829 auto* perf_sample = packet->set_perf_sample();
830 perf_sample->set_cpu(sample.common.cpu);
831 perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
832 perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
833 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
834 perf_sample->set_timebase_count(sample.common.timebase_count);
835
836 using PerfSample = protos::pbzero::PerfSample;
837 switch (reason) {
838 case SampleSkipReason::kReadStage:
839 perf_sample->set_sample_skipped_reason(
840 PerfSample::PROFILER_SKIP_READ_STAGE);
841 break;
842 case SampleSkipReason::kUnwindEnqueue:
843 perf_sample->set_sample_skipped_reason(
844 PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
845 break;
846 case SampleSkipReason::kUnwindStage:
847 perf_sample->set_sample_skipped_reason(
848 PerfSample::PROFILER_SKIP_UNWIND_STAGE);
849 break;
850 }
851 }
852
InitiateReaderStop(DataSourceState * ds)853 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
854 PERFETTO_DLOG("InitiateReaderStop");
855 PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
856
857 ds->status = DataSourceState::Status::kShuttingDown;
858 for (auto& event_reader : ds->per_cpu_readers) {
859 event_reader.DisableEvents();
860 }
861 }
862
PostFinishDataSourceStop(DataSourceInstanceID ds_id)863 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
864 auto weak_producer = weak_factory_.GetWeakPtr();
865 task_runner_->PostTask([weak_producer, ds_id] {
866 if (weak_producer)
867 weak_producer->FinishDataSourceStop(ds_id);
868 });
869 }
870
FinishDataSourceStop(DataSourceInstanceID ds_id)871 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
872 PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
873 auto ds_it = data_sources_.find(ds_id);
874 if (ds_it == data_sources_.end()) {
875 PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
876 static_cast<size_t>(ds_id));
877 return;
878 }
879 DataSourceState& ds = ds_it->second;
880 PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
881
882 ds.trace_writer->Flush();
883 data_sources_.erase(ds_it);
884
885 endpoint_->NotifyDataSourceStopped(ds_id);
886
887 // Clean up resources if there are no more active sources.
888 if (data_sources_.empty()) {
889 callstack_trie_.ClearTrie(); // purge internings
890 base::MaybeReleaseAllocatorMemToOS();
891 }
892 }
893
894 // TODO(rsavitski): maybe make the tracing service respect premature
895 // producer-driven stops, and then issue a NotifyDataSourceStopped here.
896 // Alternatively (and at the expense of higher complexity) introduce a new data
897 // source status of "tombstoned", and propagate it until the source is stopped
898 // by the service (this would technically allow for stricter lifetime checking
899 // of data sources, and help with discarding periodic flushes).
900 // TODO(rsavitski): Purging while stopping will currently leave the stop
901 // unacknowledged. Consider checking whether the DS is stopping here, and if so,
902 // notifying immediately after erasing.
PurgeDataSource(DataSourceInstanceID ds_id)903 void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
904 auto ds_it = data_sources_.find(ds_id);
905 if (ds_it == data_sources_.end())
906 return;
907 DataSourceState& ds = ds_it->second;
908
909 PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
910 static_cast<size_t>(ds_id));
911
912 unwinding_worker_->PostPurgeDataSource(ds_id);
913
914 // Write a packet indicating the abrupt stop.
915 {
916 auto packet = StartTracePacket(ds.trace_writer.get());
917 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
918 packet->set_timestamp_clock_id(
919 protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
920 auto* perf_sample = packet->set_perf_sample();
921 auto* producer_event = perf_sample->set_producer_event();
922 producer_event->set_source_stop_reason(
923 protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
924 }
925
926 ds.trace_writer->Flush();
927 data_sources_.erase(ds_it);
928
929 // Clean up resources if there are no more active sources.
930 if (data_sources_.empty()) {
931 callstack_trie_.ClearTrie(); // purge internings
932 base::MaybeReleaseAllocatorMemToOS();
933 }
934 }
935
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)936 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
937 BufferID target_buffer) {
938 auto writer = endpoint_->CreateTraceWriter(target_buffer);
939
940 auto it_and_inserted = metatrace_writers_.emplace(
941 std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
942 PERFETTO_DCHECK(it_and_inserted.second);
943 // Note: only the first concurrent writer will actually be active.
944 metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
945 metatrace::TAG_ANY);
946 }
947
ConnectWithRetries(const char * socket_name)948 void PerfProducer::ConnectWithRetries(const char* socket_name) {
949 PERFETTO_DCHECK(state_ == kNotStarted);
950 state_ = kNotConnected;
951
952 ResetConnectionBackoff();
953 producer_socket_name_ = socket_name;
954 ConnectService();
955 }
956
ConnectService()957 void PerfProducer::ConnectService() {
958 PERFETTO_DCHECK(state_ == kNotConnected);
959 state_ = kConnecting;
960 endpoint_ = ProducerIPCClient::Connect(
961 producer_socket_name_, this, kProducerName, task_runner_,
962 TracingService::ProducerSMBScrapingMode::kEnabled);
963 }
964
IncreaseConnectionBackoff()965 void PerfProducer::IncreaseConnectionBackoff() {
966 connection_backoff_ms_ *= 2;
967 if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
968 connection_backoff_ms_ = kMaxConnectionBackoffMs;
969 }
970
ResetConnectionBackoff()971 void PerfProducer::ResetConnectionBackoff() {
972 connection_backoff_ms_ = kInitialConnectionBackoffMs;
973 }
974
OnConnect()975 void PerfProducer::OnConnect() {
976 PERFETTO_DCHECK(state_ == kConnecting);
977 state_ = kConnected;
978 ResetConnectionBackoff();
979 PERFETTO_LOG("Connected to the service");
980
981 {
982 // linux.perf
983 DataSourceDescriptor desc;
984 desc.set_name(kDataSourceName);
985 desc.set_handles_incremental_state_clear(true);
986 desc.set_will_notify_on_stop(true);
987 endpoint_->RegisterDataSource(desc);
988 }
989 {
990 // metatrace
991 DataSourceDescriptor desc;
992 desc.set_name(MetatraceWriter::kDataSourceName);
993 endpoint_->RegisterDataSource(desc);
994 }
995 }
996
OnDisconnect()997 void PerfProducer::OnDisconnect() {
998 PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
999 PERFETTO_LOG("Disconnected from tracing service");
1000
1001 auto weak_producer = weak_factory_.GetWeakPtr();
1002 if (state_ == kConnected)
1003 return task_runner_->PostTask([weak_producer] {
1004 if (weak_producer)
1005 weak_producer->Restart();
1006 });
1007
1008 state_ = kNotConnected;
1009 IncreaseConnectionBackoff();
1010 task_runner_->PostDelayedTask(
1011 [weak_producer] {
1012 if (weak_producer)
1013 weak_producer->ConnectService();
1014 },
1015 connection_backoff_ms_);
1016 }
1017
Restart()1018 void PerfProducer::Restart() {
1019 // We lost the connection with the tracing service. At this point we need
1020 // to reset all the data sources. Trying to handle that manually is going to
1021 // be error prone. What we do here is simply destroy the instance and
1022 // recreate it again.
1023 base::TaskRunner* task_runner = task_runner_;
1024 const char* socket_name = producer_socket_name_;
1025 ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
1026
1027 // Invoke destructor and then the constructor again.
1028 this->~PerfProducer();
1029 new (this) PerfProducer(proc_fd_getter, task_runner);
1030
1031 ConnectWithRetries(socket_name);
1032 }
1033
1034 } // namespace profiling
1035 } // namespace perfetto
1036