• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24 
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49 
50 #include "protos/perfetto/config/interceptor_config.gen.h"
51 
52 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
53 #include <io.h>  // For dup()
54 #else
55 #include <unistd.h>  // For dup()
56 #endif
57 
58 namespace perfetto {
59 namespace internal {
60 
61 namespace {
62 
63 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
64 
65 // A task runner which prevents calls to DataSource::Trace() while an operation
66 // is in progress. Used to guard against unexpected re-entrancy where the
67 // user-provided task runner implementation tries to enter a trace point under
68 // the hood.
69 class NonReentrantTaskRunner : public base::TaskRunner {
70  public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)71   NonReentrantTaskRunner(TracingMuxer* muxer,
72                          std::unique_ptr<base::TaskRunner> task_runner)
73       : muxer_(muxer), task_runner_(std::move(task_runner)) {}
74 
75   // base::TaskRunner implementation.
PostTask(std::function<void ()> task)76   void PostTask(std::function<void()> task) override {
77     CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
78   }
79 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)80   void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
81     CallWithGuard(
82         [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
83   }
84 
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)85   void AddFileDescriptorWatch(base::PlatformHandle fd,
86                               std::function<void()> callback) override {
87     CallWithGuard(
88         [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
89   }
90 
RemoveFileDescriptorWatch(base::PlatformHandle fd)91   void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
92     CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
93   }
94 
RunsTasksOnCurrentThread() const95   bool RunsTasksOnCurrentThread() const override {
96     bool result;
97     CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
98     return result;
99   }
100 
101  private:
102   template <typename T>
CallWithGuard(T lambda) const103   void CallWithGuard(T lambda) const {
104     auto* root_tls = muxer_->GetOrCreateTracingTLS();
105     if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
106       lambda();
107       return;
108     }
109     ScopedReentrancyAnnotator scoped_annotator(*root_tls);
110     lambda();
111   }
112 
113   TracingMuxer* const muxer_;
114   std::unique_ptr<base::TaskRunner> task_runner_;
115 };
116 
117 class StopArgsImpl : public DataSourceBase::StopArgs {
118  public:
HandleStopAsynchronously() const119   std::function<void()> HandleStopAsynchronously() const override {
120     auto closure = std::move(async_stop_closure);
121     async_stop_closure = std::function<void()>();
122     return closure;
123   }
124 
125   mutable std::function<void()> async_stop_closure;
126 };
127 
128 class FlushArgsImpl : public DataSourceBase::FlushArgs {
129  public:
HandleFlushAsynchronously() const130   std::function<void()> HandleFlushAsynchronously() const override {
131     auto closure = std::move(async_flush_closure);
132     async_flush_closure = std::function<void()>();
133     return closure;
134   }
135 
136   mutable std::function<void()> async_flush_closure;
137 };
138 
139 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
140 static TracingMuxerImpl* g_prev_instance{};
141 
142 template <typename RegisteredBackend>
143 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anonab6d44540111::CompareBackendByType144   static int BackendTypePriority(BackendType type) {
145     switch (type) {
146       case kSystemBackend:
147         return 0;
148       case kInProcessBackend:
149         return 1;
150       case kCustomBackend:
151         return 2;
152       // The UnspecifiedBackend has the highest priority so that
153       // TracingBackendFake is the last one on the backend lists.
154       case kUnspecifiedBackend:
155         break;
156     }
157     return 3;
158   }
operator ()perfetto::internal::__anonab6d44540111::CompareBackendByType159   bool operator()(BackendType type, const RegisteredBackend& b) {
160     return BackendTypePriority(type) < BackendTypePriority(b.type);
161   }
162 };
163 
164 }  // namespace
165 
166 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms,bool shmem_direct_patching_enabled)167 TracingMuxerImpl::ProducerImpl::ProducerImpl(
168     TracingMuxerImpl* muxer,
169     TracingBackendId backend_id,
170     uint32_t shmem_batch_commits_duration_ms,
171     bool shmem_direct_patching_enabled)
172     : muxer_(muxer),
173       backend_id_(backend_id),
174       shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms),
175       shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {}
176 
~ProducerImpl()177 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
178   muxer_ = nullptr;
179 }
180 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)181 void TracingMuxerImpl::ProducerImpl::Initialize(
182     std::unique_ptr<ProducerEndpoint> endpoint) {
183   PERFETTO_DCHECK_THREAD(thread_checker_);
184   PERFETTO_DCHECK(!connected_);
185   connection_id_.fetch_add(1, std::memory_order_relaxed);
186   is_producer_provided_smb_ = endpoint->shared_memory();
187   last_startup_target_buffer_reservation_ = 0;
188 
189   // Adopt the endpoint into a shared pointer so that we can safely share it
190   // across threads that create trace writers. The custom deleter function
191   // ensures that the endpoint is always destroyed on the muxer's thread. (Note
192   // that |task_runner| is assumed to outlive tracing sessions on all threads.)
193   auto* task_runner = muxer_->task_runner_.get();
194   auto deleter = [task_runner](ProducerEndpoint* e) {
195     if (task_runner->RunsTasksOnCurrentThread()) {
196       delete e;
197       return;
198     }
199     task_runner->PostTask([e] { delete e; });
200   };
201   std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
202   // This atomic store is needed because another thread might be concurrently
203   // creating a trace writer using the previous (disconnected) |service_|. See
204   // CreateTraceWriter().
205   std::atomic_store(&service_, std::move(service));
206   // Don't try to use the service here since it may not have connected yet. See
207   // OnConnect().
208 }
209 
OnConnect()210 void TracingMuxerImpl::ProducerImpl::OnConnect() {
211   PERFETTO_DLOG("Producer connected");
212   PERFETTO_DCHECK_THREAD(thread_checker_);
213   PERFETTO_DCHECK(!connected_);
214   if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
215     PERFETTO_ELOG(
216         "The service likely doesn't support producer-provided SMBs. Preventing "
217         "future attempts to use producer-provided SMB again with this "
218         "backend.");
219     producer_provided_smb_failed_ = true;
220     // Will call OnDisconnect() and cause a reconnect without producer-provided
221     // SMB.
222     service_->Disconnect();
223     return;
224   }
225   connected_ = true;
226   muxer_->UpdateDataSourcesOnAllBackends();
227   SendOnConnectTriggers();
228 }
229 
OnDisconnect()230 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
231   PERFETTO_DCHECK_THREAD(thread_checker_);
232   // If we're being destroyed, bail out.
233   if (!muxer_)
234     return;
235   connected_ = false;
236   // Active data sources for this producer will be stopped by
237   // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
238   // will have a different connection id (even before it has finished
239   // connecting).
240   registered_data_sources_.reset();
241   DisposeConnection();
242 
243   // Try reconnecting the producer.
244   muxer_->OnProducerDisconnected(this);
245 }
246 
DisposeConnection()247 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
248   // Keep the old service around as a dead connection in case it has active
249   // trace writers. If any tracing sessions were created, we can't clear
250   // |service_| here because other threads may be concurrently creating new
251   // trace writers. Any reconnection attempt will atomically swap the new
252   // service in place of the old one.
253   if (did_setup_tracing_ || did_setup_startup_tracing_) {
254     dead_services_.push_back(service_);
255   } else {
256     service_.reset();
257   }
258 }
259 
OnTracingSetup()260 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
261   PERFETTO_DCHECK_THREAD(thread_checker_);
262   did_setup_tracing_ = true;
263   service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
264       shmem_batch_commits_duration_ms_);
265   if (shmem_direct_patching_enabled_) {
266     service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching();
267   }
268 }
269 
OnStartupTracingSetup()270 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
271   PERFETTO_DCHECK_THREAD(thread_checker_);
272   did_setup_startup_tracing_ = true;
273 }
274 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)275 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
276     DataSourceInstanceID id,
277     const DataSourceConfig& cfg) {
278   PERFETTO_DCHECK_THREAD(thread_checker_);
279   if (!muxer_)
280     return;
281   muxer_->SetupDataSource(
282       backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
283 }
284 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)285 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
286                                                      const DataSourceConfig&) {
287   PERFETTO_DCHECK_THREAD(thread_checker_);
288   if (!muxer_)
289     return;
290   muxer_->StartDataSource(backend_id_, id);
291   service_->NotifyDataSourceStarted(id);
292 }
293 
StopDataSource(DataSourceInstanceID id)294 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
295   PERFETTO_DCHECK_THREAD(thread_checker_);
296   if (!muxer_)
297     return;
298   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
299 }
300 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count,FlushFlags flush_flags)301 void TracingMuxerImpl::ProducerImpl::Flush(
302     FlushRequestID flush_id,
303     const DataSourceInstanceID* instances,
304     size_t instance_count,
305     FlushFlags flush_flags) {
306   PERFETTO_DCHECK_THREAD(thread_checker_);
307   bool all_handled = true;
308   if (muxer_) {
309     for (size_t i = 0; i < instance_count; i++) {
310       DataSourceInstanceID ds_id = instances[i];
311       bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id,
312                                                         flush_id, flush_flags);
313       if (!handled) {
314         pending_flushes_[flush_id].insert(ds_id);
315         all_handled = false;
316       }
317     }
318   }
319 
320   if (all_handled) {
321     service_->NotifyFlushComplete(flush_id);
322   }
323 }
324 
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)325 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
326     const DataSourceInstanceID* instances,
327     size_t instance_count) {
328   PERFETTO_DCHECK_THREAD(thread_checker_);
329   if (!muxer_)
330     return;
331   for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
332     muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
333   }
334 }
335 
SweepDeadServices()336 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
337   PERFETTO_DCHECK_THREAD(thread_checker_);
338   auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
339     auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
340     return !arbiter || arbiter->TryShutdown();
341   };
342   for (auto it = dead_services_.begin(); it != dead_services_.end();) {
343     auto next_it = it;
344     next_it++;
345     if (is_unused(*it)) {
346       dead_services_.erase(it);
347     }
348     it = next_it;
349   }
350   return dead_services_.empty();
351 }
352 
SendOnConnectTriggers()353 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
354   PERFETTO_DCHECK_THREAD(thread_checker_);
355   base::TimeMillis now = base::GetWallTimeMs();
356   std::vector<std::string> triggers;
357   while (!on_connect_triggers_.empty()) {
358     // Skip if we passed TTL.
359     if (on_connect_triggers_.front().second > now) {
360       triggers.push_back(std::move(on_connect_triggers_.front().first));
361     }
362     on_connect_triggers_.pop_front();
363   }
364   if (!triggers.empty()) {
365     service_->ActivateTriggers(triggers);
366   }
367 }
368 
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)369 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
370     DataSourceInstanceID ds_id,
371     FlushRequestID flush_id) {
372   if (!connected_) {
373     return;
374   }
375 
376   {
377     auto it = pending_flushes_.find(flush_id);
378     if (it == pending_flushes_.end()) {
379       return;
380     }
381     std::set<DataSourceInstanceID>& ds_ids = it->second;
382     ds_ids.erase(ds_id);
383   }
384 
385   std::optional<DataSourceInstanceID> biggest_flush_id;
386   for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
387     if (it->second.empty()) {
388       biggest_flush_id = it->first;
389       it = pending_flushes_.erase(it);
390     } else {
391       break;
392     }
393   }
394 
395   if (biggest_flush_id) {
396     service_->NotifyFlushComplete(*biggest_flush_id);
397   }
398 }
399 
400 // ----- End of TracingMuxerImpl::ProducerImpl methods.
401 
402 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)403 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
404                                              BackendType backend_type,
405                                              TracingSessionGlobalID session_id)
406     : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
407 
~ConsumerImpl()408 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
409   muxer_ = nullptr;
410 }
411 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)412 void TracingMuxerImpl::ConsumerImpl::Initialize(
413     std::unique_ptr<ConsumerEndpoint> endpoint) {
414   PERFETTO_DCHECK_THREAD(thread_checker_);
415   service_ = std::move(endpoint);
416   // Don't try to use the service here since it may not have connected yet. See
417   // OnConnect().
418 }
419 
OnConnect()420 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
421   PERFETTO_DCHECK_THREAD(thread_checker_);
422   PERFETTO_DCHECK(!connected_);
423   connected_ = true;
424 
425   // Observe data source instance events so we get notified when tracing starts.
426   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
427                           ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
428 
429   // If the API client configured and started tracing before we connected,
430   // tell the backend about it now.
431   if (trace_config_)
432     muxer_->SetupTracingSession(session_id_, trace_config_);
433   if (start_pending_)
434     muxer_->StartTracingSession(session_id_);
435   if (get_trace_stats_pending_) {
436     auto callback = std::move(get_trace_stats_callback_);
437     get_trace_stats_callback_ = nullptr;
438     muxer_->GetTraceStats(session_id_, std::move(callback));
439   }
440   if (query_service_state_callback_) {
441     auto callback = std::move(query_service_state_callback_);
442     query_service_state_callback_ = nullptr;
443     muxer_->QueryServiceState(session_id_, std::move(callback));
444   }
445   if (stop_pending_)
446     muxer_->StopTracingSession(session_id_);
447 }
448 
OnDisconnect()449 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
450   PERFETTO_DCHECK_THREAD(thread_checker_);
451   // If we're being destroyed, bail out.
452   if (!muxer_)
453     return;
454 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
455   if (!connected_ && backend_type_ == kSystemBackend) {
456     PERFETTO_ELOG(
457         "Unable to connect to the system tracing service as a consumer. On "
458         "Android, use the \"perfetto\" command line tool instead to start "
459         "system-wide tracing sessions");
460   }
461 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
462 
463   // Notify the client about disconnection.
464   NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
465 
466   // Make sure the client doesn't hang in a blocking start/stop because of the
467   // disconnection.
468   NotifyStartComplete();
469   NotifyStopComplete();
470 
471   // It shouldn't be necessary to call StopTracingSession. If we get this call
472   // it means that the service did shutdown before us, so there is no point
473   // trying it to ask it to stop the session. We should just remember to cleanup
474   // the consumer vector.
475   connected_ = false;
476 
477   // Notify the muxer that it is safe to destroy |this|. This is needed because
478   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
479   // access until OnDisconnect() is called.
480   muxer_->OnConsumerDisconnected(this);
481 }
482 
Disconnect()483 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
484   // This is weird and deserves a comment.
485   //
486   // When we called the ConnectConsumer method on the service it returns
487   // us a ConsumerEndpoint which we stored in |service_|, however this
488   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
489   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
490   // the ConsumerImpl pointer has to be valid until the
491   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
492   // ConsumerEndpoint |service_|. Eventually this will call
493   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
494   // call the destructor of |this|.
495   service_.reset();
496 }
497 
OnTracingDisabled(const std::string & error)498 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
499     const std::string& error) {
500   PERFETTO_DCHECK_THREAD(thread_checker_);
501   PERFETTO_DCHECK(!stopped_);
502   stopped_ = true;
503 
504   if (!error.empty())
505     NotifyError(TracingError{TracingError::kTracingFailed, error});
506 
507   // If we're still waiting for the start event, fire it now. This may happen if
508   // there are no active data sources in the session.
509   NotifyStartComplete();
510   NotifyStopComplete();
511 }
512 
NotifyStartComplete()513 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
514   PERFETTO_DCHECK_THREAD(thread_checker_);
515   if (start_complete_callback_) {
516     muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
517     start_complete_callback_ = nullptr;
518   }
519   if (blocking_start_complete_callback_) {
520     muxer_->task_runner_->PostTask(
521         std::move(blocking_start_complete_callback_));
522     blocking_start_complete_callback_ = nullptr;
523   }
524 }
525 
NotifyError(const TracingError & error)526 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
527   PERFETTO_DCHECK_THREAD(thread_checker_);
528   if (error_callback_) {
529     muxer_->task_runner_->PostTask(
530         std::bind(std::move(error_callback_), error));
531   }
532 }
533 
NotifyStopComplete()534 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
535   PERFETTO_DCHECK_THREAD(thread_checker_);
536   if (stop_complete_callback_) {
537     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
538     stop_complete_callback_ = nullptr;
539   }
540   if (blocking_stop_complete_callback_) {
541     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
542     blocking_stop_complete_callback_ = nullptr;
543   }
544 }
545 
OnTraceData(std::vector<TracePacket> packets,bool has_more)546 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
547     std::vector<TracePacket> packets,
548     bool has_more) {
549   PERFETTO_DCHECK_THREAD(thread_checker_);
550   if (!read_trace_callback_)
551     return;
552 
553   size_t capacity = 0;
554   for (const auto& packet : packets) {
555     // 16 is an over-estimation of the proto preamble size
556     capacity += packet.size() + 16;
557   }
558 
559   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
560   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
561   buf->reserve(capacity);
562   for (auto& packet : packets) {
563     char* start;
564     size_t size;
565     std::tie(start, size) = packet.GetProtoPreamble();
566     buf->insert(buf->end(), start, start + size);
567     for (auto& slice : packet.slices()) {
568       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
569       buf->insert(buf->end(), slice_data, slice_data + slice.size);
570     }
571   }
572 
573   auto callback = read_trace_callback_;
574   muxer_->task_runner_->PostTask([callback, buf, has_more] {
575     TracingSession::ReadTraceCallbackArgs callback_arg{};
576     callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
577     callback_arg.size = buf->size();
578     callback_arg.has_more = has_more;
579     callback(callback_arg);
580   });
581 
582   if (!has_more)
583     read_trace_callback_ = nullptr;
584 }
585 
OnObservableEvents(const ObservableEvents & events)586 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
587     const ObservableEvents& events) {
588   if (events.instance_state_changes_size()) {
589     for (const auto& state_change : events.instance_state_changes()) {
590       DataSourceHandle handle{state_change.producer_name(),
591                               state_change.data_source_name()};
592       data_source_states_[handle] =
593           state_change.state() ==
594           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
595     }
596   }
597 
598   if (events.instance_state_changes_size() ||
599       events.all_data_sources_started()) {
600     // Data sources are first reported as being stopped before starting, so once
601     // all the data sources we know about have started we can declare tracing
602     // begun. In the case where there are no matching data sources for the
603     // session, the service will report the all_data_sources_started() event
604     // without adding any instances (only since Android S / Perfetto v10.0).
605     if (start_complete_callback_ || blocking_start_complete_callback_) {
606       bool all_data_sources_started = std::all_of(
607           data_source_states_.cbegin(), data_source_states_.cend(),
608           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
609       if (all_data_sources_started)
610         NotifyStartComplete();
611     }
612   }
613 }
614 
OnSessionCloned(const OnSessionClonedArgs &)615 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
616     const OnSessionClonedArgs&) {
617   // CloneSession is not exposed in the SDK. This should never happen.
618   PERFETTO_DCHECK(false);
619 }
620 
OnTraceStats(bool success,const TraceStats & trace_stats)621 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
622     bool success,
623     const TraceStats& trace_stats) {
624   if (!get_trace_stats_callback_)
625     return;
626   TracingSession::GetTraceStatsCallbackArgs callback_arg{};
627   callback_arg.success = success;
628   callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
629   muxer_->task_runner_->PostTask(
630       std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
631   get_trace_stats_callback_ = nullptr;
632 }
633 
634 // The callbacks below are not used.
OnDetach(bool)635 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)636 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
637 // ----- End of TracingMuxerImpl::ConsumerImpl
638 
639 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
640 
641 // TracingSessionImpl is the RAII object returned to API clients when they
642 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
643 // tracing.
644 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)645 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
646     TracingMuxerImpl* muxer,
647     TracingSessionGlobalID session_id,
648     BackendType backend_type)
649     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
650 
651 // Can be destroyed from any thread.
~TracingSessionImpl()652 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
653   auto* muxer = muxer_;
654   auto session_id = session_id_;
655   muxer->task_runner_->PostTask(
656       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
657 }
658 
659 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)660 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
661                                                  int fd) {
662   auto* muxer = muxer_;
663   auto session_id = session_id_;
664   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
665   if (fd >= 0) {
666     base::ignore_result(backend_type_);  // For -Wunused in the amalgamation.
667 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
668     if (backend_type_ != kInProcessBackend) {
669       PERFETTO_FATAL(
670           "Passing a file descriptor to TracingSession::Setup() is only "
671           "supported with the kInProcessBackend on Windows. Use "
672           "TracingSession::ReadTrace() instead");
673     }
674 #endif
675     trace_config->set_write_into_file(true);
676     fd = dup(fd);
677   }
678   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
679     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
680   });
681 }
682 
683 // Can be called from any thread.
Start()684 void TracingMuxerImpl::TracingSessionImpl::Start() {
685   auto* muxer = muxer_;
686   auto session_id = session_id_;
687   muxer->task_runner_->PostTask(
688       [muxer, session_id] { muxer->StartTracingSession(session_id); });
689 }
690 
691 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)692 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
693     const TraceConfig& cfg) {
694   auto* muxer = muxer_;
695   auto session_id = session_id_;
696   muxer->task_runner_->PostTask([muxer, session_id, cfg] {
697     muxer->ChangeTracingSessionConfig(session_id, cfg);
698   });
699 }
700 
701 // Can be called from any thread except the service thread.
StartBlocking()702 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
703   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
704   auto* muxer = muxer_;
705   auto session_id = session_id_;
706   base::WaitableEvent tracing_started;
707   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
708     auto* consumer = muxer->FindConsumer(session_id);
709     if (!consumer) {
710       // TODO(skyostil): Signal an error to the user.
711       tracing_started.Notify();
712       return;
713     }
714     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
715     consumer->blocking_start_complete_callback_ = [&] {
716       tracing_started.Notify();
717     };
718     muxer->StartTracingSession(session_id);
719   });
720   tracing_started.Wait();
721 }
722 
723 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)724 void TracingMuxerImpl::TracingSessionImpl::Flush(
725     std::function<void(bool)> user_callback,
726     uint32_t timeout_ms) {
727   auto* muxer = muxer_;
728   auto session_id = session_id_;
729   muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
730     auto* consumer = muxer->FindConsumer(session_id);
731     if (!consumer) {
732       std::move(user_callback)(false);
733       return;
734     }
735     muxer->FlushTracingSession(session_id, timeout_ms,
736                                std::move(user_callback));
737   });
738 }
739 
740 // Can be called from any thread.
Stop()741 void TracingMuxerImpl::TracingSessionImpl::Stop() {
742   auto* muxer = muxer_;
743   auto session_id = session_id_;
744   muxer->task_runner_->PostTask(
745       [muxer, session_id] { muxer->StopTracingSession(session_id); });
746 }
747 
748 // Can be called from any thread except the service thread.
StopBlocking()749 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
750   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
751   auto* muxer = muxer_;
752   auto session_id = session_id_;
753   base::WaitableEvent tracing_stopped;
754   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
755     auto* consumer = muxer->FindConsumer(session_id);
756     if (!consumer) {
757       // TODO(skyostil): Signal an error to the user.
758       tracing_stopped.Notify();
759       return;
760     }
761     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
762     consumer->blocking_stop_complete_callback_ = [&] {
763       tracing_stopped.Notify();
764     };
765     muxer->StopTracingSession(session_id);
766   });
767   tracing_stopped.Wait();
768 }
769 
770 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)771 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
772   auto* muxer = muxer_;
773   auto session_id = session_id_;
774   muxer->task_runner_->PostTask([muxer, session_id, cb] {
775     muxer->ReadTracingSessionData(session_id, std::move(cb));
776   });
777 }
778 
779 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)780 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
781     std::function<void()> cb) {
782   auto* muxer = muxer_;
783   auto session_id = session_id_;
784   muxer->task_runner_->PostTask([muxer, session_id, cb] {
785     auto* consumer = muxer->FindConsumer(session_id);
786     if (!consumer)
787       return;
788     consumer->start_complete_callback_ = cb;
789   });
790 }
791 
792 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)793 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
794     std::function<void(TracingError)> cb) {
795   auto* muxer = muxer_;
796   auto session_id = session_id_;
797   muxer->task_runner_->PostTask([muxer, session_id, cb] {
798     auto* consumer = muxer->FindConsumer(session_id);
799     if (!consumer) {
800       // Notify the client about concurrent disconnection of the session.
801       if (cb)
802         cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
803       return;
804     }
805     consumer->error_callback_ = cb;
806   });
807 }
808 
809 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)810 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
811     std::function<void()> cb) {
812   auto* muxer = muxer_;
813   auto session_id = session_id_;
814   muxer->task_runner_->PostTask([muxer, session_id, cb] {
815     auto* consumer = muxer->FindConsumer(session_id);
816     if (!consumer)
817       return;
818     consumer->stop_complete_callback_ = cb;
819   });
820 }
821 
822 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)823 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
824     GetTraceStatsCallback cb) {
825   auto* muxer = muxer_;
826   auto session_id = session_id_;
827   muxer->task_runner_->PostTask([muxer, session_id, cb] {
828     muxer->GetTraceStats(session_id, std::move(cb));
829   });
830 }
831 
832 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)833 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
834     QueryServiceStateCallback cb) {
835   auto* muxer = muxer_;
836   auto session_id = session_id_;
837   muxer->task_runner_->PostTask([muxer, session_id, cb] {
838     muxer->QueryServiceState(session_id, std::move(cb));
839   });
840 }
841 
842 // ----- End of TracingMuxerImpl::TracingSessionImpl
843 
844 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
845 
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)846 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
847     TracingMuxerImpl* muxer,
848     TracingSessionGlobalID session_id,
849     BackendType backend_type)
850     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
851 
852 // Can be destroyed from any thread.
853 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
854     default;
855 
Abort()856 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
857   auto* muxer = muxer_;
858   auto session_id = session_id_;
859   auto backend_type = backend_type_;
860   muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
861     muxer->AbortStartupTracingSession(session_id, backend_type);
862   });
863 }
864 
865 // Must not be called from the SDK's internal thread.
AbortBlocking()866 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
867   auto* muxer = muxer_;
868   auto session_id = session_id_;
869   auto backend_type = backend_type_;
870   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
871   base::WaitableEvent event;
872   muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
873     muxer->AbortStartupTracingSession(session_id, backend_type);
874     event.Notify();
875   });
876   event.Wait();
877 }
878 
879 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
880 
881 // static
882 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
883 
884 // This is called by perfetto::Tracing::Initialize().
885 // Can be called on any thread. Typically, but not necessarily, that will be
886 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)887 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
888     : TracingMuxer(args.platform ? args.platform
889                                  : Platform::GetDefaultPlatform()) {
890   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
891   instance_ = this;
892 
893   // Create the thread where muxer, producers and service will live.
894   Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
895   task_runner_.reset(new NonReentrantTaskRunner(
896       this, platform_->CreateTaskRunner(std::move(tr_args))));
897 
898   // Run the initializer on that thread.
899   task_runner_->PostTask([this, args] {
900     Initialize(args);
901     AddBackends(args);
902   });
903 }
904 
Initialize(const TracingInitArgs & args)905 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
906   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
907 
908   policy_ = args.tracing_policy;
909   supports_multiple_data_source_instances_ =
910       args.supports_multiple_data_source_instances;
911 
912   // Fallback backend for producer creation for an unsupported backend type.
913   PERFETTO_CHECK(producer_backends_.empty());
914   AddProducerBackend(internal::TracingBackendFake::GetInstance(),
915                      BackendType::kUnspecifiedBackend, args);
916   // Fallback backend for consumer creation for an unsupported backend type.
917   // This backend simply fails any attempt to start a tracing session.
918   PERFETTO_CHECK(consumer_backends_.empty());
919   AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
920                      BackendType::kUnspecifiedBackend);
921 }
922 
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)923 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
924                                           BackendType type) {
925   if (!backend) {
926     // We skip the log in release builds because the *_backend_fake.cc code
927     // has already an ELOG before returning a nullptr.
928     PERFETTO_DLOG("Consumer backend creation failed, type %d",
929                   static_cast<int>(type));
930     return;
931   }
932   // Keep the backends sorted by type.
933   auto it =
934       std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
935                        type, CompareBackendByType<RegisteredConsumerBackend>());
936   it = consumer_backends_.emplace(it);
937 
938   RegisteredConsumerBackend& rb = *it;
939   rb.backend = backend;
940   rb.type = type;
941 }
942 
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)943 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
944                                           BackendType type,
945                                           const TracingInitArgs& args) {
946   if (!backend) {
947     // We skip the log in release builds because the *_backend_fake.cc code
948     // has already an ELOG before returning a nullptr.
949     PERFETTO_DLOG("Producer backend creation failed, type %d",
950                   static_cast<int>(type));
951     return;
952   }
953   TracingBackendId backend_id = producer_backends_.size();
954   // Keep the backends sorted by type.
955   auto it =
956       std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
957                        type, CompareBackendByType<RegisteredProducerBackend>());
958   it = producer_backends_.emplace(it);
959 
960   RegisteredProducerBackend& rb = *it;
961   rb.backend = backend;
962   rb.id = backend_id;
963   rb.type = type;
964   rb.producer.reset(new ProducerImpl(this, backend_id,
965                                      args.shmem_batch_commits_duration_ms,
966                                      args.shmem_direct_patching_enabled));
967   rb.producer_conn_args.producer = rb.producer.get();
968   rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
969   rb.producer_conn_args.task_runner = task_runner_.get();
970   rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
971   rb.producer_conn_args.shmem_page_size_hint_bytes =
972       args.shmem_page_size_hint_kb * 1024;
973   rb.producer_conn_args.create_socket_async = args.create_socket_async;
974   rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
975 }
976 
977 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)978 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
979   for (RegisteredProducerBackend& b : producer_backends_) {
980     if (b.id == id) {
981       return &b;
982     }
983   }
984   return nullptr;
985 }
986 
987 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)988 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
989   for (RegisteredProducerBackend& b : producer_backends_) {
990     if (b.type == type) {
991       return &b;
992     }
993   }
994   return nullptr;
995 }
996 
997 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)998 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
999   for (RegisteredConsumerBackend& b : consumer_backends_) {
1000     if (b.type == type) {
1001       return &b;
1002     }
1003   }
1004   return nullptr;
1005 }
1006 
AddBackends(const TracingInitArgs & args)1007 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1008   if (args.backends & kSystemBackend) {
1009     PERFETTO_CHECK(args.system_producer_backend_factory_);
1010     if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1011       AddProducerBackend(args.system_producer_backend_factory_(),
1012                          kSystemBackend, args);
1013     }
1014     if (args.enable_system_consumer) {
1015       PERFETTO_CHECK(args.system_consumer_backend_factory_);
1016       if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1017         AddConsumerBackend(args.system_consumer_backend_factory_(),
1018                            kSystemBackend);
1019       }
1020     }
1021   }
1022 
1023   if (args.backends & kInProcessBackend) {
1024     TracingBackend* b = nullptr;
1025     if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1026       if (!b) {
1027         PERFETTO_CHECK(args.in_process_backend_factory_);
1028         b = args.in_process_backend_factory_();
1029       }
1030       AddProducerBackend(b, kInProcessBackend, args);
1031     }
1032     if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1033       if (!b) {
1034         PERFETTO_CHECK(args.in_process_backend_factory_);
1035         b = args.in_process_backend_factory_();
1036       }
1037       AddConsumerBackend(b, kInProcessBackend);
1038     }
1039   }
1040 
1041   if (args.backends & kCustomBackend) {
1042     PERFETTO_CHECK(args.custom_backend);
1043     if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1044       AddProducerBackend(args.custom_backend, kCustomBackend, args);
1045     }
1046     if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1047       AddConsumerBackend(args.custom_backend, kCustomBackend);
1048     }
1049   }
1050 
1051   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1052     PERFETTO_FATAL("Unsupported tracing backend type");
1053   }
1054 }
1055 
1056 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,bool no_flush,DataSourceStaticState * static_state)1057 bool TracingMuxerImpl::RegisterDataSource(
1058     const DataSourceDescriptor& descriptor,
1059     DataSourceFactory factory,
1060     DataSourceParams params,
1061     bool no_flush,
1062     DataSourceStaticState* static_state) {
1063   // Ignore repeated registrations.
1064   if (static_state->index != kMaxDataSources)
1065     return true;
1066 
1067   uint32_t new_index = next_data_source_index_++;
1068   if (new_index >= kMaxDataSources) {
1069     PERFETTO_DLOG(
1070         "RegisterDataSource failed: too many data sources already registered");
1071     return false;
1072   }
1073 
1074   // Initialize the static state.
1075   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1076                 "instances[] size mismatch");
1077   for (size_t i = 0; i < static_state->instances.size(); i++)
1078     new (&static_state->instances[i]) DataSourceState{};
1079 
1080   static_state->index = new_index;
1081 
1082   // Generate a semi-unique id for this data source.
1083   base::Hasher hash;
1084   hash.Update(reinterpret_cast<intptr_t>(static_state));
1085   hash.Update(base::GetWallTimeNs().count());
1086   static_state->id = hash.digest() ? hash.digest() : 1;
1087 
1088   task_runner_->PostTask([this, descriptor, factory, static_state, params,
1089                           no_flush] {
1090     data_sources_.emplace_back();
1091     RegisteredDataSource& rds = data_sources_.back();
1092     rds.descriptor = descriptor;
1093     rds.factory = factory;
1094     rds.supports_multiple_instances =
1095         supports_multiple_data_source_instances_ &&
1096         params.supports_multiple_instances;
1097     rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1098     rds.static_state = static_state;
1099     rds.no_flush = no_flush;
1100 
1101     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1102   });
1103   return true;
1104 }
1105 
1106 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1107 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1108     const DataSourceDescriptor& descriptor,
1109     const DataSourceStaticState* static_state) {
1110   task_runner_->PostTask([this, descriptor, static_state] {
1111     for (auto& rds : data_sources_) {
1112       if (rds.static_state == static_state) {
1113         PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1114         rds.descriptor = descriptor;
1115         rds.descriptor.set_id(static_state->id);
1116         UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1117         return;
1118       }
1119     }
1120   });
1121 }
1122 
1123 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1124 void TracingMuxerImpl::RegisterInterceptor(
1125     const InterceptorDescriptor& descriptor,
1126     InterceptorFactory factory,
1127     InterceptorBase::TLSFactory tls_factory,
1128     InterceptorBase::TracePacketCallback packet_callback) {
1129   task_runner_->PostTask([this, descriptor, factory, tls_factory,
1130                           packet_callback] {
1131     // Ignore repeated registrations.
1132     for (const auto& interceptor : interceptors_) {
1133       if (interceptor.descriptor.name() == descriptor.name()) {
1134         PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1135         PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1136         return;
1137       }
1138     }
1139     // Only allow certain interceptors for now.
1140     if (descriptor.name() != "test_interceptor" &&
1141         descriptor.name() != "console" && descriptor.name() != "etwexport") {
1142       PERFETTO_ELOG(
1143           "Interceptors are experimental. If you want to use them, please "
1144           "get in touch with the project maintainers "
1145           "(https://perfetto.dev/docs/contributing/"
1146           "getting-started#community).");
1147       return;
1148     }
1149     interceptors_.emplace_back();
1150     RegisteredInterceptor& interceptor = interceptors_.back();
1151     interceptor.descriptor = descriptor;
1152     interceptor.factory = factory;
1153     interceptor.tls_factory = tls_factory;
1154     interceptor.packet_callback = packet_callback;
1155   });
1156 }
1157 
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1158 void TracingMuxerImpl::ActivateTriggers(
1159     const std::vector<std::string>& triggers,
1160     uint32_t ttl_ms) {
1161   base::TimeMillis expire_time =
1162       base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1163   task_runner_->PostTask([this, triggers, expire_time] {
1164     for (RegisteredProducerBackend& backend : producer_backends_) {
1165       if (backend.producer->connected_) {
1166         backend.producer->service_->ActivateTriggers(triggers);
1167       } else {
1168         for (const std::string& trigger : triggers) {
1169           backend.producer->on_connect_triggers_.emplace_back(trigger,
1170                                                               expire_time);
1171         }
1172       }
1173     }
1174   });
1175 }
1176 
1177 // Checks if there is any matching startup tracing data source instance for a
1178 // new SetupDataSource call. If so, moves the data source to this tracing
1179 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1180 static bool MaybeAdoptStartupTracingInDataSource(
1181     TracingBackendId backend_id,
1182     uint32_t backend_connection_id,
1183     DataSourceInstanceID instance_id,
1184     const DataSourceConfig& cfg,
1185     const std::vector<RegisteredDataSource>& data_sources) {
1186   for (const auto& rds : data_sources) {
1187     DataSourceStaticState* static_state = rds.static_state;
1188     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1189       auto* internal_state = static_state->TryGet(i);
1190 
1191       if (internal_state &&
1192           internal_state->startup_target_buffer_reservation.load(
1193               std::memory_order_relaxed) &&
1194           internal_state->data_source_instance_id == 0 &&
1195           internal_state->backend_id == backend_id &&
1196           internal_state->backend_connection_id == backend_connection_id &&
1197           internal_state->config &&
1198           internal_state->data_source->CanAdoptStartupSession(
1199               *internal_state->config, cfg)) {
1200         PERFETTO_DLOG("Setting up data source %" PRIu64
1201                       " %s by adopting it from a startup tracing session",
1202                       instance_id, cfg.name().c_str());
1203 
1204         std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1205         // Set the associations. The actual takeover happens in
1206         // StartDataSource().
1207         internal_state->data_source_instance_id = instance_id;
1208         internal_state->buffer_id =
1209             static_cast<internal::BufferId>(cfg.target_buffer());
1210         internal_state->config.reset(new DataSourceConfig(cfg));
1211 
1212         // TODO(eseckler): Should the data souce config provided by the service
1213         // be allowed to specify additional interceptors / additional data
1214         // source params?
1215 
1216         return true;
1217       }
1218     }
1219   }
1220   return false;
1221 }
1222 
1223 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1224 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1225                                        uint32_t backend_connection_id,
1226                                        DataSourceInstanceID instance_id,
1227                                        const DataSourceConfig& cfg) {
1228   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1229                 cfg.name().c_str());
1230   PERFETTO_DCHECK_THREAD(thread_checker_);
1231 
1232   // First check if there is any matching startup tracing data source instance.
1233   if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1234                                            instance_id, cfg, data_sources_)) {
1235     return;
1236   }
1237 
1238   for (const auto& rds : data_sources_) {
1239     if (rds.descriptor.name() != cfg.name())
1240       continue;
1241     DataSourceStaticState& static_state = *rds.static_state;
1242 
1243     // If this data source is already active for this exact config, don't start
1244     // another instance. This happens when we have several data sources with the
1245     // same name, in which case the service sends one SetupDataSource event for
1246     // each one. Since we can't map which event maps to which data source, we
1247     // ensure each event only starts one data source instance.
1248     // TODO(skyostil): Register a unique id with each data source to the service
1249     // to disambiguate.
1250     bool active_for_config = false;
1251     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1252       if (!static_state.TryGet(i))
1253         continue;
1254       auto* internal_state =
1255           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1256       if (internal_state->backend_id == backend_id &&
1257           internal_state->backend_connection_id == backend_connection_id &&
1258           internal_state->config && *internal_state->config == cfg) {
1259         active_for_config = true;
1260         break;
1261       }
1262     }
1263     if (active_for_config) {
1264       PERFETTO_DLOG(
1265           "Data source %s is already active with this config, skipping",
1266           cfg.name().c_str());
1267       continue;
1268     }
1269 
1270     SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1271                         cfg, /*startup_session_id=*/0);
1272     return;
1273   }
1274 }
1275 
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,TracingSessionGlobalID startup_session_id)1276 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1277     const RegisteredDataSource& rds,
1278     TracingBackendId backend_id,
1279     uint32_t backend_connection_id,
1280     DataSourceInstanceID instance_id,
1281     const DataSourceConfig& cfg,
1282     TracingSessionGlobalID startup_session_id) {
1283   PERFETTO_DCHECK_THREAD(thread_checker_);
1284   DataSourceStaticState& static_state = *rds.static_state;
1285 
1286   // If any bit is set in `static_state.valid_instances` then at least one
1287   // other instance of data source is running.
1288   if (!rds.supports_multiple_instances &&
1289       static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1290     PERFETTO_ELOG(
1291         "Failed to setup data source because some another instance of this "
1292         "data source is already active");
1293     return FindDataSourceRes();
1294   }
1295 
1296   for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1297     // Find a free slot.
1298     if (static_state.TryGet(i))
1299       continue;
1300 
1301     auto* internal_state =
1302         reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1303     std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1304     static_assert(
1305         std::is_same<decltype(internal_state->data_source_instance_id),
1306                      DataSourceInstanceID>::value,
1307         "data_source_instance_id type mismatch");
1308     internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1309     RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1310 
1311     if (startup_session_id) {
1312       uint16_t& last_reservation =
1313           backend.producer->last_startup_target_buffer_reservation_;
1314       if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1315         PERFETTO_ELOG(
1316             "Startup buffer reservations exhausted, dropping data source");
1317         return FindDataSourceRes();
1318       }
1319       internal_state->startup_target_buffer_reservation.store(
1320           ++last_reservation, std::memory_order_relaxed);
1321     } else {
1322       internal_state->startup_target_buffer_reservation.store(
1323           0, std::memory_order_relaxed);
1324     }
1325 
1326     internal_state->backend_id = backend_id;
1327     internal_state->backend_connection_id = backend_connection_id;
1328     internal_state->data_source_instance_id = instance_id;
1329     internal_state->buffer_id =
1330         static_cast<internal::BufferId>(cfg.target_buffer());
1331     internal_state->config.reset(new DataSourceConfig(cfg));
1332     internal_state->startup_session_id = startup_session_id;
1333     internal_state->data_source = rds.factory();
1334     internal_state->interceptor = nullptr;
1335     internal_state->interceptor_id = 0;
1336 
1337     if (cfg.has_interceptor_config()) {
1338       for (size_t j = 0; j < interceptors_.size(); j++) {
1339         if (cfg.interceptor_config().name() ==
1340             interceptors_[j].descriptor.name()) {
1341           PERFETTO_DLOG("Intercepting data source %" PRIu64
1342                         " \"%s\" into \"%s\"",
1343                         instance_id, cfg.name().c_str(),
1344                         cfg.interceptor_config().name().c_str());
1345           internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1346           internal_state->interceptor = interceptors_[j].factory();
1347           internal_state->interceptor->OnSetup({cfg});
1348           break;
1349         }
1350       }
1351       if (!internal_state->interceptor_id) {
1352         PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1353                       cfg.interceptor_config().name().c_str());
1354       }
1355     }
1356 
1357     // This must be made at the end. See matching acquire-load in
1358     // DataSource::Trace().
1359     static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1360 
1361     DataSourceBase::SetupArgs setup_args;
1362     setup_args.config = &cfg;
1363     setup_args.backend_type = backend.type;
1364     setup_args.internal_instance_index = i;
1365 
1366     if (!rds.requires_callbacks_under_lock)
1367       lock.unlock();
1368     internal_state->data_source->OnSetup(setup_args);
1369 
1370     return FindDataSourceRes(&static_state, internal_state, i,
1371                              rds.requires_callbacks_under_lock);
1372   }
1373   PERFETTO_ELOG(
1374       "Maximum number of data source instances exhausted. "
1375       "Dropping data source %" PRIu64,
1376       instance_id);
1377   return FindDataSourceRes();
1378 }
1379 
1380 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1381 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1382                                        DataSourceInstanceID instance_id) {
1383   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1384   PERFETTO_DCHECK_THREAD(thread_checker_);
1385 
1386   auto ds = FindDataSource(backend_id, instance_id);
1387   if (!ds) {
1388     PERFETTO_ELOG("Could not find data source to start");
1389     return;
1390   }
1391 
1392   // Check if the data source was already started for startup tracing.
1393   uint16_t startup_reservation =
1394       ds.internal_state->startup_target_buffer_reservation.load(
1395           std::memory_order_relaxed);
1396   if (startup_reservation) {
1397     RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1398     TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1399     auto session_it = std::find_if(
1400         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1401         [session_id](const RegisteredStartupSession& session) {
1402           return session.session_id == session_id;
1403         });
1404     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1405 
1406     if (session_it->is_aborting) {
1407       PERFETTO_DLOG("Data source %" PRIu64
1408                     " was already aborted for startup tracing, not starting it",
1409                     instance_id);
1410       return;
1411     }
1412 
1413     PERFETTO_DLOG(
1414         "Data source %" PRIu64
1415         " was already started for startup tracing, binding its target buffer",
1416         instance_id);
1417 
1418     backend.producer->service_->MaybeSharedMemoryArbiter()
1419         ->BindStartupTargetBuffer(startup_reservation,
1420                                   ds.internal_state->buffer_id);
1421 
1422     // The reservation ID can be used even after binding it, so there's no need
1423     // for any barriers here - we just need atomicity.
1424     ds.internal_state->startup_target_buffer_reservation.store(
1425         0, std::memory_order_relaxed);
1426 
1427     // TODO(eseckler): Should we reset incremental state at this point, or
1428     // notify the data source some other way?
1429 
1430     // The session should not have been fully bound yet (or aborted).
1431     PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1432 
1433     session_it->num_unbound_data_sources--;
1434     if (session_it->num_unbound_data_sources == 0) {
1435       if (session_it->on_adopted)
1436         task_runner_->PostTask(session_it->on_adopted);
1437       backend.startup_sessions.erase(session_it);
1438     }
1439     return;
1440   }
1441 
1442   StartDataSourceImpl(ds);
1443 }
1444 
StartDataSourceImpl(const FindDataSourceRes & ds)1445 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1446   PERFETTO_DCHECK_THREAD(thread_checker_);
1447 
1448   DataSourceBase::StartArgs start_args{};
1449   start_args.internal_instance_index = ds.instance_idx;
1450 
1451   std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1452   if (ds.internal_state->interceptor)
1453     ds.internal_state->interceptor->OnStart({});
1454   ds.internal_state->trace_lambda_enabled.store(true,
1455                                                 std::memory_order_relaxed);
1456   PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1457 
1458   if (!ds.requires_callbacks_under_lock)
1459     lock.unlock();
1460   ds.internal_state->data_source->OnStart(start_args);
1461 }
1462 
1463 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1464 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1465     TracingBackendId backend_id,
1466     DataSourceInstanceID instance_id) {
1467   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1468   PERFETTO_DCHECK_THREAD(thread_checker_);
1469 
1470   auto ds = FindDataSource(backend_id, instance_id);
1471   if (!ds) {
1472     PERFETTO_ELOG("Could not find data source to stop");
1473     return;
1474   }
1475 
1476   StopDataSource_AsyncBeginImpl(ds);
1477 }
1478 
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1479 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1480     const FindDataSourceRes& ds) {
1481   TracingBackendId backend_id = ds.internal_state->backend_id;
1482   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1483   DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1484 
1485   StopArgsImpl stop_args{};
1486   stop_args.internal_instance_index = ds.instance_idx;
1487   stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1488                                   instance_id, ds] {
1489     // TracingMuxerImpl is long lived, capturing |this| is okay.
1490     // The notification closure can be moved out of the StopArgs by the
1491     // embedder to handle stop asynchronously. The embedder might then
1492     // call the closure on a different thread than the current one, hence
1493     // this nested PostTask().
1494     task_runner_->PostTask(
1495         [this, backend_id, backend_connection_id, instance_id, ds] {
1496           StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1497                                   instance_id, ds);
1498         });
1499   };
1500 
1501   {
1502     std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1503 
1504     // Don't call OnStop again if the datasource is already stopping.
1505     if (ds.internal_state->async_stop_in_progress)
1506       return;
1507     ds.internal_state->async_stop_in_progress = true;
1508 
1509     if (ds.internal_state->interceptor)
1510       ds.internal_state->interceptor->OnStop({});
1511 
1512     if (!ds.requires_callbacks_under_lock)
1513       lock.unlock();
1514     ds.internal_state->data_source->OnStop(stop_args);
1515   }
1516 
1517   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1518   // async closure here. In theory we could avoid the PostTask and call
1519   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1520   // divergencies between the deferred-stop vs non-deferred-stop code paths.
1521   if (stop_args.async_stop_closure)
1522     std::move(stop_args.async_stop_closure)();
1523 }
1524 
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1525 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1526                                                uint32_t backend_connection_id,
1527                                                DataSourceInstanceID instance_id,
1528                                                const FindDataSourceRes& ds) {
1529   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1530   PERFETTO_DCHECK_THREAD(thread_checker_);
1531 
1532   // Check that the data source instance is still active and was not modified
1533   // while it was being stopped.
1534   if (!ds.static_state->TryGet(ds.instance_idx) ||
1535       ds.internal_state->backend_id != backend_id ||
1536       ds.internal_state->backend_connection_id != backend_connection_id ||
1537       ds.internal_state->data_source_instance_id != instance_id) {
1538     PERFETTO_ELOG(
1539         "Async stop of data source %" PRIu64
1540         " failed. This might be due to calling the async_stop_closure twice.",
1541         instance_id);
1542     return;
1543   }
1544 
1545   const uint32_t mask = ~(1 << ds.instance_idx);
1546   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1547 
1548   // Take the mutex to prevent that the data source is in the middle of
1549   // a Trace() execution where it called GetDataSourceLocked() while we
1550   // destroy it.
1551   uint16_t startup_buffer_reservation;
1552   TracingSessionGlobalID startup_session_id;
1553   {
1554     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1555     ds.internal_state->trace_lambda_enabled.store(false,
1556                                                   std::memory_order_relaxed);
1557     ds.internal_state->data_source.reset();
1558     ds.internal_state->interceptor.reset();
1559     ds.internal_state->config.reset();
1560     ds.internal_state->async_stop_in_progress = false;
1561     startup_buffer_reservation =
1562         ds.internal_state->startup_target_buffer_reservation.load(
1563             std::memory_order_relaxed);
1564     startup_session_id = ds.internal_state->startup_session_id;
1565   }
1566 
1567   // The other fields of internal_state are deliberately *not* cleared.
1568   // See races-related comments of DataSource::Trace().
1569 
1570   TracingMuxer::generation_++;
1571 
1572   // |producer_backends_| is append-only, Backend instances are always valid.
1573   PERFETTO_CHECK(backend_id < producer_backends_.size());
1574   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1575   ProducerImpl* producer = backend.producer.get();
1576   if (!producer)
1577     return;
1578 
1579   // If the data source instance still has a startup buffer reservation, it was
1580   // only active for startup tracing and never started by the service. Discard
1581   // the startup buffer reservation.
1582   if (startup_buffer_reservation) {
1583     PERFETTO_DCHECK(startup_session_id);
1584 
1585     if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1586       producer->service_->MaybeSharedMemoryArbiter()
1587           ->AbortStartupTracingForReservation(startup_buffer_reservation);
1588     }
1589 
1590     auto session_it = std::find_if(
1591         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1592         [startup_session_id](const RegisteredStartupSession& session) {
1593           return session.session_id == startup_session_id;
1594         });
1595 
1596     // Session should not be removed until abortion of all data source instances
1597     // is complete.
1598     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1599 
1600     session_it->num_aborting_data_sources--;
1601     if (session_it->num_aborting_data_sources == 0) {
1602       if (session_it->on_aborted)
1603         task_runner_->PostTask(session_it->on_aborted);
1604 
1605       backend.startup_sessions.erase(session_it);
1606     }
1607   }
1608 
1609   if (producer->connected_ &&
1610       backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1611           backend_connection_id) {
1612     // Flush any commits that might have been batched by SharedMemoryArbiter.
1613     producer->service_->MaybeSharedMemoryArbiter()
1614         ->FlushPendingCommitDataRequests();
1615     if (instance_id)
1616       producer->service_->NotifyDataSourceStopped(instance_id);
1617   }
1618   producer->SweepDeadServices();
1619 }
1620 
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1621 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1622     TracingBackendId backend_id,
1623     DataSourceInstanceID instance_id) {
1624   PERFETTO_DCHECK_THREAD(thread_checker_);
1625   PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1626                 instance_id);
1627   auto ds = FindDataSource(backend_id, instance_id);
1628   if (!ds) {
1629     PERFETTO_ELOG("Could not find data source to clear incremental state for");
1630     return;
1631   }
1632 
1633   DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1634   clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1635   {
1636     std::unique_lock<std::recursive_mutex> lock;
1637     if (ds.requires_callbacks_under_lock)
1638       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1639     ds.internal_state->data_source->WillClearIncrementalState(
1640         clear_incremental_state_args);
1641   }
1642 
1643   // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1644   // the incremental state should be cleared.
1645   ds.static_state->incremental_state_generation.fetch_add(
1646       1, std::memory_order_relaxed);
1647 }
1648 
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id,FlushFlags flush_flags)1649 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1650     TracingBackendId backend_id,
1651     DataSourceInstanceID instance_id,
1652     FlushRequestID flush_id,
1653     FlushFlags flush_flags) {
1654   PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1655   auto ds = FindDataSource(backend_id, instance_id);
1656   if (!ds) {
1657     PERFETTO_ELOG("Could not find data source to flush");
1658     return true;
1659   }
1660 
1661   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1662 
1663   FlushArgsImpl flush_args;
1664   flush_args.flush_flags = flush_flags;
1665   flush_args.internal_instance_index = ds.instance_idx;
1666   flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1667                                     instance_id, ds, flush_id] {
1668     // TracingMuxerImpl is long lived, capturing |this| is okay.
1669     // The notification closure can be moved out of the StopArgs by the
1670     // embedder to handle stop asynchronously. The embedder might then
1671     // call the closure on a different thread than the current one, hence
1672     // this nested PostTask().
1673     task_runner_->PostTask(
1674         [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1675           FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1676                                    instance_id, ds, flush_id);
1677         });
1678   };
1679   {
1680     std::unique_lock<std::recursive_mutex> lock;
1681     if (ds.requires_callbacks_under_lock)
1682       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1683     ds.internal_state->data_source->OnFlush(flush_args);
1684   }
1685 
1686   // |async_flush_closure| is moved out of |flush_args| if the producer
1687   // requested to handle the flush asynchronously.
1688   bool handled = static_cast<bool>(flush_args.async_flush_closure);
1689   return handled;
1690 }
1691 
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1692 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1693     TracingBackendId backend_id,
1694     uint32_t backend_connection_id,
1695     DataSourceInstanceID instance_id,
1696     const FindDataSourceRes& ds,
1697     FlushRequestID flush_id) {
1698   PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1699   PERFETTO_DCHECK_THREAD(thread_checker_);
1700 
1701   // Check that the data source instance is still active and was not modified
1702   // while it was being flushed.
1703   if (!ds.static_state->TryGet(ds.instance_idx) ||
1704       ds.internal_state->backend_id != backend_id ||
1705       ds.internal_state->backend_connection_id != backend_connection_id ||
1706       ds.internal_state->data_source_instance_id != instance_id) {
1707     PERFETTO_ELOG("Async flush of data source %" PRIu64
1708                   " failed. This might be due to the data source being stopped "
1709                   "in the meantime",
1710                   instance_id);
1711     return;
1712   }
1713 
1714   // |producer_backends_| is append-only, Backend instances are always valid.
1715   PERFETTO_CHECK(backend_id < producer_backends_.size());
1716   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1717 
1718   ProducerImpl* producer = backend.producer.get();
1719   if (!producer)
1720     return;
1721 
1722   // If the tracing service disconnects and reconnects while a data source is
1723   // handling a flush request, there's no point is sending the flush reply to
1724   // the newly reconnected producer.
1725   if (producer->connected_ &&
1726       backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1727           backend_connection_id) {
1728     producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1729   }
1730 }
1731 
SyncProducersForTesting()1732 void TracingMuxerImpl::SyncProducersForTesting() {
1733   std::mutex mutex;
1734   std::condition_variable cv;
1735 
1736   // IPC-based producers don't report connection errors explicitly for each
1737   // command, but instead with an asynchronous callback
1738   // (ProducerImpl::OnDisconnected). This means that the sync command below
1739   // may have completed but failed to reach the service because of a
1740   // disconnection, but we can't tell until the disconnection message comes
1741   // through. To guard against this, we run two whole rounds of sync round-trips
1742   // before returning; the first one will detect any disconnected producers and
1743   // the second one will ensure any reconnections have completed and all data
1744   // sources are registered in the service again.
1745   for (size_t i = 0; i < 2; i++) {
1746     size_t countdown = std::numeric_limits<size_t>::max();
1747     task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1748       {
1749         std::unique_lock<std::mutex> countdown_lock(mutex);
1750         countdown = producer_backends_.size();
1751       }
1752       for (auto& backend : producer_backends_) {
1753         auto* producer = backend.producer.get();
1754         producer->service_->Sync([&mutex, &cv, &countdown] {
1755           std::unique_lock<std::mutex> countdown_lock(mutex);
1756           countdown--;
1757           cv.notify_one();
1758         });
1759       }
1760     });
1761 
1762     {
1763       std::unique_lock<std::mutex> countdown_lock(mutex);
1764       cv.wait(countdown_lock, [&countdown] { return !countdown; });
1765     }
1766   }
1767 
1768   // Check that all producers are indeed connected.
1769   bool done = false;
1770   bool all_producers_connected = true;
1771   task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1772     for (auto& backend : producer_backends_)
1773       all_producers_connected &= backend.producer->connected_;
1774     std::unique_lock<std::mutex> lock(mutex);
1775     done = true;
1776     cv.notify_one();
1777   });
1778 
1779   {
1780     std::unique_lock<std::mutex> lock(mutex);
1781     cv.wait(lock, [&done] { return done; });
1782   }
1783   PERFETTO_DCHECK(all_producers_connected);
1784 }
1785 
DestroyStoppedTraceWritersForCurrentThread()1786 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1787   // Iterate across all possible data source types.
1788   auto cur_generation = generation_.load(std::memory_order_acquire);
1789   auto* root_tls = GetOrCreateTracingTLS();
1790 
1791   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1792     // |tls| has a vector of per-data-source-instance thread-local state.
1793     DataSourceStaticState* static_state = tls.static_state;
1794     if (!static_state)
1795       return;  // Slot not used.
1796 
1797     // Iterate across all possible instances for this data source.
1798     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1799       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1800       if (!ds_tls.trace_writer)
1801         continue;
1802 
1803       DataSourceState* ds_state = static_state->TryGet(inst);
1804       if (ds_state &&
1805           ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1806           ds_state->backend_id == ds_tls.backend_id &&
1807           ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1808           ds_state->startup_target_buffer_reservation.load(
1809               std::memory_order_relaxed) ==
1810               ds_tls.startup_target_buffer_reservation &&
1811           ds_state->buffer_id == ds_tls.buffer_id &&
1812           ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1813         continue;
1814       }
1815 
1816       // The DataSource instance has been destroyed or recycled.
1817       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
1818     }
1819   };
1820 
1821   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1822     // |tls| has a vector of per-data-source-instance thread-local state.
1823     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1824     destroy_stopped_instances(tls);
1825   }
1826   destroy_stopped_instances(root_tls->track_event_tls);
1827   root_tls->generation = cur_generation;
1828 }
1829 
1830 // Called both when a new data source is registered or when a new backend
1831 // connects. In both cases we want to be sure we reflected the data source
1832 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1833 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1834   PERFETTO_DCHECK_THREAD(thread_checker_);
1835   for (RegisteredDataSource& rds : data_sources_) {
1836     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1837   }
1838 }
1839 
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1840 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1841                                                      bool is_changed) {
1842   PERFETTO_DCHECK_THREAD(thread_checker_);
1843   for (RegisteredProducerBackend& backend : producer_backends_) {
1844     // We cannot call RegisterDataSource on the backend before it connects.
1845     if (!backend.producer->connected_)
1846       continue;
1847 
1848     PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1849     bool is_registered = backend.producer->registered_data_sources_.test(
1850         rds.static_state->index);
1851     if (is_registered && !is_changed)
1852       continue;
1853 
1854     if (!rds.descriptor.no_flush()) {
1855       rds.descriptor.set_no_flush(rds.no_flush);
1856     }
1857     rds.descriptor.set_will_notify_on_start(true);
1858     if (!rds.descriptor.has_will_notify_on_stop()) {
1859       rds.descriptor.set_will_notify_on_stop(true);
1860     }
1861 
1862     rds.descriptor.set_handles_incremental_state_clear(true);
1863     rds.descriptor.set_id(rds.static_state->id);
1864     if (is_registered) {
1865       backend.producer->service_->UpdateDataSource(rds.descriptor);
1866     } else {
1867       backend.producer->service_->RegisterDataSource(rds.descriptor);
1868     }
1869     backend.producer->registered_data_sources_.set(rds.static_state->index);
1870   }
1871 }
1872 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1873 void TracingMuxerImpl::SetupTracingSession(
1874     TracingSessionGlobalID session_id,
1875     const std::shared_ptr<TraceConfig>& trace_config,
1876     base::ScopedFile trace_fd) {
1877   PERFETTO_DCHECK_THREAD(thread_checker_);
1878   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1879 
1880   auto* consumer = FindConsumer(session_id);
1881   if (!consumer)
1882     return;
1883 
1884   consumer->trace_config_ = trace_config;
1885   if (trace_fd)
1886     consumer->trace_fd_ = std::move(trace_fd);
1887 
1888   if (!consumer->connected_)
1889     return;
1890 
1891   // Only used in the deferred start mode.
1892   if (trace_config->deferred_start()) {
1893     consumer->service_->EnableTracing(*trace_config,
1894                                       std::move(consumer->trace_fd_));
1895   }
1896 }
1897 
StartTracingSession(TracingSessionGlobalID session_id)1898 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1899   PERFETTO_DCHECK_THREAD(thread_checker_);
1900 
1901   auto* consumer = FindConsumer(session_id);
1902 
1903   if (!consumer)
1904     return;
1905 
1906   if (!consumer->trace_config_) {
1907     PERFETTO_ELOG("Must call Setup(config) first");
1908     return;
1909   }
1910 
1911   if (!consumer->connected_) {
1912     consumer->start_pending_ = true;
1913     return;
1914   }
1915 
1916   consumer->start_pending_ = false;
1917   if (consumer->trace_config_->deferred_start()) {
1918     consumer->service_->StartTracing();
1919   } else {
1920     consumer->service_->EnableTracing(*consumer->trace_config_,
1921                                       std::move(consumer->trace_fd_));
1922   }
1923 
1924   // TODO implement support for the deferred-start + fast-triggering case.
1925 }
1926 
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1927 void TracingMuxerImpl::ChangeTracingSessionConfig(
1928     TracingSessionGlobalID session_id,
1929     const TraceConfig& trace_config) {
1930   PERFETTO_DCHECK_THREAD(thread_checker_);
1931 
1932   auto* consumer = FindConsumer(session_id);
1933 
1934   if (!consumer)
1935     return;
1936 
1937   if (!consumer->trace_config_) {
1938     // Changing the config is only supported for started sessions.
1939     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1940     return;
1941   }
1942 
1943   consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1944   if (consumer->connected_)
1945     consumer->service_->ChangeTraceConfig(trace_config);
1946 }
1947 
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1948 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1949                                            uint32_t timeout_ms,
1950                                            std::function<void(bool)> callback) {
1951   PERFETTO_DCHECK_THREAD(thread_checker_);
1952   auto* consumer = FindConsumer(session_id);
1953   if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1954       !consumer->trace_config_) {
1955     PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1956     std::move(callback)(false);
1957     return;
1958   }
1959 
1960   // For now we don't want to expose the flush reason to the consumer-side SDK
1961   // users to avoid misuses until there is a strong need.
1962   consumer->service_->Flush(timeout_ms, std::move(callback),
1963                             FlushFlags(FlushFlags::Initiator::kConsumerSdk,
1964                                        FlushFlags::Reason::kExplicit));
1965 }
1966 
StopTracingSession(TracingSessionGlobalID session_id)1967 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1968   PERFETTO_DCHECK_THREAD(thread_checker_);
1969   auto* consumer = FindConsumer(session_id);
1970   if (!consumer)
1971     return;
1972 
1973   if (consumer->start_pending_) {
1974     // If the session hasn't started yet, wait until it does before stopping.
1975     consumer->stop_pending_ = true;
1976     return;
1977   }
1978 
1979   consumer->stop_pending_ = false;
1980   if (consumer->stopped_) {
1981     // If the session was already stopped (e.g., it failed to start), don't try
1982     // stopping again.
1983     consumer->NotifyStopComplete();
1984   } else if (!consumer->trace_config_) {
1985     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1986     return;
1987   } else {
1988     consumer->service_->DisableTracing();
1989   }
1990 
1991   consumer->trace_config_.reset();
1992 }
1993 
DestroyTracingSession(TracingSessionGlobalID session_id)1994 void TracingMuxerImpl::DestroyTracingSession(
1995     TracingSessionGlobalID session_id) {
1996   PERFETTO_DCHECK_THREAD(thread_checker_);
1997   for (RegisteredConsumerBackend& backend : consumer_backends_) {
1998     // We need to find the consumer (if any) and call Disconnect as we destroy
1999     // the tracing session. We can't call Disconnect() inside this for loop
2000     // because in the in-process case this will end up to a synchronous call to
2001     // OnConsumerDisconnect which will invalidate all the iterators to
2002     // |backend.consumers|.
2003     ConsumerImpl* consumer = nullptr;
2004     for (auto& con : backend.consumers) {
2005       if (con->session_id_ == session_id) {
2006         consumer = con.get();
2007         break;
2008       }
2009     }
2010     if (consumer) {
2011       // We broke out of the loop above on the assumption that each backend will
2012       // only have a single consumer per session. This DCHECK ensures that
2013       // this is the case.
2014       PERFETTO_DCHECK(
2015           std::count_if(backend.consumers.begin(), backend.consumers.end(),
2016                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2017                           return con->session_id_ == session_id;
2018                         }) == 1u);
2019       consumer->Disconnect();
2020     }
2021   }
2022 }
2023 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2024 void TracingMuxerImpl::ReadTracingSessionData(
2025     TracingSessionGlobalID session_id,
2026     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2027   PERFETTO_DCHECK_THREAD(thread_checker_);
2028   auto* consumer = FindConsumer(session_id);
2029   if (!consumer) {
2030     // TODO(skyostil): Signal an error to the user.
2031     TracingSession::ReadTraceCallbackArgs callback_arg{};
2032     callback(callback_arg);
2033     return;
2034   }
2035   PERFETTO_DCHECK(!consumer->read_trace_callback_);
2036   consumer->read_trace_callback_ = std::move(callback);
2037   consumer->service_->ReadBuffers();
2038 }
2039 
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2040 void TracingMuxerImpl::GetTraceStats(
2041     TracingSessionGlobalID session_id,
2042     TracingSession::GetTraceStatsCallback callback) {
2043   PERFETTO_DCHECK_THREAD(thread_checker_);
2044   auto* consumer = FindConsumer(session_id);
2045   if (!consumer) {
2046     TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2047     callback_arg.success = false;
2048     callback(std::move(callback_arg));
2049     return;
2050   }
2051   PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2052   consumer->get_trace_stats_callback_ = std::move(callback);
2053   if (!consumer->connected_) {
2054     consumer->get_trace_stats_pending_ = true;
2055     return;
2056   }
2057   consumer->get_trace_stats_pending_ = false;
2058   consumer->service_->GetTraceStats();
2059 }
2060 
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2061 void TracingMuxerImpl::QueryServiceState(
2062     TracingSessionGlobalID session_id,
2063     TracingSession::QueryServiceStateCallback callback) {
2064   PERFETTO_DCHECK_THREAD(thread_checker_);
2065   auto* consumer = FindConsumer(session_id);
2066   if (!consumer) {
2067     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2068     callback_arg.success = false;
2069     callback(std::move(callback_arg));
2070     return;
2071   }
2072   PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2073   if (!consumer->connected_) {
2074     consumer->query_service_state_callback_ = std::move(callback);
2075     return;
2076   }
2077   auto callback_wrapper = [callback](bool success,
2078                                      protos::gen::TracingServiceState state) {
2079     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2080     callback_arg.success = success;
2081     callback_arg.service_state_data = state.SerializeAsArray();
2082     callback(std::move(callback_arg));
2083   };
2084   consumer->service_->QueryServiceState({}, std::move(callback_wrapper));
2085 }
2086 
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2087 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2088     uint32_t batch_commits_duration_ms,
2089     BackendType backend_type) {
2090   for (RegisteredProducerBackend& backend : producer_backends_) {
2091     if (backend.producer && backend.producer->connected_ &&
2092         backend.type == backend_type) {
2093       backend.producer->service_->MaybeSharedMemoryArbiter()
2094           ->SetBatchCommitsDuration(batch_commits_duration_ms);
2095     }
2096   }
2097 }
2098 
EnableDirectSMBPatchingForTesting(BackendType backend_type)2099 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2100     BackendType backend_type) {
2101   for (RegisteredProducerBackend& backend : producer_backends_) {
2102     if (backend.producer && backend.producer->connected_ &&
2103         backend.type == backend_type &&
2104         !backend.producer->service_->MaybeSharedMemoryArbiter()
2105              ->EnableDirectSMBPatching()) {
2106       return false;
2107     }
2108   }
2109   return true;
2110 }
2111 
FindConsumer(TracingSessionGlobalID session_id)2112 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2113     TracingSessionGlobalID session_id) {
2114   PERFETTO_DCHECK_THREAD(thread_checker_);
2115   return FindConsumerAndBackend(session_id).first;
2116 }
2117 
2118 std::pair<TracingMuxerImpl::ConsumerImpl*,
2119           TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2120 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2121   PERFETTO_DCHECK_THREAD(thread_checker_);
2122   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2123     for (auto& consumer : backend.consumers) {
2124       if (consumer->session_id_ == session_id) {
2125         return {consumer.get(), &backend};
2126       }
2127     }
2128   }
2129   return {nullptr, nullptr};
2130 }
2131 
InitializeConsumer(TracingSessionGlobalID session_id)2132 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2133   PERFETTO_DCHECK_THREAD(thread_checker_);
2134 
2135   auto res = FindConsumerAndBackend(session_id);
2136   if (!res.first || !res.second)
2137     return;
2138   TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2139   RegisteredConsumerBackend& backend = *res.second;
2140 
2141   TracingBackend::ConnectConsumerArgs conn_args;
2142   conn_args.consumer = consumer;
2143   conn_args.task_runner = task_runner_.get();
2144   consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2145 }
2146 
OnConsumerDisconnected(ConsumerImpl * consumer)2147 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2148   PERFETTO_DCHECK_THREAD(thread_checker_);
2149   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2150     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2151       return con.get() == consumer;
2152     };
2153     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2154                                            backend.consumers.end(), pred),
2155                             backend.consumers.end());
2156   }
2157 }
2158 
SetMaxProducerReconnectionsForTesting(uint32_t count)2159 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2160   max_producer_reconnections_.store(count);
2161 }
2162 
OnProducerDisconnected(ProducerImpl * producer)2163 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2164   PERFETTO_DCHECK_THREAD(thread_checker_);
2165   for (RegisteredProducerBackend& backend : producer_backends_) {
2166     if (backend.producer.get() != producer)
2167       continue;
2168 
2169     // The tracing service is disconnected. It does not make sense to keep
2170     // tracing (we wouldn't be able to commit). On reconnection, the tracing
2171     // service will restart the data sources.
2172     for (const auto& rds : data_sources_) {
2173       DataSourceStaticState* static_state = rds.static_state;
2174       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2175         auto* internal_state = static_state->TryGet(i);
2176         if (internal_state && internal_state->backend_id == backend.id &&
2177             internal_state->backend_connection_id ==
2178                 backend.producer->connection_id_.load(
2179                     std::memory_order_relaxed)) {
2180           StopDataSource_AsyncBeginImpl(
2181               FindDataSourceRes(static_state, internal_state, i,
2182                                 rds.requires_callbacks_under_lock));
2183         }
2184       }
2185     }
2186 
2187     // Try reconnecting the disconnected producer. If the connection succeeds,
2188     // all the data sources will be automatically re-registered.
2189     if (producer->connection_id_.load(std::memory_order_relaxed) >
2190         max_producer_reconnections_.load()) {
2191       // Avoid reconnecting a failing producer too many times. Instead we just
2192       // leak the producer instead of trying to avoid further complicating
2193       // cross-thread trace writer creation.
2194       PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2195       continue;
2196     }
2197 
2198     backend.producer->Initialize(
2199         backend.backend->ConnectProducer(backend.producer_conn_args));
2200     // Don't use producer-provided SMBs for the next connection unless startup
2201     // tracing requires it again.
2202     backend.producer_conn_args.use_producer_provided_smb = false;
2203   }
2204 }
2205 
SweepDeadBackends()2206 void TracingMuxerImpl::SweepDeadBackends() {
2207   PERFETTO_DCHECK_THREAD(thread_checker_);
2208   for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2209     auto next_it = it;
2210     next_it++;
2211     if (it->producer->SweepDeadServices())
2212       dead_backends_.erase(it);
2213     it = next_it;
2214   }
2215 }
2216 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2217 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2218     TracingBackendId backend_id,
2219     DataSourceInstanceID instance_id) {
2220   PERFETTO_DCHECK_THREAD(thread_checker_);
2221   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2222   for (const auto& rds : data_sources_) {
2223     DataSourceStaticState* static_state = rds.static_state;
2224     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2225       auto* internal_state = static_state->TryGet(i);
2226       if (internal_state && internal_state->backend_id == backend_id &&
2227           internal_state->backend_connection_id ==
2228               backend.producer->connection_id_.load(
2229                   std::memory_order_relaxed) &&
2230           internal_state->data_source_instance_id == instance_id) {
2231         return FindDataSourceRes(static_state, internal_state, i,
2232                                  rds.requires_callbacks_under_lock);
2233       }
2234     }
2235   }
2236   return FindDataSourceRes();
2237 }
2238 
2239 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2240 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2241     DataSourceStaticState* static_state,
2242     uint32_t data_source_instance_index,
2243     DataSourceState* data_source,
2244     BufferExhaustedPolicy buffer_exhausted_policy) {
2245   if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2246     // If the session is being intercepted, return a heap-backed trace writer
2247     // instead. This is safe because all the data given to the interceptor is
2248     // either thread-local (|instance_index|), statically allocated
2249     // (|static_state|) or constant after initialization (|interceptor|). Access
2250     // to the interceptor instance itself through |data_source| is protected by
2251     // a statically allocated lock (similarly to the data source instance).
2252     auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2253     return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2254         interceptor.tls_factory(static_state, data_source_instance_index),
2255         interceptor.packet_callback, static_state, data_source_instance_index));
2256   }
2257   ProducerImpl* producer =
2258       FindProducerBackendById(data_source->backend_id)->producer.get();
2259   // Atomically load the current service endpoint. We keep the pointer as a
2260   // shared pointer on the stack to guard against it from being concurrently
2261   // modified on the thread by ProducerImpl::Initialize() swapping in a
2262   // reconnected service on the muxer task runner thread.
2263   //
2264   // The endpoint may also be concurrently modified by SweepDeadServices()
2265   // clearing out old disconnected services. We guard against that by
2266   // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2267   // shutdown has started, the trace writer created below will be a null one
2268   // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2269   //
2270   // We use an atomic pointer instead of holding a lock because
2271   // CreateTraceWriter posts tasks under the hood.
2272   std::shared_ptr<ProducerEndpoint> service =
2273       std::atomic_load(&producer->service_);
2274 
2275   // The service may have been disconnected and reconnected concurrently after
2276   // the data source was enabled, in which case we may not have an arbiter, or
2277   // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2278   // Instead, early-out now. A relaxed load is fine here because the atomic_load
2279   // above ensures that the |service| isn't newer.
2280   if (producer->connection_id_.load(std::memory_order_relaxed) !=
2281       data_source->backend_connection_id) {
2282     return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2283   }
2284 
2285   // We just need a relaxed atomic read here: We can use the reservation ID even
2286   // after the buffer was bound, we just need to be sure to read it atomically.
2287   uint16_t startup_buffer_reservation =
2288       data_source->startup_target_buffer_reservation.load(
2289           std::memory_order_relaxed);
2290   if (startup_buffer_reservation) {
2291     return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2292         startup_buffer_reservation);
2293   }
2294   return service->CreateTraceWriter(data_source->buffer_id,
2295                                     buffer_exhausted_policy);
2296 }
2297 
2298 // This is called via the public API Tracing::NewTrace().
2299 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2300 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2301     BackendType requested_backend_type,
2302     TracingConsumerBackend* (*system_backend_factory)()) {
2303   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2304 
2305   // |backend_type| can only specify one backend, not an OR-ed mask.
2306   PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2307 
2308   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2309   task_runner_->PostTask([this, requested_backend_type, session_id,
2310                           system_backend_factory] {
2311     if (requested_backend_type == kSystemBackend && system_backend_factory &&
2312         !FindConsumerBackendByType(kSystemBackend)) {
2313       AddConsumerBackend(system_backend_factory(), kSystemBackend);
2314     }
2315     for (RegisteredConsumerBackend& backend : consumer_backends_) {
2316       if (requested_backend_type && backend.type &&
2317           backend.type != requested_backend_type) {
2318         continue;
2319       }
2320 
2321       // Create the consumer now, even if we have to ask the embedder below, so
2322       // that any other tasks executing after this one can find the consumer and
2323       // change its pending attributes.
2324       backend.consumers.emplace_back(
2325           new ConsumerImpl(this, backend.type, session_id));
2326 
2327       // The last registered backend in |consumer_backends_| is the unsupported
2328       // backend without a valid type.
2329       if (!backend.type) {
2330         PERFETTO_ELOG(
2331             "No tracing backend ready for type=%d, consumer will disconnect",
2332             requested_backend_type);
2333         InitializeConsumer(session_id);
2334         return;
2335       }
2336 
2337       // Check if the embedder wants to be asked for permission before
2338       // connecting the consumer.
2339       if (!policy_) {
2340         InitializeConsumer(session_id);
2341         return;
2342       }
2343 
2344       BackendType type = backend.type;
2345       TracingPolicy::ShouldAllowConsumerSessionArgs args;
2346       args.backend_type = backend.type;
2347       args.result_callback = [this, type, session_id](bool allow) {
2348         task_runner_->PostTask([this, type, session_id, allow] {
2349           if (allow) {
2350             InitializeConsumer(session_id);
2351             return;
2352           }
2353 
2354           PERFETTO_ELOG(
2355               "Consumer session for backend type type=%d forbidden, "
2356               "consumer will disconnect",
2357               type);
2358 
2359           auto* consumer = FindConsumer(session_id);
2360           if (!consumer)
2361             return;
2362 
2363           consumer->OnDisconnect();
2364         });
2365       };
2366       policy_->ShouldAllowConsumerSession(args);
2367       return;
2368     }
2369     PERFETTO_DFATAL("Not reached");
2370   });
2371 
2372   return std::unique_ptr<TracingSession>(
2373       new TracingSessionImpl(this, session_id, requested_backend_type));
2374 }
2375 
2376 // static
2377 // This is called via the public API Tracing::SetupStartupTracing().
2378 // Can be called from any thread.
2379 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2380 TracingMuxerImpl::CreateStartupTracingSession(
2381     const TraceConfig& config,
2382     Tracing::SetupStartupTracingOpts opts) {
2383   BackendType backend_type = opts.backend;
2384   // |backend_type| can only specify one backend, not an OR-ed mask.
2385   PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2386   // The in-process backend doesn't support startup tracing.
2387   PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2388 
2389   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2390 
2391   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2392   task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2393     for (RegisteredProducerBackend& backend : producer_backends_) {
2394       if (backend_type && backend.type && backend.type != backend_type) {
2395         continue;
2396       }
2397 
2398       TracingBackendId backend_id = backend.id;
2399 
2400       // The last registered backend in |producer_backends_| is the unsupported
2401       // backend without a valid type.
2402       if (!backend.type) {
2403         PERFETTO_ELOG(
2404             "No tracing backend initialized for type=%d, startup tracing "
2405             "failed",
2406             backend_type);
2407         if (opts.on_setup)
2408           opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2409               0 /* num_data_sources_started */});
2410         return;
2411       }
2412 
2413       if (!backend.producer->service_ ||
2414           !backend.producer->service_->shared_memory()) {
2415         // If we unsuccessfully attempted to use a producer-provided SMB in the
2416         // past, don't try again.
2417         if (backend.producer->producer_provided_smb_failed_) {
2418           PERFETTO_ELOG(
2419               "Backend %zu doesn't seem to support producer-provided "
2420               "SMBs, startup tracing failed",
2421               backend_id);
2422           if (opts.on_setup)
2423             opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2424                 0 /* num_data_sources_started */});
2425           return;
2426         }
2427 
2428         PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2429                       backend_id);
2430         backend.producer_conn_args.use_producer_provided_smb = true;
2431         backend.producer->service_->Disconnect();  // Causes a reconnect.
2432         PERFETTO_DCHECK(backend.producer->service_ &&
2433                         backend.producer->service_->MaybeSharedMemoryArbiter());
2434       }
2435 
2436       RegisteredStartupSession session;
2437       session.session_id = session_id;
2438       session.on_aborted = opts.on_aborted;
2439       session.on_adopted = opts.on_adopted;
2440 
2441       for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2442         // Find all matching data sources and start one instance of each.
2443         for (const auto& rds : data_sources_) {
2444           if (rds.descriptor.name() != ds_cfg.config().name())
2445             continue;
2446 
2447           PERFETTO_DLOG(
2448               "Setting up data source %s for startup tracing with target "
2449               "buffer reservation %" PRIi32,
2450               rds.descriptor.name().c_str(),
2451               backend.producer->last_startup_target_buffer_reservation_ + 1u);
2452           auto ds = SetupDataSourceImpl(
2453               rds, backend_id,
2454               backend.producer->connection_id_.load(std::memory_order_relaxed),
2455               /*instance_id=*/0, ds_cfg.config(),
2456               /*startup_session_id=*/session_id);
2457           if (ds) {
2458             StartDataSourceImpl(ds);
2459             session.num_unbound_data_sources++;
2460           }
2461         }
2462       }
2463 
2464       int num_ds = session.num_unbound_data_sources;
2465       auto on_setup = opts.on_setup;
2466       if (on_setup) {
2467         backend.producer->OnStartupTracingSetup();
2468         task_runner_->PostTask([on_setup, num_ds] {
2469           on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2470         });
2471       }
2472 
2473       if (num_ds > 0) {
2474         backend.startup_sessions.push_back(std::move(session));
2475 
2476         if (opts.timeout_ms > 0) {
2477           task_runner_->PostDelayedTask(
2478               [this, session_id, backend_type] {
2479                 AbortStartupTracingSession(session_id, backend_type);
2480               },
2481               opts.timeout_ms);
2482         }
2483       }
2484       return;
2485     }
2486     PERFETTO_DFATAL("Invalid startup tracing session backend");
2487   });
2488 
2489   return std::unique_ptr<StartupTracingSession>(
2490       new StartupTracingSessionImpl(this, session_id, backend_type));
2491 }
2492 
2493 // Must not be called from the SDK's internal thread.
2494 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2495 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2496     const TraceConfig& config,
2497     Tracing::SetupStartupTracingOpts opts) {
2498   auto previous_on_setup = std::move(opts.on_setup);
2499   PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2500   base::WaitableEvent event;
2501   // It is safe to capture by reference because once on_setup is called only
2502   // once before this method returns.
2503   opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2504     if (previous_on_setup) {
2505       previous_on_setup(std::move(args));
2506     }
2507     event.Notify();
2508   };
2509   auto session = CreateStartupTracingSession(config, std::move(opts));
2510   event.Wait();
2511   return session;
2512 }
2513 
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2514 void TracingMuxerImpl::AbortStartupTracingSession(
2515     TracingSessionGlobalID session_id,
2516     BackendType backend_type) {
2517   PERFETTO_DCHECK_THREAD(thread_checker_);
2518 
2519   for (RegisteredProducerBackend& backend : producer_backends_) {
2520     if (backend_type != backend.type)
2521       continue;
2522 
2523     auto session_it = std::find_if(
2524         backend.startup_sessions.begin(), backend.startup_sessions.end(),
2525         [session_id](const RegisteredStartupSession& session) {
2526           return session.session_id == session_id;
2527         });
2528 
2529     // The startup session may have already been aborted or fully adopted.
2530     if (session_it == backend.startup_sessions.end())
2531       return;
2532     if (session_it->is_aborting)
2533       return;
2534 
2535     session_it->is_aborting = true;
2536 
2537     // Iterate all data sources and abort them if they weren't adopted yet.
2538     for (const auto& rds : data_sources_) {
2539       DataSourceStaticState* static_state = rds.static_state;
2540       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2541         auto* internal_state = static_state->TryGet(i);
2542         if (internal_state &&
2543             internal_state->startup_target_buffer_reservation.load(
2544                 std::memory_order_relaxed) &&
2545             internal_state->data_source_instance_id == 0 &&
2546             internal_state->startup_session_id == session_id) {
2547           PERFETTO_DLOG(
2548               "Aborting startup tracing for data source %s (target buffer "
2549               "reservation %" PRIu16 ")",
2550               rds.descriptor.name().c_str(),
2551               internal_state->startup_target_buffer_reservation.load(
2552                   std::memory_order_relaxed));
2553 
2554           // Abort the instance asynchronously by stopping it. From this point
2555           // onwards, the service will not be able to adopt it via
2556           // StartDataSource().
2557           session_it->num_aborting_data_sources++;
2558           StopDataSource_AsyncBeginImpl(
2559               FindDataSourceRes(static_state, internal_state, i,
2560                                 rds.requires_callbacks_under_lock));
2561         }
2562       }
2563     }
2564 
2565     // If we did everything right, we should have aborted all still-unbound data
2566     // source instances.
2567     PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2568                     session_it->num_aborting_data_sources);
2569 
2570     if (session_it->num_aborting_data_sources == 0) {
2571       if (session_it->on_aborted)
2572         task_runner_->PostTask(session_it->on_aborted);
2573 
2574       backend.startup_sessions.erase(session_it);
2575     }
2576     return;
2577   }
2578   // We might reach here in tests because when we start a trace, we post the
2579   // Task(AbortStartupTrace, delay=timeout). When we do
2580   // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2581   // kill those delayed tasks because TaskRunner doesn't have support for
2582   // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2583   // to wait for the completion of all the scheduled tasks (apart from
2584   // deleting the TaskRunner) and we want to avoid doing that because we need
2585   // a long running TaskRunner in muxer.
2586   PERFETTO_DLOG("Invalid startup tracing session backend");
2587 }
2588 
InitializeInstance(const TracingInitArgs & args)2589 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2590   if (instance_ != TracingMuxerFake::Get()) {
2591     // The tracing muxer was already initialized. We might need to initialize
2592     // additional backends that were not configured earlier.
2593     auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2594     muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2595     return;
2596   }
2597   // If we previously had a TracingMuxerImpl instance which was reset,
2598   // reinitialize and reuse it instead of trying to create a new one. See
2599   // ResetForTesting().
2600   if (g_prev_instance) {
2601     auto* muxer = g_prev_instance;
2602     g_prev_instance = nullptr;
2603     instance_ = muxer;
2604     muxer->task_runner_->PostTask([muxer, args] {
2605       muxer->Initialize(args);
2606       muxer->AddBackends(args);
2607     });
2608   } else {
2609     new TracingMuxerImpl(args);
2610   }
2611 }
2612 
2613 // static
ResetForTesting()2614 void TracingMuxerImpl::ResetForTesting() {
2615   // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2616   // various objects make that a non-starter. In particular:
2617   //
2618   // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2619   //    a reference back to ProducerImpl::service_.
2620   //
2621   // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2622   //
2623   // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2624   //    turn depends on TracingMuxerImpl itself.
2625   //
2626   // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2627   // threads have dropped their TraceWriters. Since we can't really ask the
2628   // caller to guarantee this, we'll instead reset enough of the muxer's state
2629   // so that it can be reinitialized later and ensure all necessary objects from
2630   // the old state remain alive until all references have gone away.
2631   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2632 
2633   base::WaitableEvent reset_done;
2634   auto do_reset = [muxer, &reset_done] {
2635     muxer->DestroyStoppedTraceWritersForCurrentThread();
2636     // Unregister all data sources so they don't interfere with any future
2637     // tracing sessions.
2638     for (RegisteredDataSource& rds : muxer->data_sources_) {
2639       for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2640         if (!backend.producer->service_ || !backend.producer->connected_)
2641           continue;
2642         backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2643       }
2644     }
2645     for (auto& backend : muxer->consumer_backends_) {
2646       // Check that no consumer session is currently active on any backend.
2647       for (auto& consumer : backend.consumers)
2648         PERFETTO_CHECK(!consumer->service_);
2649     }
2650     for (auto& backend : muxer->producer_backends_) {
2651       backend.producer->muxer_ = nullptr;
2652       backend.producer->DisposeConnection();
2653       muxer->dead_backends_.push_back(std::move(backend));
2654     }
2655     muxer->consumer_backends_.clear();
2656     muxer->producer_backends_.clear();
2657     muxer->interceptors_.clear();
2658 
2659     for (auto& ds : muxer->data_sources_) {
2660       ds.static_state->ResetForTesting();
2661     }
2662 
2663     muxer->data_sources_.clear();
2664     muxer->next_data_source_index_ = 0;
2665 
2666     // Free all backends without active trace writers or other inbound
2667     // references. Note that even if all the backends get swept, the muxer still
2668     // needs to stay around since |task_runner_| is assumed to be long-lived.
2669     muxer->SweepDeadBackends();
2670 
2671     // Make sure we eventually discard any per-thread trace writers from the
2672     // previous instance.
2673     muxer->muxer_id_for_testing_++;
2674 
2675     g_prev_instance = muxer;
2676     instance_ = TracingMuxerFake::Get();
2677 
2678     // Call the user provided cleanups on the muxer thread.
2679     for (auto& cb : muxer->reset_callbacks_) {
2680       cb();
2681     }
2682 
2683     reset_done.Notify();
2684   };
2685 
2686   // Some tests run the muxer and the test on the same thread. In these cases,
2687   // we can reset synchronously.
2688   if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2689     do_reset();
2690   } else {
2691     muxer->DestroyStoppedTraceWritersForCurrentThread();
2692     muxer->task_runner_->PostTask(std::move(do_reset));
2693     reset_done.Wait();
2694     // Call the user provided cleanups also on this thread.
2695     for (auto& cb : muxer->reset_callbacks_) {
2696       cb();
2697     }
2698   }
2699   muxer->reset_callbacks_.clear();
2700 }
2701 
2702 // static
Shutdown()2703 void TracingMuxerImpl::Shutdown() {
2704   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2705 
2706   // Shutting down on the muxer thread would lead to a deadlock.
2707   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2708   muxer->DestroyStoppedTraceWritersForCurrentThread();
2709 
2710   std::unique_ptr<base::TaskRunner> owned_task_runner(
2711       muxer->task_runner_.get());
2712   base::WaitableEvent shutdown_done;
2713   owned_task_runner->PostTask([muxer, &shutdown_done] {
2714     // Check that no consumer session is currently active on any backend.
2715     // Producers will be automatically disconnected as a part of deleting the
2716     // muxer below.
2717     for (auto& backend : muxer->consumer_backends_) {
2718       for (auto& consumer : backend.consumers) {
2719         PERFETTO_CHECK(!consumer->service_);
2720       }
2721     }
2722     // Make sure no trace writers are lingering around on the muxer thread. Note
2723     // that we can't do this for any arbitrary thread in the process; it is the
2724     // caller's responsibility to clean them up before shutting down Perfetto.
2725     muxer->DestroyStoppedTraceWritersForCurrentThread();
2726     // The task runner must be deleted outside the muxer thread. This is done by
2727     // `owned_task_runner` above.
2728     muxer->task_runner_.release();
2729     auto* platform = muxer->platform_;
2730     delete muxer;
2731     instance_ = TracingMuxerFake::Get();
2732     platform->Shutdown();
2733     shutdown_done.Notify();
2734   });
2735   shutdown_done.Wait();
2736 }
2737 
AppendResetForTestingCallback(std::function<void ()> cb)2738 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2739   reset_callbacks_.push_back(std::move(cb));
2740 }
2741 
2742 TracingMuxer::~TracingMuxer() = default;
2743 
2744 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2745               "public's BufferId and tracing/core's BufferID diverged");
2746 
2747 }  // namespace internal
2748 }  // namespace perfetto
2749