• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24 
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49 
50 #include "protos/perfetto/config/chrome/chrome_config.gen.h"
51 #include "protos/perfetto/config/interceptor_config.gen.h"
52 
53 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
54 #include <io.h>  // For dup()
55 #else
56 #include <unistd.h>  // For dup()
57 #endif
58 
59 namespace perfetto {
60 namespace internal {
61 
62 namespace {
63 
64 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
65 
66 // A task runner which prevents calls to DataSource::Trace() while an operation
67 // is in progress. Used to guard against unexpected re-entrancy where the
68 // user-provided task runner implementation tries to enter a trace point under
69 // the hood.
70 class NonReentrantTaskRunner : public base::TaskRunner {
71  public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)72   NonReentrantTaskRunner(TracingMuxer* muxer,
73                          std::unique_ptr<base::TaskRunner> task_runner)
74       : muxer_(muxer), task_runner_(std::move(task_runner)) {}
75 
76   // base::TaskRunner implementation.
PostTask(std::function<void ()> task)77   void PostTask(std::function<void()> task) override {
78     CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
79   }
80 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)81   void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
82     CallWithGuard(
83         [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
84   }
85 
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)86   void AddFileDescriptorWatch(base::PlatformHandle fd,
87                               std::function<void()> callback) override {
88     CallWithGuard(
89         [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
90   }
91 
RemoveFileDescriptorWatch(base::PlatformHandle fd)92   void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
93     CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
94   }
95 
RunsTasksOnCurrentThread() const96   bool RunsTasksOnCurrentThread() const override {
97     bool result;
98     CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
99     return result;
100   }
101 
102  private:
103   template <typename T>
CallWithGuard(T lambda) const104   void CallWithGuard(T lambda) const {
105     auto* root_tls = muxer_->GetOrCreateTracingTLS();
106     if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
107       lambda();
108       return;
109     }
110     ScopedReentrancyAnnotator scoped_annotator(*root_tls);
111     lambda();
112   }
113 
114   TracingMuxer* const muxer_;
115   std::unique_ptr<base::TaskRunner> task_runner_;
116 };
117 
118 class StopArgsImpl : public DataSourceBase::StopArgs {
119  public:
HandleStopAsynchronously() const120   std::function<void()> HandleStopAsynchronously() const override {
121     auto closure = std::move(async_stop_closure);
122     async_stop_closure = std::function<void()>();
123     return closure;
124   }
125 
126   mutable std::function<void()> async_stop_closure;
127 };
128 
129 class FlushArgsImpl : public DataSourceBase::FlushArgs {
130  public:
HandleFlushAsynchronously() const131   std::function<void()> HandleFlushAsynchronously() const override {
132     auto closure = std::move(async_flush_closure);
133     async_flush_closure = std::function<void()>();
134     return closure;
135   }
136 
137   mutable std::function<void()> async_flush_closure;
138 };
139 
ComputeConfigHash(const DataSourceConfig & config)140 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
141   base::Hasher hasher;
142   std::string config_bytes = config.SerializeAsString();
143   hasher.Update(config_bytes.data(), config_bytes.size());
144   return hasher.digest();
145 }
146 
147 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
148 static TracingMuxerImpl* g_prev_instance{};
149 
ComputeStartupConfigHash(DataSourceConfig config)150 uint64_t ComputeStartupConfigHash(DataSourceConfig config) {
151   // Clear target buffer and tracing-service provided fields for comparison of
152   // configs for startup tracing, since these fields are not available when
153   // setting up data sources for startup tracing.
154   config.set_target_buffer(0);
155   config.set_tracing_session_id(0);
156   config.set_session_initiator(DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
157   config.set_trace_duration_ms(0);
158   config.set_stop_timeout_ms(0);
159   config.set_enable_extra_guardrails(false);
160   // Clear some fields inside Chrome config:
161   // * client_priority, because Chrome always sets the priority to
162   // USER_INITIATED when setting up startup tracing.
163   // * convert_to_legacy_json, because Telemetry initiates tracing with proto
164   // format, but in some cases adopts the tracing session later via devtools
165   // which expect json format.
166   // TODO(khokhlov): Don't clear client_priority when Chrome correctly sets it
167   // for startup tracing (and propagates it to all child processes).
168   if (config.has_chrome_config()) {
169     config.mutable_chrome_config()->set_client_priority(
170         perfetto::protos::gen::ChromeConfig::UNKNOWN);
171     config.mutable_chrome_config()->set_convert_to_legacy_json(false);
172   }
173   base::Hasher hasher;
174   std::string config_bytes = config.SerializeAsString();
175   hasher.Update(config_bytes.data(), config_bytes.size());
176   return hasher.digest();
177 }
178 
179 template <typename RegisteredBackend>
180 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anon3be2c9c50111::CompareBackendByType181   static int BackendTypePriority(BackendType type) {
182     switch (type) {
183       case kSystemBackend:
184         return 0;
185       case kInProcessBackend:
186         return 1;
187       case kCustomBackend:
188         return 2;
189       // The UnspecifiedBackend has the highest priority so that
190       // TracingBackendFake is the last one on the backend lists.
191       case kUnspecifiedBackend:
192         break;
193     }
194     return 3;
195   }
operator ()perfetto::internal::__anon3be2c9c50111::CompareBackendByType196   bool operator()(BackendType type, const RegisteredBackend& b) {
197     return BackendTypePriority(type) < BackendTypePriority(b.type);
198   }
199 };
200 
201 }  // namespace
202 
203 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)204 TracingMuxerImpl::ProducerImpl::ProducerImpl(
205     TracingMuxerImpl* muxer,
206     TracingBackendId backend_id,
207     uint32_t shmem_batch_commits_duration_ms)
208     : muxer_(muxer),
209       backend_id_(backend_id),
210       shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
211 
~ProducerImpl()212 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
213   muxer_ = nullptr;
214 }
215 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)216 void TracingMuxerImpl::ProducerImpl::Initialize(
217     std::unique_ptr<ProducerEndpoint> endpoint) {
218   PERFETTO_DCHECK_THREAD(thread_checker_);
219   PERFETTO_DCHECK(!connected_);
220   connection_id_.fetch_add(1, std::memory_order_relaxed);
221   is_producer_provided_smb_ = endpoint->shared_memory();
222   last_startup_target_buffer_reservation_ = 0;
223 
224   // Adopt the endpoint into a shared pointer so that we can safely share it
225   // across threads that create trace writers. The custom deleter function
226   // ensures that the endpoint is always destroyed on the muxer's thread. (Note
227   // that |task_runner| is assumed to outlive tracing sessions on all threads.)
228   auto* task_runner = muxer_->task_runner_.get();
229   auto deleter = [task_runner](ProducerEndpoint* e) {
230     if (task_runner->RunsTasksOnCurrentThread()) {
231       delete e;
232       return;
233     }
234     task_runner->PostTask([e] { delete e; });
235   };
236   std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
237   // This atomic store is needed because another thread might be concurrently
238   // creating a trace writer using the previous (disconnected) |service_|. See
239   // CreateTraceWriter().
240   std::atomic_store(&service_, std::move(service));
241   // Don't try to use the service here since it may not have connected yet. See
242   // OnConnect().
243 }
244 
OnConnect()245 void TracingMuxerImpl::ProducerImpl::OnConnect() {
246   PERFETTO_DLOG("Producer connected");
247   PERFETTO_DCHECK_THREAD(thread_checker_);
248   PERFETTO_DCHECK(!connected_);
249   if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
250     PERFETTO_ELOG(
251         "The service likely doesn't support producer-provided SMBs. Preventing "
252         "future attempts to use producer-provided SMB again with this "
253         "backend.");
254     producer_provided_smb_failed_ = true;
255     // Will call OnDisconnect() and cause a reconnect without producer-provided
256     // SMB.
257     service_->Disconnect();
258     return;
259   }
260   connected_ = true;
261   muxer_->UpdateDataSourcesOnAllBackends();
262   SendOnConnectTriggers();
263 }
264 
OnDisconnect()265 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
266   PERFETTO_DCHECK_THREAD(thread_checker_);
267   // If we're being destroyed, bail out.
268   if (!muxer_)
269     return;
270   connected_ = false;
271   // Active data sources for this producer will be stopped by
272   // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
273   // will have a different connection id (even before it has finished
274   // connecting).
275   registered_data_sources_.reset();
276   DisposeConnection();
277 
278   // Try reconnecting the producer.
279   muxer_->OnProducerDisconnected(this);
280 }
281 
DisposeConnection()282 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
283   // Keep the old service around as a dead connection in case it has active
284   // trace writers. If any tracing sessions were created, we can't clear
285   // |service_| here because other threads may be concurrently creating new
286   // trace writers. Any reconnection attempt will atomically swap the new
287   // service in place of the old one.
288   if (did_setup_tracing_ || did_setup_startup_tracing_) {
289     dead_services_.push_back(service_);
290   } else {
291     service_.reset();
292   }
293 }
294 
OnTracingSetup()295 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
296   PERFETTO_DCHECK_THREAD(thread_checker_);
297   did_setup_tracing_ = true;
298   service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
299       shmem_batch_commits_duration_ms_);
300 }
301 
OnStartupTracingSetup()302 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
303   PERFETTO_DCHECK_THREAD(thread_checker_);
304   did_setup_startup_tracing_ = true;
305 }
306 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)307 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
308     DataSourceInstanceID id,
309     const DataSourceConfig& cfg) {
310   PERFETTO_DCHECK_THREAD(thread_checker_);
311   if (!muxer_)
312     return;
313   muxer_->SetupDataSource(
314       backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
315 }
316 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)317 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
318                                                      const DataSourceConfig&) {
319   PERFETTO_DCHECK_THREAD(thread_checker_);
320   if (!muxer_)
321     return;
322   muxer_->StartDataSource(backend_id_, id);
323   service_->NotifyDataSourceStarted(id);
324 }
325 
StopDataSource(DataSourceInstanceID id)326 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
327   PERFETTO_DCHECK_THREAD(thread_checker_);
328   if (!muxer_)
329     return;
330   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
331 }
332 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count)333 void TracingMuxerImpl::ProducerImpl::Flush(
334     FlushRequestID flush_id,
335     const DataSourceInstanceID* instances,
336     size_t instance_count) {
337   PERFETTO_DCHECK_THREAD(thread_checker_);
338   bool all_handled = true;
339   if (muxer_) {
340     for (size_t i = 0; i < instance_count; i++) {
341       DataSourceInstanceID ds_id = instances[i];
342       bool handled =
343           muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id, flush_id);
344       if (!handled) {
345         pending_flushes_[flush_id].insert(ds_id);
346         all_handled = false;
347       }
348     }
349   }
350 
351   if (all_handled) {
352     service_->NotifyFlushComplete(flush_id);
353   }
354 }
355 
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)356 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
357     const DataSourceInstanceID* instances,
358     size_t instance_count) {
359   PERFETTO_DCHECK_THREAD(thread_checker_);
360   if (!muxer_)
361     return;
362   for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
363     muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
364   }
365 }
366 
SweepDeadServices()367 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
368   PERFETTO_DCHECK_THREAD(thread_checker_);
369   auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
370     auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
371     return !arbiter || arbiter->TryShutdown();
372   };
373   for (auto it = dead_services_.begin(); it != dead_services_.end();) {
374     auto next_it = it;
375     next_it++;
376     if (is_unused(*it)) {
377       dead_services_.erase(it);
378     }
379     it = next_it;
380   }
381   return dead_services_.empty();
382 }
383 
SendOnConnectTriggers()384 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
385   PERFETTO_DCHECK_THREAD(thread_checker_);
386   base::TimeMillis now = base::GetWallTimeMs();
387   std::vector<std::string> triggers;
388   while (!on_connect_triggers_.empty()) {
389     // Skip if we passed TTL.
390     if (on_connect_triggers_.front().second > now) {
391       triggers.push_back(std::move(on_connect_triggers_.front().first));
392     }
393     on_connect_triggers_.pop_front();
394   }
395   if (!triggers.empty()) {
396     service_->ActivateTriggers(triggers);
397   }
398 }
399 
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)400 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
401     DataSourceInstanceID ds_id,
402     FlushRequestID flush_id) {
403   if (!connected_) {
404     return;
405   }
406 
407   {
408     auto it = pending_flushes_.find(flush_id);
409     if (it == pending_flushes_.end()) {
410       return;
411     }
412     std::set<DataSourceInstanceID>& ds_ids = it->second;
413     ds_ids.erase(ds_id);
414   }
415 
416   std::optional<DataSourceInstanceID> biggest_flush_id;
417   for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
418     if (it->second.empty()) {
419       biggest_flush_id = it->first;
420       it = pending_flushes_.erase(it);
421     } else {
422       break;
423     }
424   }
425 
426   if (biggest_flush_id) {
427     service_->NotifyFlushComplete(*biggest_flush_id);
428   }
429 }
430 
431 // ----- End of TracingMuxerImpl::ProducerImpl methods.
432 
433 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)434 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
435                                              BackendType backend_type,
436                                              TracingSessionGlobalID session_id)
437     : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
438 
~ConsumerImpl()439 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
440   muxer_ = nullptr;
441 }
442 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)443 void TracingMuxerImpl::ConsumerImpl::Initialize(
444     std::unique_ptr<ConsumerEndpoint> endpoint) {
445   PERFETTO_DCHECK_THREAD(thread_checker_);
446   service_ = std::move(endpoint);
447   // Don't try to use the service here since it may not have connected yet. See
448   // OnConnect().
449 }
450 
OnConnect()451 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
452   PERFETTO_DCHECK_THREAD(thread_checker_);
453   PERFETTO_DCHECK(!connected_);
454   connected_ = true;
455 
456   // Observe data source instance events so we get notified when tracing starts.
457   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
458                           ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
459 
460   // If the API client configured and started tracing before we connected,
461   // tell the backend about it now.
462   if (trace_config_)
463     muxer_->SetupTracingSession(session_id_, trace_config_);
464   if (start_pending_)
465     muxer_->StartTracingSession(session_id_);
466   if (get_trace_stats_pending_) {
467     auto callback = std::move(get_trace_stats_callback_);
468     get_trace_stats_callback_ = nullptr;
469     muxer_->GetTraceStats(session_id_, std::move(callback));
470   }
471   if (query_service_state_callback_) {
472     auto callback = std::move(query_service_state_callback_);
473     query_service_state_callback_ = nullptr;
474     muxer_->QueryServiceState(session_id_, std::move(callback));
475   }
476   if (stop_pending_)
477     muxer_->StopTracingSession(session_id_);
478 }
479 
OnDisconnect()480 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
481   PERFETTO_DCHECK_THREAD(thread_checker_);
482   // If we're being destroyed, bail out.
483   if (!muxer_)
484     return;
485 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
486   if (!connected_ && backend_type_ == kSystemBackend) {
487     PERFETTO_ELOG(
488         "Unable to connect to the system tracing service as a consumer. On "
489         "Android, use the \"perfetto\" command line tool instead to start "
490         "system-wide tracing sessions");
491   }
492 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
493 
494   // Notify the client about disconnection.
495   NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
496 
497   // Make sure the client doesn't hang in a blocking start/stop because of the
498   // disconnection.
499   NotifyStartComplete();
500   NotifyStopComplete();
501 
502   // It shouldn't be necessary to call StopTracingSession. If we get this call
503   // it means that the service did shutdown before us, so there is no point
504   // trying it to ask it to stop the session. We should just remember to cleanup
505   // the consumer vector.
506   connected_ = false;
507 
508   // Notify the muxer that it is safe to destroy |this|. This is needed because
509   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
510   // access until OnDisconnect() is called.
511   muxer_->OnConsumerDisconnected(this);
512 }
513 
Disconnect()514 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
515   // This is weird and deserves a comment.
516   //
517   // When we called the ConnectConsumer method on the service it returns
518   // us a ConsumerEndpoint which we stored in |service_|, however this
519   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
520   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
521   // the ConsumerImpl pointer has to be valid until the
522   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
523   // ConsumerEndpoint |service_|. Eventually this will call
524   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
525   // call the destructor of |this|.
526   service_.reset();
527 }
528 
OnTracingDisabled(const std::string & error)529 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
530     const std::string& error) {
531   PERFETTO_DCHECK_THREAD(thread_checker_);
532   PERFETTO_DCHECK(!stopped_);
533   stopped_ = true;
534 
535   if (!error.empty())
536     NotifyError(TracingError{TracingError::kTracingFailed, error});
537 
538   // If we're still waiting for the start event, fire it now. This may happen if
539   // there are no active data sources in the session.
540   NotifyStartComplete();
541   NotifyStopComplete();
542 }
543 
NotifyStartComplete()544 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
545   PERFETTO_DCHECK_THREAD(thread_checker_);
546   if (start_complete_callback_) {
547     muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
548     start_complete_callback_ = nullptr;
549   }
550   if (blocking_start_complete_callback_) {
551     muxer_->task_runner_->PostTask(
552         std::move(blocking_start_complete_callback_));
553     blocking_start_complete_callback_ = nullptr;
554   }
555 }
556 
NotifyError(const TracingError & error)557 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
558   PERFETTO_DCHECK_THREAD(thread_checker_);
559   if (error_callback_) {
560     muxer_->task_runner_->PostTask(
561         std::bind(std::move(error_callback_), error));
562   }
563 }
564 
NotifyStopComplete()565 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
566   PERFETTO_DCHECK_THREAD(thread_checker_);
567   if (stop_complete_callback_) {
568     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
569     stop_complete_callback_ = nullptr;
570   }
571   if (blocking_stop_complete_callback_) {
572     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
573     blocking_stop_complete_callback_ = nullptr;
574   }
575 }
576 
OnTraceData(std::vector<TracePacket> packets,bool has_more)577 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
578     std::vector<TracePacket> packets,
579     bool has_more) {
580   PERFETTO_DCHECK_THREAD(thread_checker_);
581   if (!read_trace_callback_)
582     return;
583 
584   size_t capacity = 0;
585   for (const auto& packet : packets) {
586     // 16 is an over-estimation of the proto preamble size
587     capacity += packet.size() + 16;
588   }
589 
590   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
591   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
592   buf->reserve(capacity);
593   for (auto& packet : packets) {
594     char* start;
595     size_t size;
596     std::tie(start, size) = packet.GetProtoPreamble();
597     buf->insert(buf->end(), start, start + size);
598     for (auto& slice : packet.slices()) {
599       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
600       buf->insert(buf->end(), slice_data, slice_data + slice.size);
601     }
602   }
603 
604   auto callback = read_trace_callback_;
605   muxer_->task_runner_->PostTask([callback, buf, has_more] {
606     TracingSession::ReadTraceCallbackArgs callback_arg{};
607     callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
608     callback_arg.size = buf->size();
609     callback_arg.has_more = has_more;
610     callback(callback_arg);
611   });
612 
613   if (!has_more)
614     read_trace_callback_ = nullptr;
615 }
616 
OnObservableEvents(const ObservableEvents & events)617 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
618     const ObservableEvents& events) {
619   if (events.instance_state_changes_size()) {
620     for (const auto& state_change : events.instance_state_changes()) {
621       DataSourceHandle handle{state_change.producer_name(),
622                               state_change.data_source_name()};
623       data_source_states_[handle] =
624           state_change.state() ==
625           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
626     }
627   }
628 
629   if (events.instance_state_changes_size() ||
630       events.all_data_sources_started()) {
631     // Data sources are first reported as being stopped before starting, so once
632     // all the data sources we know about have started we can declare tracing
633     // begun. In the case where there are no matching data sources for the
634     // session, the service will report the all_data_sources_started() event
635     // without adding any instances (only since Android S / Perfetto v10.0).
636     if (start_complete_callback_ || blocking_start_complete_callback_) {
637       bool all_data_sources_started = std::all_of(
638           data_source_states_.cbegin(), data_source_states_.cend(),
639           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
640       if (all_data_sources_started)
641         NotifyStartComplete();
642     }
643   }
644 }
645 
OnSessionCloned(const OnSessionClonedArgs &)646 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
647     const OnSessionClonedArgs&) {
648   // CloneSession is not exposed in the SDK. This should never happen.
649   PERFETTO_DCHECK(false);
650 }
651 
OnTraceStats(bool success,const TraceStats & trace_stats)652 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
653     bool success,
654     const TraceStats& trace_stats) {
655   if (!get_trace_stats_callback_)
656     return;
657   TracingSession::GetTraceStatsCallbackArgs callback_arg{};
658   callback_arg.success = success;
659   callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
660   muxer_->task_runner_->PostTask(
661       std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
662   get_trace_stats_callback_ = nullptr;
663 }
664 
665 // The callbacks below are not used.
OnDetach(bool)666 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)667 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
668 // ----- End of TracingMuxerImpl::ConsumerImpl
669 
670 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
671 
672 // TracingSessionImpl is the RAII object returned to API clients when they
673 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
674 // tracing.
675 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)676 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
677     TracingMuxerImpl* muxer,
678     TracingSessionGlobalID session_id,
679     BackendType backend_type)
680     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
681 
682 // Can be destroyed from any thread.
~TracingSessionImpl()683 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
684   auto* muxer = muxer_;
685   auto session_id = session_id_;
686   muxer->task_runner_->PostTask(
687       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
688 }
689 
690 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)691 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
692                                                  int fd) {
693   auto* muxer = muxer_;
694   auto session_id = session_id_;
695   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
696   if (fd >= 0) {
697     base::ignore_result(backend_type_);  // For -Wunused in the amalgamation.
698 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
699     if (backend_type_ != kInProcessBackend) {
700       PERFETTO_FATAL(
701           "Passing a file descriptor to TracingSession::Setup() is only "
702           "supported with the kInProcessBackend on Windows. Use "
703           "TracingSession::ReadTrace() instead");
704     }
705 #endif
706     trace_config->set_write_into_file(true);
707     fd = dup(fd);
708   }
709   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
710     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
711   });
712 }
713 
714 // Can be called from any thread.
Start()715 void TracingMuxerImpl::TracingSessionImpl::Start() {
716   auto* muxer = muxer_;
717   auto session_id = session_id_;
718   muxer->task_runner_->PostTask(
719       [muxer, session_id] { muxer->StartTracingSession(session_id); });
720 }
721 
722 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)723 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
724     const TraceConfig& cfg) {
725   auto* muxer = muxer_;
726   auto session_id = session_id_;
727   muxer->task_runner_->PostTask([muxer, session_id, cfg] {
728     muxer->ChangeTracingSessionConfig(session_id, cfg);
729   });
730 }
731 
732 // Can be called from any thread except the service thread.
StartBlocking()733 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
734   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
735   auto* muxer = muxer_;
736   auto session_id = session_id_;
737   base::WaitableEvent tracing_started;
738   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
739     auto* consumer = muxer->FindConsumer(session_id);
740     if (!consumer) {
741       // TODO(skyostil): Signal an error to the user.
742       tracing_started.Notify();
743       return;
744     }
745     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
746     consumer->blocking_start_complete_callback_ = [&] {
747       tracing_started.Notify();
748     };
749     muxer->StartTracingSession(session_id);
750   });
751   tracing_started.Wait();
752 }
753 
754 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)755 void TracingMuxerImpl::TracingSessionImpl::Flush(
756     std::function<void(bool)> user_callback,
757     uint32_t timeout_ms) {
758   auto* muxer = muxer_;
759   auto session_id = session_id_;
760   muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
761     auto* consumer = muxer->FindConsumer(session_id);
762     if (!consumer) {
763       std::move(user_callback)(false);
764       return;
765     }
766     muxer->FlushTracingSession(session_id, timeout_ms,
767                                std::move(user_callback));
768   });
769 }
770 
771 // Can be called from any thread.
Stop()772 void TracingMuxerImpl::TracingSessionImpl::Stop() {
773   auto* muxer = muxer_;
774   auto session_id = session_id_;
775   muxer->task_runner_->PostTask(
776       [muxer, session_id] { muxer->StopTracingSession(session_id); });
777 }
778 
779 // Can be called from any thread except the service thread.
StopBlocking()780 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
781   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
782   auto* muxer = muxer_;
783   auto session_id = session_id_;
784   base::WaitableEvent tracing_stopped;
785   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
786     auto* consumer = muxer->FindConsumer(session_id);
787     if (!consumer) {
788       // TODO(skyostil): Signal an error to the user.
789       tracing_stopped.Notify();
790       return;
791     }
792     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
793     consumer->blocking_stop_complete_callback_ = [&] {
794       tracing_stopped.Notify();
795     };
796     muxer->StopTracingSession(session_id);
797   });
798   tracing_stopped.Wait();
799 }
800 
801 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)802 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
803   auto* muxer = muxer_;
804   auto session_id = session_id_;
805   muxer->task_runner_->PostTask([muxer, session_id, cb] {
806     muxer->ReadTracingSessionData(session_id, std::move(cb));
807   });
808 }
809 
810 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)811 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
812     std::function<void()> cb) {
813   auto* muxer = muxer_;
814   auto session_id = session_id_;
815   muxer->task_runner_->PostTask([muxer, session_id, cb] {
816     auto* consumer = muxer->FindConsumer(session_id);
817     if (!consumer)
818       return;
819     consumer->start_complete_callback_ = cb;
820   });
821 }
822 
823 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)824 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
825     std::function<void(TracingError)> cb) {
826   auto* muxer = muxer_;
827   auto session_id = session_id_;
828   muxer->task_runner_->PostTask([muxer, session_id, cb] {
829     auto* consumer = muxer->FindConsumer(session_id);
830     if (!consumer) {
831       // Notify the client about concurrent disconnection of the session.
832       if (cb)
833         cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
834       return;
835     }
836     consumer->error_callback_ = cb;
837   });
838 }
839 
840 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)841 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
842     std::function<void()> cb) {
843   auto* muxer = muxer_;
844   auto session_id = session_id_;
845   muxer->task_runner_->PostTask([muxer, session_id, cb] {
846     auto* consumer = muxer->FindConsumer(session_id);
847     if (!consumer)
848       return;
849     consumer->stop_complete_callback_ = cb;
850   });
851 }
852 
853 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)854 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
855     GetTraceStatsCallback cb) {
856   auto* muxer = muxer_;
857   auto session_id = session_id_;
858   muxer->task_runner_->PostTask([muxer, session_id, cb] {
859     muxer->GetTraceStats(session_id, std::move(cb));
860   });
861 }
862 
863 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)864 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
865     QueryServiceStateCallback cb) {
866   auto* muxer = muxer_;
867   auto session_id = session_id_;
868   muxer->task_runner_->PostTask([muxer, session_id, cb] {
869     muxer->QueryServiceState(session_id, std::move(cb));
870   });
871 }
872 
873 // ----- End of TracingMuxerImpl::TracingSessionImpl
874 
875 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
876 
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)877 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
878     TracingMuxerImpl* muxer,
879     TracingSessionGlobalID session_id,
880     BackendType backend_type)
881     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
882 
883 // Can be destroyed from any thread.
884 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
885     default;
886 
Abort()887 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
888   auto* muxer = muxer_;
889   auto session_id = session_id_;
890   auto backend_type = backend_type_;
891   muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
892     muxer->AbortStartupTracingSession(session_id, backend_type);
893   });
894 }
895 
896 // Must not be called from the SDK's internal thread.
AbortBlocking()897 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
898   auto* muxer = muxer_;
899   auto session_id = session_id_;
900   auto backend_type = backend_type_;
901   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
902   base::WaitableEvent event;
903   muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
904     muxer->AbortStartupTracingSession(session_id, backend_type);
905     event.Notify();
906   });
907   event.Wait();
908 }
909 
910 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
911 
912 // static
913 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
914 
915 // This is called by perfetto::Tracing::Initialize().
916 // Can be called on any thread. Typically, but not necessarily, that will be
917 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)918 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
919     : TracingMuxer(args.platform ? args.platform
920                                  : Platform::GetDefaultPlatform()) {
921   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
922   instance_ = this;
923 
924   // Create the thread where muxer, producers and service will live.
925   Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
926   task_runner_.reset(new NonReentrantTaskRunner(
927       this, platform_->CreateTaskRunner(std::move(tr_args))));
928 
929   // Run the initializer on that thread.
930   task_runner_->PostTask([this, args] {
931     Initialize(args);
932     AddBackends(args);
933   });
934 }
935 
Initialize(const TracingInitArgs & args)936 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
937   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
938 
939   policy_ = args.tracing_policy;
940   supports_multiple_data_source_instances_ =
941       args.supports_multiple_data_source_instances;
942 
943   // Fallback backend for producer creation for an unsupported backend type.
944   PERFETTO_CHECK(producer_backends_.empty());
945   AddProducerBackend(internal::TracingBackendFake::GetInstance(),
946                      BackendType::kUnspecifiedBackend, args);
947   // Fallback backend for consumer creation for an unsupported backend type.
948   // This backend simply fails any attempt to start a tracing session.
949   PERFETTO_CHECK(consumer_backends_.empty());
950   AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
951                      BackendType::kUnspecifiedBackend);
952 }
953 
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)954 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
955                                           BackendType type) {
956   if (!backend) {
957     // We skip the log in release builds because the *_backend_fake.cc code
958     // has already an ELOG before returning a nullptr.
959     PERFETTO_DLOG("Consumer backend creation failed, type %d",
960                   static_cast<int>(type));
961     return;
962   }
963   // Keep the backends sorted by type.
964   auto it =
965       std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
966                        type, CompareBackendByType<RegisteredConsumerBackend>());
967   it = consumer_backends_.emplace(it);
968 
969   RegisteredConsumerBackend& rb = *it;
970   rb.backend = backend;
971   rb.type = type;
972 }
973 
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)974 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
975                                           BackendType type,
976                                           const TracingInitArgs& args) {
977   if (!backend) {
978     // We skip the log in release builds because the *_backend_fake.cc code
979     // has already an ELOG before returning a nullptr.
980     PERFETTO_DLOG("Producer backend creation failed, type %d",
981                   static_cast<int>(type));
982     return;
983   }
984   TracingBackendId backend_id = producer_backends_.size();
985   // Keep the backends sorted by type.
986   auto it =
987       std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
988                        type, CompareBackendByType<RegisteredProducerBackend>());
989   it = producer_backends_.emplace(it);
990 
991   RegisteredProducerBackend& rb = *it;
992   rb.backend = backend;
993   rb.id = backend_id;
994   rb.type = type;
995   rb.producer.reset(
996       new ProducerImpl(this, backend_id, args.shmem_batch_commits_duration_ms));
997   rb.producer_conn_args.producer = rb.producer.get();
998   rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
999   rb.producer_conn_args.task_runner = task_runner_.get();
1000   rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
1001   rb.producer_conn_args.shmem_page_size_hint_bytes =
1002       args.shmem_page_size_hint_kb * 1024;
1003   rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
1004 }
1005 
1006 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)1007 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
1008   for (RegisteredProducerBackend& b : producer_backends_) {
1009     if (b.id == id) {
1010       return &b;
1011     }
1012   }
1013   return nullptr;
1014 }
1015 
1016 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)1017 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
1018   for (RegisteredProducerBackend& b : producer_backends_) {
1019     if (b.type == type) {
1020       return &b;
1021     }
1022   }
1023   return nullptr;
1024 }
1025 
1026 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)1027 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
1028   for (RegisteredConsumerBackend& b : consumer_backends_) {
1029     if (b.type == type) {
1030       return &b;
1031     }
1032   }
1033   return nullptr;
1034 }
1035 
AddBackends(const TracingInitArgs & args)1036 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1037   if (args.backends & kSystemBackend) {
1038     PERFETTO_CHECK(args.system_producer_backend_factory_);
1039     if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1040       AddProducerBackend(args.system_producer_backend_factory_(),
1041                          kSystemBackend, args);
1042     }
1043     if (args.enable_system_consumer) {
1044       PERFETTO_CHECK(args.system_consumer_backend_factory_);
1045       if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1046         AddConsumerBackend(args.system_consumer_backend_factory_(),
1047                            kSystemBackend);
1048       }
1049     }
1050   }
1051 
1052   if (args.backends & kInProcessBackend) {
1053     TracingBackend* b = nullptr;
1054     if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1055       if (!b) {
1056         PERFETTO_CHECK(args.in_process_backend_factory_);
1057         b = args.in_process_backend_factory_();
1058       }
1059       AddProducerBackend(b, kInProcessBackend, args);
1060     }
1061     if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1062       if (!b) {
1063         PERFETTO_CHECK(args.in_process_backend_factory_);
1064         b = args.in_process_backend_factory_();
1065       }
1066       AddConsumerBackend(b, kInProcessBackend);
1067     }
1068   }
1069 
1070   if (args.backends & kCustomBackend) {
1071     PERFETTO_CHECK(args.custom_backend);
1072     if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1073       AddProducerBackend(args.custom_backend, kCustomBackend, args);
1074     }
1075     if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1076       AddConsumerBackend(args.custom_backend, kCustomBackend);
1077     }
1078   }
1079 
1080   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1081     PERFETTO_FATAL("Unsupported tracing backend type");
1082   }
1083 }
1084 
1085 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,DataSourceStaticState * static_state)1086 bool TracingMuxerImpl::RegisterDataSource(
1087     const DataSourceDescriptor& descriptor,
1088     DataSourceFactory factory,
1089     DataSourceParams params,
1090     DataSourceStaticState* static_state) {
1091   // Ignore repeated registrations.
1092   if (static_state->index != kMaxDataSources)
1093     return true;
1094 
1095   uint32_t new_index = next_data_source_index_++;
1096   if (new_index >= kMaxDataSources) {
1097     PERFETTO_DLOG(
1098         "RegisterDataSource failed: too many data sources already registered");
1099     return false;
1100   }
1101 
1102   // Initialize the static state.
1103   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1104                 "instances[] size mismatch");
1105   for (size_t i = 0; i < static_state->instances.size(); i++)
1106     new (&static_state->instances[i]) DataSourceState{};
1107 
1108   static_state->index = new_index;
1109 
1110   // Generate a semi-unique id for this data source.
1111   base::Hasher hash;
1112   hash.Update(reinterpret_cast<intptr_t>(static_state));
1113   hash.Update(base::GetWallTimeNs().count());
1114   static_state->id = hash.digest() ? hash.digest() : 1;
1115 
1116   task_runner_->PostTask([this, descriptor, factory, static_state, params] {
1117     data_sources_.emplace_back();
1118     RegisteredDataSource& rds = data_sources_.back();
1119     rds.descriptor = descriptor;
1120     rds.factory = factory;
1121     rds.supports_multiple_instances =
1122         supports_multiple_data_source_instances_ &&
1123         params.supports_multiple_instances;
1124     rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1125     rds.static_state = static_state;
1126 
1127     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1128   });
1129   return true;
1130 }
1131 
1132 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1133 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1134     const DataSourceDescriptor& descriptor,
1135     const DataSourceStaticState* static_state) {
1136   task_runner_->PostTask([this, descriptor, static_state] {
1137     for (auto& rds : data_sources_) {
1138       if (rds.static_state == static_state) {
1139         PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1140         rds.descriptor = descriptor;
1141         rds.descriptor.set_id(static_state->id);
1142         UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1143         return;
1144       }
1145     }
1146   });
1147 }
1148 
1149 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1150 void TracingMuxerImpl::RegisterInterceptor(
1151     const InterceptorDescriptor& descriptor,
1152     InterceptorFactory factory,
1153     InterceptorBase::TLSFactory tls_factory,
1154     InterceptorBase::TracePacketCallback packet_callback) {
1155   task_runner_->PostTask(
1156       [this, descriptor, factory, tls_factory, packet_callback] {
1157         // Ignore repeated registrations.
1158         for (const auto& interceptor : interceptors_) {
1159           if (interceptor.descriptor.name() == descriptor.name()) {
1160             PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1161             PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1162             return;
1163           }
1164         }
1165         // Only allow certain interceptors for now.
1166         if (descriptor.name() != "test_interceptor" &&
1167             descriptor.name() != "console" &&
1168             descriptor.name() != "etwexport") {
1169           PERFETTO_ELOG(
1170               "Interceptors are experimental. If you want to use them, please "
1171               "get in touch with the project maintainers "
1172               "(https://perfetto.dev/docs/contributing/"
1173               "getting-started#community).");
1174           return;
1175         }
1176         interceptors_.emplace_back();
1177         RegisteredInterceptor& interceptor = interceptors_.back();
1178         interceptor.descriptor = descriptor;
1179         interceptor.factory = factory;
1180         interceptor.tls_factory = tls_factory;
1181         interceptor.packet_callback = packet_callback;
1182       });
1183 }
1184 
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1185 void TracingMuxerImpl::ActivateTriggers(
1186     const std::vector<std::string>& triggers,
1187     uint32_t ttl_ms) {
1188   base::TimeMillis expire_time =
1189       base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1190   task_runner_->PostTask([this, triggers, expire_time] {
1191     for (RegisteredProducerBackend& backend : producer_backends_) {
1192       if (backend.producer->connected_) {
1193         backend.producer->service_->ActivateTriggers(triggers);
1194       } else {
1195         for (const std::string& trigger : triggers) {
1196           backend.producer->on_connect_triggers_.emplace_back(trigger,
1197                                                               expire_time);
1198         }
1199       }
1200     }
1201   });
1202 }
1203 
1204 // Checks if there is any matching startup tracing data source instance for a
1205 // new SetupDataSource call. If so, moves the data source to this tracing
1206 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1207 static bool MaybeAdoptStartupTracingInDataSource(
1208     TracingBackendId backend_id,
1209     uint32_t backend_connection_id,
1210     DataSourceInstanceID instance_id,
1211     const DataSourceConfig& cfg,
1212     const std::vector<RegisteredDataSource>& data_sources) {
1213   uint64_t startup_config_hash = ComputeStartupConfigHash(cfg);
1214 
1215   for (const auto& rds : data_sources) {
1216     DataSourceStaticState* static_state = rds.static_state;
1217     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1218       auto* internal_state = static_state->TryGet(i);
1219 
1220       // TODO(eseckler): Instead of comparing config_hashes here, should we ask
1221       // the data source instance for a compat check of the config?
1222       if (internal_state &&
1223           internal_state->startup_target_buffer_reservation.load(
1224               std::memory_order_relaxed) &&
1225           internal_state->data_source_instance_id == 0 &&
1226           internal_state->backend_id == backend_id &&
1227           internal_state->backend_connection_id == backend_connection_id &&
1228           internal_state->startup_config_hash == startup_config_hash) {
1229         PERFETTO_DLOG("Setting up data source %" PRIu64
1230                       " %s by adopting it from a startup tracing session",
1231                       instance_id, cfg.name().c_str());
1232 
1233         std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1234         // Set the associations. The actual takeover happens in
1235         // StartDataSource().
1236         internal_state->data_source_instance_id = instance_id;
1237         internal_state->buffer_id =
1238             static_cast<internal::BufferId>(cfg.target_buffer());
1239         internal_state->config_hash = ComputeConfigHash(cfg);
1240 
1241         // TODO(eseckler): Should the data souce config provided by the service
1242         // be allowed to specify additional interceptors / additional data
1243         // source params?
1244 
1245         return true;
1246       }
1247     }
1248   }
1249   return false;
1250 }
1251 
1252 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1253 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1254                                        uint32_t backend_connection_id,
1255                                        DataSourceInstanceID instance_id,
1256                                        const DataSourceConfig& cfg) {
1257   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1258                 cfg.name().c_str());
1259   PERFETTO_DCHECK_THREAD(thread_checker_);
1260 
1261   // First check if there is any matching startup tracing data source instance.
1262   if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1263                                            instance_id, cfg, data_sources_)) {
1264     return;
1265   }
1266   uint64_t config_hash = ComputeConfigHash(cfg);
1267 
1268   for (const auto& rds : data_sources_) {
1269     if (rds.descriptor.name() != cfg.name())
1270       continue;
1271     DataSourceStaticState& static_state = *rds.static_state;
1272 
1273     // If this data source is already active for this exact config, don't start
1274     // another instance. This happens when we have several data sources with the
1275     // same name, in which case the service sends one SetupDataSource event for
1276     // each one. Since we can't map which event maps to which data source, we
1277     // ensure each event only starts one data source instance.
1278     // TODO(skyostil): Register a unique id with each data source to the service
1279     // to disambiguate.
1280     bool active_for_config = false;
1281     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1282       if (!static_state.TryGet(i))
1283         continue;
1284       auto* internal_state =
1285           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1286       if (internal_state->backend_id == backend_id &&
1287           internal_state->config_hash == config_hash) {
1288         active_for_config = true;
1289         break;
1290       }
1291     }
1292     if (active_for_config) {
1293       PERFETTO_DLOG(
1294           "Data source %s is already active with this config, skipping",
1295           cfg.name().c_str());
1296       continue;
1297     }
1298 
1299     SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1300                         cfg, config_hash, /*startup_config_hash=*/0,
1301                         /*startup_session_id=*/0);
1302     return;
1303   }
1304 }
1305 
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,uint64_t config_hash,uint64_t startup_config_hash,TracingSessionGlobalID startup_session_id)1306 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1307     const RegisteredDataSource& rds,
1308     TracingBackendId backend_id,
1309     uint32_t backend_connection_id,
1310     DataSourceInstanceID instance_id,
1311     const DataSourceConfig& cfg,
1312     uint64_t config_hash,
1313     uint64_t startup_config_hash,
1314     TracingSessionGlobalID startup_session_id) {
1315   PERFETTO_DCHECK_THREAD(thread_checker_);
1316   DataSourceStaticState& static_state = *rds.static_state;
1317 
1318   // If any bit is set in `static_state.valid_instances` then at least one
1319   // other instance of data source is running.
1320   if (!rds.supports_multiple_instances &&
1321       static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1322     PERFETTO_ELOG(
1323         "Failed to setup data source because some another instance of this "
1324         "data source is already active");
1325     return FindDataSourceRes();
1326   }
1327 
1328   for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1329     // Find a free slot.
1330     if (static_state.TryGet(i))
1331       continue;
1332 
1333     auto* internal_state =
1334         reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1335     std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1336     static_assert(
1337         std::is_same<decltype(internal_state->data_source_instance_id),
1338                      DataSourceInstanceID>::value,
1339         "data_source_instance_id type mismatch");
1340     internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1341 
1342     if (startup_session_id) {
1343       RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1344       uint16_t& last_reservation =
1345           backend.producer->last_startup_target_buffer_reservation_;
1346       if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1347         PERFETTO_ELOG(
1348             "Startup buffer reservations exhausted, dropping data source");
1349         return FindDataSourceRes();
1350       }
1351       internal_state->startup_target_buffer_reservation.store(
1352           ++last_reservation, std::memory_order_relaxed);
1353     } else {
1354       internal_state->startup_target_buffer_reservation.store(
1355           0, std::memory_order_relaxed);
1356     }
1357 
1358     internal_state->backend_id = backend_id;
1359     internal_state->backend_connection_id = backend_connection_id;
1360     internal_state->data_source_instance_id = instance_id;
1361     internal_state->buffer_id =
1362         static_cast<internal::BufferId>(cfg.target_buffer());
1363     internal_state->config_hash = config_hash;
1364     internal_state->startup_session_id = startup_session_id;
1365     internal_state->startup_config_hash = startup_config_hash;
1366     internal_state->data_source = rds.factory();
1367     internal_state->interceptor = nullptr;
1368     internal_state->interceptor_id = 0;
1369 
1370     if (cfg.has_interceptor_config()) {
1371       for (size_t j = 0; j < interceptors_.size(); j++) {
1372         if (cfg.interceptor_config().name() ==
1373             interceptors_[j].descriptor.name()) {
1374           PERFETTO_DLOG("Intercepting data source %" PRIu64
1375                         " \"%s\" into \"%s\"",
1376                         instance_id, cfg.name().c_str(),
1377                         cfg.interceptor_config().name().c_str());
1378           internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1379           internal_state->interceptor = interceptors_[j].factory();
1380           internal_state->interceptor->OnSetup({cfg});
1381           break;
1382         }
1383       }
1384       if (!internal_state->interceptor_id) {
1385         PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1386                       cfg.interceptor_config().name().c_str());
1387       }
1388     }
1389 
1390     // This must be made at the end. See matching acquire-load in
1391     // DataSource::Trace().
1392     static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1393 
1394     DataSourceBase::SetupArgs setup_args;
1395     setup_args.config = &cfg;
1396     setup_args.internal_instance_index = i;
1397 
1398     if (!rds.requires_callbacks_under_lock)
1399       lock.unlock();
1400     internal_state->data_source->OnSetup(setup_args);
1401 
1402     return FindDataSourceRes(&static_state, internal_state, i,
1403                              rds.requires_callbacks_under_lock);
1404   }
1405   PERFETTO_ELOG(
1406       "Maximum number of data source instances exhausted. "
1407       "Dropping data source %" PRIu64,
1408       instance_id);
1409   return FindDataSourceRes();
1410 }
1411 
1412 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1413 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1414                                        DataSourceInstanceID instance_id) {
1415   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1416   PERFETTO_DCHECK_THREAD(thread_checker_);
1417 
1418   auto ds = FindDataSource(backend_id, instance_id);
1419   if (!ds) {
1420     PERFETTO_ELOG("Could not find data source to start");
1421     return;
1422   }
1423 
1424   // Check if the data source was already started for startup tracing.
1425   uint16_t startup_reservation =
1426       ds.internal_state->startup_target_buffer_reservation.load(
1427           std::memory_order_relaxed);
1428   if (startup_reservation) {
1429     RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1430     TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1431     auto session_it = std::find_if(
1432         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1433         [session_id](const RegisteredStartupSession& session) {
1434           return session.session_id == session_id;
1435         });
1436     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1437 
1438     if (session_it->is_aborting) {
1439       PERFETTO_DLOG("Data source %" PRIu64
1440                     " was already aborted for startup tracing, not starting it",
1441                     instance_id);
1442       return;
1443     }
1444 
1445     PERFETTO_DLOG(
1446         "Data source %" PRIu64
1447         " was already started for startup tracing, binding its target buffer",
1448         instance_id);
1449 
1450     backend.producer->service_->MaybeSharedMemoryArbiter()
1451         ->BindStartupTargetBuffer(startup_reservation,
1452                                   ds.internal_state->buffer_id);
1453 
1454     // The reservation ID can be used even after binding it, so there's no need
1455     // for any barriers here - we just need atomicity.
1456     ds.internal_state->startup_target_buffer_reservation.store(
1457         0, std::memory_order_relaxed);
1458 
1459     // TODO(eseckler): Should we reset incremental state at this point, or
1460     // notify the data source some other way?
1461 
1462     // The session should not have been fully bound yet (or aborted).
1463     PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1464 
1465     session_it->num_unbound_data_sources--;
1466     if (session_it->num_unbound_data_sources == 0) {
1467       if (session_it->on_adopted)
1468         task_runner_->PostTask(session_it->on_adopted);
1469       backend.startup_sessions.erase(session_it);
1470     }
1471     return;
1472   }
1473 
1474   StartDataSourceImpl(ds);
1475 }
1476 
StartDataSourceImpl(const FindDataSourceRes & ds)1477 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1478   PERFETTO_DCHECK_THREAD(thread_checker_);
1479 
1480   DataSourceBase::StartArgs start_args{};
1481   start_args.internal_instance_index = ds.instance_idx;
1482 
1483   std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1484   if (ds.internal_state->interceptor)
1485     ds.internal_state->interceptor->OnStart({});
1486   ds.internal_state->trace_lambda_enabled.store(true,
1487                                                 std::memory_order_relaxed);
1488   PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1489 
1490   if (!ds.requires_callbacks_under_lock)
1491     lock.unlock();
1492   ds.internal_state->data_source->OnStart(start_args);
1493 }
1494 
1495 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1496 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1497     TracingBackendId backend_id,
1498     DataSourceInstanceID instance_id) {
1499   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1500   PERFETTO_DCHECK_THREAD(thread_checker_);
1501 
1502   auto ds = FindDataSource(backend_id, instance_id);
1503   if (!ds) {
1504     PERFETTO_ELOG("Could not find data source to stop");
1505     return;
1506   }
1507 
1508   StopDataSource_AsyncBeginImpl(ds);
1509 }
1510 
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1511 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1512     const FindDataSourceRes& ds) {
1513   TracingBackendId backend_id = ds.internal_state->backend_id;
1514   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1515   DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1516 
1517   StopArgsImpl stop_args{};
1518   stop_args.internal_instance_index = ds.instance_idx;
1519   stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1520                                   instance_id, ds] {
1521     // TracingMuxerImpl is long lived, capturing |this| is okay.
1522     // The notification closure can be moved out of the StopArgs by the
1523     // embedder to handle stop asynchronously. The embedder might then
1524     // call the closure on a different thread than the current one, hence
1525     // this nested PostTask().
1526     task_runner_->PostTask(
1527         [this, backend_id, backend_connection_id, instance_id, ds] {
1528           StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1529                                   instance_id, ds);
1530         });
1531   };
1532 
1533   {
1534     std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1535     if (ds.internal_state->interceptor)
1536       ds.internal_state->interceptor->OnStop({});
1537 
1538     if (!ds.requires_callbacks_under_lock)
1539       lock.unlock();
1540     ds.internal_state->data_source->OnStop(stop_args);
1541   }
1542 
1543   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1544   // async closure here. In theory we could avoid the PostTask and call
1545   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1546   // divergencies between the deferred-stop vs non-deferred-stop code paths.
1547   if (stop_args.async_stop_closure)
1548     std::move(stop_args.async_stop_closure)();
1549 }
1550 
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1551 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1552                                                uint32_t backend_connection_id,
1553                                                DataSourceInstanceID instance_id,
1554                                                const FindDataSourceRes& ds) {
1555   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1556   PERFETTO_DCHECK_THREAD(thread_checker_);
1557 
1558   // Check that the data source instance is still active and was not modified
1559   // while it was being stopped.
1560   if (!ds.static_state->TryGet(ds.instance_idx) ||
1561       ds.internal_state->backend_id != backend_id ||
1562       ds.internal_state->backend_connection_id != backend_connection_id ||
1563       ds.internal_state->data_source_instance_id != instance_id) {
1564     PERFETTO_ELOG(
1565         "Async stop of data source %" PRIu64
1566         " failed. This might be due to calling the async_stop_closure twice.",
1567         instance_id);
1568     return;
1569   }
1570 
1571   const uint32_t mask = ~(1 << ds.instance_idx);
1572   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1573 
1574   // Take the mutex to prevent that the data source is in the middle of
1575   // a Trace() execution where it called GetDataSourceLocked() while we
1576   // destroy it.
1577   uint16_t startup_buffer_reservation;
1578   TracingSessionGlobalID startup_session_id;
1579   {
1580     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1581     ds.internal_state->trace_lambda_enabled.store(false,
1582                                                   std::memory_order_relaxed);
1583     ds.internal_state->data_source.reset();
1584     ds.internal_state->interceptor.reset();
1585     startup_buffer_reservation =
1586         ds.internal_state->startup_target_buffer_reservation.load(
1587             std::memory_order_relaxed);
1588     startup_session_id = ds.internal_state->startup_session_id;
1589   }
1590 
1591   // The other fields of internal_state are deliberately *not* cleared.
1592   // See races-related comments of DataSource::Trace().
1593 
1594   TracingMuxer::generation_++;
1595 
1596   // |producer_backends_| is append-only, Backend instances are always valid.
1597   PERFETTO_CHECK(backend_id < producer_backends_.size());
1598   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1599   ProducerImpl* producer = backend.producer.get();
1600   if (!producer)
1601     return;
1602 
1603   // If the data source instance still has a startup buffer reservation, it was
1604   // only active for startup tracing and never started by the service. Discard
1605   // the startup buffer reservation.
1606   if (startup_buffer_reservation) {
1607     PERFETTO_DCHECK(startup_session_id);
1608 
1609     if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1610       producer->service_->MaybeSharedMemoryArbiter()
1611           ->AbortStartupTracingForReservation(startup_buffer_reservation);
1612     }
1613 
1614     auto session_it = std::find_if(
1615         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1616         [startup_session_id](const RegisteredStartupSession& session) {
1617           return session.session_id == startup_session_id;
1618         });
1619 
1620     // Session should not be removed until abortion of all data source instances
1621     // is complete.
1622     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1623 
1624     session_it->num_aborting_data_sources--;
1625     if (session_it->num_aborting_data_sources == 0) {
1626       if (session_it->on_aborted)
1627         task_runner_->PostTask(session_it->on_aborted);
1628 
1629       backend.startup_sessions.erase(session_it);
1630     }
1631   }
1632 
1633   if (producer->connected_) {
1634     // Flush any commits that might have been batched by SharedMemoryArbiter.
1635     producer->service_->MaybeSharedMemoryArbiter()
1636         ->FlushPendingCommitDataRequests();
1637     if (instance_id)
1638       producer->service_->NotifyDataSourceStopped(instance_id);
1639   }
1640   producer->SweepDeadServices();
1641 }
1642 
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1643 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1644     TracingBackendId backend_id,
1645     DataSourceInstanceID instance_id) {
1646   PERFETTO_DCHECK_THREAD(thread_checker_);
1647   PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1648                 instance_id);
1649   auto ds = FindDataSource(backend_id, instance_id);
1650   if (!ds) {
1651     PERFETTO_ELOG("Could not find data source to clear incremental state for");
1652     return;
1653   }
1654 
1655   DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1656   clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1657   {
1658     std::unique_lock<std::recursive_mutex> lock;
1659     if (ds.requires_callbacks_under_lock)
1660       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1661     ds.internal_state->data_source->WillClearIncrementalState(
1662         clear_incremental_state_args);
1663   }
1664 
1665   // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1666   // the incremental state should be cleared.
1667   ds.static_state->incremental_state_generation.fetch_add(
1668       1, std::memory_order_relaxed);
1669 }
1670 
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id)1671 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1672     TracingBackendId backend_id,
1673     DataSourceInstanceID instance_id,
1674     FlushRequestID flush_id) {
1675   PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1676   auto ds = FindDataSource(backend_id, instance_id);
1677   if (!ds) {
1678     PERFETTO_ELOG("Could not find data source to flush");
1679     return true;
1680   }
1681 
1682   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1683 
1684   FlushArgsImpl flush_args;
1685   flush_args.internal_instance_index = ds.instance_idx;
1686   flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1687                                     instance_id, ds, flush_id] {
1688     // TracingMuxerImpl is long lived, capturing |this| is okay.
1689     // The notification closure can be moved out of the StopArgs by the
1690     // embedder to handle stop asynchronously. The embedder might then
1691     // call the closure on a different thread than the current one, hence
1692     // this nested PostTask().
1693     task_runner_->PostTask(
1694         [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1695           FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1696                                    instance_id, ds, flush_id);
1697         });
1698   };
1699   {
1700     std::unique_lock<std::recursive_mutex> lock;
1701     if (ds.requires_callbacks_under_lock)
1702       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1703     ds.internal_state->data_source->OnFlush(flush_args);
1704   }
1705 
1706   // |async_flush_closure| is moved out of |flush_args| if the producer
1707   // requested to handle the flush asynchronously.
1708   bool handled = static_cast<bool>(flush_args.async_flush_closure);
1709   return handled;
1710 }
1711 
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1712 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1713     TracingBackendId backend_id,
1714     uint32_t backend_connection_id,
1715     DataSourceInstanceID instance_id,
1716     const FindDataSourceRes& ds,
1717     FlushRequestID flush_id) {
1718   PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1719   PERFETTO_DCHECK_THREAD(thread_checker_);
1720 
1721   // Check that the data source instance is still active and was not modified
1722   // while it was being flushed.
1723   if (!ds.static_state->TryGet(ds.instance_idx) ||
1724       ds.internal_state->backend_id != backend_id ||
1725       ds.internal_state->backend_connection_id != backend_connection_id ||
1726       ds.internal_state->data_source_instance_id != instance_id) {
1727     PERFETTO_ELOG("Async flush of data source %" PRIu64
1728                   " failed. This might be due to the data source being stopped "
1729                   "in the meantime",
1730                   instance_id);
1731     return;
1732   }
1733 
1734   // |producer_backends_| is append-only, Backend instances are always valid.
1735   PERFETTO_CHECK(backend_id < producer_backends_.size());
1736   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1737 
1738   ProducerImpl* producer = backend.producer.get();
1739   if (!producer)
1740     return;
1741 
1742   if (producer->connected_) {
1743     producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1744   }
1745 }
1746 
SyncProducersForTesting()1747 void TracingMuxerImpl::SyncProducersForTesting() {
1748   std::mutex mutex;
1749   std::condition_variable cv;
1750 
1751   // IPC-based producers don't report connection errors explicitly for each
1752   // command, but instead with an asynchronous callback
1753   // (ProducerImpl::OnDisconnected). This means that the sync command below
1754   // may have completed but failed to reach the service because of a
1755   // disconnection, but we can't tell until the disconnection message comes
1756   // through. To guard against this, we run two whole rounds of sync round-trips
1757   // before returning; the first one will detect any disconnected producers and
1758   // the second one will ensure any reconnections have completed and all data
1759   // sources are registered in the service again.
1760   for (size_t i = 0; i < 2; i++) {
1761     size_t countdown = std::numeric_limits<size_t>::max();
1762     task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1763       {
1764         std::unique_lock<std::mutex> countdown_lock(mutex);
1765         countdown = producer_backends_.size();
1766       }
1767       for (auto& backend : producer_backends_) {
1768         auto* producer = backend.producer.get();
1769         producer->service_->Sync([&mutex, &cv, &countdown] {
1770           std::unique_lock<std::mutex> countdown_lock(mutex);
1771           countdown--;
1772           cv.notify_one();
1773         });
1774       }
1775     });
1776 
1777     {
1778       std::unique_lock<std::mutex> countdown_lock(mutex);
1779       cv.wait(countdown_lock, [&countdown] { return !countdown; });
1780     }
1781   }
1782 
1783   // Check that all producers are indeed connected.
1784   bool done = false;
1785   bool all_producers_connected = true;
1786   task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1787     for (auto& backend : producer_backends_)
1788       all_producers_connected &= backend.producer->connected_;
1789     std::unique_lock<std::mutex> lock(mutex);
1790     done = true;
1791     cv.notify_one();
1792   });
1793 
1794   {
1795     std::unique_lock<std::mutex> lock(mutex);
1796     cv.wait(lock, [&done] { return done; });
1797   }
1798   PERFETTO_DCHECK(all_producers_connected);
1799 }
1800 
DestroyStoppedTraceWritersForCurrentThread()1801 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1802   // Iterate across all possible data source types.
1803   auto cur_generation = generation_.load(std::memory_order_acquire);
1804   auto* root_tls = GetOrCreateTracingTLS();
1805 
1806   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1807     // |tls| has a vector of per-data-source-instance thread-local state.
1808     DataSourceStaticState* static_state = tls.static_state;
1809     if (!static_state)
1810       return;  // Slot not used.
1811 
1812     // Iterate across all possible instances for this data source.
1813     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1814       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1815       if (!ds_tls.trace_writer)
1816         continue;
1817 
1818       DataSourceState* ds_state = static_state->TryGet(inst);
1819       if (ds_state &&
1820           ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1821           ds_state->backend_id == ds_tls.backend_id &&
1822           ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1823           ds_state->startup_target_buffer_reservation.load(
1824               std::memory_order_relaxed) ==
1825               ds_tls.startup_target_buffer_reservation &&
1826           ds_state->buffer_id == ds_tls.buffer_id &&
1827           ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1828         continue;
1829       }
1830 
1831       // The DataSource instance has been destroyed or recycled.
1832       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
1833     }
1834   };
1835 
1836   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1837     // |tls| has a vector of per-data-source-instance thread-local state.
1838     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1839     destroy_stopped_instances(tls);
1840   }
1841   destroy_stopped_instances(root_tls->track_event_tls);
1842   root_tls->generation = cur_generation;
1843 }
1844 
1845 // Called both when a new data source is registered or when a new backend
1846 // connects. In both cases we want to be sure we reflected the data source
1847 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1848 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1849   PERFETTO_DCHECK_THREAD(thread_checker_);
1850   for (RegisteredDataSource& rds : data_sources_) {
1851     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1852   }
1853 }
1854 
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1855 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1856                                                      bool is_changed) {
1857   PERFETTO_DCHECK_THREAD(thread_checker_);
1858   for (RegisteredProducerBackend& backend : producer_backends_) {
1859     // We cannot call RegisterDataSource on the backend before it connects.
1860     if (!backend.producer->connected_)
1861       continue;
1862 
1863     PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1864     bool is_registered = backend.producer->registered_data_sources_.test(
1865         rds.static_state->index);
1866     if (is_registered && !is_changed)
1867       continue;
1868 
1869     rds.descriptor.set_will_notify_on_start(true);
1870     rds.descriptor.set_will_notify_on_stop(true);
1871     rds.descriptor.set_handles_incremental_state_clear(true);
1872     rds.descriptor.set_id(rds.static_state->id);
1873     if (is_registered) {
1874       backend.producer->service_->UpdateDataSource(rds.descriptor);
1875     } else {
1876       backend.producer->service_->RegisterDataSource(rds.descriptor);
1877     }
1878     backend.producer->registered_data_sources_.set(rds.static_state->index);
1879   }
1880 }
1881 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1882 void TracingMuxerImpl::SetupTracingSession(
1883     TracingSessionGlobalID session_id,
1884     const std::shared_ptr<TraceConfig>& trace_config,
1885     base::ScopedFile trace_fd) {
1886   PERFETTO_DCHECK_THREAD(thread_checker_);
1887   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1888 
1889   auto* consumer = FindConsumer(session_id);
1890   if (!consumer)
1891     return;
1892 
1893   consumer->trace_config_ = trace_config;
1894   if (trace_fd)
1895     consumer->trace_fd_ = std::move(trace_fd);
1896 
1897   if (!consumer->connected_)
1898     return;
1899 
1900   // Only used in the deferred start mode.
1901   if (trace_config->deferred_start()) {
1902     consumer->service_->EnableTracing(*trace_config,
1903                                       std::move(consumer->trace_fd_));
1904   }
1905 }
1906 
StartTracingSession(TracingSessionGlobalID session_id)1907 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1908   PERFETTO_DCHECK_THREAD(thread_checker_);
1909 
1910   auto* consumer = FindConsumer(session_id);
1911 
1912   if (!consumer)
1913     return;
1914 
1915   if (!consumer->trace_config_) {
1916     PERFETTO_ELOG("Must call Setup(config) first");
1917     return;
1918   }
1919 
1920   if (!consumer->connected_) {
1921     consumer->start_pending_ = true;
1922     return;
1923   }
1924 
1925   consumer->start_pending_ = false;
1926   if (consumer->trace_config_->deferred_start()) {
1927     consumer->service_->StartTracing();
1928   } else {
1929     consumer->service_->EnableTracing(*consumer->trace_config_,
1930                                       std::move(consumer->trace_fd_));
1931   }
1932 
1933   // TODO implement support for the deferred-start + fast-triggering case.
1934 }
1935 
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1936 void TracingMuxerImpl::ChangeTracingSessionConfig(
1937     TracingSessionGlobalID session_id,
1938     const TraceConfig& trace_config) {
1939   PERFETTO_DCHECK_THREAD(thread_checker_);
1940 
1941   auto* consumer = FindConsumer(session_id);
1942 
1943   if (!consumer)
1944     return;
1945 
1946   if (!consumer->trace_config_) {
1947     // Changing the config is only supported for started sessions.
1948     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1949     return;
1950   }
1951 
1952   consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1953   if (consumer->connected_)
1954     consumer->service_->ChangeTraceConfig(trace_config);
1955 }
1956 
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1957 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1958                                            uint32_t timeout_ms,
1959                                            std::function<void(bool)> callback) {
1960   PERFETTO_DCHECK_THREAD(thread_checker_);
1961   auto* consumer = FindConsumer(session_id);
1962   if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1963       !consumer->trace_config_) {
1964     PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1965     std::move(callback)(false);
1966     return;
1967   }
1968 
1969   consumer->service_->Flush(timeout_ms, std::move(callback));
1970 }
1971 
StopTracingSession(TracingSessionGlobalID session_id)1972 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1973   PERFETTO_DCHECK_THREAD(thread_checker_);
1974   auto* consumer = FindConsumer(session_id);
1975   if (!consumer)
1976     return;
1977 
1978   if (consumer->start_pending_) {
1979     // If the session hasn't started yet, wait until it does before stopping.
1980     consumer->stop_pending_ = true;
1981     return;
1982   }
1983 
1984   consumer->stop_pending_ = false;
1985   if (consumer->stopped_) {
1986     // If the session was already stopped (e.g., it failed to start), don't try
1987     // stopping again.
1988     consumer->NotifyStopComplete();
1989   } else if (!consumer->trace_config_) {
1990     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1991     return;
1992   } else {
1993     consumer->service_->DisableTracing();
1994   }
1995 
1996   consumer->trace_config_.reset();
1997 }
1998 
DestroyTracingSession(TracingSessionGlobalID session_id)1999 void TracingMuxerImpl::DestroyTracingSession(
2000     TracingSessionGlobalID session_id) {
2001   PERFETTO_DCHECK_THREAD(thread_checker_);
2002   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2003     // We need to find the consumer (if any) and call Disconnect as we destroy
2004     // the tracing session. We can't call Disconnect() inside this for loop
2005     // because in the in-process case this will end up to a synchronous call to
2006     // OnConsumerDisconnect which will invalidate all the iterators to
2007     // |backend.consumers|.
2008     ConsumerImpl* consumer = nullptr;
2009     for (auto& con : backend.consumers) {
2010       if (con->session_id_ == session_id) {
2011         consumer = con.get();
2012         break;
2013       }
2014     }
2015     if (consumer) {
2016       // We broke out of the loop above on the assumption that each backend will
2017       // only have a single consumer per session. This DCHECK ensures that
2018       // this is the case.
2019       PERFETTO_DCHECK(
2020           std::count_if(backend.consumers.begin(), backend.consumers.end(),
2021                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2022                           return con->session_id_ == session_id;
2023                         }) == 1u);
2024       consumer->Disconnect();
2025     }
2026   }
2027 }
2028 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2029 void TracingMuxerImpl::ReadTracingSessionData(
2030     TracingSessionGlobalID session_id,
2031     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2032   PERFETTO_DCHECK_THREAD(thread_checker_);
2033   auto* consumer = FindConsumer(session_id);
2034   if (!consumer) {
2035     // TODO(skyostil): Signal an error to the user.
2036     TracingSession::ReadTraceCallbackArgs callback_arg{};
2037     callback(callback_arg);
2038     return;
2039   }
2040   PERFETTO_DCHECK(!consumer->read_trace_callback_);
2041   consumer->read_trace_callback_ = std::move(callback);
2042   consumer->service_->ReadBuffers();
2043 }
2044 
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2045 void TracingMuxerImpl::GetTraceStats(
2046     TracingSessionGlobalID session_id,
2047     TracingSession::GetTraceStatsCallback callback) {
2048   PERFETTO_DCHECK_THREAD(thread_checker_);
2049   auto* consumer = FindConsumer(session_id);
2050   if (!consumer) {
2051     TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2052     callback_arg.success = false;
2053     callback(std::move(callback_arg));
2054     return;
2055   }
2056   PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2057   consumer->get_trace_stats_callback_ = std::move(callback);
2058   if (!consumer->connected_) {
2059     consumer->get_trace_stats_pending_ = true;
2060     return;
2061   }
2062   consumer->get_trace_stats_pending_ = false;
2063   consumer->service_->GetTraceStats();
2064 }
2065 
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2066 void TracingMuxerImpl::QueryServiceState(
2067     TracingSessionGlobalID session_id,
2068     TracingSession::QueryServiceStateCallback callback) {
2069   PERFETTO_DCHECK_THREAD(thread_checker_);
2070   auto* consumer = FindConsumer(session_id);
2071   if (!consumer) {
2072     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2073     callback_arg.success = false;
2074     callback(std::move(callback_arg));
2075     return;
2076   }
2077   PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2078   if (!consumer->connected_) {
2079     consumer->query_service_state_callback_ = std::move(callback);
2080     return;
2081   }
2082   auto callback_wrapper = [callback](bool success,
2083                                      protos::gen::TracingServiceState state) {
2084     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2085     callback_arg.success = success;
2086     callback_arg.service_state_data = state.SerializeAsArray();
2087     callback(std::move(callback_arg));
2088   };
2089   consumer->service_->QueryServiceState(std::move(callback_wrapper));
2090 }
2091 
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2092 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2093     uint32_t batch_commits_duration_ms,
2094     BackendType backend_type) {
2095   for (RegisteredProducerBackend& backend : producer_backends_) {
2096     if (backend.producer && backend.producer->connected_ &&
2097         backend.type == backend_type) {
2098       backend.producer->service_->MaybeSharedMemoryArbiter()
2099           ->SetBatchCommitsDuration(batch_commits_duration_ms);
2100     }
2101   }
2102 }
2103 
EnableDirectSMBPatchingForTesting(BackendType backend_type)2104 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2105     BackendType backend_type) {
2106   for (RegisteredProducerBackend& backend : producer_backends_) {
2107     if (backend.producer && backend.producer->connected_ &&
2108         backend.type == backend_type &&
2109         !backend.producer->service_->MaybeSharedMemoryArbiter()
2110              ->EnableDirectSMBPatching()) {
2111       return false;
2112     }
2113   }
2114   return true;
2115 }
2116 
FindConsumer(TracingSessionGlobalID session_id)2117 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2118     TracingSessionGlobalID session_id) {
2119   PERFETTO_DCHECK_THREAD(thread_checker_);
2120   return FindConsumerAndBackend(session_id).first;
2121 }
2122 
2123 std::pair<TracingMuxerImpl::ConsumerImpl*,
2124           TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2125 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2126   PERFETTO_DCHECK_THREAD(thread_checker_);
2127   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2128     for (auto& consumer : backend.consumers) {
2129       if (consumer->session_id_ == session_id) {
2130         return {consumer.get(), &backend};
2131       }
2132     }
2133   }
2134   return {nullptr, nullptr};
2135 }
2136 
InitializeConsumer(TracingSessionGlobalID session_id)2137 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2138   PERFETTO_DCHECK_THREAD(thread_checker_);
2139 
2140   auto res = FindConsumerAndBackend(session_id);
2141   if (!res.first || !res.second)
2142     return;
2143   TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2144   RegisteredConsumerBackend& backend = *res.second;
2145 
2146   TracingBackend::ConnectConsumerArgs conn_args;
2147   conn_args.consumer = consumer;
2148   conn_args.task_runner = task_runner_.get();
2149   consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2150 }
2151 
OnConsumerDisconnected(ConsumerImpl * consumer)2152 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2153   PERFETTO_DCHECK_THREAD(thread_checker_);
2154   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2155     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2156       return con.get() == consumer;
2157     };
2158     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2159                                            backend.consumers.end(), pred),
2160                             backend.consumers.end());
2161   }
2162 }
2163 
SetMaxProducerReconnectionsForTesting(uint32_t count)2164 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2165   max_producer_reconnections_.store(count);
2166 }
2167 
OnProducerDisconnected(ProducerImpl * producer)2168 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2169   PERFETTO_DCHECK_THREAD(thread_checker_);
2170   for (RegisteredProducerBackend& backend : producer_backends_) {
2171     if (backend.producer.get() != producer)
2172       continue;
2173 
2174     // The tracing service is disconnected. It does not make sense to keep
2175     // tracing (we wouldn't be able to commit). On reconnection, the tracing
2176     // service will restart the data sources.
2177     for (const auto& rds : data_sources_) {
2178       DataSourceStaticState* static_state = rds.static_state;
2179       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2180         auto* internal_state = static_state->TryGet(i);
2181         if (internal_state && internal_state->backend_id == backend.id &&
2182             internal_state->backend_connection_id ==
2183                 backend.producer->connection_id_.load(
2184                     std::memory_order_relaxed)) {
2185           StopDataSource_AsyncBeginImpl(
2186               FindDataSourceRes(static_state, internal_state, i,
2187                                 rds.requires_callbacks_under_lock));
2188         }
2189       }
2190     }
2191 
2192     // Try reconnecting the disconnected producer. If the connection succeeds,
2193     // all the data sources will be automatically re-registered.
2194     if (producer->connection_id_.load(std::memory_order_relaxed) >
2195         max_producer_reconnections_.load()) {
2196       // Avoid reconnecting a failing producer too many times. Instead we just
2197       // leak the producer instead of trying to avoid further complicating
2198       // cross-thread trace writer creation.
2199       PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2200       continue;
2201     }
2202 
2203     backend.producer->Initialize(
2204         backend.backend->ConnectProducer(backend.producer_conn_args));
2205     // Don't use producer-provided SMBs for the next connection unless startup
2206     // tracing requires it again.
2207     backend.producer_conn_args.use_producer_provided_smb = false;
2208   }
2209 }
2210 
SweepDeadBackends()2211 void TracingMuxerImpl::SweepDeadBackends() {
2212   PERFETTO_DCHECK_THREAD(thread_checker_);
2213   for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2214     auto next_it = it;
2215     next_it++;
2216     if (it->producer->SweepDeadServices())
2217       dead_backends_.erase(it);
2218     it = next_it;
2219   }
2220 }
2221 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2222 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2223     TracingBackendId backend_id,
2224     DataSourceInstanceID instance_id) {
2225   PERFETTO_DCHECK_THREAD(thread_checker_);
2226   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2227   for (const auto& rds : data_sources_) {
2228     DataSourceStaticState* static_state = rds.static_state;
2229     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2230       auto* internal_state = static_state->TryGet(i);
2231       if (internal_state && internal_state->backend_id == backend_id &&
2232           internal_state->backend_connection_id ==
2233               backend.producer->connection_id_.load(
2234                   std::memory_order_relaxed) &&
2235           internal_state->data_source_instance_id == instance_id) {
2236         return FindDataSourceRes(static_state, internal_state, i,
2237                                  rds.requires_callbacks_under_lock);
2238       }
2239     }
2240   }
2241   return FindDataSourceRes();
2242 }
2243 
2244 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2245 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2246     DataSourceStaticState* static_state,
2247     uint32_t data_source_instance_index,
2248     DataSourceState* data_source,
2249     BufferExhaustedPolicy buffer_exhausted_policy) {
2250   if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2251     // If the session is being intercepted, return a heap-backed trace writer
2252     // instead. This is safe because all the data given to the interceptor is
2253     // either thread-local (|instance_index|), statically allocated
2254     // (|static_state|) or constant after initialization (|interceptor|). Access
2255     // to the interceptor instance itself through |data_source| is protected by
2256     // a statically allocated lock (similarly to the data source instance).
2257     auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2258     return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2259         interceptor.tls_factory(static_state, data_source_instance_index),
2260         interceptor.packet_callback, static_state, data_source_instance_index));
2261   }
2262   ProducerImpl* producer =
2263       FindProducerBackendById(data_source->backend_id)->producer.get();
2264   // Atomically load the current service endpoint. We keep the pointer as a
2265   // shared pointer on the stack to guard against it from being concurrently
2266   // modified on the thread by ProducerImpl::Initialize() swapping in a
2267   // reconnected service on the muxer task runner thread.
2268   //
2269   // The endpoint may also be concurrently modified by SweepDeadServices()
2270   // clearing out old disconnected services. We guard against that by
2271   // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2272   // shutdown has started, the trace writer created below will be a null one
2273   // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2274   //
2275   // We use an atomic pointer instead of holding a lock because
2276   // CreateTraceWriter posts tasks under the hood.
2277   std::shared_ptr<ProducerEndpoint> service =
2278       std::atomic_load(&producer->service_);
2279 
2280   // The service may have been disconnected and reconnected concurrently after
2281   // the data source was enabled, in which case we may not have an arbiter, or
2282   // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2283   // Instead, early-out now. A relaxed load is fine here because the atomic_load
2284   // above ensures that the |service| isn't newer.
2285   if (producer->connection_id_.load(std::memory_order_relaxed) !=
2286       data_source->backend_connection_id) {
2287     return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2288   }
2289 
2290   // We just need a relaxed atomic read here: We can use the reservation ID even
2291   // after the buffer was bound, we just need to be sure to read it atomically.
2292   uint16_t startup_buffer_reservation =
2293       data_source->startup_target_buffer_reservation.load(
2294           std::memory_order_relaxed);
2295   if (startup_buffer_reservation) {
2296     return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2297         startup_buffer_reservation);
2298   }
2299   return service->CreateTraceWriter(data_source->buffer_id,
2300                                     buffer_exhausted_policy);
2301 }
2302 
2303 // This is called via the public API Tracing::NewTrace().
2304 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2305 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2306     BackendType requested_backend_type,
2307     TracingConsumerBackend* (*system_backend_factory)()) {
2308   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2309 
2310   // |backend_type| can only specify one backend, not an OR-ed mask.
2311   PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2312 
2313   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2314   task_runner_->PostTask([this, requested_backend_type, session_id,
2315                           system_backend_factory] {
2316     if (requested_backend_type == kSystemBackend && system_backend_factory &&
2317         !FindConsumerBackendByType(kSystemBackend)) {
2318       AddConsumerBackend(system_backend_factory(), kSystemBackend);
2319     }
2320     for (RegisteredConsumerBackend& backend : consumer_backends_) {
2321       if (requested_backend_type && backend.type &&
2322           backend.type != requested_backend_type) {
2323         continue;
2324       }
2325 
2326       // Create the consumer now, even if we have to ask the embedder below, so
2327       // that any other tasks executing after this one can find the consumer and
2328       // change its pending attributes.
2329       backend.consumers.emplace_back(
2330           new ConsumerImpl(this, backend.type, session_id));
2331 
2332       // The last registered backend in |consumer_backends_| is the unsupported
2333       // backend without a valid type.
2334       if (!backend.type) {
2335         PERFETTO_ELOG(
2336             "No tracing backend ready for type=%d, consumer will disconnect",
2337             requested_backend_type);
2338         InitializeConsumer(session_id);
2339         return;
2340       }
2341 
2342       // Check if the embedder wants to be asked for permission before
2343       // connecting the consumer.
2344       if (!policy_) {
2345         InitializeConsumer(session_id);
2346         return;
2347       }
2348 
2349       BackendType type = backend.type;
2350       TracingPolicy::ShouldAllowConsumerSessionArgs args;
2351       args.backend_type = backend.type;
2352       args.result_callback = [this, type, session_id](bool allow) {
2353         task_runner_->PostTask([this, type, session_id, allow] {
2354           if (allow) {
2355             InitializeConsumer(session_id);
2356             return;
2357           }
2358 
2359           PERFETTO_ELOG(
2360               "Consumer session for backend type type=%d forbidden, "
2361               "consumer will disconnect",
2362               type);
2363 
2364           auto* consumer = FindConsumer(session_id);
2365           if (!consumer)
2366             return;
2367 
2368           consumer->OnDisconnect();
2369         });
2370       };
2371       policy_->ShouldAllowConsumerSession(args);
2372       return;
2373     }
2374     PERFETTO_DFATAL("Not reached");
2375   });
2376 
2377   return std::unique_ptr<TracingSession>(
2378       new TracingSessionImpl(this, session_id, requested_backend_type));
2379 }
2380 
2381 // static
2382 // This is called via the public API Tracing::SetupStartupTracing().
2383 // Can be called from any thread.
2384 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2385 TracingMuxerImpl::CreateStartupTracingSession(
2386     const TraceConfig& config,
2387     Tracing::SetupStartupTracingOpts opts) {
2388   BackendType backend_type = opts.backend;
2389   // |backend_type| can only specify one backend, not an OR-ed mask.
2390   PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2391   // The in-process backend doesn't support startup tracing.
2392   PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2393 
2394   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2395 
2396   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2397   task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2398     for (RegisteredProducerBackend& backend : producer_backends_) {
2399       if (backend_type && backend.type && backend.type != backend_type) {
2400         continue;
2401       }
2402 
2403       TracingBackendId backend_id = backend.id;
2404 
2405       // The last registered backend in |producer_backends_| is the unsupported
2406       // backend without a valid type.
2407       if (!backend.type) {
2408         PERFETTO_ELOG(
2409             "No tracing backend initialized for type=%d, startup tracing "
2410             "failed",
2411             backend_type);
2412         if (opts.on_setup)
2413           opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2414               0 /* num_data_sources_started */});
2415         return;
2416       }
2417 
2418       if (!backend.producer->service_ ||
2419           !backend.producer->service_->shared_memory()) {
2420         // If we unsuccessfully attempted to use a producer-provided SMB in the
2421         // past, don't try again.
2422         if (backend.producer->producer_provided_smb_failed_) {
2423           PERFETTO_ELOG(
2424               "Backend %zu doesn't seem to support producer-provided "
2425               "SMBs, startup tracing failed",
2426               backend_id);
2427           if (opts.on_setup)
2428             opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2429                 0 /* num_data_sources_started */});
2430           return;
2431         }
2432 
2433         PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2434                       backend_id);
2435         backend.producer_conn_args.use_producer_provided_smb = true;
2436         backend.producer->service_->Disconnect();  // Causes a reconnect.
2437         PERFETTO_DCHECK(backend.producer->service_ &&
2438                         backend.producer->service_->MaybeSharedMemoryArbiter());
2439       }
2440 
2441       RegisteredStartupSession session;
2442       session.session_id = session_id;
2443       session.on_aborted = opts.on_aborted;
2444       session.on_adopted = opts.on_adopted;
2445 
2446       for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2447         // Find all matching data sources and start one instance of each.
2448         for (const auto& rds : data_sources_) {
2449           if (rds.descriptor.name() != ds_cfg.config().name())
2450             continue;
2451 
2452           PERFETTO_DLOG(
2453               "Setting up data source %s for startup tracing with target "
2454               "buffer reservation %" PRIi32,
2455               rds.descriptor.name().c_str(),
2456               backend.producer->last_startup_target_buffer_reservation_ + 1u);
2457           auto ds = SetupDataSourceImpl(
2458               rds, backend_id,
2459               backend.producer->connection_id_.load(std::memory_order_relaxed),
2460               /*instance_id=*/0, ds_cfg.config(),
2461               ComputeConfigHash(ds_cfg.config()),
2462               ComputeStartupConfigHash(ds_cfg.config()),
2463               /*startup_session_id=*/session_id);
2464           if (ds) {
2465             StartDataSourceImpl(ds);
2466             session.num_unbound_data_sources++;
2467           }
2468         }
2469       }
2470 
2471       int num_ds = session.num_unbound_data_sources;
2472       auto on_setup = opts.on_setup;
2473       if (on_setup) {
2474         backend.producer->OnStartupTracingSetup();
2475         task_runner_->PostTask([on_setup, num_ds] {
2476           on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2477         });
2478       }
2479 
2480       if (num_ds > 0) {
2481         backend.startup_sessions.push_back(std::move(session));
2482 
2483         if (opts.timeout_ms > 0) {
2484           task_runner_->PostDelayedTask(
2485               [this, session_id, backend_type] {
2486                 AbortStartupTracingSession(session_id, backend_type);
2487               },
2488               opts.timeout_ms);
2489         }
2490       }
2491       return;
2492     }
2493     PERFETTO_DFATAL("Invalid startup tracing session backend");
2494   });
2495 
2496   return std::unique_ptr<StartupTracingSession>(
2497       new StartupTracingSessionImpl(this, session_id, backend_type));
2498 }
2499 
2500 // Must not be called from the SDK's internal thread.
2501 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2502 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2503     const TraceConfig& config,
2504     Tracing::SetupStartupTracingOpts opts) {
2505   auto previous_on_setup = std::move(opts.on_setup);
2506   PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2507   base::WaitableEvent event;
2508   // It is safe to capture by reference because once on_setup is called only
2509   // once before this method returns.
2510   opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2511     if (previous_on_setup) {
2512       previous_on_setup(std::move(args));
2513     }
2514     event.Notify();
2515   };
2516   auto session = CreateStartupTracingSession(config, std::move(opts));
2517   event.Wait();
2518   return session;
2519 }
2520 
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2521 void TracingMuxerImpl::AbortStartupTracingSession(
2522     TracingSessionGlobalID session_id,
2523     BackendType backend_type) {
2524   PERFETTO_DCHECK_THREAD(thread_checker_);
2525 
2526   for (RegisteredProducerBackend& backend : producer_backends_) {
2527     if (backend_type != backend.type)
2528       continue;
2529 
2530     auto session_it = std::find_if(
2531         backend.startup_sessions.begin(), backend.startup_sessions.end(),
2532         [session_id](const RegisteredStartupSession& session) {
2533           return session.session_id == session_id;
2534         });
2535 
2536     // The startup session may have already been aborted or fully adopted.
2537     if (session_it == backend.startup_sessions.end())
2538       return;
2539     if (session_it->is_aborting)
2540       return;
2541 
2542     session_it->is_aborting = true;
2543 
2544     // Iterate all data sources and abort them if they weren't adopted yet.
2545     for (const auto& rds : data_sources_) {
2546       DataSourceStaticState* static_state = rds.static_state;
2547       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2548         auto* internal_state = static_state->TryGet(i);
2549         if (internal_state &&
2550             internal_state->startup_target_buffer_reservation.load(
2551                 std::memory_order_relaxed) &&
2552             internal_state->data_source_instance_id == 0 &&
2553             internal_state->startup_session_id == session_id) {
2554           PERFETTO_DLOG(
2555               "Aborting startup tracing for data source %s (target buffer "
2556               "reservation %" PRIu16 ")",
2557               rds.descriptor.name().c_str(),
2558               internal_state->startup_target_buffer_reservation.load(
2559                   std::memory_order_relaxed));
2560 
2561           // Abort the instance asynchronously by stopping it. From this point
2562           // onwards, the service will not be able to adopt it via
2563           // StartDataSource().
2564           session_it->num_aborting_data_sources++;
2565           StopDataSource_AsyncBeginImpl(
2566               FindDataSourceRes(static_state, internal_state, i,
2567                                 rds.requires_callbacks_under_lock));
2568         }
2569       }
2570     }
2571 
2572     // If we did everything right, we should have aborted all still-unbound data
2573     // source instances.
2574     PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2575                     session_it->num_aborting_data_sources);
2576 
2577     if (session_it->num_aborting_data_sources == 0) {
2578       if (session_it->on_aborted)
2579         task_runner_->PostTask(session_it->on_aborted);
2580 
2581       backend.startup_sessions.erase(session_it);
2582     }
2583     return;
2584   }
2585   // We might reach here in tests because when we start a trace, we post the
2586   // Task(AbortStartupTrace, delay=timeout). When we do
2587   // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2588   // kill those delayed tasks because TaskRunner doesn't have support for
2589   // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2590   // to wait for the completion of all the scheduled tasks (apart from
2591   // deleting the TaskRunner) and we want to avoid doing that because we need
2592   // a long running TaskRunner in muxer.
2593   PERFETTO_DLOG("Invalid startup tracing session backend");
2594 }
2595 
InitializeInstance(const TracingInitArgs & args)2596 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2597   if (instance_ != TracingMuxerFake::Get()) {
2598     // The tracing muxer was already initialized. We might need to initialize
2599     // additional backends that were not configured earlier.
2600     auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2601     muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2602     return;
2603   }
2604   // If we previously had a TracingMuxerImpl instance which was reset,
2605   // reinitialize and reuse it instead of trying to create a new one. See
2606   // ResetForTesting().
2607   if (g_prev_instance) {
2608     auto* muxer = g_prev_instance;
2609     g_prev_instance = nullptr;
2610     instance_ = muxer;
2611     muxer->task_runner_->PostTask([muxer, args] {
2612       muxer->Initialize(args);
2613       muxer->AddBackends(args);
2614     });
2615   } else {
2616     new TracingMuxerImpl(args);
2617   }
2618 }
2619 
2620 // static
ResetForTesting()2621 void TracingMuxerImpl::ResetForTesting() {
2622   // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2623   // various objects make that a non-starter. In particular:
2624   //
2625   // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2626   //    a reference back to ProducerImpl::service_.
2627   //
2628   // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2629   //
2630   // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2631   //    turn depends on TracingMuxerImpl itself.
2632   //
2633   // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2634   // threads have dropped their TraceWriters. Since we can't really ask the
2635   // caller to guarantee this, we'll instead reset enough of the muxer's state
2636   // so that it can be reinitialized later and ensure all necessary objects from
2637   // the old state remain alive until all references have gone away.
2638   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2639 
2640   base::WaitableEvent reset_done;
2641   auto do_reset = [muxer, &reset_done] {
2642     muxer->DestroyStoppedTraceWritersForCurrentThread();
2643     // Unregister all data sources so they don't interfere with any future
2644     // tracing sessions.
2645     for (RegisteredDataSource& rds : muxer->data_sources_) {
2646       for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2647         if (!backend.producer->service_ || !backend.producer->connected_)
2648           continue;
2649         backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2650       }
2651     }
2652     for (auto& backend : muxer->consumer_backends_) {
2653       // Check that no consumer session is currently active on any backend.
2654       for (auto& consumer : backend.consumers)
2655         PERFETTO_CHECK(!consumer->service_);
2656     }
2657     for (auto& backend : muxer->producer_backends_) {
2658       backend.producer->muxer_ = nullptr;
2659       backend.producer->DisposeConnection();
2660       muxer->dead_backends_.push_back(std::move(backend));
2661     }
2662     muxer->consumer_backends_.clear();
2663     muxer->producer_backends_.clear();
2664     muxer->interceptors_.clear();
2665 
2666     for (auto& ds : muxer->data_sources_) {
2667       ds.static_state->ResetForTesting();
2668     }
2669 
2670     muxer->data_sources_.clear();
2671     muxer->next_data_source_index_ = 0;
2672 
2673     // Free all backends without active trace writers or other inbound
2674     // references. Note that even if all the backends get swept, the muxer still
2675     // needs to stay around since |task_runner_| is assumed to be long-lived.
2676     muxer->SweepDeadBackends();
2677 
2678     // Make sure we eventually discard any per-thread trace writers from the
2679     // previous instance.
2680     muxer->muxer_id_for_testing_++;
2681 
2682     g_prev_instance = muxer;
2683     instance_ = TracingMuxerFake::Get();
2684 
2685     // Call the user provided cleanups on the muxer thread.
2686     for (auto& cb : muxer->reset_callbacks_) {
2687       cb();
2688     }
2689 
2690     reset_done.Notify();
2691   };
2692 
2693   // Some tests run the muxer and the test on the same thread. In these cases,
2694   // we can reset synchronously.
2695   if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2696     do_reset();
2697   } else {
2698     muxer->DestroyStoppedTraceWritersForCurrentThread();
2699     muxer->task_runner_->PostTask(std::move(do_reset));
2700     reset_done.Wait();
2701     // Call the user provided cleanups also on this thread.
2702     for (auto& cb : muxer->reset_callbacks_) {
2703       cb();
2704     }
2705   }
2706   muxer->reset_callbacks_.clear();
2707 }
2708 
2709 // static
Shutdown()2710 void TracingMuxerImpl::Shutdown() {
2711   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2712 
2713   // Shutting down on the muxer thread would lead to a deadlock.
2714   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2715   muxer->DestroyStoppedTraceWritersForCurrentThread();
2716 
2717   std::unique_ptr<base::TaskRunner> owned_task_runner(
2718       muxer->task_runner_.get());
2719   base::WaitableEvent shutdown_done;
2720   owned_task_runner->PostTask([muxer, &shutdown_done] {
2721     // Check that no consumer session is currently active on any backend.
2722     // Producers will be automatically disconnected as a part of deleting the
2723     // muxer below.
2724     for (auto& backend : muxer->consumer_backends_) {
2725       for (auto& consumer : backend.consumers) {
2726         PERFETTO_CHECK(!consumer->service_);
2727       }
2728     }
2729     // Make sure no trace writers are lingering around on the muxer thread. Note
2730     // that we can't do this for any arbitrary thread in the process; it is the
2731     // caller's responsibility to clean them up before shutting down Perfetto.
2732     muxer->DestroyStoppedTraceWritersForCurrentThread();
2733     // The task runner must be deleted outside the muxer thread. This is done by
2734     // `owned_task_runner` above.
2735     muxer->task_runner_.release();
2736     auto* platform = muxer->platform_;
2737     delete muxer;
2738     instance_ = TracingMuxerFake::Get();
2739     platform->Shutdown();
2740     shutdown_done.Notify();
2741   });
2742   shutdown_done.Wait();
2743 }
2744 
AppendResetForTestingCallback(std::function<void ()> cb)2745 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2746   reset_callbacks_.push_back(std::move(cb));
2747 }
2748 
2749 TracingMuxer::~TracingMuxer() = default;
2750 
2751 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2752               "public's BufferId and tracing/core's BufferID diverged");
2753 
2754 }  // namespace internal
2755 }  // namespace perfetto
2756