1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/perf/perf_producer.h"
18
19 #include <random>
20 #include <utility>
21
22 #include <malloc.h>
23 #include <unistd.h>
24
25 #include <unwindstack/Error.h>
26 #include <unwindstack/Unwinder.h>
27
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/task_runner.h"
30 #include "perfetto/ext/base/metatrace.h"
31 #include "perfetto/ext/base/weak_ptr.h"
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/data_source_descriptor.h"
38 #include "src/profiling/common/callstack_trie.h"
39 #include "src/profiling/common/proc_utils.h"
40 #include "src/profiling/common/unwind_support.h"
41 #include "src/profiling/perf/common_types.h"
42 #include "src/profiling/perf/event_reader.h"
43
44 #include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
45 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
46 #include "protos/perfetto/trace/trace_packet.pbzero.h"
47
48 namespace perfetto {
49 namespace profiling {
50 namespace {
51
52 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
53 // window between a process image being replaced by execve, and the new
54 // libc instance reinstalling the proper signal handlers. During this window,
55 // the signal disposition is defaulted to terminating the process.
56 // This is a best-effort mitigation from the daemon's side, using a heuristic
57 // that most execve calls follow a fork. So if we get a sample for a very fresh
58 // process, the grace period will give it a chance to get to
59 // a properly initialised state prior to getting signalled. This doesn't help
60 // cases when a mature process calls execve, or when the target gets descheduled
61 // (since this is a naive walltime wait).
62 // The proper fix is in the platform, see bug for progress.
63 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
64
65 constexpr uint32_t kInitialConnectionBackoffMs = 100;
66 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
67
68 constexpr char kProducerName[] = "perfetto.traced_perf";
69 constexpr char kDataSourceName[] = "linux.perf";
70
NumberOfCpus()71 size_t NumberOfCpus() {
72 return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
73 }
74
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)75 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
76 // Normally, we'd schedule the next tick at the next |period_ms|
77 // boundary of the boot clock. However, to avoid aligning the read tasks of
78 // all concurrent data sources, we select a deterministic offset based on the
79 // data source id.
80 std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
81 std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
82 uint32_t ds_period_offset = dist(prng);
83
84 uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
85 return period_ms - ((now_ms - ds_period_offset) % period_ms);
86 }
87
ShouldRejectDueToFilter(pid_t pid,const TargetFilter & filter)88 bool ShouldRejectDueToFilter(pid_t pid, const TargetFilter& filter) {
89 bool reject_cmd = false;
90 std::string cmdline;
91 if (GetCmdlineForPID(pid, &cmdline)) { // normalized form
92 // reject if absent from non-empty whitelist, or present in blacklist
93 reject_cmd = (filter.cmdlines.size() && !filter.cmdlines.count(cmdline)) ||
94 filter.exclude_cmdlines.count(cmdline);
95 } else {
96 PERFETTO_DLOG("Failed to look up cmdline for pid [%d]",
97 static_cast<int>(pid));
98 // reject only if there's a whitelist present
99 reject_cmd = filter.cmdlines.size() > 0;
100 }
101
102 bool reject_pid = (filter.pids.size() && !filter.pids.count(pid)) ||
103 filter.exclude_pids.count(pid);
104
105 if (reject_cmd || reject_pid) {
106 PERFETTO_DLOG(
107 "Rejecting samples for pid [%d] due to cmdline(%d) or pid(%d)",
108 static_cast<int>(pid), reject_cmd, reject_pid);
109
110 return true;
111 }
112 return false;
113 }
114
MaybeReleaseAllocatorMemToOS()115 void MaybeReleaseAllocatorMemToOS() {
116 #if defined(__BIONIC__)
117 // TODO(b/152414415): libunwindstack's volume of small allocations is
118 // adverarial to scudo, which doesn't automatically release small
119 // allocation regions back to the OS. Forceful purge does reclaim all size
120 // classes.
121 mallopt(M_PURGE, 0);
122 #endif
123 }
124
ToCpuModeEnum(uint16_t perf_cpu_mode)125 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
126 using Profiling = protos::pbzero::Profiling;
127 switch (perf_cpu_mode) {
128 case PERF_RECORD_MISC_KERNEL:
129 return Profiling::MODE_KERNEL;
130 case PERF_RECORD_MISC_USER:
131 return Profiling::MODE_USER;
132 case PERF_RECORD_MISC_HYPERVISOR:
133 return Profiling::MODE_HYPERVISOR;
134 case PERF_RECORD_MISC_GUEST_KERNEL:
135 return Profiling::MODE_GUEST_KERNEL;
136 case PERF_RECORD_MISC_GUEST_USER:
137 return Profiling::MODE_GUEST_USER;
138 default:
139 return Profiling::MODE_UNKNOWN;
140 }
141 }
142
ToProtoEnum(unwindstack::ErrorCode error_code)143 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
144 unwindstack::ErrorCode error_code) {
145 using Profiling = protos::pbzero::Profiling;
146 switch (error_code) {
147 case unwindstack::ERROR_NONE:
148 return Profiling::UNWIND_ERROR_NONE;
149 case unwindstack::ERROR_MEMORY_INVALID:
150 return Profiling::UNWIND_ERROR_MEMORY_INVALID;
151 case unwindstack::ERROR_UNWIND_INFO:
152 return Profiling::UNWIND_ERROR_UNWIND_INFO;
153 case unwindstack::ERROR_UNSUPPORTED:
154 return Profiling::UNWIND_ERROR_UNSUPPORTED;
155 case unwindstack::ERROR_INVALID_MAP:
156 return Profiling::UNWIND_ERROR_INVALID_MAP;
157 case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
158 return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
159 case unwindstack::ERROR_REPEATED_FRAME:
160 return Profiling::UNWIND_ERROR_REPEATED_FRAME;
161 case unwindstack::ERROR_INVALID_ELF:
162 return Profiling::UNWIND_ERROR_INVALID_ELF;
163 }
164 return Profiling::UNWIND_ERROR_UNKNOWN;
165 }
166
167 } // namespace
168
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)169 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
170 base::TaskRunner* task_runner)
171 : task_runner_(task_runner),
172 proc_fd_getter_(proc_fd_getter),
173 unwinding_worker_(this),
174 weak_factory_(this) {
175 proc_fd_getter->SetDelegate(this);
176 }
177
178 // TODO(rsavitski): consider configure at setup + enable at start instead.
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)179 void PerfProducer::SetupDataSource(DataSourceInstanceID,
180 const DataSourceConfig&) {}
181
StartDataSource(DataSourceInstanceID instance_id,const DataSourceConfig & config)182 void PerfProducer::StartDataSource(DataSourceInstanceID instance_id,
183 const DataSourceConfig& config) {
184 PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(instance_id),
185 config.name().c_str());
186
187 if (config.name() == MetatraceWriter::kDataSourceName) {
188 StartMetatraceSource(instance_id,
189 static_cast<BufferID>(config.target_buffer()));
190 return;
191 }
192
193 // linux.perf data source
194 if (config.name() != kDataSourceName)
195 return;
196
197 base::Optional<EventConfig> event_config = EventConfig::Create(config);
198 if (!event_config.has_value()) {
199 PERFETTO_ELOG("PerfEventConfig rejected.");
200 return;
201 }
202
203 // TODO(rsavitski): consider supporting specific cpu subsets.
204 if (!event_config->target_all_cpus()) {
205 PERFETTO_ELOG("PerfEventConfig{all_cpus} required");
206 return;
207 }
208 size_t num_cpus = NumberOfCpus();
209 std::vector<EventReader> per_cpu_readers;
210 for (uint32_t cpu = 0; cpu < num_cpus; cpu++) {
211 base::Optional<EventReader> event_reader =
212 EventReader::ConfigureEvents(cpu, event_config.value());
213 if (!event_reader.has_value()) {
214 PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
215 ", discarding data source.",
216 cpu);
217 return;
218 }
219 per_cpu_readers.emplace_back(std::move(event_reader.value()));
220 }
221
222 auto buffer_id = static_cast<BufferID>(config.target_buffer());
223 auto writer = endpoint_->CreateTraceWriter(buffer_id);
224
225 // Construct the data source instance.
226 std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
227 bool inserted;
228 std::tie(ds_it, inserted) = data_sources_.emplace(
229 std::piecewise_construct, std::forward_as_tuple(instance_id),
230 std::forward_as_tuple(event_config.value(), std::move(writer),
231 std::move(per_cpu_readers)));
232 PERFETTO_CHECK(inserted);
233 DataSourceState& ds = ds_it->second;
234
235 // Write out a packet to initialize the incremental state for this sequence.
236 InterningOutputTracker::WriteFixedInterningsPacket(
237 ds_it->second.trace_writer.get());
238
239 // Inform unwinder of the new data source instance, and optionally start a
240 // periodic task to clear its cached state.
241 unwinding_worker_->PostStartDataSource(instance_id);
242 if (ds.event_config.unwind_state_clear_period_ms()) {
243 unwinding_worker_->PostClearCachedStatePeriodic(
244 instance_id, ds.event_config.unwind_state_clear_period_ms());
245 }
246
247 // Kick off periodic read task.
248 auto tick_period_ms = ds.event_config.read_tick_period_ms();
249 auto weak_this = weak_factory_.GetWeakPtr();
250 task_runner_->PostDelayedTask(
251 [weak_this, instance_id] {
252 if (weak_this)
253 weak_this->TickDataSourceRead(instance_id);
254 },
255 TimeToNextReadTickMs(instance_id, tick_period_ms));
256 }
257
StopDataSource(DataSourceInstanceID instance_id)258 void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
259 PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(instance_id));
260
261 // Metatrace: stop immediately (will miss the events from the
262 // asynchronous shutdown of the primary data source).
263 auto meta_it = metatrace_writers_.find(instance_id);
264 if (meta_it != metatrace_writers_.end()) {
265 meta_it->second.WriteAllAndFlushTraceWriter([] {});
266 metatrace_writers_.erase(meta_it);
267 return;
268 }
269
270 auto ds_it = data_sources_.find(instance_id);
271 if (ds_it == data_sources_.end())
272 return;
273
274 // Start shutting down the reading frontend, which will propagate the stop
275 // further as the intermediate buffers are cleared.
276 DataSourceState& ds = ds_it->second;
277 InitiateReaderStop(&ds);
278 }
279
280 // The perf data sources ignore flush requests, as flushing would be
281 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
282 // Instead of responding to explicit flushes, we can ensure that we're otherwise
283 // well-behaved (do not reorder packets too much), and let the service scrape
284 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources)285 void PerfProducer::Flush(FlushRequestID flush_id,
286 const DataSourceInstanceID* data_source_ids,
287 size_t num_data_sources) {
288 bool should_ack_flush = false;
289 for (size_t i = 0; i < num_data_sources; i++) {
290 auto ds_id = data_source_ids[i];
291 PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
292
293 auto meta_it = metatrace_writers_.find(ds_id);
294 if (meta_it != metatrace_writers_.end()) {
295 meta_it->second.WriteAllAndFlushTraceWriter([] {});
296 should_ack_flush = true;
297 }
298 if (data_sources_.find(ds_id) != data_sources_.end()) {
299 should_ack_flush = true;
300 }
301 }
302 if (should_ack_flush)
303 endpoint_->NotifyFlushComplete(flush_id);
304 }
305
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)306 void PerfProducer::ClearIncrementalState(
307 const DataSourceInstanceID* data_source_ids,
308 size_t num_data_sources) {
309 for (size_t i = 0; i < num_data_sources; i++) {
310 auto ds_id = data_source_ids[i];
311 PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
312
313 if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
314 continue;
315
316 auto ds_it = data_sources_.find(ds_id);
317 if (ds_it == data_sources_.end()) {
318 PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
319 static_cast<size_t>(ds_id));
320 continue;
321 }
322 DataSourceState& ds = ds_it->second;
323
324 // Forget which incremental state we've emitted before.
325 ds.interning_output.ClearHistory();
326 InterningOutputTracker::WriteFixedInterningsPacket(ds.trace_writer.get());
327
328 // Drop the cross-datasource callstack interning trie. This is not
329 // necessary for correctness (the preceding step is sufficient). However,
330 // incremental clearing is likely to be used in ring buffer traces, where
331 // it makes sense to reset the trie's size periodically, and this is a
332 // reasonable point to do so. The trie keeps the monotonic interning IDs,
333 // so there is no confusion for other concurrent data sources. We do not
334 // bother with clearing concurrent sources' interning output trackers as
335 // their footprint should be trivial.
336 callstack_trie_.ClearTrie();
337 }
338 }
339
TickDataSourceRead(DataSourceInstanceID ds_id)340 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
341 auto it = data_sources_.find(ds_id);
342 if (it == data_sources_.end()) {
343 PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
344 static_cast<size_t>(ds_id));
345 return;
346 }
347 DataSourceState& ds = it->second;
348
349 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
350
351 // Make a pass over all per-cpu readers.
352 uint32_t max_samples = ds.event_config.samples_per_tick_limit();
353 bool more_records_available = false;
354 for (EventReader& reader : ds.per_cpu_readers) {
355 if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
356 more_records_available = true;
357 }
358 }
359
360 // Wake up the unwinder as we've (likely) pushed samples into its queue.
361 unwinding_worker_->PostProcessQueue();
362
363 if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
364 !more_records_available) {
365 unwinding_worker_->PostInitiateDataSourceStop(ds_id);
366 } else {
367 // otherwise, keep reading
368 auto tick_period_ms = it->second.event_config.read_tick_period_ms();
369 auto weak_this = weak_factory_.GetWeakPtr();
370 task_runner_->PostDelayedTask(
371 [weak_this, ds_id] {
372 if (weak_this)
373 weak_this->TickDataSourceRead(ds_id);
374 },
375 TimeToNextReadTickMs(ds_id, tick_period_ms));
376 }
377 }
378
ReadAndParsePerCpuBuffer(EventReader * reader,uint32_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)379 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
380 uint32_t max_samples,
381 DataSourceInstanceID ds_id,
382 DataSourceState* ds) {
383 PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
384
385 // If the kernel ring buffer dropped data, record it in the trace.
386 size_t cpu = reader->cpu();
387 auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
388 auto weak_this = weak_factory_.GetWeakPtr();
389 task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
390 if (weak_this)
391 weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
392 });
393 };
394
395 for (uint32_t i = 0; i < max_samples; i++) {
396 base::Optional<ParsedSample> sample =
397 reader->ReadUntilSample(records_lost_callback);
398 if (!sample) {
399 return false; // caught up to the writer
400 }
401
402 if (!sample->regs) {
403 continue; // skip kernel threads/workers
404 }
405
406 // Request proc-fds for the process if this is the first time we see it.
407 pid_t pid = sample->pid;
408 auto& process_state = ds->process_states[pid]; // insert if new
409
410 if (process_state == ProcessTrackingStatus::kExpired) {
411 PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
412 static_cast<int>(pid));
413 PostEmitSkippedSample(ds_id, std::move(sample.value()),
414 SampleSkipReason::kReadStage);
415 continue;
416 }
417
418 // Previously failed the target filter check.
419 if (process_state == ProcessTrackingStatus::kRejected) {
420 PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
421 static_cast<int>(pid));
422 continue;
423 }
424
425 // Seeing pid for the first time.
426 if (process_state == ProcessTrackingStatus::kInitial) {
427 PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
428
429 // Check whether samples for this new process should be
430 // dropped due to the target whitelist/blacklist.
431 const TargetFilter& filter = ds->event_config.filter();
432 if (ShouldRejectDueToFilter(pid, filter)) {
433 process_state = ProcessTrackingStatus::kRejected;
434 continue;
435 }
436
437 // At this point, sampled process is known to be of interest, so start
438 // resolving the proc-fds. Response is async.
439 process_state = ProcessTrackingStatus::kResolving;
440 InitiateDescriptorLookup(ds_id, pid,
441 ds->event_config.remote_descriptor_timeout_ms());
442 }
443
444 PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
445 process_state == ProcessTrackingStatus::kResolving);
446
447 // Push the sample into the unwinding queue if there is room.
448 auto& queue = unwinding_worker_->unwind_queue();
449 WriteView write_view = queue.BeginWrite();
450 if (write_view.valid) {
451 queue.at(write_view.write_pos) =
452 UnwindEntry{ds_id, std::move(sample.value())};
453 queue.CommitWrite();
454 } else {
455 PERFETTO_DLOG("Unwinder queue full, skipping sample");
456 PostEmitSkippedSample(ds_id, std::move(sample.value()),
457 SampleSkipReason::kUnwindEnqueue);
458 }
459 }
460
461 // Most likely more events in the kernel buffer. Though we might be exactly on
462 // the boundary due to |max_samples|.
463 return true;
464 }
465
466 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
467 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)468 void PerfProducer::OnProcDescriptors(pid_t pid,
469 base::ScopedFile maps_fd,
470 base::ScopedFile mem_fd) {
471 // Find first-fit data source that requested descriptors for the process.
472 for (auto& it : data_sources_) {
473 DataSourceState& ds = it.second;
474 auto proc_status_it = ds.process_states.find(pid);
475 if (proc_status_it == ds.process_states.end())
476 continue;
477
478 // Match against either resolving, or expired state. In the latter
479 // case, it means that the async response was slow enough that we've marked
480 // the lookup as expired (but can now recover for future samples).
481 auto proc_status = proc_status_it->second;
482 if (proc_status == ProcessTrackingStatus::kResolving ||
483 proc_status == ProcessTrackingStatus::kExpired) {
484 PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
485 static_cast<int>(pid), static_cast<size_t>(it.first));
486
487 proc_status_it->second = ProcessTrackingStatus::kResolved;
488 unwinding_worker_->PostAdoptProcDescriptors(
489 it.first, pid, std::move(maps_fd), std::move(mem_fd));
490 return; // done
491 }
492 }
493 PERFETTO_DLOG(
494 "Discarding proc-fds for pid [%d] as found no outstanding requests.",
495 static_cast<int>(pid));
496 }
497
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)498 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
499 pid_t pid,
500 uint32_t timeout_ms) {
501 if (!proc_fd_getter_->RequiresDelayedRequest()) {
502 StartDescriptorLookup(ds_id, pid, timeout_ms);
503 return;
504 }
505
506 // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
507 auto weak_this = weak_factory_.GetWeakPtr();
508 task_runner_->PostDelayedTask(
509 [weak_this, ds_id, pid, timeout_ms] {
510 if (weak_this)
511 weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
512 },
513 kProcDescriptorsAndroidDelayMs);
514 }
515
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)516 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
517 pid_t pid,
518 uint32_t timeout_ms) {
519 proc_fd_getter_->GetDescriptorsForPid(pid);
520
521 auto weak_this = weak_factory_.GetWeakPtr();
522 task_runner_->PostDelayedTask(
523 [weak_this, ds_id, pid] {
524 if (weak_this)
525 weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
526 },
527 timeout_ms);
528 }
529
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)530 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
531 pid_t pid) {
532 auto ds_it = data_sources_.find(ds_id);
533 if (ds_it == data_sources_.end())
534 return;
535
536 DataSourceState& ds = ds_it->second;
537 auto proc_status_it = ds.process_states.find(pid);
538 if (proc_status_it == ds.process_states.end())
539 return;
540
541 // If the request is still outstanding, mark the process as expired (causing
542 // outstanding and future samples to be discarded).
543 auto proc_status = proc_status_it->second;
544 if (proc_status == ProcessTrackingStatus::kResolving) {
545 PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
546 static_cast<int>(pid), static_cast<size_t>(ds_it->first));
547
548 proc_status_it->second = ProcessTrackingStatus::kExpired;
549 // Also inform the unwinder of the state change (so that it can discard any
550 // of the already-enqueued samples).
551 unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
552 }
553 }
554
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)555 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
556 CompletedSample sample) {
557 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
558 CompletedSample* raw_sample = new CompletedSample(std::move(sample));
559 auto weak_this = weak_factory_.GetWeakPtr();
560 task_runner_->PostTask([weak_this, ds_id, raw_sample] {
561 if (weak_this)
562 weak_this->EmitSample(ds_id, std::move(*raw_sample));
563 delete raw_sample;
564 });
565 }
566
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)567 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
568 CompletedSample sample) {
569 auto ds_it = data_sources_.find(ds_id);
570 PERFETTO_CHECK(ds_it != data_sources_.end());
571 DataSourceState& ds = ds_it->second;
572
573 // intern callsite
574 GlobalCallstackTrie::Node* callstack_root =
575 callstack_trie_.CreateCallsite(sample.frames);
576 uint64_t callstack_iid = callstack_root->id();
577
578 // start packet
579 auto packet = ds.trace_writer->NewTracePacket();
580 packet->set_timestamp(sample.timestamp);
581
582 // write new interning data (if any)
583 protos::pbzero::InternedData* interned_out = packet->set_interned_data();
584 ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
585 interned_out);
586
587 // write the sample itself
588 auto* perf_sample = packet->set_perf_sample();
589 perf_sample->set_cpu(sample.cpu);
590 perf_sample->set_pid(static_cast<uint32_t>(sample.pid));
591 perf_sample->set_tid(static_cast<uint32_t>(sample.tid));
592 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.cpu_mode));
593 perf_sample->set_callstack_iid(callstack_iid);
594 if (sample.unwind_error != unwindstack::ERROR_NONE) {
595 perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
596 }
597 }
598
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)599 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
600 size_t cpu,
601 uint64_t records_lost) {
602 auto ds_it = data_sources_.find(ds_id);
603 PERFETTO_CHECK(ds_it != data_sources_.end());
604 DataSourceState& ds = ds_it->second;
605 PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
606 static_cast<size_t>(ds_id), cpu, records_lost);
607
608 // The data loss record relates to a single ring buffer, and indicates loss
609 // since the last successfully-written record in that buffer. Therefore the
610 // data loss record itself has no timestamp.
611 // We timestamp the packet with the boot clock for packet ordering purposes,
612 // but it no longer has a (precise) interpretation relative to the sample
613 // stream from that per-cpu buffer. See the proto comments for more details.
614 auto packet = ds.trace_writer->NewTracePacket();
615 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
616
617 auto* perf_sample = packet->set_perf_sample();
618 perf_sample->set_cpu(static_cast<uint32_t>(cpu));
619 perf_sample->set_kernel_records_lost(records_lost);
620 }
621
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)622 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
623 ParsedSample sample) {
624 PostEmitSkippedSample(ds_id, std::move(sample),
625 SampleSkipReason::kUnwindStage);
626 }
627
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)628 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
629 ParsedSample sample,
630 SampleSkipReason reason) {
631 // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
632 ParsedSample* raw_sample = new ParsedSample(std::move(sample));
633 auto weak_this = weak_factory_.GetWeakPtr();
634 task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
635 if (weak_this)
636 weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
637 delete raw_sample;
638 });
639 }
640
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)641 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
642 ParsedSample sample,
643 SampleSkipReason reason) {
644 auto ds_it = data_sources_.find(ds_id);
645 PERFETTO_CHECK(ds_it != data_sources_.end());
646 DataSourceState& ds = ds_it->second;
647
648 auto packet = ds.trace_writer->NewTracePacket();
649 packet->set_timestamp(sample.timestamp);
650 auto* perf_sample = packet->set_perf_sample();
651 perf_sample->set_cpu(sample.cpu);
652 perf_sample->set_pid(static_cast<uint32_t>(sample.pid));
653 perf_sample->set_tid(static_cast<uint32_t>(sample.tid));
654 perf_sample->set_cpu_mode(ToCpuModeEnum(sample.cpu_mode));
655
656 using PerfSample = protos::pbzero::PerfSample;
657 switch (reason) {
658 case SampleSkipReason::kReadStage:
659 perf_sample->set_sample_skipped_reason(
660 PerfSample::PROFILER_SKIP_READ_STAGE);
661 break;
662 case SampleSkipReason::kUnwindEnqueue:
663 perf_sample->set_sample_skipped_reason(
664 PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
665 break;
666 case SampleSkipReason::kUnwindStage:
667 perf_sample->set_sample_skipped_reason(
668 PerfSample::PROFILER_SKIP_UNWIND_STAGE);
669 break;
670 }
671 }
672
InitiateReaderStop(DataSourceState * ds)673 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
674 PERFETTO_DLOG("InitiateReaderStop");
675 PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
676
677 ds->status = DataSourceState::Status::kShuttingDown;
678 for (auto& event_reader : ds->per_cpu_readers) {
679 event_reader.PauseEvents();
680 }
681 }
682
PostFinishDataSourceStop(DataSourceInstanceID ds_id)683 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
684 auto weak_producer = weak_factory_.GetWeakPtr();
685 task_runner_->PostTask([weak_producer, ds_id] {
686 if (weak_producer)
687 weak_producer->FinishDataSourceStop(ds_id);
688 });
689 }
690
FinishDataSourceStop(DataSourceInstanceID ds_id)691 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
692 PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
693 auto ds_it = data_sources_.find(ds_id);
694 PERFETTO_CHECK(ds_it != data_sources_.end());
695 DataSourceState& ds = ds_it->second;
696 PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
697
698 ds.trace_writer->Flush();
699 data_sources_.erase(ds_it);
700
701 endpoint_->NotifyDataSourceStopped(ds_id);
702
703 // Clean up resources if there are no more active sources.
704 if (data_sources_.empty()) {
705 callstack_trie_.ClearTrie(); // purge internings
706 MaybeReleaseAllocatorMemToOS();
707 }
708 }
709
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)710 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
711 BufferID target_buffer) {
712 auto writer = endpoint_->CreateTraceWriter(target_buffer);
713
714 auto it_and_inserted = metatrace_writers_.emplace(
715 std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
716 PERFETTO_DCHECK(it_and_inserted.second);
717 // Note: only the first concurrent writer will actually be active.
718 metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
719 metatrace::TAG_ANY);
720 }
721
ConnectWithRetries(const char * socket_name)722 void PerfProducer::ConnectWithRetries(const char* socket_name) {
723 PERFETTO_DCHECK(state_ == kNotStarted);
724 state_ = kNotConnected;
725
726 ResetConnectionBackoff();
727 producer_socket_name_ = socket_name;
728 ConnectService();
729 }
730
ConnectService()731 void PerfProducer::ConnectService() {
732 PERFETTO_DCHECK(state_ == kNotConnected);
733 state_ = kConnecting;
734 endpoint_ = ProducerIPCClient::Connect(
735 producer_socket_name_, this, kProducerName, task_runner_,
736 TracingService::ProducerSMBScrapingMode::kEnabled);
737 }
738
IncreaseConnectionBackoff()739 void PerfProducer::IncreaseConnectionBackoff() {
740 connection_backoff_ms_ *= 2;
741 if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
742 connection_backoff_ms_ = kMaxConnectionBackoffMs;
743 }
744
ResetConnectionBackoff()745 void PerfProducer::ResetConnectionBackoff() {
746 connection_backoff_ms_ = kInitialConnectionBackoffMs;
747 }
748
OnConnect()749 void PerfProducer::OnConnect() {
750 PERFETTO_DCHECK(state_ == kConnecting);
751 state_ = kConnected;
752 ResetConnectionBackoff();
753 PERFETTO_LOG("Connected to the service");
754
755 {
756 // linux.perf
757 DataSourceDescriptor desc;
758 desc.set_name(kDataSourceName);
759 desc.set_handles_incremental_state_clear(true);
760 desc.set_will_notify_on_stop(true);
761 endpoint_->RegisterDataSource(desc);
762 }
763 {
764 // metatrace
765 DataSourceDescriptor desc;
766 desc.set_name(MetatraceWriter::kDataSourceName);
767 endpoint_->RegisterDataSource(desc);
768 }
769 }
770
OnDisconnect()771 void PerfProducer::OnDisconnect() {
772 PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
773 PERFETTO_LOG("Disconnected from tracing service");
774
775 auto weak_producer = weak_factory_.GetWeakPtr();
776 if (state_ == kConnected)
777 return task_runner_->PostTask([weak_producer] {
778 if (weak_producer)
779 weak_producer->Restart();
780 });
781
782 state_ = kNotConnected;
783 IncreaseConnectionBackoff();
784 task_runner_->PostDelayedTask(
785 [weak_producer] {
786 if (weak_producer)
787 weak_producer->ConnectService();
788 },
789 connection_backoff_ms_);
790 }
791
Restart()792 void PerfProducer::Restart() {
793 // We lost the connection with the tracing service. At this point we need
794 // to reset all the data sources. Trying to handle that manually is going to
795 // be error prone. What we do here is simply destroy the instance and
796 // recreate it again.
797 base::TaskRunner* task_runner = task_runner_;
798 const char* socket_name = producer_socket_name_;
799 ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
800
801 // Invoke destructor and then the constructor again.
802 this->~PerfProducer();
803 new (this) PerfProducer(proc_fd_getter, task_runner);
804
805 ConnectWithRetries(socket_name);
806 }
807
808 } // namespace profiling
809 } // namespace perfetto
810