• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/perf/perf_producer.h"
18 
19 #include <random>
20 #include <utility>
21 
22 #include <malloc.h>
23 #include <unistd.h>
24 
25 #include <unwindstack/Error.h>
26 #include <unwindstack/Unwinder.h>
27 
28 #include "perfetto/base/logging.h"
29 #include "perfetto/base/task_runner.h"
30 #include "perfetto/ext/base/metatrace.h"
31 #include "perfetto/ext/base/weak_ptr.h"
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/data_source_descriptor.h"
38 #include "src/profiling/common/callstack_trie.h"
39 #include "src/profiling/common/proc_utils.h"
40 #include "src/profiling/common/unwind_support.h"
41 #include "src/profiling/perf/common_types.h"
42 #include "src/profiling/perf/event_reader.h"
43 
44 #include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
45 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
46 #include "protos/perfetto/trace/trace_packet.pbzero.h"
47 
48 namespace perfetto {
49 namespace profiling {
50 namespace {
51 
52 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
53 // window between a process image being replaced by execve, and the new
54 // libc instance reinstalling the proper signal handlers. During this window,
55 // the signal disposition is defaulted to terminating the process.
56 // This is a best-effort mitigation from the daemon's side, using a heuristic
57 // that most execve calls follow a fork. So if we get a sample for a very fresh
58 // process, the grace period will give it a chance to get to
59 // a properly initialised state prior to getting signalled. This doesn't help
60 // cases when a mature process calls execve, or when the target gets descheduled
61 // (since this is a naive walltime wait).
62 // The proper fix is in the platform, see bug for progress.
63 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
64 
65 constexpr uint32_t kInitialConnectionBackoffMs = 100;
66 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
67 
68 constexpr char kProducerName[] = "perfetto.traced_perf";
69 constexpr char kDataSourceName[] = "linux.perf";
70 
NumberOfCpus()71 size_t NumberOfCpus() {
72   return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
73 }
74 
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)75 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
76   // Normally, we'd schedule the next tick at the next |period_ms|
77   // boundary of the boot clock. However, to avoid aligning the read tasks of
78   // all concurrent data sources, we select a deterministic offset based on the
79   // data source id.
80   std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
81   std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
82   uint32_t ds_period_offset = dist(prng);
83 
84   uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
85   return period_ms - ((now_ms - ds_period_offset) % period_ms);
86 }
87 
ShouldRejectDueToFilter(pid_t pid,const TargetFilter & filter)88 bool ShouldRejectDueToFilter(pid_t pid, const TargetFilter& filter) {
89   bool reject_cmd = false;
90   std::string cmdline;
91   if (GetCmdlineForPID(pid, &cmdline)) {  // normalized form
92     // reject if absent from non-empty whitelist, or present in blacklist
93     reject_cmd = (filter.cmdlines.size() && !filter.cmdlines.count(cmdline)) ||
94                  filter.exclude_cmdlines.count(cmdline);
95   } else {
96     PERFETTO_DLOG("Failed to look up cmdline for pid [%d]",
97                   static_cast<int>(pid));
98     // reject only if there's a whitelist present
99     reject_cmd = filter.cmdlines.size() > 0;
100   }
101 
102   bool reject_pid = (filter.pids.size() && !filter.pids.count(pid)) ||
103                     filter.exclude_pids.count(pid);
104 
105   if (reject_cmd || reject_pid) {
106     PERFETTO_DLOG(
107         "Rejecting samples for pid [%d] due to cmdline(%d) or pid(%d)",
108         static_cast<int>(pid), reject_cmd, reject_pid);
109 
110     return true;
111   }
112   return false;
113 }
114 
MaybeReleaseAllocatorMemToOS()115 void MaybeReleaseAllocatorMemToOS() {
116 #if defined(__BIONIC__)
117   // TODO(b/152414415): libunwindstack's volume of small allocations is
118   // adverarial to scudo, which doesn't automatically release small
119   // allocation regions back to the OS. Forceful purge does reclaim all size
120   // classes.
121   mallopt(M_PURGE, 0);
122 #endif
123 }
124 
ToCpuModeEnum(uint16_t perf_cpu_mode)125 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
126   using Profiling = protos::pbzero::Profiling;
127   switch (perf_cpu_mode) {
128     case PERF_RECORD_MISC_KERNEL:
129       return Profiling::MODE_KERNEL;
130     case PERF_RECORD_MISC_USER:
131       return Profiling::MODE_USER;
132     case PERF_RECORD_MISC_HYPERVISOR:
133       return Profiling::MODE_HYPERVISOR;
134     case PERF_RECORD_MISC_GUEST_KERNEL:
135       return Profiling::MODE_GUEST_KERNEL;
136     case PERF_RECORD_MISC_GUEST_USER:
137       return Profiling::MODE_GUEST_USER;
138     default:
139       return Profiling::MODE_UNKNOWN;
140   }
141 }
142 
ToProtoEnum(unwindstack::ErrorCode error_code)143 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
144     unwindstack::ErrorCode error_code) {
145   using Profiling = protos::pbzero::Profiling;
146   switch (error_code) {
147     case unwindstack::ERROR_NONE:
148       return Profiling::UNWIND_ERROR_NONE;
149     case unwindstack::ERROR_MEMORY_INVALID:
150       return Profiling::UNWIND_ERROR_MEMORY_INVALID;
151     case unwindstack::ERROR_UNWIND_INFO:
152       return Profiling::UNWIND_ERROR_UNWIND_INFO;
153     case unwindstack::ERROR_UNSUPPORTED:
154       return Profiling::UNWIND_ERROR_UNSUPPORTED;
155     case unwindstack::ERROR_INVALID_MAP:
156       return Profiling::UNWIND_ERROR_INVALID_MAP;
157     case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
158       return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
159     case unwindstack::ERROR_REPEATED_FRAME:
160       return Profiling::UNWIND_ERROR_REPEATED_FRAME;
161     case unwindstack::ERROR_INVALID_ELF:
162       return Profiling::UNWIND_ERROR_INVALID_ELF;
163   }
164   return Profiling::UNWIND_ERROR_UNKNOWN;
165 }
166 
167 }  // namespace
168 
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)169 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
170                            base::TaskRunner* task_runner)
171     : task_runner_(task_runner),
172       proc_fd_getter_(proc_fd_getter),
173       unwinding_worker_(this),
174       weak_factory_(this) {
175   proc_fd_getter->SetDelegate(this);
176 }
177 
178 // TODO(rsavitski): consider configure at setup + enable at start instead.
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)179 void PerfProducer::SetupDataSource(DataSourceInstanceID,
180                                    const DataSourceConfig&) {}
181 
StartDataSource(DataSourceInstanceID instance_id,const DataSourceConfig & config)182 void PerfProducer::StartDataSource(DataSourceInstanceID instance_id,
183                                    const DataSourceConfig& config) {
184   PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(instance_id),
185                config.name().c_str());
186 
187   if (config.name() == MetatraceWriter::kDataSourceName) {
188     StartMetatraceSource(instance_id,
189                          static_cast<BufferID>(config.target_buffer()));
190     return;
191   }
192 
193   // linux.perf data source
194   if (config.name() != kDataSourceName)
195     return;
196 
197   base::Optional<EventConfig> event_config = EventConfig::Create(config);
198   if (!event_config.has_value()) {
199     PERFETTO_ELOG("PerfEventConfig rejected.");
200     return;
201   }
202 
203   // TODO(rsavitski): consider supporting specific cpu subsets.
204   if (!event_config->target_all_cpus()) {
205     PERFETTO_ELOG("PerfEventConfig{all_cpus} required");
206     return;
207   }
208   size_t num_cpus = NumberOfCpus();
209   std::vector<EventReader> per_cpu_readers;
210   for (uint32_t cpu = 0; cpu < num_cpus; cpu++) {
211     base::Optional<EventReader> event_reader =
212         EventReader::ConfigureEvents(cpu, event_config.value());
213     if (!event_reader.has_value()) {
214       PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
215                     ", discarding data source.",
216                     cpu);
217       return;
218     }
219     per_cpu_readers.emplace_back(std::move(event_reader.value()));
220   }
221 
222   auto buffer_id = static_cast<BufferID>(config.target_buffer());
223   auto writer = endpoint_->CreateTraceWriter(buffer_id);
224 
225   // Construct the data source instance.
226   std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
227   bool inserted;
228   std::tie(ds_it, inserted) = data_sources_.emplace(
229       std::piecewise_construct, std::forward_as_tuple(instance_id),
230       std::forward_as_tuple(event_config.value(), std::move(writer),
231                             std::move(per_cpu_readers)));
232   PERFETTO_CHECK(inserted);
233   DataSourceState& ds = ds_it->second;
234 
235   // Write out a packet to initialize the incremental state for this sequence.
236   InterningOutputTracker::WriteFixedInterningsPacket(
237       ds_it->second.trace_writer.get());
238 
239   // Inform unwinder of the new data source instance, and optionally start a
240   // periodic task to clear its cached state.
241   unwinding_worker_->PostStartDataSource(instance_id);
242   if (ds.event_config.unwind_state_clear_period_ms()) {
243     unwinding_worker_->PostClearCachedStatePeriodic(
244         instance_id, ds.event_config.unwind_state_clear_period_ms());
245   }
246 
247   // Kick off periodic read task.
248   auto tick_period_ms = ds.event_config.read_tick_period_ms();
249   auto weak_this = weak_factory_.GetWeakPtr();
250   task_runner_->PostDelayedTask(
251       [weak_this, instance_id] {
252         if (weak_this)
253           weak_this->TickDataSourceRead(instance_id);
254       },
255       TimeToNextReadTickMs(instance_id, tick_period_ms));
256 }
257 
StopDataSource(DataSourceInstanceID instance_id)258 void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
259   PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(instance_id));
260 
261   // Metatrace: stop immediately (will miss the events from the
262   // asynchronous shutdown of the primary data source).
263   auto meta_it = metatrace_writers_.find(instance_id);
264   if (meta_it != metatrace_writers_.end()) {
265     meta_it->second.WriteAllAndFlushTraceWriter([] {});
266     metatrace_writers_.erase(meta_it);
267     return;
268   }
269 
270   auto ds_it = data_sources_.find(instance_id);
271   if (ds_it == data_sources_.end())
272     return;
273 
274   // Start shutting down the reading frontend, which will propagate the stop
275   // further as the intermediate buffers are cleared.
276   DataSourceState& ds = ds_it->second;
277   InitiateReaderStop(&ds);
278 }
279 
280 // The perf data sources ignore flush requests, as flushing would be
281 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
282 // Instead of responding to explicit flushes, we can ensure that we're otherwise
283 // well-behaved (do not reorder packets too much), and let the service scrape
284 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources)285 void PerfProducer::Flush(FlushRequestID flush_id,
286                          const DataSourceInstanceID* data_source_ids,
287                          size_t num_data_sources) {
288   bool should_ack_flush = false;
289   for (size_t i = 0; i < num_data_sources; i++) {
290     auto ds_id = data_source_ids[i];
291     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
292 
293     auto meta_it = metatrace_writers_.find(ds_id);
294     if (meta_it != metatrace_writers_.end()) {
295       meta_it->second.WriteAllAndFlushTraceWriter([] {});
296       should_ack_flush = true;
297     }
298     if (data_sources_.find(ds_id) != data_sources_.end()) {
299       should_ack_flush = true;
300     }
301   }
302   if (should_ack_flush)
303     endpoint_->NotifyFlushComplete(flush_id);
304 }
305 
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)306 void PerfProducer::ClearIncrementalState(
307     const DataSourceInstanceID* data_source_ids,
308     size_t num_data_sources) {
309   for (size_t i = 0; i < num_data_sources; i++) {
310     auto ds_id = data_source_ids[i];
311     PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
312 
313     if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
314       continue;
315 
316     auto ds_it = data_sources_.find(ds_id);
317     if (ds_it == data_sources_.end()) {
318       PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
319                     static_cast<size_t>(ds_id));
320       continue;
321     }
322     DataSourceState& ds = ds_it->second;
323 
324     // Forget which incremental state we've emitted before.
325     ds.interning_output.ClearHistory();
326     InterningOutputTracker::WriteFixedInterningsPacket(ds.trace_writer.get());
327 
328     // Drop the cross-datasource callstack interning trie. This is not
329     // necessary for correctness (the preceding step is sufficient). However,
330     // incremental clearing is likely to be used in ring buffer traces, where
331     // it makes sense to reset the trie's size periodically, and this is a
332     // reasonable point to do so. The trie keeps the monotonic interning IDs,
333     // so there is no confusion for other concurrent data sources. We do not
334     // bother with clearing concurrent sources' interning output trackers as
335     // their footprint should be trivial.
336     callstack_trie_.ClearTrie();
337   }
338 }
339 
TickDataSourceRead(DataSourceInstanceID ds_id)340 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
341   auto it = data_sources_.find(ds_id);
342   if (it == data_sources_.end()) {
343     PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
344                   static_cast<size_t>(ds_id));
345     return;
346   }
347   DataSourceState& ds = it->second;
348 
349   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
350 
351   // Make a pass over all per-cpu readers.
352   uint32_t max_samples = ds.event_config.samples_per_tick_limit();
353   bool more_records_available = false;
354   for (EventReader& reader : ds.per_cpu_readers) {
355     if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
356       more_records_available = true;
357     }
358   }
359 
360   // Wake up the unwinder as we've (likely) pushed samples into its queue.
361   unwinding_worker_->PostProcessQueue();
362 
363   if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
364       !more_records_available) {
365     unwinding_worker_->PostInitiateDataSourceStop(ds_id);
366   } else {
367     // otherwise, keep reading
368     auto tick_period_ms = it->second.event_config.read_tick_period_ms();
369     auto weak_this = weak_factory_.GetWeakPtr();
370     task_runner_->PostDelayedTask(
371         [weak_this, ds_id] {
372           if (weak_this)
373             weak_this->TickDataSourceRead(ds_id);
374         },
375         TimeToNextReadTickMs(ds_id, tick_period_ms));
376   }
377 }
378 
ReadAndParsePerCpuBuffer(EventReader * reader,uint32_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)379 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
380                                             uint32_t max_samples,
381                                             DataSourceInstanceID ds_id,
382                                             DataSourceState* ds) {
383   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
384 
385   // If the kernel ring buffer dropped data, record it in the trace.
386   size_t cpu = reader->cpu();
387   auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
388     auto weak_this = weak_factory_.GetWeakPtr();
389     task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
390       if (weak_this)
391         weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
392     });
393   };
394 
395   for (uint32_t i = 0; i < max_samples; i++) {
396     base::Optional<ParsedSample> sample =
397         reader->ReadUntilSample(records_lost_callback);
398     if (!sample) {
399       return false;  // caught up to the writer
400     }
401 
402     if (!sample->regs) {
403       continue;  // skip kernel threads/workers
404     }
405 
406     // Request proc-fds for the process if this is the first time we see it.
407     pid_t pid = sample->pid;
408     auto& process_state = ds->process_states[pid];  // insert if new
409 
410     if (process_state == ProcessTrackingStatus::kExpired) {
411       PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
412                     static_cast<int>(pid));
413       PostEmitSkippedSample(ds_id, std::move(sample.value()),
414                             SampleSkipReason::kReadStage);
415       continue;
416     }
417 
418     // Previously failed the target filter check.
419     if (process_state == ProcessTrackingStatus::kRejected) {
420       PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
421                     static_cast<int>(pid));
422       continue;
423     }
424 
425     // Seeing pid for the first time.
426     if (process_state == ProcessTrackingStatus::kInitial) {
427       PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
428 
429       // Check whether samples for this new process should be
430       // dropped due to the target whitelist/blacklist.
431       const TargetFilter& filter = ds->event_config.filter();
432       if (ShouldRejectDueToFilter(pid, filter)) {
433         process_state = ProcessTrackingStatus::kRejected;
434         continue;
435       }
436 
437       // At this point, sampled process is known to be of interest, so start
438       // resolving the proc-fds. Response is async.
439       process_state = ProcessTrackingStatus::kResolving;
440       InitiateDescriptorLookup(ds_id, pid,
441                                ds->event_config.remote_descriptor_timeout_ms());
442     }
443 
444     PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
445                    process_state == ProcessTrackingStatus::kResolving);
446 
447     // Push the sample into the unwinding queue if there is room.
448     auto& queue = unwinding_worker_->unwind_queue();
449     WriteView write_view = queue.BeginWrite();
450     if (write_view.valid) {
451       queue.at(write_view.write_pos) =
452           UnwindEntry{ds_id, std::move(sample.value())};
453       queue.CommitWrite();
454     } else {
455       PERFETTO_DLOG("Unwinder queue full, skipping sample");
456       PostEmitSkippedSample(ds_id, std::move(sample.value()),
457                             SampleSkipReason::kUnwindEnqueue);
458     }
459   }
460 
461   // Most likely more events in the kernel buffer. Though we might be exactly on
462   // the boundary due to |max_samples|.
463   return true;
464 }
465 
466 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
467 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)468 void PerfProducer::OnProcDescriptors(pid_t pid,
469                                      base::ScopedFile maps_fd,
470                                      base::ScopedFile mem_fd) {
471   // Find first-fit data source that requested descriptors for the process.
472   for (auto& it : data_sources_) {
473     DataSourceState& ds = it.second;
474     auto proc_status_it = ds.process_states.find(pid);
475     if (proc_status_it == ds.process_states.end())
476       continue;
477 
478     // Match against either resolving, or expired state. In the latter
479     // case, it means that the async response was slow enough that we've marked
480     // the lookup as expired (but can now recover for future samples).
481     auto proc_status = proc_status_it->second;
482     if (proc_status == ProcessTrackingStatus::kResolving ||
483         proc_status == ProcessTrackingStatus::kExpired) {
484       PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
485                     static_cast<int>(pid), static_cast<size_t>(it.first));
486 
487       proc_status_it->second = ProcessTrackingStatus::kResolved;
488       unwinding_worker_->PostAdoptProcDescriptors(
489           it.first, pid, std::move(maps_fd), std::move(mem_fd));
490       return;  // done
491     }
492   }
493   PERFETTO_DLOG(
494       "Discarding proc-fds for pid [%d] as found no outstanding requests.",
495       static_cast<int>(pid));
496 }
497 
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)498 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
499                                             pid_t pid,
500                                             uint32_t timeout_ms) {
501   if (!proc_fd_getter_->RequiresDelayedRequest()) {
502     StartDescriptorLookup(ds_id, pid, timeout_ms);
503     return;
504   }
505 
506   // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
507   auto weak_this = weak_factory_.GetWeakPtr();
508   task_runner_->PostDelayedTask(
509       [weak_this, ds_id, pid, timeout_ms] {
510         if (weak_this)
511           weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
512       },
513       kProcDescriptorsAndroidDelayMs);
514 }
515 
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)516 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
517                                          pid_t pid,
518                                          uint32_t timeout_ms) {
519   proc_fd_getter_->GetDescriptorsForPid(pid);
520 
521   auto weak_this = weak_factory_.GetWeakPtr();
522   task_runner_->PostDelayedTask(
523       [weak_this, ds_id, pid] {
524         if (weak_this)
525           weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
526       },
527       timeout_ms);
528 }
529 
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)530 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
531                                                    pid_t pid) {
532   auto ds_it = data_sources_.find(ds_id);
533   if (ds_it == data_sources_.end())
534     return;
535 
536   DataSourceState& ds = ds_it->second;
537   auto proc_status_it = ds.process_states.find(pid);
538   if (proc_status_it == ds.process_states.end())
539     return;
540 
541   // If the request is still outstanding, mark the process as expired (causing
542   // outstanding and future samples to be discarded).
543   auto proc_status = proc_status_it->second;
544   if (proc_status == ProcessTrackingStatus::kResolving) {
545     PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
546                   static_cast<int>(pid), static_cast<size_t>(ds_it->first));
547 
548     proc_status_it->second = ProcessTrackingStatus::kExpired;
549     // Also inform the unwinder of the state change (so that it can discard any
550     // of the already-enqueued samples).
551     unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
552   }
553 }
554 
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)555 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
556                                   CompletedSample sample) {
557   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
558   CompletedSample* raw_sample = new CompletedSample(std::move(sample));
559   auto weak_this = weak_factory_.GetWeakPtr();
560   task_runner_->PostTask([weak_this, ds_id, raw_sample] {
561     if (weak_this)
562       weak_this->EmitSample(ds_id, std::move(*raw_sample));
563     delete raw_sample;
564   });
565 }
566 
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)567 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
568                               CompletedSample sample) {
569   auto ds_it = data_sources_.find(ds_id);
570   PERFETTO_CHECK(ds_it != data_sources_.end());
571   DataSourceState& ds = ds_it->second;
572 
573   // intern callsite
574   GlobalCallstackTrie::Node* callstack_root =
575       callstack_trie_.CreateCallsite(sample.frames);
576   uint64_t callstack_iid = callstack_root->id();
577 
578   // start packet
579   auto packet = ds.trace_writer->NewTracePacket();
580   packet->set_timestamp(sample.timestamp);
581 
582   // write new interning data (if any)
583   protos::pbzero::InternedData* interned_out = packet->set_interned_data();
584   ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
585                                      interned_out);
586 
587   // write the sample itself
588   auto* perf_sample = packet->set_perf_sample();
589   perf_sample->set_cpu(sample.cpu);
590   perf_sample->set_pid(static_cast<uint32_t>(sample.pid));
591   perf_sample->set_tid(static_cast<uint32_t>(sample.tid));
592   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.cpu_mode));
593   perf_sample->set_callstack_iid(callstack_iid);
594   if (sample.unwind_error != unwindstack::ERROR_NONE) {
595     perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
596   }
597 }
598 
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)599 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
600                                       size_t cpu,
601                                       uint64_t records_lost) {
602   auto ds_it = data_sources_.find(ds_id);
603   PERFETTO_CHECK(ds_it != data_sources_.end());
604   DataSourceState& ds = ds_it->second;
605   PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
606                 static_cast<size_t>(ds_id), cpu, records_lost);
607 
608   // The data loss record relates to a single ring buffer, and indicates loss
609   // since the last successfully-written record in that buffer. Therefore the
610   // data loss record itself has no timestamp.
611   // We timestamp the packet with the boot clock for packet ordering purposes,
612   // but it no longer has a (precise) interpretation relative to the sample
613   // stream from that per-cpu buffer. See the proto comments for more details.
614   auto packet = ds.trace_writer->NewTracePacket();
615   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
616 
617   auto* perf_sample = packet->set_perf_sample();
618   perf_sample->set_cpu(static_cast<uint32_t>(cpu));
619   perf_sample->set_kernel_records_lost(records_lost);
620 }
621 
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)622 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
623                                                  ParsedSample sample) {
624   PostEmitSkippedSample(ds_id, std::move(sample),
625                         SampleSkipReason::kUnwindStage);
626 }
627 
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)628 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
629                                          ParsedSample sample,
630                                          SampleSkipReason reason) {
631   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
632   ParsedSample* raw_sample = new ParsedSample(std::move(sample));
633   auto weak_this = weak_factory_.GetWeakPtr();
634   task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
635     if (weak_this)
636       weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
637     delete raw_sample;
638   });
639 }
640 
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)641 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
642                                      ParsedSample sample,
643                                      SampleSkipReason reason) {
644   auto ds_it = data_sources_.find(ds_id);
645   PERFETTO_CHECK(ds_it != data_sources_.end());
646   DataSourceState& ds = ds_it->second;
647 
648   auto packet = ds.trace_writer->NewTracePacket();
649   packet->set_timestamp(sample.timestamp);
650   auto* perf_sample = packet->set_perf_sample();
651   perf_sample->set_cpu(sample.cpu);
652   perf_sample->set_pid(static_cast<uint32_t>(sample.pid));
653   perf_sample->set_tid(static_cast<uint32_t>(sample.tid));
654   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.cpu_mode));
655 
656   using PerfSample = protos::pbzero::PerfSample;
657   switch (reason) {
658     case SampleSkipReason::kReadStage:
659       perf_sample->set_sample_skipped_reason(
660           PerfSample::PROFILER_SKIP_READ_STAGE);
661       break;
662     case SampleSkipReason::kUnwindEnqueue:
663       perf_sample->set_sample_skipped_reason(
664           PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
665       break;
666     case SampleSkipReason::kUnwindStage:
667       perf_sample->set_sample_skipped_reason(
668           PerfSample::PROFILER_SKIP_UNWIND_STAGE);
669       break;
670   }
671 }
672 
InitiateReaderStop(DataSourceState * ds)673 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
674   PERFETTO_DLOG("InitiateReaderStop");
675   PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
676 
677   ds->status = DataSourceState::Status::kShuttingDown;
678   for (auto& event_reader : ds->per_cpu_readers) {
679     event_reader.PauseEvents();
680   }
681 }
682 
PostFinishDataSourceStop(DataSourceInstanceID ds_id)683 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
684   auto weak_producer = weak_factory_.GetWeakPtr();
685   task_runner_->PostTask([weak_producer, ds_id] {
686     if (weak_producer)
687       weak_producer->FinishDataSourceStop(ds_id);
688   });
689 }
690 
FinishDataSourceStop(DataSourceInstanceID ds_id)691 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
692   PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
693   auto ds_it = data_sources_.find(ds_id);
694   PERFETTO_CHECK(ds_it != data_sources_.end());
695   DataSourceState& ds = ds_it->second;
696   PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
697 
698   ds.trace_writer->Flush();
699   data_sources_.erase(ds_it);
700 
701   endpoint_->NotifyDataSourceStopped(ds_id);
702 
703   // Clean up resources if there are no more active sources.
704   if (data_sources_.empty()) {
705     callstack_trie_.ClearTrie();  // purge internings
706     MaybeReleaseAllocatorMemToOS();
707   }
708 }
709 
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)710 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
711                                         BufferID target_buffer) {
712   auto writer = endpoint_->CreateTraceWriter(target_buffer);
713 
714   auto it_and_inserted = metatrace_writers_.emplace(
715       std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
716   PERFETTO_DCHECK(it_and_inserted.second);
717   // Note: only the first concurrent writer will actually be active.
718   metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
719                                    metatrace::TAG_ANY);
720 }
721 
ConnectWithRetries(const char * socket_name)722 void PerfProducer::ConnectWithRetries(const char* socket_name) {
723   PERFETTO_DCHECK(state_ == kNotStarted);
724   state_ = kNotConnected;
725 
726   ResetConnectionBackoff();
727   producer_socket_name_ = socket_name;
728   ConnectService();
729 }
730 
ConnectService()731 void PerfProducer::ConnectService() {
732   PERFETTO_DCHECK(state_ == kNotConnected);
733   state_ = kConnecting;
734   endpoint_ = ProducerIPCClient::Connect(
735       producer_socket_name_, this, kProducerName, task_runner_,
736       TracingService::ProducerSMBScrapingMode::kEnabled);
737 }
738 
IncreaseConnectionBackoff()739 void PerfProducer::IncreaseConnectionBackoff() {
740   connection_backoff_ms_ *= 2;
741   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
742     connection_backoff_ms_ = kMaxConnectionBackoffMs;
743 }
744 
ResetConnectionBackoff()745 void PerfProducer::ResetConnectionBackoff() {
746   connection_backoff_ms_ = kInitialConnectionBackoffMs;
747 }
748 
OnConnect()749 void PerfProducer::OnConnect() {
750   PERFETTO_DCHECK(state_ == kConnecting);
751   state_ = kConnected;
752   ResetConnectionBackoff();
753   PERFETTO_LOG("Connected to the service");
754 
755   {
756     // linux.perf
757     DataSourceDescriptor desc;
758     desc.set_name(kDataSourceName);
759     desc.set_handles_incremental_state_clear(true);
760     desc.set_will_notify_on_stop(true);
761     endpoint_->RegisterDataSource(desc);
762   }
763   {
764     // metatrace
765     DataSourceDescriptor desc;
766     desc.set_name(MetatraceWriter::kDataSourceName);
767     endpoint_->RegisterDataSource(desc);
768   }
769 }
770 
OnDisconnect()771 void PerfProducer::OnDisconnect() {
772   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
773   PERFETTO_LOG("Disconnected from tracing service");
774 
775   auto weak_producer = weak_factory_.GetWeakPtr();
776   if (state_ == kConnected)
777     return task_runner_->PostTask([weak_producer] {
778       if (weak_producer)
779         weak_producer->Restart();
780     });
781 
782   state_ = kNotConnected;
783   IncreaseConnectionBackoff();
784   task_runner_->PostDelayedTask(
785       [weak_producer] {
786         if (weak_producer)
787           weak_producer->ConnectService();
788       },
789       connection_backoff_ms_);
790 }
791 
Restart()792 void PerfProducer::Restart() {
793   // We lost the connection with the tracing service. At this point we need
794   // to reset all the data sources. Trying to handle that manually is going to
795   // be error prone. What we do here is simply destroy the instance and
796   // recreate it again.
797   base::TaskRunner* task_runner = task_runner_;
798   const char* socket_name = producer_socket_name_;
799   ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
800 
801   // Invoke destructor and then the constructor again.
802   this->~PerfProducer();
803   new (this) PerfProducer(proc_fd_getter, task_runner);
804 
805   ConnectWithRetries(socket_name);
806 }
807 
808 }  // namespace profiling
809 }  // namespace perfetto
810