• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23 
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/base/time.h"
28 #include "perfetto/ext/base/hash.h"
29 #include "perfetto/ext/base/thread_checker.h"
30 #include "perfetto/ext/base/waitable_event.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/ext/tracing/core/trace_packet.h"
33 #include "perfetto/ext/tracing/core/trace_stats.h"
34 #include "perfetto/ext/tracing/core/trace_writer.h"
35 #include "perfetto/ext/tracing/core/tracing_service.h"
36 #include "perfetto/tracing/buffer_exhausted_policy.h"
37 #include "perfetto/tracing/core/data_source_config.h"
38 #include "perfetto/tracing/core/tracing_service_state.h"
39 #include "perfetto/tracing/data_source.h"
40 #include "perfetto/tracing/internal/data_source_internal.h"
41 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
42 #include "perfetto/tracing/internal/tracing_backend_fake.h"
43 #include "perfetto/tracing/trace_writer_base.h"
44 #include "perfetto/tracing/tracing.h"
45 #include "perfetto/tracing/tracing_backend.h"
46 
47 #include "protos/perfetto/config/interceptor_config.gen.h"
48 
49 #include "src/tracing/internal/tracing_muxer_fake.h"
50 
51 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
52 #include <io.h>  // For dup()
53 #else
54 #include <unistd.h>  // For dup()
55 #endif
56 
57 namespace perfetto {
58 namespace internal {
59 
60 namespace {
61 
62 // A task runner which prevents calls to DataSource::Trace() while an operation
63 // is in progress. Used to guard against unexpected re-entrancy where the
64 // user-provided task runner implementation tries to enter a trace point under
65 // the hood.
66 class NonReentrantTaskRunner : public base::TaskRunner {
67  public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)68   NonReentrantTaskRunner(TracingMuxer* muxer,
69                          std::unique_ptr<base::TaskRunner> task_runner)
70       : muxer_(muxer), task_runner_(std::move(task_runner)) {}
71 
72   // base::TaskRunner implementation.
PostTask(std::function<void ()> task)73   void PostTask(std::function<void()> task) override {
74     CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
75   }
76 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)77   void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
78     CallWithGuard(
79         [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
80   }
81 
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)82   void AddFileDescriptorWatch(base::PlatformHandle fd,
83                               std::function<void()> callback) override {
84     CallWithGuard(
85         [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
86   }
87 
RemoveFileDescriptorWatch(base::PlatformHandle fd)88   void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
89     CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
90   }
91 
RunsTasksOnCurrentThread() const92   bool RunsTasksOnCurrentThread() const override {
93     bool result;
94     CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
95     return result;
96   }
97 
98  private:
99   template <typename T>
CallWithGuard(T lambda) const100   void CallWithGuard(T lambda) const {
101     auto* root_tls = muxer_->GetOrCreateTracingTLS();
102     if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
103       lambda();
104       return;
105     }
106     ScopedReentrancyAnnotator scoped_annotator(*root_tls);
107     lambda();
108   }
109 
110   TracingMuxer* const muxer_;
111   std::unique_ptr<base::TaskRunner> task_runner_;
112 };
113 
114 class StopArgsImpl : public DataSourceBase::StopArgs {
115  public:
HandleStopAsynchronously() const116   std::function<void()> HandleStopAsynchronously() const override {
117     auto closure = std::move(async_stop_closure);
118     async_stop_closure = std::function<void()>();
119     return closure;
120   }
121 
122   mutable std::function<void()> async_stop_closure;
123 };
124 
ComputeConfigHash(const DataSourceConfig & config)125 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
126   base::Hash hasher;
127   std::string config_bytes = config.SerializeAsString();
128   hasher.Update(config_bytes.data(), config_bytes.size());
129   return hasher.digest();
130 }
131 
132 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
133 static TracingMuxerImpl* g_prev_instance{};
134 
135 }  // namespace
136 
137 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)138 TracingMuxerImpl::ProducerImpl::ProducerImpl(
139     TracingMuxerImpl* muxer,
140     TracingBackendId backend_id,
141     uint32_t shmem_batch_commits_duration_ms)
142     : muxer_(muxer),
143       backend_id_(backend_id),
144       shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
145 
~ProducerImpl()146 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
147   muxer_ = nullptr;
148 }
149 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)150 void TracingMuxerImpl::ProducerImpl::Initialize(
151     std::unique_ptr<ProducerEndpoint> endpoint) {
152   PERFETTO_DCHECK_THREAD(thread_checker_);
153   PERFETTO_DCHECK(!connected_);
154   connection_id_++;
155 
156   // Adopt the endpoint into a shared pointer so that we can safely share it
157   // across threads that create trace writers. The custom deleter function
158   // ensures that the endpoint is always destroyed on the muxer's thread. (Note
159   // that |task_runner| is assumed to outlive tracing sessions on all threads.)
160   auto* task_runner = muxer_->task_runner_.get();
161   auto deleter = [task_runner](ProducerEndpoint* e) {
162     if (task_runner->RunsTasksOnCurrentThread()) {
163       delete e;
164       return;
165     }
166     task_runner->PostTask([e] { delete e; });
167   };
168   std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
169   // This atomic store is needed because another thread might be concurrently
170   // creating a trace writer using the previous (disconnected) |service_|. See
171   // CreateTraceWriter().
172   std::atomic_store(&service_, std::move(service));
173   // Don't try to use the service here since it may not have connected yet. See
174   // OnConnect().
175 }
176 
OnConnect()177 void TracingMuxerImpl::ProducerImpl::OnConnect() {
178   PERFETTO_DLOG("Producer connected");
179   PERFETTO_DCHECK_THREAD(thread_checker_);
180   PERFETTO_DCHECK(!connected_);
181   connected_ = true;
182   muxer_->UpdateDataSourcesOnAllBackends();
183 }
184 
OnDisconnect()185 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
186   PERFETTO_DCHECK_THREAD(thread_checker_);
187   // If we're being destroyed, bail out.
188   if (!muxer_)
189     return;
190   connected_ = false;
191   // Active data sources for this producer will be stopped by
192   // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
193   // will have a different connection id (even before it has finished
194   // connecting).
195   registered_data_sources_.reset();
196   DisposeConnection();
197 
198   // Try reconnecting the producer.
199   muxer_->OnProducerDisconnected(this);
200 }
201 
DisposeConnection()202 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
203   // Keep the old service around as a dead connection in case it has active
204   // trace writers. If any tracing sessions were created, we can't clear
205   // |service_| here because other threads may be concurrently creating new
206   // trace writers. Any reconnection attempt will atomically swap the new
207   // service in place of the old one.
208   if (did_setup_tracing_) {
209     dead_services_.push_back(service_);
210   } else {
211     service_.reset();
212   }
213 }
214 
OnTracingSetup()215 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
216   PERFETTO_DCHECK_THREAD(thread_checker_);
217   did_setup_tracing_ = true;
218   service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
219       shmem_batch_commits_duration_ms_);
220 }
221 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)222 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
223     DataSourceInstanceID id,
224     const DataSourceConfig& cfg) {
225   PERFETTO_DCHECK_THREAD(thread_checker_);
226   if (!muxer_)
227     return;
228   muxer_->SetupDataSource(backend_id_, connection_id_, id, cfg);
229 }
230 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)231 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
232                                                      const DataSourceConfig&) {
233   PERFETTO_DCHECK_THREAD(thread_checker_);
234   if (!muxer_)
235     return;
236   muxer_->StartDataSource(backend_id_, id);
237   service_->NotifyDataSourceStarted(id);
238 }
239 
StopDataSource(DataSourceInstanceID id)240 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
241   PERFETTO_DCHECK_THREAD(thread_checker_);
242   if (!muxer_)
243     return;
244   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
245 }
246 
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)247 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
248                                            const DataSourceInstanceID*,
249                                            size_t) {
250   // Flush is not plumbed for now, we just ack straight away.
251   PERFETTO_DCHECK_THREAD(thread_checker_);
252   service_->NotifyFlushComplete(flush_id);
253 }
254 
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)255 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
256     const DataSourceInstanceID* instances,
257     size_t instance_count) {
258   PERFETTO_DCHECK_THREAD(thread_checker_);
259   if (!muxer_)
260     return;
261   for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
262     muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
263   }
264 }
265 
SweepDeadServices()266 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
267   PERFETTO_DCHECK_THREAD(thread_checker_);
268   auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
269     auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
270     return !arbiter || arbiter->TryShutdown();
271   };
272   for (auto it = dead_services_.begin(); it != dead_services_.end();) {
273     auto next_it = it;
274     next_it++;
275     if (is_unused(*it)) {
276       dead_services_.erase(it);
277     }
278     it = next_it;
279   }
280   return dead_services_.empty();
281 }
282 
283 // ----- End of TracingMuxerImpl::ProducerImpl methods.
284 
285 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingBackendId backend_id,TracingSessionGlobalID session_id)286 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
287                                              BackendType backend_type,
288                                              TracingBackendId backend_id,
289                                              TracingSessionGlobalID session_id)
290     : muxer_(muxer),
291       backend_type_(backend_type),
292       backend_id_(backend_id),
293       session_id_(session_id) {}
294 
~ConsumerImpl()295 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
296   muxer_ = nullptr;
297 }
298 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)299 void TracingMuxerImpl::ConsumerImpl::Initialize(
300     std::unique_ptr<ConsumerEndpoint> endpoint) {
301   PERFETTO_DCHECK_THREAD(thread_checker_);
302   service_ = std::move(endpoint);
303   // Don't try to use the service here since it may not have connected yet. See
304   // OnConnect().
305 }
306 
OnConnect()307 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
308   PERFETTO_DCHECK_THREAD(thread_checker_);
309   PERFETTO_DCHECK(!connected_);
310   connected_ = true;
311 
312   // Observe data source instance events so we get notified when tracing starts.
313   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
314                           ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
315 
316   // If the API client configured and started tracing before we connected,
317   // tell the backend about it now.
318   if (trace_config_)
319     muxer_->SetupTracingSession(session_id_, trace_config_);
320   if (start_pending_)
321     muxer_->StartTracingSession(session_id_);
322   if (get_trace_stats_pending_) {
323     auto callback = std::move(get_trace_stats_callback_);
324     get_trace_stats_callback_ = nullptr;
325     muxer_->GetTraceStats(session_id_, std::move(callback));
326   }
327   if (query_service_state_callback_) {
328     auto callback = std::move(query_service_state_callback_);
329     query_service_state_callback_ = nullptr;
330     muxer_->QueryServiceState(session_id_, std::move(callback));
331   }
332   if (stop_pending_)
333     muxer_->StopTracingSession(session_id_);
334 }
335 
OnDisconnect()336 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
337   PERFETTO_DCHECK_THREAD(thread_checker_);
338   // If we're being destroyed, bail out.
339   if (!muxer_)
340     return;
341 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
342   if (!connected_ && backend_type_ == kSystemBackend) {
343     PERFETTO_ELOG(
344         "Unable to connect to the system tracing service as a consumer. On "
345         "Android, use the \"perfetto\" command line tool instead to start "
346         "system-wide tracing sessions");
347   }
348 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
349 
350   // Notify the client about disconnection.
351   NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
352 
353   // Make sure the client doesn't hang in a blocking start/stop because of the
354   // disconnection.
355   NotifyStartComplete();
356   NotifyStopComplete();
357 
358   // It shouldn't be necessary to call StopTracingSession. If we get this call
359   // it means that the service did shutdown before us, so there is no point
360   // trying it to ask it to stop the session. We should just remember to cleanup
361   // the consumer vector.
362   connected_ = false;
363 
364   // Notify the muxer that it is safe to destroy |this|. This is needed because
365   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
366   // access until OnDisconnect() is called.
367   muxer_->OnConsumerDisconnected(this);
368 }
369 
Disconnect()370 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
371   // This is weird and deserves a comment.
372   //
373   // When we called the ConnectConsumer method on the service it returns
374   // us a ConsumerEndpoint which we stored in |service_|, however this
375   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
376   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
377   // the ConsumerImpl pointer has to be valid until the
378   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
379   // ConsumerEndpoint |service_|. Eventually this will call
380   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
381   // call the destructor of |this|.
382   service_.reset();
383 }
384 
OnTracingDisabled(const std::string & error)385 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
386     const std::string& error) {
387   PERFETTO_DCHECK_THREAD(thread_checker_);
388   PERFETTO_DCHECK(!stopped_);
389   stopped_ = true;
390 
391   if (!error.empty())
392     NotifyError(TracingError{TracingError::kTracingFailed, error});
393 
394   // If we're still waiting for the start event, fire it now. This may happen if
395   // there are no active data sources in the session.
396   NotifyStartComplete();
397   NotifyStopComplete();
398 }
399 
NotifyStartComplete()400 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
401   PERFETTO_DCHECK_THREAD(thread_checker_);
402   if (start_complete_callback_) {
403     muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
404     start_complete_callback_ = nullptr;
405   }
406   if (blocking_start_complete_callback_) {
407     muxer_->task_runner_->PostTask(
408         std::move(blocking_start_complete_callback_));
409     blocking_start_complete_callback_ = nullptr;
410   }
411 }
412 
NotifyError(const TracingError & error)413 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
414   PERFETTO_DCHECK_THREAD(thread_checker_);
415   if (error_callback_) {
416     muxer_->task_runner_->PostTask(
417         std::bind(std::move(error_callback_), error));
418   }
419 }
420 
NotifyStopComplete()421 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
422   PERFETTO_DCHECK_THREAD(thread_checker_);
423   if (stop_complete_callback_) {
424     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
425     stop_complete_callback_ = nullptr;
426   }
427   if (blocking_stop_complete_callback_) {
428     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
429     blocking_stop_complete_callback_ = nullptr;
430   }
431 }
432 
OnTraceData(std::vector<TracePacket> packets,bool has_more)433 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
434     std::vector<TracePacket> packets,
435     bool has_more) {
436   PERFETTO_DCHECK_THREAD(thread_checker_);
437   if (!read_trace_callback_)
438     return;
439 
440   size_t capacity = 0;
441   for (const auto& packet : packets) {
442     // 16 is an over-estimation of the proto preamble size
443     capacity += packet.size() + 16;
444   }
445 
446   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
447   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
448   buf->reserve(capacity);
449   for (auto& packet : packets) {
450     char* start;
451     size_t size;
452     std::tie(start, size) = packet.GetProtoPreamble();
453     buf->insert(buf->end(), start, start + size);
454     for (auto& slice : packet.slices()) {
455       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
456       buf->insert(buf->end(), slice_data, slice_data + slice.size);
457     }
458   }
459 
460   auto callback = read_trace_callback_;
461   muxer_->task_runner_->PostTask([callback, buf, has_more] {
462     TracingSession::ReadTraceCallbackArgs callback_arg{};
463     callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
464     callback_arg.size = buf->size();
465     callback_arg.has_more = has_more;
466     callback(callback_arg);
467   });
468 
469   if (!has_more)
470     read_trace_callback_ = nullptr;
471 }
472 
OnObservableEvents(const ObservableEvents & events)473 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
474     const ObservableEvents& events) {
475   if (events.instance_state_changes_size()) {
476     for (const auto& state_change : events.instance_state_changes()) {
477       DataSourceHandle handle{state_change.producer_name(),
478                               state_change.data_source_name()};
479       data_source_states_[handle] =
480           state_change.state() ==
481           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
482     }
483   }
484 
485   if (events.instance_state_changes_size() ||
486       events.all_data_sources_started()) {
487     // Data sources are first reported as being stopped before starting, so once
488     // all the data sources we know about have started we can declare tracing
489     // begun. In the case where there are no matching data sources for the
490     // session, the service will report the all_data_sources_started() event
491     // without adding any instances (only since Android S / Perfetto v10.0).
492     if (start_complete_callback_ || blocking_start_complete_callback_) {
493       bool all_data_sources_started = std::all_of(
494           data_source_states_.cbegin(), data_source_states_.cend(),
495           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
496       if (all_data_sources_started)
497         NotifyStartComplete();
498     }
499   }
500 }
501 
OnTraceStats(bool success,const TraceStats & trace_stats)502 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
503     bool success,
504     const TraceStats& trace_stats) {
505   if (!get_trace_stats_callback_)
506     return;
507   TracingSession::GetTraceStatsCallbackArgs callback_arg{};
508   callback_arg.success = success;
509   callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
510   muxer_->task_runner_->PostTask(
511       std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
512   get_trace_stats_callback_ = nullptr;
513 }
514 
515 // The callbacks below are not used.
OnDetach(bool)516 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)517 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
518 // ----- End of TracingMuxerImpl::ConsumerImpl
519 
520 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
521 
522 // TracingSessionImpl is the RAII object returned to API clients when they
523 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
524 // tracing.
525 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)526 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
527     TracingMuxerImpl* muxer,
528     TracingSessionGlobalID session_id,
529     BackendType backend_type)
530     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
531 
532 // Can be destroyed from any thread.
~TracingSessionImpl()533 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
534   auto* muxer = muxer_;
535   auto session_id = session_id_;
536   muxer->task_runner_->PostTask(
537       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
538 }
539 
540 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)541 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
542                                                  int fd) {
543   auto* muxer = muxer_;
544   auto session_id = session_id_;
545   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
546   if (fd >= 0) {
547     base::ignore_result(backend_type_);  // For -Wunused in the amalgamation.
548 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
549     if (backend_type_ != kInProcessBackend) {
550       PERFETTO_FATAL(
551           "Passing a file descriptor to TracingSession::Setup() is only "
552           "supported with the kInProcessBackend on Windows. Use "
553           "TracingSession::ReadTrace() instead");
554     }
555 #endif
556     trace_config->set_write_into_file(true);
557     fd = dup(fd);
558   }
559   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
560     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
561   });
562 }
563 
564 // Can be called from any thread.
Start()565 void TracingMuxerImpl::TracingSessionImpl::Start() {
566   auto* muxer = muxer_;
567   auto session_id = session_id_;
568   muxer->task_runner_->PostTask(
569       [muxer, session_id] { muxer->StartTracingSession(session_id); });
570 }
571 
572 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)573 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
574     const TraceConfig& cfg) {
575   auto* muxer = muxer_;
576   auto session_id = session_id_;
577   muxer->task_runner_->PostTask([muxer, session_id, cfg] {
578     muxer->ChangeTracingSessionConfig(session_id, cfg);
579   });
580 }
581 
582 // Can be called from any thread except the service thread.
StartBlocking()583 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
584   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
585   auto* muxer = muxer_;
586   auto session_id = session_id_;
587   base::WaitableEvent tracing_started;
588   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
589     auto* consumer = muxer->FindConsumer(session_id);
590     if (!consumer) {
591       // TODO(skyostil): Signal an error to the user.
592       tracing_started.Notify();
593       return;
594     }
595     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
596     consumer->blocking_start_complete_callback_ = [&] {
597       tracing_started.Notify();
598     };
599     muxer->StartTracingSession(session_id);
600   });
601   tracing_started.Wait();
602 }
603 
604 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)605 void TracingMuxerImpl::TracingSessionImpl::Flush(
606     std::function<void(bool)> user_callback,
607     uint32_t timeout_ms) {
608   auto* muxer = muxer_;
609   auto session_id = session_id_;
610   muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
611     auto* consumer = muxer->FindConsumer(session_id);
612     if (!consumer) {
613       std::move(user_callback)(false);
614       return;
615     }
616     muxer->FlushTracingSession(session_id, timeout_ms,
617                                std::move(user_callback));
618   });
619 }
620 
621 // Can be called from any thread.
Stop()622 void TracingMuxerImpl::TracingSessionImpl::Stop() {
623   auto* muxer = muxer_;
624   auto session_id = session_id_;
625   muxer->task_runner_->PostTask(
626       [muxer, session_id] { muxer->StopTracingSession(session_id); });
627 }
628 
629 // Can be called from any thread except the service thread.
StopBlocking()630 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
631   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
632   auto* muxer = muxer_;
633   auto session_id = session_id_;
634   base::WaitableEvent tracing_stopped;
635   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
636     auto* consumer = muxer->FindConsumer(session_id);
637     if (!consumer) {
638       // TODO(skyostil): Signal an error to the user.
639       tracing_stopped.Notify();
640       return;
641     }
642     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
643     consumer->blocking_stop_complete_callback_ = [&] {
644       tracing_stopped.Notify();
645     };
646     muxer->StopTracingSession(session_id);
647   });
648   tracing_stopped.Wait();
649 }
650 
651 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)652 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
653   auto* muxer = muxer_;
654   auto session_id = session_id_;
655   muxer->task_runner_->PostTask([muxer, session_id, cb] {
656     muxer->ReadTracingSessionData(session_id, std::move(cb));
657   });
658 }
659 
660 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)661 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
662     std::function<void()> cb) {
663   auto* muxer = muxer_;
664   auto session_id = session_id_;
665   muxer->task_runner_->PostTask([muxer, session_id, cb] {
666     auto* consumer = muxer->FindConsumer(session_id);
667     if (!consumer)
668       return;
669     consumer->start_complete_callback_ = cb;
670   });
671 }
672 
673 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)674 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
675     std::function<void(TracingError)> cb) {
676   auto* muxer = muxer_;
677   auto session_id = session_id_;
678   muxer->task_runner_->PostTask([muxer, session_id, cb] {
679     auto* consumer = muxer->FindConsumer(session_id);
680     if (!consumer) {
681       // Notify the client about concurrent disconnection of the session.
682       if (cb)
683         cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
684       return;
685     }
686     consumer->error_callback_ = cb;
687   });
688 }
689 
690 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)691 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
692     std::function<void()> cb) {
693   auto* muxer = muxer_;
694   auto session_id = session_id_;
695   muxer->task_runner_->PostTask([muxer, session_id, cb] {
696     auto* consumer = muxer->FindConsumer(session_id);
697     if (!consumer)
698       return;
699     consumer->stop_complete_callback_ = cb;
700   });
701 }
702 
703 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)704 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
705     GetTraceStatsCallback cb) {
706   auto* muxer = muxer_;
707   auto session_id = session_id_;
708   muxer->task_runner_->PostTask([muxer, session_id, cb] {
709     muxer->GetTraceStats(session_id, std::move(cb));
710   });
711 }
712 
713 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)714 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
715     QueryServiceStateCallback cb) {
716   auto* muxer = muxer_;
717   auto session_id = session_id_;
718   muxer->task_runner_->PostTask([muxer, session_id, cb] {
719     muxer->QueryServiceState(session_id, std::move(cb));
720   });
721 }
722 
723 // ----- End of TracingMuxerImpl::TracingSessionImpl
724 
725 // static
726 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
727 
728 // This is called by perfetto::Tracing::Initialize().
729 // Can be called on any thread. Typically, but not necessarily, that will be
730 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)731 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
732     : TracingMuxer(args.platform ? args.platform
733                                  : Platform::GetDefaultPlatform()) {
734   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
735   instance_ = this;
736 
737   // Create the thread where muxer, producers and service will live.
738   Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
739   task_runner_.reset(new NonReentrantTaskRunner(
740       this, platform_->CreateTaskRunner(std::move(tr_args))));
741 
742   // Run the initializer on that thread.
743   task_runner_->PostTask([this, args] { Initialize(args); });
744 }
745 
Initialize(const TracingInitArgs & args)746 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
747   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
748 
749   policy_ = args.tracing_policy;
750 
751   auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
752     if (!backend) {
753       // We skip the log in release builds because the *_backend_fake.cc code
754       // has already an ELOG before returning a nullptr.
755       PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
756       return;
757     }
758     TracingBackendId backend_id = backends_.size();
759     backends_.emplace_back();
760     RegisteredBackend& rb = backends_.back();
761     rb.backend = backend;
762     rb.id = backend_id;
763     rb.type = type;
764     rb.producer.reset(new ProducerImpl(this, backend_id,
765                                        args.shmem_batch_commits_duration_ms));
766     rb.producer_conn_args.producer = rb.producer.get();
767     rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
768     rb.producer_conn_args.task_runner = task_runner_.get();
769     rb.producer_conn_args.shmem_size_hint_bytes =
770         args.shmem_size_hint_kb * 1024;
771     rb.producer_conn_args.shmem_page_size_hint_bytes =
772         args.shmem_page_size_hint_kb * 1024;
773     rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
774   };
775 
776   if (args.backends & kSystemBackend) {
777     PERFETTO_CHECK(args.system_backend_factory_);
778     add_backend(args.system_backend_factory_(), kSystemBackend);
779   }
780 
781   if (args.backends & kInProcessBackend) {
782     PERFETTO_CHECK(args.in_process_backend_factory_);
783     add_backend(args.in_process_backend_factory_(), kInProcessBackend);
784   }
785 
786   if (args.backends & kCustomBackend) {
787     PERFETTO_CHECK(args.custom_backend);
788     add_backend(args.custom_backend, kCustomBackend);
789   }
790 
791   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
792     PERFETTO_FATAL("Unsupported tracing backend type");
793   }
794 
795   // Fallback backend for consumer creation for an unsupported backend type.
796   // This backend simply fails any attempt to start a tracing session.
797   // NOTE: This backend instance has to be added last.
798   add_backend(internal::TracingBackendFake::GetInstance(),
799               BackendType::kUnspecifiedBackend);
800 }
801 
802 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)803 bool TracingMuxerImpl::RegisterDataSource(
804     const DataSourceDescriptor& descriptor,
805     DataSourceFactory factory,
806     DataSourceStaticState* static_state) {
807   // Ignore repeated registrations.
808   if (static_state->index != kMaxDataSources)
809     return true;
810 
811   uint32_t new_index = next_data_source_index_++;
812   if (new_index >= kMaxDataSources) {
813     PERFETTO_DLOG(
814         "RegisterDataSource failed: too many data sources already registered");
815     return false;
816   }
817 
818   // Initialize the static state.
819   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
820                 "instances[] size mismatch");
821   for (size_t i = 0; i < static_state->instances.size(); i++)
822     new (&static_state->instances[i]) DataSourceState{};
823 
824   static_state->index = new_index;
825 
826   // Generate a semi-unique id for this data source.
827   base::Hash hash;
828   hash.Update(reinterpret_cast<intptr_t>(static_state));
829   hash.Update(base::GetWallTimeNs().count());
830   static_state->id = hash.digest() ? hash.digest() : 1;
831 
832   task_runner_->PostTask([this, descriptor, factory, static_state] {
833     data_sources_.emplace_back();
834     RegisteredDataSource& rds = data_sources_.back();
835     rds.descriptor = descriptor;
836     rds.factory = factory;
837     rds.static_state = static_state;
838 
839     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
840   });
841   return true;
842 }
843 
844 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)845 void TracingMuxerImpl::UpdateDataSourceDescriptor(
846     const DataSourceDescriptor& descriptor,
847     const DataSourceStaticState* static_state) {
848   task_runner_->PostTask([this, descriptor, static_state] {
849     for (auto& rds : data_sources_) {
850       if (rds.static_state == static_state) {
851         PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
852         rds.descriptor = descriptor;
853         rds.descriptor.set_id(static_state->id);
854         UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
855         return;
856       }
857     }
858   });
859 }
860 
861 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)862 void TracingMuxerImpl::RegisterInterceptor(
863     const InterceptorDescriptor& descriptor,
864     InterceptorFactory factory,
865     InterceptorBase::TLSFactory tls_factory,
866     InterceptorBase::TracePacketCallback packet_callback) {
867   task_runner_->PostTask(
868       [this, descriptor, factory, tls_factory, packet_callback] {
869         // Ignore repeated registrations.
870         for (const auto& interceptor : interceptors_) {
871           if (interceptor.descriptor.name() == descriptor.name()) {
872             PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
873             PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
874             return;
875           }
876         }
877         // Only allow certain interceptors for now.
878         if (descriptor.name() != "test_interceptor" &&
879             descriptor.name() != "console") {
880           PERFETTO_ELOG(
881               "Interceptors are experimental. If you want to use them, please "
882               "get in touch with the project maintainers "
883               "(https://perfetto.dev/docs/contributing/"
884               "getting-started#community).");
885           return;
886         }
887         interceptors_.emplace_back();
888         RegisteredInterceptor& interceptor = interceptors_.back();
889         interceptor.descriptor = descriptor;
890         interceptor.factory = factory;
891         interceptor.tls_factory = tls_factory;
892         interceptor.packet_callback = packet_callback;
893       });
894 }
895 
896 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)897 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
898                                        uint32_t backend_connection_id,
899                                        DataSourceInstanceID instance_id,
900                                        const DataSourceConfig& cfg) {
901   PERFETTO_DCHECK_THREAD(thread_checker_);
902   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
903                 cfg.name().c_str());
904   uint64_t config_hash = ComputeConfigHash(cfg);
905 
906   for (const auto& rds : data_sources_) {
907     if (rds.descriptor.name() != cfg.name())
908       continue;
909     DataSourceStaticState& static_state = *rds.static_state;
910 
911     // If this data source is already active for this exact config, don't start
912     // another instance. This happens when we have several data sources with the
913     // same name, in which case the service sends one SetupDataSource event for
914     // each one. Since we can't map which event maps to which data source, we
915     // ensure each event only starts one data source instance.
916     // TODO(skyostil): Register a unique id with each data source to the service
917     // to disambiguate.
918     bool active_for_config = false;
919     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
920       if (!static_state.TryGet(i))
921         continue;
922       auto* internal_state =
923           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
924       if (internal_state->backend_id == backend_id &&
925           internal_state->config_hash == config_hash) {
926         active_for_config = true;
927         break;
928       }
929     }
930     if (active_for_config) {
931       PERFETTO_DLOG(
932           "Data source %s is already active with this config, skipping",
933           cfg.name().c_str());
934       continue;
935     }
936 
937     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
938       // Find a free slot.
939       if (static_state.TryGet(i))
940         continue;
941 
942       auto* internal_state =
943           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
944       std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
945       static_assert(
946           std::is_same<decltype(internal_state->data_source_instance_id),
947                        DataSourceInstanceID>::value,
948           "data_source_instance_id type mismatch");
949       internal_state->muxer_id_for_testing = muxer_id_for_testing_;
950       internal_state->backend_id = backend_id;
951       internal_state->backend_connection_id = backend_connection_id;
952       internal_state->data_source_instance_id = instance_id;
953       internal_state->buffer_id =
954           static_cast<internal::BufferId>(cfg.target_buffer());
955       internal_state->config_hash = config_hash;
956       internal_state->data_source = rds.factory();
957       internal_state->interceptor = nullptr;
958       internal_state->interceptor_id = 0;
959 
960       if (cfg.has_interceptor_config()) {
961         for (size_t j = 0; j < interceptors_.size(); j++) {
962           if (cfg.interceptor_config().name() ==
963               interceptors_[j].descriptor.name()) {
964             PERFETTO_DLOG("Intercepting data source %" PRIu64
965                           " \"%s\" into \"%s\"",
966                           instance_id, cfg.name().c_str(),
967                           cfg.interceptor_config().name().c_str());
968             internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
969             internal_state->interceptor = interceptors_[j].factory();
970             internal_state->interceptor->OnSetup({cfg});
971             break;
972           }
973         }
974         if (!internal_state->interceptor_id) {
975           PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
976                         cfg.interceptor_config().name().c_str());
977         }
978       }
979 
980       // This must be made at the end. See matching acquire-load in
981       // DataSource::Trace().
982       static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
983 
984       DataSourceBase::SetupArgs setup_args;
985       setup_args.config = &cfg;
986       setup_args.internal_instance_index = i;
987       internal_state->data_source->OnSetup(setup_args);
988       return;
989     }
990     PERFETTO_ELOG(
991         "Maximum number of data source instances exhausted. "
992         "Dropping data source %" PRIu64,
993         instance_id);
994     break;
995   }
996 }
997 
998 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)999 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1000                                        DataSourceInstanceID instance_id) {
1001   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1002   PERFETTO_DCHECK_THREAD(thread_checker_);
1003 
1004   auto ds = FindDataSource(backend_id, instance_id);
1005   if (!ds) {
1006     PERFETTO_ELOG("Could not find data source to start");
1007     return;
1008   }
1009 
1010   DataSourceBase::StartArgs start_args{};
1011   start_args.internal_instance_index = ds.instance_idx;
1012 
1013   std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1014   if (ds.internal_state->interceptor)
1015     ds.internal_state->interceptor->OnStart({});
1016   ds.internal_state->trace_lambda_enabled = true;
1017   ds.internal_state->data_source->OnStart(start_args);
1018 }
1019 
1020 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1021 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1022     TracingBackendId backend_id,
1023     DataSourceInstanceID instance_id) {
1024   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1025   PERFETTO_DCHECK_THREAD(thread_checker_);
1026 
1027   auto ds = FindDataSource(backend_id, instance_id);
1028   if (!ds) {
1029     PERFETTO_ELOG("Could not find data source to stop");
1030     return;
1031   }
1032 
1033   StopArgsImpl stop_args{};
1034   stop_args.internal_instance_index = ds.instance_idx;
1035   stop_args.async_stop_closure = [this, backend_id, instance_id] {
1036     // TracingMuxerImpl is long lived, capturing |this| is okay.
1037     // The notification closure can be moved out of the StopArgs by the
1038     // embedder to handle stop asynchronously. The embedder might then
1039     // call the closure on a different thread than the current one, hence
1040     // this nested PostTask().
1041     task_runner_->PostTask([this, backend_id, instance_id] {
1042       StopDataSource_AsyncEnd(backend_id, instance_id);
1043     });
1044   };
1045 
1046   {
1047     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1048     if (ds.internal_state->interceptor)
1049       ds.internal_state->interceptor->OnStop({});
1050     ds.internal_state->data_source->OnStop(stop_args);
1051   }
1052 
1053   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1054   // async closure here. In theory we could avoid the PostTask and call
1055   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1056   // divergencies between the deferred-stop vs non-deferred-stop code paths.
1057   if (stop_args.async_stop_closure)
1058     std::move(stop_args.async_stop_closure)();
1059 }
1060 
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)1061 void TracingMuxerImpl::StopDataSource_AsyncEnd(
1062     TracingBackendId backend_id,
1063     DataSourceInstanceID instance_id) {
1064   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1065   PERFETTO_DCHECK_THREAD(thread_checker_);
1066 
1067   auto ds = FindDataSource(backend_id, instance_id);
1068   if (!ds) {
1069     PERFETTO_ELOG(
1070         "Async stop of data source %" PRIu64
1071         " failed. This might be due to calling the async_stop_closure twice.",
1072         instance_id);
1073     return;
1074   }
1075 
1076   const uint32_t mask = ~(1 << ds.instance_idx);
1077   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1078 
1079   // Take the mutex to prevent that the data source is in the middle of
1080   // a Trace() execution where it called GetDataSourceLocked() while we
1081   // destroy it.
1082   {
1083     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1084     ds.internal_state->trace_lambda_enabled = false;
1085     ds.internal_state->data_source.reset();
1086     ds.internal_state->interceptor.reset();
1087   }
1088 
1089   // The other fields of internal_state are deliberately *not* cleared.
1090   // See races-related comments of DataSource::Trace().
1091 
1092   TracingMuxer::generation_++;
1093 
1094   // |backends_| is append-only, Backend instances are always valid.
1095   PERFETTO_CHECK(backend_id < backends_.size());
1096   ProducerImpl* producer = backends_[backend_id].producer.get();
1097   if (!producer)
1098     return;
1099   if (producer->connected_) {
1100     // Flush any commits that might have been batched by SharedMemoryArbiter.
1101     producer->service_->MaybeSharedMemoryArbiter()
1102         ->FlushPendingCommitDataRequests();
1103     producer->service_->NotifyDataSourceStopped(instance_id);
1104   }
1105   producer->SweepDeadServices();
1106 }
1107 
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1108 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1109     TracingBackendId backend_id,
1110     DataSourceInstanceID instance_id) {
1111   PERFETTO_DCHECK_THREAD(thread_checker_);
1112   PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1113                 instance_id);
1114   auto ds = FindDataSource(backend_id, instance_id);
1115   if (!ds) {
1116     PERFETTO_ELOG("Could not find data source to clear incremental state for");
1117     return;
1118   }
1119   // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1120   // the incremental state should be cleared.
1121   ds.static_state->incremental_state_generation.fetch_add(
1122       1, std::memory_order_relaxed);
1123 }
1124 
SyncProducersForTesting()1125 void TracingMuxerImpl::SyncProducersForTesting() {
1126   std::mutex mutex;
1127   std::condition_variable cv;
1128 
1129   // IPC-based producers don't report connection errors explicitly for each
1130   // command, but instead with an asynchronous callback
1131   // (ProducerImpl::OnDisconnected). This means that the sync command below
1132   // may have completed but failed to reach the service because of a
1133   // disconnection, but we can't tell until the disconnection message comes
1134   // through. To guard against this, we run two whole rounds of sync round-trips
1135   // before returning; the first one will detect any disconnected producers and
1136   // the second one will ensure any reconnections have completed and all data
1137   // sources are registered in the service again.
1138   for (size_t i = 0; i < 2; i++) {
1139     size_t countdown = std::numeric_limits<size_t>::max();
1140     task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1141       {
1142         std::unique_lock<std::mutex> countdown_lock(mutex);
1143         countdown = backends_.size();
1144       }
1145       for (auto& backend : backends_) {
1146         auto* producer = backend.producer.get();
1147         producer->service_->Sync([&mutex, &cv, &countdown] {
1148           std::unique_lock<std::mutex> countdown_lock(mutex);
1149           countdown--;
1150           cv.notify_one();
1151         });
1152       }
1153     });
1154 
1155     {
1156       std::unique_lock<std::mutex> countdown_lock(mutex);
1157       cv.wait(countdown_lock, [&countdown] { return !countdown; });
1158     }
1159   }
1160 
1161   // Check that all producers are indeed connected.
1162   bool done = false;
1163   bool all_producers_connected = true;
1164   task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1165     for (auto& backend : backends_)
1166       all_producers_connected &= backend.producer->connected_;
1167     std::unique_lock<std::mutex> lock(mutex);
1168     done = true;
1169     cv.notify_one();
1170   });
1171 
1172   {
1173     std::unique_lock<std::mutex> lock(mutex);
1174     cv.wait(lock, [&done] { return done; });
1175   }
1176   PERFETTO_DCHECK(all_producers_connected);
1177 }
1178 
DestroyStoppedTraceWritersForCurrentThread()1179 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1180   // Iterate across all possible data source types.
1181   auto cur_generation = generation_.load(std::memory_order_acquire);
1182   auto* root_tls = GetOrCreateTracingTLS();
1183 
1184   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1185     // |tls| has a vector of per-data-source-instance thread-local state.
1186     DataSourceStaticState* static_state = tls.static_state;
1187     if (!static_state)
1188       return;  // Slot not used.
1189 
1190     // Iterate across all possible instances for this data source.
1191     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1192       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1193       if (!ds_tls.trace_writer)
1194         continue;
1195 
1196       DataSourceState* ds_state = static_state->TryGet(inst);
1197       if (ds_state &&
1198           ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1199           ds_state->backend_id == ds_tls.backend_id &&
1200           ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1201           ds_state->buffer_id == ds_tls.buffer_id &&
1202           ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1203         continue;
1204       }
1205 
1206       // The DataSource instance has been destroyed or recycled.
1207       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
1208     }
1209   };
1210 
1211   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1212     // |tls| has a vector of per-data-source-instance thread-local state.
1213     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1214     destroy_stopped_instances(tls);
1215   }
1216   destroy_stopped_instances(root_tls->track_event_tls);
1217   root_tls->generation = cur_generation;
1218 }
1219 
1220 // Called both when a new data source is registered or when a new backend
1221 // connects. In both cases we want to be sure we reflected the data source
1222 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1223 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1224   PERFETTO_DCHECK_THREAD(thread_checker_);
1225   for (RegisteredDataSource& rds : data_sources_) {
1226     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1227   }
1228 }
1229 
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1230 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1231                                                      bool is_changed) {
1232   PERFETTO_DCHECK_THREAD(thread_checker_);
1233   for (RegisteredBackend& backend : backends_) {
1234     // We cannot call RegisterDataSource on the backend before it connects.
1235     if (!backend.producer->connected_)
1236       continue;
1237 
1238     PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1239     bool is_registered = backend.producer->registered_data_sources_.test(
1240         rds.static_state->index);
1241     if (is_registered && !is_changed)
1242       continue;
1243 
1244     rds.descriptor.set_will_notify_on_start(true);
1245     rds.descriptor.set_will_notify_on_stop(true);
1246     rds.descriptor.set_handles_incremental_state_clear(true);
1247     rds.descriptor.set_id(rds.static_state->id);
1248     if (is_registered) {
1249       backend.producer->service_->UpdateDataSource(rds.descriptor);
1250     } else {
1251       backend.producer->service_->RegisterDataSource(rds.descriptor);
1252     }
1253     backend.producer->registered_data_sources_.set(rds.static_state->index);
1254   }
1255 }
1256 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1257 void TracingMuxerImpl::SetupTracingSession(
1258     TracingSessionGlobalID session_id,
1259     const std::shared_ptr<TraceConfig>& trace_config,
1260     base::ScopedFile trace_fd) {
1261   PERFETTO_DCHECK_THREAD(thread_checker_);
1262   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1263 
1264   auto* consumer = FindConsumer(session_id);
1265   if (!consumer)
1266     return;
1267 
1268   consumer->trace_config_ = trace_config;
1269   if (trace_fd)
1270     consumer->trace_fd_ = std::move(trace_fd);
1271 
1272   if (!consumer->connected_)
1273     return;
1274 
1275   // Only used in the deferred start mode.
1276   if (trace_config->deferred_start()) {
1277     consumer->service_->EnableTracing(*trace_config,
1278                                       std::move(consumer->trace_fd_));
1279   }
1280 }
1281 
StartTracingSession(TracingSessionGlobalID session_id)1282 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1283   PERFETTO_DCHECK_THREAD(thread_checker_);
1284 
1285   auto* consumer = FindConsumer(session_id);
1286 
1287   if (!consumer)
1288     return;
1289 
1290   if (!consumer->trace_config_) {
1291     PERFETTO_ELOG("Must call Setup(config) first");
1292     return;
1293   }
1294 
1295   if (!consumer->connected_) {
1296     consumer->start_pending_ = true;
1297     return;
1298   }
1299 
1300   consumer->start_pending_ = false;
1301   if (consumer->trace_config_->deferred_start()) {
1302     consumer->service_->StartTracing();
1303   } else {
1304     consumer->service_->EnableTracing(*consumer->trace_config_,
1305                                       std::move(consumer->trace_fd_));
1306   }
1307 
1308   // TODO implement support for the deferred-start + fast-triggering case.
1309 }
1310 
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1311 void TracingMuxerImpl::ChangeTracingSessionConfig(
1312     TracingSessionGlobalID session_id,
1313     const TraceConfig& trace_config) {
1314   PERFETTO_DCHECK_THREAD(thread_checker_);
1315 
1316   auto* consumer = FindConsumer(session_id);
1317 
1318   if (!consumer)
1319     return;
1320 
1321   if (!consumer->trace_config_) {
1322     // Changing the config is only supported for started sessions.
1323     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1324     return;
1325   }
1326 
1327   consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1328   if (consumer->connected_)
1329     consumer->service_->ChangeTraceConfig(trace_config);
1330 }
1331 
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1332 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1333                                            uint32_t timeout_ms,
1334                                            std::function<void(bool)> callback) {
1335   PERFETTO_DCHECK_THREAD(thread_checker_);
1336   auto* consumer = FindConsumer(session_id);
1337   if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1338       !consumer->trace_config_) {
1339     PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1340     std::move(callback)(false);
1341     return;
1342   }
1343 
1344   consumer->service_->Flush(timeout_ms, std::move(callback));
1345 }
1346 
StopTracingSession(TracingSessionGlobalID session_id)1347 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1348   PERFETTO_DCHECK_THREAD(thread_checker_);
1349   auto* consumer = FindConsumer(session_id);
1350   if (!consumer)
1351     return;
1352 
1353   if (consumer->start_pending_) {
1354     // If the session hasn't started yet, wait until it does before stopping.
1355     consumer->stop_pending_ = true;
1356     return;
1357   }
1358 
1359   consumer->stop_pending_ = false;
1360   if (consumer->stopped_) {
1361     // If the session was already stopped (e.g., it failed to start), don't try
1362     // stopping again.
1363     consumer->NotifyStopComplete();
1364   } else if (!consumer->trace_config_) {
1365     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1366     return;
1367   } else {
1368     consumer->service_->DisableTracing();
1369   }
1370 
1371   consumer->trace_config_.reset();
1372 }
1373 
DestroyTracingSession(TracingSessionGlobalID session_id)1374 void TracingMuxerImpl::DestroyTracingSession(
1375     TracingSessionGlobalID session_id) {
1376   PERFETTO_DCHECK_THREAD(thread_checker_);
1377   for (RegisteredBackend& backend : backends_) {
1378     // We need to find the consumer (if any) and call Disconnect as we destroy
1379     // the tracing session. We can't call Disconnect() inside this for loop
1380     // because in the in-process case this will end up to a synchronous call to
1381     // OnConsumerDisconnect which will invalidate all the iterators to
1382     // |backend.consumers|.
1383     ConsumerImpl* consumer = nullptr;
1384     for (auto& con : backend.consumers) {
1385       if (con->session_id_ == session_id) {
1386         consumer = con.get();
1387         break;
1388       }
1389     }
1390     if (consumer) {
1391       // We broke out of the loop above on the assumption that each backend will
1392       // only have a single consumer per session. This DCHECK ensures that
1393       // this is the case.
1394       PERFETTO_DCHECK(
1395           std::count_if(backend.consumers.begin(), backend.consumers.end(),
1396                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
1397                           return con->session_id_ == session_id;
1398                         }) == 1u);
1399       consumer->Disconnect();
1400     }
1401   }
1402 }
1403 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)1404 void TracingMuxerImpl::ReadTracingSessionData(
1405     TracingSessionGlobalID session_id,
1406     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
1407   PERFETTO_DCHECK_THREAD(thread_checker_);
1408   auto* consumer = FindConsumer(session_id);
1409   if (!consumer) {
1410     // TODO(skyostil): Signal an error to the user.
1411     TracingSession::ReadTraceCallbackArgs callback_arg{};
1412     callback(callback_arg);
1413     return;
1414   }
1415   PERFETTO_DCHECK(!consumer->read_trace_callback_);
1416   consumer->read_trace_callback_ = std::move(callback);
1417   consumer->service_->ReadBuffers();
1418 }
1419 
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)1420 void TracingMuxerImpl::GetTraceStats(
1421     TracingSessionGlobalID session_id,
1422     TracingSession::GetTraceStatsCallback callback) {
1423   PERFETTO_DCHECK_THREAD(thread_checker_);
1424   auto* consumer = FindConsumer(session_id);
1425   if (!consumer) {
1426     TracingSession::GetTraceStatsCallbackArgs callback_arg{};
1427     callback_arg.success = false;
1428     callback(std::move(callback_arg));
1429     return;
1430   }
1431   PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
1432   consumer->get_trace_stats_callback_ = std::move(callback);
1433   if (!consumer->connected_) {
1434     consumer->get_trace_stats_pending_ = true;
1435     return;
1436   }
1437   consumer->get_trace_stats_pending_ = false;
1438   consumer->service_->GetTraceStats();
1439 }
1440 
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)1441 void TracingMuxerImpl::QueryServiceState(
1442     TracingSessionGlobalID session_id,
1443     TracingSession::QueryServiceStateCallback callback) {
1444   PERFETTO_DCHECK_THREAD(thread_checker_);
1445   auto* consumer = FindConsumer(session_id);
1446   if (!consumer) {
1447     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1448     callback_arg.success = false;
1449     callback(std::move(callback_arg));
1450     return;
1451   }
1452   PERFETTO_DCHECK(!consumer->query_service_state_callback_);
1453   if (!consumer->connected_) {
1454     consumer->query_service_state_callback_ = std::move(callback);
1455     return;
1456   }
1457   auto callback_wrapper = [callback](bool success,
1458                                      protos::gen::TracingServiceState state) {
1459     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1460     callback_arg.success = success;
1461     callback_arg.service_state_data = state.SerializeAsArray();
1462     callback(std::move(callback_arg));
1463   };
1464   consumer->service_->QueryServiceState(std::move(callback_wrapper));
1465 }
1466 
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)1467 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
1468     uint32_t batch_commits_duration_ms,
1469     BackendType backend_type) {
1470   for (RegisteredBackend& backend : backends_) {
1471     if (backend.producer && backend.producer->connected_ &&
1472         backend.type == backend_type) {
1473       backend.producer->service_->MaybeSharedMemoryArbiter()
1474           ->SetBatchCommitsDuration(batch_commits_duration_ms);
1475     }
1476   }
1477 }
1478 
EnableDirectSMBPatchingForTesting(BackendType backend_type)1479 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
1480     BackendType backend_type) {
1481   for (RegisteredBackend& backend : backends_) {
1482     if (backend.producer && backend.producer->connected_ &&
1483         backend.type == backend_type &&
1484         !backend.producer->service_->MaybeSharedMemoryArbiter()
1485              ->EnableDirectSMBPatching()) {
1486       return false;
1487     }
1488   }
1489   return true;
1490 }
1491 
FindConsumer(TracingSessionGlobalID session_id)1492 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
1493     TracingSessionGlobalID session_id) {
1494   PERFETTO_DCHECK_THREAD(thread_checker_);
1495   for (RegisteredBackend& backend : backends_) {
1496     for (auto& consumer : backend.consumers) {
1497       if (consumer->session_id_ == session_id) {
1498         return consumer.get();
1499       }
1500     }
1501   }
1502   return nullptr;
1503 }
1504 
InitializeConsumer(TracingSessionGlobalID session_id)1505 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
1506   PERFETTO_DCHECK_THREAD(thread_checker_);
1507 
1508   auto* consumer = FindConsumer(session_id);
1509   if (!consumer)
1510     return;
1511 
1512   TracingBackendId backend_id = consumer->backend_id_;
1513   // |backends_| is append-only, Backend instances are always valid.
1514   PERFETTO_CHECK(backend_id < backends_.size());
1515   RegisteredBackend& backend = backends_[backend_id];
1516 
1517   TracingBackend::ConnectConsumerArgs conn_args;
1518   conn_args.consumer = consumer;
1519   conn_args.task_runner = task_runner_.get();
1520   consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
1521 }
1522 
OnConsumerDisconnected(ConsumerImpl * consumer)1523 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
1524   PERFETTO_DCHECK_THREAD(thread_checker_);
1525   for (RegisteredBackend& backend : backends_) {
1526     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
1527       return con.get() == consumer;
1528     };
1529     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
1530                                            backend.consumers.end(), pred),
1531                             backend.consumers.end());
1532   }
1533 }
1534 
SetMaxProducerReconnectionsForTesting(uint32_t count)1535 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
1536   max_producer_reconnections_.store(count);
1537 }
1538 
OnProducerDisconnected(ProducerImpl * producer)1539 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
1540   PERFETTO_DCHECK_THREAD(thread_checker_);
1541   for (RegisteredBackend& backend : backends_) {
1542     if (backend.producer.get() != producer)
1543       continue;
1544     // Try reconnecting the disconnected producer. If the connection succeeds,
1545     // all the data sources will be automatically re-registered.
1546     if (producer->connection_id_ > max_producer_reconnections_.load()) {
1547       // Avoid reconnecting a failing producer too many times. Instead we just
1548       // leak the producer instead of trying to avoid further complicating
1549       // cross-thread trace writer creation.
1550       PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
1551       continue;
1552     }
1553     backend.producer->Initialize(
1554         backend.backend->ConnectProducer(backend.producer_conn_args));
1555   }
1556 
1557   // Increment the generation counter to atomically ensure that:
1558   // 1. Old trace writers from the severed connection eventually get cleaned up
1559   //    by DestroyStoppedTraceWritersForCurrentThread().
1560   // 2. No new trace writers can be created for the SharedMemoryArbiter from the
1561   //    old connection.
1562   TracingMuxer::generation_++;
1563 }
1564 
SweepDeadBackends()1565 void TracingMuxerImpl::SweepDeadBackends() {
1566   PERFETTO_DCHECK_THREAD(thread_checker_);
1567   for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
1568     auto next_it = it;
1569     next_it++;
1570     if (it->producer->SweepDeadServices())
1571       dead_backends_.erase(it);
1572     it = next_it;
1573   }
1574 }
1575 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1576 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
1577     TracingBackendId backend_id,
1578     DataSourceInstanceID instance_id) {
1579   PERFETTO_DCHECK_THREAD(thread_checker_);
1580   for (const auto& rds : data_sources_) {
1581     DataSourceStaticState* static_state = rds.static_state;
1582     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1583       auto* internal_state = static_state->TryGet(i);
1584       if (internal_state && internal_state->backend_id == backend_id &&
1585           internal_state->data_source_instance_id == instance_id) {
1586         return FindDataSourceRes(static_state, internal_state, i);
1587       }
1588     }
1589   }
1590   return FindDataSourceRes();
1591 }
1592 
1593 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)1594 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
1595     DataSourceStaticState* static_state,
1596     uint32_t data_source_instance_index,
1597     DataSourceState* data_source,
1598     BufferExhaustedPolicy buffer_exhausted_policy) {
1599   if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
1600     // If the session is being intercepted, return a heap-backed trace writer
1601     // instead. This is safe because all the data given to the interceptor is
1602     // either thread-local (|instance_index|), statically allocated
1603     // (|static_state|) or constant after initialization (|interceptor|). Access
1604     // to the interceptor instance itself through |data_source| is protected by
1605     // a statically allocated lock (similarly to the data source instance).
1606     auto& interceptor = interceptors_[data_source->interceptor_id - 1];
1607     return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
1608         interceptor.tls_factory(static_state, data_source_instance_index),
1609         interceptor.packet_callback, static_state, data_source_instance_index));
1610   }
1611   ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
1612   // Atomically load the current service endpoint. We keep the pointer as a
1613   // shared pointer on the stack to guard against it from being concurrently
1614   // modified on the thread by ProducerImpl::Initialize() swapping in a
1615   // reconnected service on the muxer task runner thread.
1616   //
1617   // The endpoint may also be concurrently modified by SweepDeadServices()
1618   // clearing out old disconnected services. We guard against that by
1619   // SharedMemoryArbiter keeping track of any outstanding trace writers. After
1620   // shutdown has started, the trace writer created below will be a null one
1621   // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
1622   //
1623   // We use an atomic pointer instead of holding a lock because
1624   // CreateTraceWriter posts tasks under the hood.
1625   std::shared_ptr<ProducerEndpoint> service =
1626       std::atomic_load(&producer->service_);
1627   return service->CreateTraceWriter(data_source->buffer_id,
1628                                     buffer_exhausted_policy);
1629 }
1630 
1631 // This is called via the public API Tracing::NewTrace().
1632 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type)1633 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
1634     BackendType requested_backend_type) {
1635   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
1636 
1637   // |backend_type| can only specify one backend, not an OR-ed mask.
1638   PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
1639 
1640   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
1641   task_runner_->PostTask([this, requested_backend_type, session_id] {
1642     for (RegisteredBackend& backend : backends_) {
1643       if (requested_backend_type && backend.type &&
1644           backend.type != requested_backend_type) {
1645         continue;
1646       }
1647 
1648       TracingBackendId backend_id = backend.id;
1649 
1650       // Create the consumer now, even if we have to ask the embedder below, so
1651       // that any other tasks executing after this one can find the consumer and
1652       // change its pending attributes.
1653       backend.consumers.emplace_back(
1654           new ConsumerImpl(this, backend.type, backend.id, session_id));
1655 
1656       // The last registered backend in |backends_| is the unsupported backend
1657       // without a valid type.
1658       if (!backend.type) {
1659         PERFETTO_ELOG(
1660             "No tracing backend ready for type=%d, consumer will disconnect",
1661             requested_backend_type);
1662         InitializeConsumer(session_id);
1663         return;
1664       }
1665 
1666       // Check if the embedder wants to be asked for permission before
1667       // connecting the consumer.
1668       if (!policy_) {
1669         InitializeConsumer(session_id);
1670         return;
1671       }
1672 
1673       TracingPolicy::ShouldAllowConsumerSessionArgs args;
1674       args.backend_type = backend.type;
1675       args.result_callback = [this, backend_id, session_id](bool allow) {
1676         task_runner_->PostTask([this, backend_id, session_id, allow] {
1677           if (allow) {
1678             InitializeConsumer(session_id);
1679             return;
1680           }
1681 
1682           PERFETTO_ELOG(
1683               "Consumer session for backend type type=%d forbidden, "
1684               "consumer will disconnect",
1685               backends_[backend_id].type);
1686 
1687           auto* consumer = FindConsumer(session_id);
1688           if (!consumer)
1689             return;
1690 
1691           consumer->OnDisconnect();
1692         });
1693       };
1694       policy_->ShouldAllowConsumerSession(args);
1695       return;
1696     }
1697     PERFETTO_DFATAL("Not reached");
1698   });
1699 
1700   return std::unique_ptr<TracingSession>(
1701       new TracingSessionImpl(this, session_id, requested_backend_type));
1702 }
1703 
1704 // static
InitializeInstance(const TracingInitArgs & args)1705 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
1706   if (instance_ != TracingMuxerFake::Get())
1707     PERFETTO_FATAL("Tracing already initialized");
1708   // If we previously had a TracingMuxerImpl instance which was reset,
1709   // reinitialize and reuse it instead of trying to create a new one. See
1710   // ResetForTesting().
1711   if (g_prev_instance) {
1712     auto* muxer = g_prev_instance;
1713     g_prev_instance = nullptr;
1714     instance_ = muxer;
1715     muxer->task_runner_->PostTask([muxer, args] { muxer->Initialize(args); });
1716   } else {
1717     new TracingMuxerImpl(args);
1718   }
1719 }
1720 
1721 // static
ResetForTesting()1722 void TracingMuxerImpl::ResetForTesting() {
1723   // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
1724   // various objects make that a non-starter. In particular:
1725   //
1726   // 1) Any thread that has entered a trace event has a TraceWriter, which holds
1727   //    a reference back to ProducerImpl::service_.
1728   //
1729   // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
1730   //
1731   // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
1732   //    turn depends on TracingMuxerImpl itself.
1733   //
1734   // Because of this, it's not safe to deallocate TracingMuxerImpl until all
1735   // threads have dropped their TraceWriters. Since we can't really ask the
1736   // caller to guarantee this, we'll instead reset enough of the muxer's state
1737   // so that it can be reinitialized later and ensure all necessary objects from
1738   // the old state remain alive until all references have gone away.
1739   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
1740 
1741   base::WaitableEvent reset_done;
1742   auto do_reset = [muxer, &reset_done] {
1743     // Unregister all data sources so they don't interfere with any future
1744     // tracing sessions.
1745     for (RegisteredDataSource& rds : muxer->data_sources_) {
1746       for (RegisteredBackend& backend : muxer->backends_) {
1747         if (!backend.producer->service_)
1748           continue;
1749         backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
1750       }
1751     }
1752     for (auto& backend : muxer->backends_) {
1753       // Check that no consumer session is currently active on any backend.
1754       for (auto& consumer : backend.consumers)
1755         PERFETTO_CHECK(!consumer->service_);
1756       backend.producer->muxer_ = nullptr;
1757       backend.producer->DisposeConnection();
1758       muxer->dead_backends_.push_back(std::move(backend));
1759     }
1760     muxer->backends_.clear();
1761     muxer->interceptors_.clear();
1762 
1763     for (auto& ds : muxer->data_sources_) {
1764       ds.static_state->~DataSourceStaticState();
1765       new (ds.static_state) DataSourceStaticState{};
1766     }
1767     muxer->data_sources_.clear();
1768     muxer->next_data_source_index_ = 0;
1769 
1770     // Free all backends without active trace writers or other inbound
1771     // references. Note that even if all the backends get swept, the muxer still
1772     // needs to stay around since |task_runner_| is assumed to be long-lived.
1773     muxer->SweepDeadBackends();
1774 
1775     // Make sure we eventually discard any per-thread trace writers from the
1776     // previous instance.
1777     muxer->muxer_id_for_testing_++;
1778 
1779     g_prev_instance = muxer;
1780     instance_ = TracingMuxerFake::Get();
1781     reset_done.Notify();
1782   };
1783 
1784   // Some tests run the muxer and the test on the same thread. In these cases,
1785   // we can reset synchronously.
1786   if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
1787     do_reset();
1788   } else {
1789     muxer->task_runner_->PostTask(std::move(do_reset));
1790     reset_done.Wait();
1791   }
1792 }
1793 
1794 TracingMuxer::~TracingMuxer() = default;
1795 
1796 static_assert(std::is_same<internal::BufferId, BufferID>::value,
1797               "public's BufferId and tracing/core's BufferID diverged");
1798 
1799 }  // namespace internal
1800 }  // namespace perfetto
1801