• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/profiling/perf/perf_producer.h"
18 
19 #include <random>
20 #include <utility>
21 
22 #include <unistd.h>
23 
24 #include <unwindstack/Error.h>
25 #include <unwindstack/Unwinder.h>
26 
27 #include "perfetto/base/logging.h"
28 #include "perfetto/base/task_runner.h"
29 #include "perfetto/ext/base/metatrace.h"
30 #include "perfetto/ext/base/utils.h"
31 #include "perfetto/ext/base/weak_ptr.h"
32 #include "perfetto/ext/tracing/core/basic_types.h"
33 #include "perfetto/ext/tracing/core/producer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/data_source_descriptor.h"
38 #include "src/profiling/common/callstack_trie.h"
39 #include "src/profiling/common/proc_cmdline.h"
40 #include "src/profiling/common/producer_support.h"
41 #include "src/profiling/common/profiler_guardrails.h"
42 #include "src/profiling/common/unwind_support.h"
43 #include "src/profiling/perf/common_types.h"
44 #include "src/profiling/perf/event_reader.h"
45 
46 #include "protos/perfetto/common/builtin_clock.pbzero.h"
47 #include "protos/perfetto/common/perf_events.gen.h"
48 #include "protos/perfetto/common/perf_events.pbzero.h"
49 #include "protos/perfetto/config/profiling/perf_event_config.gen.h"
50 #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
51 #include "protos/perfetto/trace/trace_packet.pbzero.h"
52 #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h"
53 
54 namespace perfetto {
55 namespace profiling {
56 namespace {
57 
58 // TODO(b/151835887): on Android, when using signals, there exists a vulnerable
59 // window between a process image being replaced by execve, and the new
60 // libc instance reinstalling the proper signal handlers. During this window,
61 // the signal disposition is defaulted to terminating the process.
62 // This is a best-effort mitigation from the daemon's side, using a heuristic
63 // that most execve calls follow a fork. So if we get a sample for a very fresh
64 // process, the grace period will give it a chance to get to
65 // a properly initialised state prior to getting signalled. This doesn't help
66 // cases when a mature process calls execve, or when the target gets descheduled
67 // (since this is a naive walltime wait).
68 // The proper fix is in the platform, see bug for progress.
69 constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50;
70 
71 constexpr uint32_t kMemoryLimitCheckPeriodMs = 1000;
72 
73 constexpr uint32_t kInitialConnectionBackoffMs = 100;
74 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
75 
76 constexpr char kProducerName[] = "perfetto.traced_perf";
77 constexpr char kDataSourceName[] = "linux.perf";
78 
NumberOfCpus()79 size_t NumberOfCpus() {
80   return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
81 }
82 
ToBuiltinClock(int32_t clockid)83 int32_t ToBuiltinClock(int32_t clockid) {
84   switch (clockid) {
85     case CLOCK_REALTIME:
86       return protos::pbzero::BUILTIN_CLOCK_REALTIME;
87     case CLOCK_MONOTONIC:
88       return protos::pbzero::BUILTIN_CLOCK_MONOTONIC;
89     case CLOCK_MONOTONIC_RAW:
90       return protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW;
91     case CLOCK_BOOTTIME:
92       return protos::pbzero::BUILTIN_CLOCK_BOOTTIME;
93     // Should never get invalid input here as otherwise the syscall itself
94     // would've failed earlier.
95     default:
96       return protos::pbzero::BUILTIN_CLOCK_UNKNOWN;
97   }
98 }
99 
StartTracePacket(TraceWriter * trace_writer)100 TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) {
101   auto packet = trace_writer->NewTracePacket();
102   packet->set_sequence_flags(
103       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
104   return packet;
105 }
106 
WritePerfEventDefaultsPacket(const EventConfig & event_config,TraceWriter * trace_writer)107 void WritePerfEventDefaultsPacket(const EventConfig& event_config,
108                                   TraceWriter* trace_writer) {
109   auto packet = trace_writer->NewTracePacket();
110   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
111   packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME);
112 
113   // start new incremental state generation:
114   packet->set_sequence_flags(
115       protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
116 
117   // default packet timestamp clock for the samples:
118   perf_event_attr* perf_attr = event_config.perf_attr();
119   auto* defaults = packet->set_trace_packet_defaults();
120   int32_t builtin_clock = ToBuiltinClock(perf_attr->clockid);
121   defaults->set_timestamp_clock_id(static_cast<uint32_t>(builtin_clock));
122 
123   auto* perf_defaults = defaults->set_perf_sample_defaults();
124   auto* timebase_pb = perf_defaults->set_timebase();
125 
126   // frequency/period:
127   if (perf_attr->freq) {
128     timebase_pb->set_frequency(perf_attr->sample_freq);
129   } else {
130     timebase_pb->set_period(perf_attr->sample_period);
131   }
132 
133   // event:
134   const PerfCounter& timebase = event_config.timebase_event();
135   switch (timebase.event_type()) {
136     case PerfCounter::Type::kBuiltinCounter: {
137       timebase_pb->set_counter(
138           static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter));
139       break;
140     }
141     case PerfCounter::Type::kTracepoint: {
142       auto* tracepoint_pb = timebase_pb->set_tracepoint();
143       tracepoint_pb->set_name(timebase.tracepoint_name);
144       tracepoint_pb->set_filter(timebase.tracepoint_filter);
145       break;
146     }
147     case PerfCounter::Type::kRawEvent: {
148       auto* raw_pb = timebase_pb->set_raw_event();
149       raw_pb->set_type(timebase.attr_type);
150       raw_pb->set_config(timebase.attr_config);
151       raw_pb->set_config1(timebase.attr_config1);
152       raw_pb->set_config2(timebase.attr_config2);
153       break;
154     }
155   }
156 
157   // optional name to identify the counter during parsing:
158   if (!timebase.name.empty()) {
159     timebase_pb->set_name(timebase.name);
160   }
161 
162   // Not setting timebase.timestamp_clock since the field that matters during
163   // parsing is the root timestamp_clock_id set above.
164 }
165 
TimeToNextReadTickMs(DataSourceInstanceID ds_id,uint32_t period_ms)166 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
167   // Normally, we'd schedule the next tick at the next |period_ms|
168   // boundary of the boot clock. However, to avoid aligning the read tasks of
169   // all concurrent data sources, we select a deterministic offset based on the
170   // data source id.
171   std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id));
172   std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1);
173   uint32_t ds_period_offset = dist(prng);
174 
175   uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count());
176   return period_ms - ((now_ms - ds_period_offset) % period_ms);
177 }
178 
ShouldRejectDueToFilter(pid_t pid,base::FlatSet<std::string> * additional_cmdlines,const TargetFilter & filter)179 bool ShouldRejectDueToFilter(pid_t pid,
180                              base::FlatSet<std::string>* additional_cmdlines,
181                              const TargetFilter& filter) {
182   PERFETTO_CHECK(additional_cmdlines);
183 
184   std::string cmdline;
185   bool have_cmdline = glob_aware::ReadProcCmdlineForPID(pid, &cmdline);
186 
187   const char* binname = "";
188   if (have_cmdline) {
189     binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
190   }
191 
192   auto has_matching_pattern = [](const std::vector<std::string>& patterns,
193                                  const char* cmd, const char* name) {
194     for (const std::string& pattern : patterns) {
195       if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
196         return true;
197       }
198     }
199     return false;
200   };
201 
202   if (have_cmdline &&
203       has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
204     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
205                   static_cast<int>(pid));
206     return true;
207   }
208   if (filter.exclude_pids.count(pid)) {
209     PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
210                   static_cast<int>(pid));
211     return true;
212   }
213 
214   if (have_cmdline &&
215       has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
216     return false;
217   }
218   if (filter.pids.count(pid)) {
219     return false;
220   }
221 
222   // Empty allow filter means keep everything that isn't explicitly excluded.
223   if (filter.cmdlines.empty() && filter.pids.empty() &&
224       !filter.additional_cmdline_count) {
225     return false;
226   }
227 
228   // Config option that allows to profile just the N first seen cmdlines.
229   if (have_cmdline) {
230     if (additional_cmdlines->count(cmdline)) {
231       return false;
232     }
233     if (additional_cmdlines->size() < filter.additional_cmdline_count) {
234       additional_cmdlines->insert(cmdline);
235       return false;
236     }
237   }
238 
239   PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
240   return true;
241 }
242 
ToCpuModeEnum(uint16_t perf_cpu_mode)243 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
244   using Profiling = protos::pbzero::Profiling;
245   switch (perf_cpu_mode) {
246     case PERF_RECORD_MISC_KERNEL:
247       return Profiling::MODE_KERNEL;
248     case PERF_RECORD_MISC_USER:
249       return Profiling::MODE_USER;
250     case PERF_RECORD_MISC_HYPERVISOR:
251       return Profiling::MODE_HYPERVISOR;
252     case PERF_RECORD_MISC_GUEST_KERNEL:
253       return Profiling::MODE_GUEST_KERNEL;
254     case PERF_RECORD_MISC_GUEST_USER:
255       return Profiling::MODE_GUEST_USER;
256     default:
257       return Profiling::MODE_UNKNOWN;
258   }
259 }
260 
ToProtoEnum(unwindstack::ErrorCode error_code)261 protos::pbzero::Profiling::StackUnwindError ToProtoEnum(
262     unwindstack::ErrorCode error_code) {
263   using Profiling = protos::pbzero::Profiling;
264   switch (error_code) {
265     case unwindstack::ERROR_NONE:
266       return Profiling::UNWIND_ERROR_NONE;
267     case unwindstack::ERROR_MEMORY_INVALID:
268       return Profiling::UNWIND_ERROR_MEMORY_INVALID;
269     case unwindstack::ERROR_UNWIND_INFO:
270       return Profiling::UNWIND_ERROR_UNWIND_INFO;
271     case unwindstack::ERROR_UNSUPPORTED:
272       return Profiling::UNWIND_ERROR_UNSUPPORTED;
273     case unwindstack::ERROR_INVALID_MAP:
274       return Profiling::UNWIND_ERROR_INVALID_MAP;
275     case unwindstack::ERROR_MAX_FRAMES_EXCEEDED:
276       return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED;
277     case unwindstack::ERROR_REPEATED_FRAME:
278       return Profiling::UNWIND_ERROR_REPEATED_FRAME;
279     case unwindstack::ERROR_INVALID_ELF:
280       return Profiling::UNWIND_ERROR_INVALID_ELF;
281     case unwindstack::ERROR_SYSTEM_CALL:
282       return Profiling::UNWIND_ERROR_SYSTEM_CALL;
283     case unwindstack::ERROR_THREAD_TIMEOUT:
284       return Profiling::UNWIND_ERROR_THREAD_TIMEOUT;
285     case unwindstack::ERROR_THREAD_DOES_NOT_EXIST:
286       return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST;
287     case unwindstack::ERROR_BAD_ARCH:
288       return Profiling::UNWIND_ERROR_BAD_ARCH;
289     case unwindstack::ERROR_MAPS_PARSE:
290       return Profiling::UNWIND_ERROR_MAPS_PARSE;
291     case unwindstack::ERROR_INVALID_PARAMETER:
292       return Profiling::UNWIND_ERROR_INVALID_PARAMETER;
293   }
294   return Profiling::UNWIND_ERROR_UNKNOWN;
295 }
296 
297 }  // namespace
298 
PerfProducer(ProcDescriptorGetter * proc_fd_getter,base::TaskRunner * task_runner)299 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
300                            base::TaskRunner* task_runner)
301     : task_runner_(task_runner),
302       proc_fd_getter_(proc_fd_getter),
303       unwinding_worker_(this),
304       weak_factory_(this) {
305   proc_fd_getter->SetDelegate(this);
306 }
307 
SetupDataSource(DataSourceInstanceID,const DataSourceConfig &)308 void PerfProducer::SetupDataSource(DataSourceInstanceID,
309                                    const DataSourceConfig&) {}
310 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)311 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
312                                    const DataSourceConfig& config) {
313   PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(ds_id),
314                config.name().c_str());
315 
316   if (config.name() == MetatraceWriter::kDataSourceName) {
317     StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer()));
318     return;
319   }
320 
321   // linux.perf data source
322   if (config.name() != kDataSourceName)
323     return;
324 
325   // Tracepoint name -> id lookup in case the config asks for tracepoints:
326   auto tracepoint_id_lookup = [this](const std::string& group,
327                                      const std::string& name) {
328     if (!tracefs_)  // lazy init or retry
329       tracefs_ = FtraceProcfs::CreateGuessingMountPoint();
330     if (!tracefs_)  // still didn't find an accessible tracefs
331       return 0u;
332     return tracefs_->ReadEventId(group, name);
333   };
334 
335   protos::gen::PerfEventConfig event_config_pb;
336   if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) {
337     PERFETTO_ELOG("PerfEventConfig could not be parsed.");
338     return;
339   }
340   base::Optional<EventConfig> event_config =
341       EventConfig::Create(event_config_pb, config, tracepoint_id_lookup);
342   if (!event_config.has_value()) {
343     PERFETTO_ELOG("PerfEventConfig rejected.");
344     return;
345   }
346 
347   size_t num_cpus = NumberOfCpus();
348   std::vector<EventReader> per_cpu_readers;
349   for (uint32_t cpu = 0; cpu < num_cpus; cpu++) {
350     base::Optional<EventReader> event_reader =
351         EventReader::ConfigureEvents(cpu, event_config.value());
352     if (!event_reader.has_value()) {
353       PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32
354                     ", discarding data source.",
355                     cpu);
356       return;
357     }
358     per_cpu_readers.emplace_back(std::move(event_reader.value()));
359   }
360 
361   auto buffer_id = static_cast<BufferID>(config.target_buffer());
362   auto writer = endpoint_->CreateTraceWriter(buffer_id);
363 
364   // Construct the data source instance.
365   std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
366   bool inserted;
367   std::tie(ds_it, inserted) = data_sources_.emplace(
368       std::piecewise_construct, std::forward_as_tuple(ds_id),
369       std::forward_as_tuple(event_config.value(), std::move(writer),
370                             std::move(per_cpu_readers)));
371   PERFETTO_CHECK(inserted);
372   DataSourceState& ds = ds_it->second;
373 
374   // Start the configured events.
375   for (auto& per_cpu_reader : ds.per_cpu_readers) {
376     per_cpu_reader.EnableEvents();
377   }
378 
379   WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
380 
381   InterningOutputTracker::WriteFixedInterningsPacket(
382       ds_it->second.trace_writer.get(),
383       protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
384 
385   // Inform unwinder of the new data source instance, and optionally start a
386   // periodic task to clear its cached state.
387   unwinding_worker_->PostStartDataSource(ds_id,
388                                          ds.event_config.kernel_frames());
389   if (ds.event_config.unwind_state_clear_period_ms()) {
390     unwinding_worker_->PostClearCachedStatePeriodic(
391         ds_id, ds.event_config.unwind_state_clear_period_ms());
392   }
393 
394   // Kick off periodic read task.
395   auto tick_period_ms = ds.event_config.read_tick_period_ms();
396   auto weak_this = weak_factory_.GetWeakPtr();
397   task_runner_->PostDelayedTask(
398       [weak_this, ds_id] {
399         if (weak_this)
400           weak_this->TickDataSourceRead(ds_id);
401       },
402       TimeToNextReadTickMs(ds_id, tick_period_ms));
403 
404   // Optionally kick off periodic memory footprint limit check.
405   uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb();
406   if (max_daemon_memory_kb > 0) {
407     task_runner_->PostDelayedTask(
408         [weak_this, ds_id, max_daemon_memory_kb] {
409           if (weak_this)
410             weak_this->CheckMemoryFootprintPeriodic(ds_id,
411                                                     max_daemon_memory_kb);
412         },
413         kMemoryLimitCheckPeriodMs);
414   }
415 }
416 
CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,uint32_t max_daemon_memory_kb)417 void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
418                                                 uint32_t max_daemon_memory_kb) {
419   auto ds_it = data_sources_.find(ds_id);
420   if (ds_it == data_sources_.end())
421     return;  // stop recurring
422 
423   GuardrailConfig gconfig = {};
424   gconfig.memory_guardrail_kb = max_daemon_memory_kb;
425 
426   ProfilerMemoryGuardrails footprint_snapshot;
427   if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) {
428     PurgeDataSource(ds_id);
429     return;  // stop recurring
430   }
431 
432   // repost
433   auto weak_this = weak_factory_.GetWeakPtr();
434   task_runner_->PostDelayedTask(
435       [weak_this, ds_id, max_daemon_memory_kb] {
436         if (weak_this)
437           weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb);
438       },
439       kMemoryLimitCheckPeriodMs);
440 }
441 
StopDataSource(DataSourceInstanceID ds_id)442 void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) {
443   PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id));
444 
445   // Metatrace: stop immediately (will miss the events from the
446   // asynchronous shutdown of the primary data source).
447   auto meta_it = metatrace_writers_.find(ds_id);
448   if (meta_it != metatrace_writers_.end()) {
449     meta_it->second.WriteAllAndFlushTraceWriter([] {});
450     metatrace_writers_.erase(meta_it);
451     return;
452   }
453 
454   auto ds_it = data_sources_.find(ds_id);
455   if (ds_it == data_sources_.end()) {
456     // Most likely, the source is missing due to an abrupt stop (via
457     // |PurgeDataSource|). Tell the service that we've stopped the source now,
458     // so that it doesn't wait for the ack until the timeout.
459     endpoint_->NotifyDataSourceStopped(ds_id);
460     return;
461   }
462 
463   // Start shutting down the reading frontend, which will propagate the stop
464   // further as the intermediate buffers are cleared.
465   DataSourceState& ds = ds_it->second;
466   InitiateReaderStop(&ds);
467 }
468 
469 // The perf data sources ignore flush requests, as flushing would be
470 // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts.
471 // Instead of responding to explicit flushes, we can ensure that we're otherwise
472 // well-behaved (do not reorder packets too much), and let the service scrape
473 // the SMB.
Flush(FlushRequestID flush_id,const DataSourceInstanceID * data_source_ids,size_t num_data_sources)474 void PerfProducer::Flush(FlushRequestID flush_id,
475                          const DataSourceInstanceID* data_source_ids,
476                          size_t num_data_sources) {
477   // Flush metatracing if requested.
478   for (size_t i = 0; i < num_data_sources; i++) {
479     auto ds_id = data_source_ids[i];
480     PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id));
481 
482     auto meta_it = metatrace_writers_.find(ds_id);
483     if (meta_it != metatrace_writers_.end()) {
484       meta_it->second.WriteAllAndFlushTraceWriter([] {});
485     }
486   }
487 
488   endpoint_->NotifyFlushComplete(flush_id);
489 }
490 
ClearIncrementalState(const DataSourceInstanceID * data_source_ids,size_t num_data_sources)491 void PerfProducer::ClearIncrementalState(
492     const DataSourceInstanceID* data_source_ids,
493     size_t num_data_sources) {
494   for (size_t i = 0; i < num_data_sources; i++) {
495     auto ds_id = data_source_ids[i];
496     PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id));
497 
498     if (metatrace_writers_.find(ds_id) != metatrace_writers_.end())
499       continue;
500 
501     auto ds_it = data_sources_.find(ds_id);
502     if (ds_it == data_sources_.end()) {
503       PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry",
504                     static_cast<size_t>(ds_id));
505       continue;
506     }
507     DataSourceState& ds = ds_it->second;
508 
509     WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get());
510 
511     // Forget which incremental state we've emitted before.
512     ds.interning_output.ClearHistory();
513     InterningOutputTracker::WriteFixedInterningsPacket(
514         ds.trace_writer.get(),
515         protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
516 
517     // Drop the cross-datasource callstack interning trie. This is not
518     // necessary for correctness (the preceding step is sufficient). However,
519     // incremental clearing is likely to be used in ring buffer traces, where
520     // it makes sense to reset the trie's size periodically, and this is a
521     // reasonable point to do so. The trie keeps the monotonic interning IDs,
522     // so there is no confusion for other concurrent data sources. We do not
523     // bother with clearing concurrent sources' interning output trackers as
524     // their footprint should be trivial.
525     callstack_trie_.ClearTrie();
526   }
527 }
528 
TickDataSourceRead(DataSourceInstanceID ds_id)529 void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) {
530   auto it = data_sources_.find(ds_id);
531   if (it == data_sources_.end()) {
532     PERFETTO_DLOG("TickDataSourceRead(%zu): source gone",
533                   static_cast<size_t>(ds_id));
534     return;
535   }
536   DataSourceState& ds = it->second;
537 
538   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
539 
540   // Make a pass over all per-cpu readers.
541   uint64_t max_samples = ds.event_config.samples_per_tick_limit();
542   bool more_records_available = false;
543   for (EventReader& reader : ds.per_cpu_readers) {
544     if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) {
545       more_records_available = true;
546     }
547   }
548 
549   // Wake up the unwinder as we've (likely) pushed samples into its queue.
550   unwinding_worker_->PostProcessQueue();
551 
552   if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
553       !more_records_available) {
554     unwinding_worker_->PostInitiateDataSourceStop(ds_id);
555   } else {
556     // otherwise, keep reading
557     auto tick_period_ms = it->second.event_config.read_tick_period_ms();
558     auto weak_this = weak_factory_.GetWeakPtr();
559     task_runner_->PostDelayedTask(
560         [weak_this, ds_id] {
561           if (weak_this)
562             weak_this->TickDataSourceRead(ds_id);
563         },
564         TimeToNextReadTickMs(ds_id, tick_period_ms));
565   }
566 }
567 
ReadAndParsePerCpuBuffer(EventReader * reader,uint64_t max_samples,DataSourceInstanceID ds_id,DataSourceState * ds)568 bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
569                                             uint64_t max_samples,
570                                             DataSourceInstanceID ds_id,
571                                             DataSourceState* ds) {
572   PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
573 
574   // If the kernel ring buffer dropped data, record it in the trace.
575   size_t cpu = reader->cpu();
576   auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) {
577     auto weak_this = weak_factory_.GetWeakPtr();
578     task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] {
579       if (weak_this)
580         weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost);
581     });
582   };
583 
584   for (uint64_t i = 0; i < max_samples; i++) {
585     base::Optional<ParsedSample> sample =
586         reader->ReadUntilSample(records_lost_callback);
587     if (!sample) {
588       return false;  // caught up to the writer
589     }
590 
591     // Counter-only mode: skip the unwinding stage, enqueue the sample for
592     // output immediately.
593     if (!ds->event_config.sample_callstacks()) {
594       CompletedSample output;
595       output.common = sample->common;
596       EmitSample(ds_id, std::move(output));
597       continue;
598     }
599 
600     // If sampling callstacks, we're not interested in kernel threads/workers.
601     if (!sample->regs) {
602       continue;
603     }
604 
605     // Request proc-fds for the process if this is the first time we see it.
606     pid_t pid = sample->common.pid;
607     auto& process_state = ds->process_states[pid];  // insert if new
608 
609     if (process_state == ProcessTrackingStatus::kExpired) {
610       PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
611                     static_cast<int>(pid));
612       EmitSkippedSample(ds_id, std::move(sample.value()),
613                         SampleSkipReason::kReadStage);
614       continue;
615     }
616 
617     // Previously failed the target filter check.
618     if (process_state == ProcessTrackingStatus::kRejected) {
619       PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
620                     static_cast<int>(pid));
621       continue;
622     }
623 
624     // Seeing pid for the first time.
625     if (process_state == ProcessTrackingStatus::kInitial) {
626       PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
627 
628       // Check whether samples for this new process should be
629       // dropped due to the target filtering.
630       const TargetFilter& filter = ds->event_config.filter();
631       if (ShouldRejectDueToFilter(pid, &ds->additional_cmdlines, filter)) {
632         process_state = ProcessTrackingStatus::kRejected;
633         continue;
634       }
635 
636       // At this point, sampled process is known to be of interest, so start
637       // resolving the proc-fds. Response is async.
638       process_state = ProcessTrackingStatus::kResolving;
639       InitiateDescriptorLookup(ds_id, pid,
640                                ds->event_config.remote_descriptor_timeout_ms());
641     }
642 
643     PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
644                    process_state == ProcessTrackingStatus::kResolving);
645 
646     // Optionally: drop sample if above a given threshold of sampled stacks
647     // that are waiting in the unwinding queue.
648     uint64_t max_footprint_bytes =
649         ds->event_config.max_enqueued_footprint_bytes();
650     uint64_t sample_stack_size = sample->stack.size();
651     if (max_footprint_bytes) {
652       uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint();
653       if (footprint_bytes + sample_stack_size >= max_footprint_bytes) {
654         PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit.");
655         EmitSkippedSample(ds_id, std::move(sample.value()),
656                           SampleSkipReason::kUnwindEnqueue);
657         continue;
658       }
659     }
660 
661     // Push the sample into the unwinding queue if there is room.
662     auto& queue = unwinding_worker_->unwind_queue();
663     WriteView write_view = queue.BeginWrite();
664     if (write_view.valid) {
665       queue.at(write_view.write_pos) =
666           UnwindEntry{ds_id, std::move(sample.value())};
667       queue.CommitWrite();
668       unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size);
669     } else {
670       PERFETTO_DLOG("Unwinder queue full, skipping sample");
671       EmitSkippedSample(ds_id, std::move(sample.value()),
672                         SampleSkipReason::kUnwindEnqueue);
673     }
674   }
675 
676   // Most likely more events in the kernel buffer. Though we might be exactly on
677   // the boundary due to |max_samples|.
678   return true;
679 }
680 
681 // Note: first-fit makes descriptor request fulfillment not true FIFO. But the
682 // edge-cases where it matters are very unlikely.
OnProcDescriptors(pid_t pid,uid_t uid,base::ScopedFile maps_fd,base::ScopedFile mem_fd)683 void PerfProducer::OnProcDescriptors(pid_t pid,
684                                      uid_t uid,
685                                      base::ScopedFile maps_fd,
686                                      base::ScopedFile mem_fd) {
687   // Find first-fit data source that requested descriptors for the process.
688   for (auto& it : data_sources_) {
689     DataSourceState& ds = it.second;
690     auto proc_status_it = ds.process_states.find(pid);
691     if (proc_status_it == ds.process_states.end())
692       continue;
693 
694     if (!CanProfile(ds.event_config.raw_ds_config(), uid,
695                     ds.event_config.target_installed_by())) {
696       PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]",
697                     static_cast<int>(pid), static_cast<int>(uid),
698                     static_cast<size_t>(it.first));
699       continue;
700     }
701 
702     // Match against either resolving, or expired state. In the latter
703     // case, it means that the async response was slow enough that we've marked
704     // the lookup as expired (but can now recover for future samples).
705     auto proc_status = proc_status_it->second;
706     if (proc_status == ProcessTrackingStatus::kResolving ||
707         proc_status == ProcessTrackingStatus::kExpired) {
708       PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
709                     static_cast<int>(pid), static_cast<size_t>(it.first));
710 
711       proc_status_it->second = ProcessTrackingStatus::kResolved;
712       unwinding_worker_->PostAdoptProcDescriptors(
713           it.first, pid, std::move(maps_fd), std::move(mem_fd));
714       return;  // done
715     }
716   }
717   PERFETTO_DLOG(
718       "Discarding proc-fds for pid [%d] as found no outstanding requests.",
719       static_cast<int>(pid));
720 }
721 
InitiateDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)722 void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id,
723                                             pid_t pid,
724                                             uint32_t timeout_ms) {
725   if (!proc_fd_getter_->RequiresDelayedRequest()) {
726     StartDescriptorLookup(ds_id, pid, timeout_ms);
727     return;
728   }
729 
730   // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|.
731   auto weak_this = weak_factory_.GetWeakPtr();
732   task_runner_->PostDelayedTask(
733       [weak_this, ds_id, pid, timeout_ms] {
734         if (weak_this)
735           weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms);
736       },
737       kProcDescriptorsAndroidDelayMs);
738 }
739 
StartDescriptorLookup(DataSourceInstanceID ds_id,pid_t pid,uint32_t timeout_ms)740 void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id,
741                                          pid_t pid,
742                                          uint32_t timeout_ms) {
743   proc_fd_getter_->GetDescriptorsForPid(pid);
744 
745   auto weak_this = weak_factory_.GetWeakPtr();
746   task_runner_->PostDelayedTask(
747       [weak_this, ds_id, pid] {
748         if (weak_this)
749           weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid);
750       },
751       timeout_ms);
752 }
753 
EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,pid_t pid)754 void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id,
755                                                    pid_t pid) {
756   auto ds_it = data_sources_.find(ds_id);
757   if (ds_it == data_sources_.end())
758     return;
759 
760   DataSourceState& ds = ds_it->second;
761   auto proc_status_it = ds.process_states.find(pid);
762   if (proc_status_it == ds.process_states.end())
763     return;
764 
765   // If the request is still outstanding, mark the process as expired (causing
766   // outstanding and future samples to be discarded).
767   auto proc_status = proc_status_it->second;
768   if (proc_status == ProcessTrackingStatus::kResolving) {
769     PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
770                   static_cast<int>(pid), static_cast<size_t>(ds_it->first));
771 
772     proc_status_it->second = ProcessTrackingStatus::kExpired;
773     // Also inform the unwinder of the state change (so that it can discard any
774     // of the already-enqueued samples).
775     unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
776   }
777 }
778 
PostEmitSample(DataSourceInstanceID ds_id,CompletedSample sample)779 void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
780                                   CompletedSample sample) {
781   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
782   CompletedSample* raw_sample = new CompletedSample(std::move(sample));
783   auto weak_this = weak_factory_.GetWeakPtr();
784   task_runner_->PostTask([weak_this, ds_id, raw_sample] {
785     if (weak_this)
786       weak_this->EmitSample(ds_id, std::move(*raw_sample));
787     delete raw_sample;
788   });
789 }
790 
EmitSample(DataSourceInstanceID ds_id,CompletedSample sample)791 void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
792                               CompletedSample sample) {
793   auto ds_it = data_sources_.find(ds_id);
794   if (ds_it == data_sources_.end()) {
795     PERFETTO_DLOG("EmitSample(ds: %zu): source gone",
796                   static_cast<size_t>(ds_id));
797     return;
798   }
799   DataSourceState& ds = ds_it->second;
800 
801   // intern callsite
802   GlobalCallstackTrie::Node* callstack_root =
803       callstack_trie_.CreateCallsite(sample.frames, sample.build_ids);
804   uint64_t callstack_iid = callstack_root->id();
805 
806   // start packet, timestamp domain defaults to monotonic_raw
807   auto packet = StartTracePacket(ds.trace_writer.get());
808   packet->set_timestamp(sample.common.timestamp);
809 
810   // write new interning data (if any)
811   protos::pbzero::InternedData* interned_out = packet->set_interned_data();
812   ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
813                                      interned_out);
814 
815   // write the sample itself
816   auto* perf_sample = packet->set_perf_sample();
817   perf_sample->set_cpu(sample.common.cpu);
818   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
819   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
820   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
821   perf_sample->set_timebase_count(sample.common.timebase_count);
822   perf_sample->set_callstack_iid(callstack_iid);
823   if (sample.unwind_error != unwindstack::ERROR_NONE) {
824     perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error));
825   }
826 }
827 
EmitRingBufferLoss(DataSourceInstanceID ds_id,size_t cpu,uint64_t records_lost)828 void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id,
829                                       size_t cpu,
830                                       uint64_t records_lost) {
831   auto ds_it = data_sources_.find(ds_id);
832   if (ds_it == data_sources_.end())
833     return;
834   DataSourceState& ds = ds_it->second;
835   PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
836                 static_cast<size_t>(ds_id), cpu, records_lost);
837 
838   // The data loss record relates to a single ring buffer, and indicates loss
839   // since the last successfully-written record in that buffer. Therefore the
840   // data loss record itself has no timestamp.
841   // We timestamp the packet with the boot clock for packet ordering purposes,
842   // but it no longer has a (precise) interpretation relative to the sample
843   // stream from that per-cpu buffer. See the proto comments for more details.
844   auto packet = StartTracePacket(ds.trace_writer.get());
845   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
846   packet->set_timestamp_clock_id(
847       protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
848 
849   auto* perf_sample = packet->set_perf_sample();
850   perf_sample->set_cpu(static_cast<uint32_t>(cpu));
851   perf_sample->set_kernel_records_lost(records_lost);
852 }
853 
PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample)854 void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id,
855                                                  ParsedSample sample) {
856   PostEmitSkippedSample(ds_id, std::move(sample),
857                         SampleSkipReason::kUnwindStage);
858 }
859 
PostEmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)860 void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id,
861                                          ParsedSample sample,
862                                          SampleSkipReason reason) {
863   // hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
864   ParsedSample* raw_sample = new ParsedSample(std::move(sample));
865   auto weak_this = weak_factory_.GetWeakPtr();
866   task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] {
867     if (weak_this)
868       weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason);
869     delete raw_sample;
870   });
871 }
872 
EmitSkippedSample(DataSourceInstanceID ds_id,ParsedSample sample,SampleSkipReason reason)873 void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id,
874                                      ParsedSample sample,
875                                      SampleSkipReason reason) {
876   auto ds_it = data_sources_.find(ds_id);
877   if (ds_it == data_sources_.end())
878     return;
879   DataSourceState& ds = ds_it->second;
880 
881   // Note: timestamp defaults to the monotonic_raw domain.
882   auto packet = StartTracePacket(ds.trace_writer.get());
883   packet->set_timestamp(sample.common.timestamp);
884   auto* perf_sample = packet->set_perf_sample();
885   perf_sample->set_cpu(sample.common.cpu);
886   perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid));
887   perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid));
888   perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode));
889   perf_sample->set_timebase_count(sample.common.timebase_count);
890 
891   using PerfSample = protos::pbzero::PerfSample;
892   switch (reason) {
893     case SampleSkipReason::kReadStage:
894       perf_sample->set_sample_skipped_reason(
895           PerfSample::PROFILER_SKIP_READ_STAGE);
896       break;
897     case SampleSkipReason::kUnwindEnqueue:
898       perf_sample->set_sample_skipped_reason(
899           PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE);
900       break;
901     case SampleSkipReason::kUnwindStage:
902       perf_sample->set_sample_skipped_reason(
903           PerfSample::PROFILER_SKIP_UNWIND_STAGE);
904       break;
905   }
906 }
907 
InitiateReaderStop(DataSourceState * ds)908 void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
909   PERFETTO_DLOG("InitiateReaderStop");
910   PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
911 
912   ds->status = DataSourceState::Status::kShuttingDown;
913   for (auto& event_reader : ds->per_cpu_readers) {
914     event_reader.DisableEvents();
915   }
916 }
917 
PostFinishDataSourceStop(DataSourceInstanceID ds_id)918 void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
919   auto weak_producer = weak_factory_.GetWeakPtr();
920   task_runner_->PostTask([weak_producer, ds_id] {
921     if (weak_producer)
922       weak_producer->FinishDataSourceStop(ds_id);
923   });
924 }
925 
FinishDataSourceStop(DataSourceInstanceID ds_id)926 void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
927   PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
928   auto ds_it = data_sources_.find(ds_id);
929   if (ds_it == data_sources_.end()) {
930     PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone",
931                   static_cast<size_t>(ds_id));
932     return;
933   }
934   DataSourceState& ds = ds_it->second;
935   PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
936 
937   ds.trace_writer->Flush();
938   data_sources_.erase(ds_it);
939 
940   endpoint_->NotifyDataSourceStopped(ds_id);
941 
942   // Clean up resources if there are no more active sources.
943   if (data_sources_.empty()) {
944     callstack_trie_.ClearTrie();  // purge internings
945     base::MaybeReleaseAllocatorMemToOS();
946   }
947 }
948 
949 // TODO(rsavitski): maybe make the tracing service respect premature
950 // producer-driven stops, and then issue a NotifyDataSourceStopped here.
951 // Alternatively (and at the expense of higher complexity) introduce a new data
952 // source status of "tombstoned", and propagate it until the source is stopped
953 // by the service (this would technically allow for stricter lifetime checking
954 // of data sources, and help with discarding periodic flushes).
955 // TODO(rsavitski): Purging while stopping will currently leave the stop
956 // unacknowledged. Consider checking whether the DS is stopping here, and if so,
957 // notifying immediately after erasing.
PurgeDataSource(DataSourceInstanceID ds_id)958 void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) {
959   auto ds_it = data_sources_.find(ds_id);
960   if (ds_it == data_sources_.end())
961     return;
962   DataSourceState& ds = ds_it->second;
963 
964   PERFETTO_LOG("Stopping DataSource(%zu) prematurely",
965                static_cast<size_t>(ds_id));
966 
967   unwinding_worker_->PostPurgeDataSource(ds_id);
968 
969   // Write a packet indicating the abrupt stop.
970   {
971     auto packet = StartTracePacket(ds.trace_writer.get());
972     packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
973     packet->set_timestamp_clock_id(
974         protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME);
975     auto* perf_sample = packet->set_perf_sample();
976     auto* producer_event = perf_sample->set_producer_event();
977     producer_event->set_source_stop_reason(
978         protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL);
979   }
980 
981   ds.trace_writer->Flush();
982   data_sources_.erase(ds_it);
983 
984   // Clean up resources if there are no more active sources.
985   if (data_sources_.empty()) {
986     callstack_trie_.ClearTrie();  // purge internings
987     base::MaybeReleaseAllocatorMemToOS();
988   }
989 }
990 
StartMetatraceSource(DataSourceInstanceID ds_id,BufferID target_buffer)991 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
992                                         BufferID target_buffer) {
993   auto writer = endpoint_->CreateTraceWriter(target_buffer);
994 
995   auto it_and_inserted = metatrace_writers_.emplace(
996       std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple());
997   PERFETTO_DCHECK(it_and_inserted.second);
998   // Note: only the first concurrent writer will actually be active.
999   metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer),
1000                                    metatrace::TAG_ANY);
1001 }
1002 
ConnectWithRetries(const char * socket_name)1003 void PerfProducer::ConnectWithRetries(const char* socket_name) {
1004   PERFETTO_DCHECK(state_ == kNotStarted);
1005   state_ = kNotConnected;
1006 
1007   ResetConnectionBackoff();
1008   producer_socket_name_ = socket_name;
1009   ConnectService();
1010 }
1011 
ConnectService()1012 void PerfProducer::ConnectService() {
1013   PERFETTO_DCHECK(state_ == kNotConnected);
1014   state_ = kConnecting;
1015   endpoint_ = ProducerIPCClient::Connect(
1016       producer_socket_name_, this, kProducerName, task_runner_,
1017       TracingService::ProducerSMBScrapingMode::kEnabled);
1018 }
1019 
IncreaseConnectionBackoff()1020 void PerfProducer::IncreaseConnectionBackoff() {
1021   connection_backoff_ms_ *= 2;
1022   if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
1023     connection_backoff_ms_ = kMaxConnectionBackoffMs;
1024 }
1025 
ResetConnectionBackoff()1026 void PerfProducer::ResetConnectionBackoff() {
1027   connection_backoff_ms_ = kInitialConnectionBackoffMs;
1028 }
1029 
OnConnect()1030 void PerfProducer::OnConnect() {
1031   PERFETTO_DCHECK(state_ == kConnecting);
1032   state_ = kConnected;
1033   ResetConnectionBackoff();
1034   PERFETTO_LOG("Connected to the service");
1035 
1036   {
1037     // linux.perf
1038     DataSourceDescriptor desc;
1039     desc.set_name(kDataSourceName);
1040     desc.set_handles_incremental_state_clear(true);
1041     desc.set_will_notify_on_stop(true);
1042     endpoint_->RegisterDataSource(desc);
1043   }
1044   {
1045     // metatrace
1046     DataSourceDescriptor desc;
1047     desc.set_name(MetatraceWriter::kDataSourceName);
1048     endpoint_->RegisterDataSource(desc);
1049   }
1050 }
1051 
OnDisconnect()1052 void PerfProducer::OnDisconnect() {
1053   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
1054   PERFETTO_LOG("Disconnected from tracing service");
1055 
1056   auto weak_producer = weak_factory_.GetWeakPtr();
1057   if (state_ == kConnected)
1058     return task_runner_->PostTask([weak_producer] {
1059       if (weak_producer)
1060         weak_producer->Restart();
1061     });
1062 
1063   state_ = kNotConnected;
1064   IncreaseConnectionBackoff();
1065   task_runner_->PostDelayedTask(
1066       [weak_producer] {
1067         if (weak_producer)
1068           weak_producer->ConnectService();
1069       },
1070       connection_backoff_ms_);
1071 }
1072 
Restart()1073 void PerfProducer::Restart() {
1074   // We lost the connection with the tracing service. At this point we need
1075   // to reset all the data sources. Trying to handle that manually is going to
1076   // be error prone. What we do here is simply destroy the instance and
1077   // recreate it again.
1078   base::TaskRunner* task_runner = task_runner_;
1079   const char* socket_name = producer_socket_name_;
1080   ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_;
1081 
1082   // Invoke destructor and then the constructor again.
1083   this->~PerfProducer();
1084   new (this) PerfProducer(proc_fd_getter, task_runner);
1085 
1086   ConnectWithRetries(socket_name);
1087 }
1088 
1089 }  // namespace profiling
1090 }  // namespace perfetto
1091