• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23 
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/ext/base/hash.h"
28 #include "perfetto/ext/base/thread_checker.h"
29 #include "perfetto/ext/base/waitable_event.h"
30 #include "perfetto/ext/tracing/core/trace_packet.h"
31 #include "perfetto/ext/tracing/core/trace_writer.h"
32 #include "perfetto/ext/tracing/core/tracing_service.h"
33 #include "perfetto/tracing/buffer_exhausted_policy.h"
34 #include "perfetto/tracing/core/data_source_config.h"
35 #include "perfetto/tracing/data_source.h"
36 #include "perfetto/tracing/internal/data_source_internal.h"
37 #include "perfetto/tracing/trace_writer_base.h"
38 #include "perfetto/tracing/tracing.h"
39 #include "perfetto/tracing/tracing_backend.h"
40 
41 namespace perfetto {
42 namespace internal {
43 
44 namespace {
45 
46 class StopArgsImpl : public DataSourceBase::StopArgs {
47  public:
HandleStopAsynchronously() const48   std::function<void()> HandleStopAsynchronously() const override {
49     auto closure = std::move(async_stop_closure);
50     async_stop_closure = std::function<void()>();
51     return closure;
52   }
53 
54   mutable std::function<void()> async_stop_closure;
55 };
56 
ComputeConfigHash(const DataSourceConfig & config)57 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
58   base::Hash hasher;
59   std::string config_bytes = config.SerializeAsString();
60   hasher.Update(config_bytes.data(), config_bytes.size());
61   return hasher.digest();
62 }
63 
64 }  // namespace
65 
66 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id)67 TracingMuxerImpl::ProducerImpl::ProducerImpl(TracingMuxerImpl* muxer,
68                                              TracingBackendId backend_id)
69     : muxer_(muxer), backend_id_(backend_id) {}
70 TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
71 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)72 void TracingMuxerImpl::ProducerImpl::Initialize(
73     std::unique_ptr<ProducerEndpoint> endpoint) {
74   service_ = std::move(endpoint);
75 }
76 
OnConnect()77 void TracingMuxerImpl::ProducerImpl::OnConnect() {
78   PERFETTO_DLOG("Producer connected");
79   PERFETTO_DCHECK_THREAD(thread_checker_);
80   PERFETTO_DCHECK(!connected_);
81   connected_ = true;
82   muxer_->UpdateDataSourcesOnAllBackends();
83 }
84 
OnDisconnect()85 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
86   PERFETTO_DCHECK_THREAD(thread_checker_);
87   connected_ = false;
88   // TODO: handle more graceful.
89   PERFETTO_ELOG("Cannot connect to traced. Is it running?");
90 }
91 
OnTracingSetup()92 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
93   PERFETTO_DCHECK_THREAD(thread_checker_);
94 }
95 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)96 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
97     DataSourceInstanceID id,
98     const DataSourceConfig& cfg) {
99   PERFETTO_DCHECK_THREAD(thread_checker_);
100   muxer_->SetupDataSource(backend_id_, id, cfg);
101 }
102 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)103 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
104                                                      const DataSourceConfig&) {
105   PERFETTO_DCHECK_THREAD(thread_checker_);
106   muxer_->StartDataSource(backend_id_, id);
107   service_->NotifyDataSourceStarted(id);
108 }
109 
StopDataSource(DataSourceInstanceID id)110 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
111   PERFETTO_DCHECK_THREAD(thread_checker_);
112   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
113 }
114 
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)115 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
116                                            const DataSourceInstanceID*,
117                                            size_t) {
118   // Flush is not plumbed for now, we just ack straight away.
119   PERFETTO_DCHECK_THREAD(thread_checker_);
120   service_->NotifyFlushComplete(flush_id);
121 }
122 
ClearIncrementalState(const DataSourceInstanceID *,size_t)123 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
124     const DataSourceInstanceID*,
125     size_t) {
126   PERFETTO_DCHECK_THREAD(thread_checker_);
127   // TODO(skyostil): Mark each affected data source's incremental state as
128   // needing to be cleared.
129 }
130 // ----- End of TracingMuxerImpl::ProducerImpl methods.
131 
132 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,TracingSessionGlobalID session_id)133 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
134                                              TracingBackendId backend_id,
135                                              TracingSessionGlobalID session_id)
136     : muxer_(muxer), backend_id_(backend_id), session_id_(session_id) {}
137 
138 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
139 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)140 void TracingMuxerImpl::ConsumerImpl::Initialize(
141     std::unique_ptr<ConsumerEndpoint> endpoint) {
142   PERFETTO_DCHECK_THREAD(thread_checker_);
143   service_ = std::move(endpoint);
144   // Observe data source instance events so we get notified when tracing starts.
145   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
146 }
147 
OnConnect()148 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
149   PERFETTO_DCHECK_THREAD(thread_checker_);
150   PERFETTO_DCHECK(!connected_);
151   connected_ = true;
152 
153   // If the API client configured and started tracing before we connected,
154   // tell the backend about it now.
155   if (trace_config_) {
156     muxer_->SetupTracingSession(session_id_, trace_config_);
157     if (start_pending_)
158       muxer_->StartTracingSession(session_id_);
159     if (stop_pending_)
160       muxer_->StopTracingSession(session_id_);
161   }
162 }
163 
OnDisconnect()164 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
165   PERFETTO_DCHECK_THREAD(thread_checker_);
166   // It shouldn't be necessary to call StopTracingSession. If we get this call
167   // it means that the service did shutdown before us, so there is no point
168   // trying it to ask it to stop the session. We should just remember to cleanup
169   // the consumer vector.
170   connected_ = false;
171 
172   // TODO notify the client somehow.
173 
174   // Notify the muxer that it is safe to destroy |this|. This is needed because
175   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
176   // access until OnDisconnect() is called.
177   muxer_->OnConsumerDisconnected(this);
178 }
179 
Disconnect()180 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
181   // This is weird and deserves a comment.
182   //
183   // When we called the ConnectConsumer method on the service it returns
184   // us a ConsumerEndpoint which we stored in |service_|, however this
185   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
186   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
187   // the ConsumerImpl pointer has to be valid until the
188   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
189   // ConsumerEndpoint |service_|. Eventually this will call
190   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
191   // call the destructor of |this|.
192   service_.reset();
193 }
194 
OnTracingDisabled()195 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled() {
196   PERFETTO_DCHECK_THREAD(thread_checker_);
197   PERFETTO_DCHECK(!stopped_);
198   stopped_ = true;
199   // If we're still waiting for the start event, fire it now. This may happen if
200   // there are no active data sources in the session.
201   NotifyStartComplete();
202   NotifyStopComplete();
203 }
204 
NotifyStartComplete()205 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
206   PERFETTO_DCHECK_THREAD(thread_checker_);
207   if (blocking_start_complete_callback_) {
208     muxer_->task_runner_->PostTask(
209         std::move(blocking_start_complete_callback_));
210     blocking_start_complete_callback_ = nullptr;
211   }
212 }
213 
NotifyStopComplete()214 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
215   PERFETTO_DCHECK_THREAD(thread_checker_);
216   if (stop_complete_callback_) {
217     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
218     stop_complete_callback_ = nullptr;
219   }
220   if (blocking_stop_complete_callback_) {
221     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
222     blocking_stop_complete_callback_ = nullptr;
223   }
224 }
225 
OnTraceData(std::vector<TracePacket> packets,bool has_more)226 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
227     std::vector<TracePacket> packets,
228     bool has_more) {
229   PERFETTO_DCHECK_THREAD(thread_checker_);
230   if (!read_trace_callback_)
231     return;
232 
233   size_t capacity = 0;
234   for (const auto& packet : packets) {
235     // 16 is an over-estimation of the proto preamble size
236     capacity += packet.size() + 16;
237   }
238 
239   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
240   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
241   buf->reserve(capacity);
242   for (auto& packet : packets) {
243     char* start;
244     size_t size;
245     std::tie(start, size) = packet.GetProtoPreamble();
246     buf->insert(buf->end(), start, start + size);
247     for (auto& slice : packet.slices()) {
248       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
249       buf->insert(buf->end(), slice_data, slice_data + slice.size);
250     }
251   }
252 
253   auto callback = read_trace_callback_;
254   muxer_->task_runner_->PostTask([callback, buf, has_more] {
255     TracingSession::ReadTraceCallbackArgs callback_arg{};
256     callback_arg.data = buf->size() ? &(*buf)[0] : nullptr;
257     callback_arg.size = buf->size();
258     callback_arg.has_more = has_more;
259     callback(callback_arg);
260   });
261 
262   if (!has_more)
263     read_trace_callback_ = nullptr;
264 }
265 
OnObservableEvents(const ObservableEvents & events)266 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
267     const ObservableEvents& events) {
268   if (events.instance_state_changes_size()) {
269     for (const auto& state_change : events.instance_state_changes()) {
270       DataSourceHandle handle{state_change.producer_name(),
271                               state_change.data_source_name()};
272       data_source_states_[handle] =
273           state_change.state() ==
274           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
275     }
276     // Data sources are first reported as being stopped before starting, so once
277     // all the data sources we know about have started we can declare tracing
278     // begun.
279     if (blocking_start_complete_callback_) {
280       bool all_data_sources_started = std::all_of(
281           data_source_states_.cbegin(), data_source_states_.cend(),
282           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
283       if (all_data_sources_started)
284         NotifyStartComplete();
285     }
286   }
287 }
288 
289 // The callbacks below are not used.
OnDetach(bool)290 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)291 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
OnTraceStats(bool,const TraceStats &)292 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(bool, const TraceStats&) {}
293 // ----- End of TracingMuxerImpl::ConsumerImpl
294 
295 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
296 
297 // TracingSessionImpl is the RAII object returned to API clients when they
298 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
299 // tracing.
300 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id)301 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
302     TracingMuxerImpl* muxer,
303     TracingSessionGlobalID session_id)
304     : muxer_(muxer), session_id_(session_id) {}
305 
306 // Can be destroyed from any thread.
~TracingSessionImpl()307 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
308   auto* muxer = muxer_;
309   auto session_id = session_id_;
310   muxer->task_runner_->PostTask(
311       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
312 }
313 
314 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)315 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
316                                                  int fd) {
317   auto* muxer = muxer_;
318   auto session_id = session_id_;
319   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
320   if (fd >= 0) {
321     trace_config->set_write_into_file(true);
322     fd = dup(fd);
323   }
324   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
325     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
326   });
327 }
328 
329 // Can be called from any thread.
Start()330 void TracingMuxerImpl::TracingSessionImpl::Start() {
331   auto* muxer = muxer_;
332   auto session_id = session_id_;
333   muxer->task_runner_->PostTask(
334       [muxer, session_id] { muxer->StartTracingSession(session_id); });
335 }
336 
337 // Can be called from any thread except the service thread.
StartBlocking()338 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
339   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
340   auto* muxer = muxer_;
341   auto session_id = session_id_;
342   base::WaitableEvent tracing_started;
343   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
344     auto* consumer = muxer->FindConsumer(session_id);
345     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
346     consumer->blocking_start_complete_callback_ = [&] {
347       tracing_started.Notify();
348     };
349     muxer->StartTracingSession(session_id);
350   });
351   tracing_started.Wait();
352 }
353 
354 // Can be called from any thread.
Stop()355 void TracingMuxerImpl::TracingSessionImpl::Stop() {
356   auto* muxer = muxer_;
357   auto session_id = session_id_;
358   muxer->task_runner_->PostTask(
359       [muxer, session_id] { muxer->StopTracingSession(session_id); });
360 }
361 
362 // Can be called from any thread except the service thread.
StopBlocking()363 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
364   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
365   auto* muxer = muxer_;
366   auto session_id = session_id_;
367   base::WaitableEvent tracing_stopped;
368   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
369     auto* consumer = muxer->FindConsumer(session_id);
370     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
371     consumer->blocking_stop_complete_callback_ = [&] {
372       tracing_stopped.Notify();
373     };
374     muxer->StopTracingSession(session_id);
375   });
376   tracing_stopped.Wait();
377 }
378 
379 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)380 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
381   auto* muxer = muxer_;
382   auto session_id = session_id_;
383   muxer->task_runner_->PostTask([muxer, session_id, cb] {
384     muxer->ReadTracingSessionData(session_id, std::move(cb));
385   });
386 }
387 
388 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)389 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
390     std::function<void()> cb) {
391   auto* muxer = muxer_;
392   auto session_id = session_id_;
393   muxer->task_runner_->PostTask([muxer, session_id, cb] {
394     auto* consumer = muxer->FindConsumer(session_id);
395     consumer->stop_complete_callback_ = cb;
396   });
397 }
398 // ----- End of TracingMuxerImpl::TracingSessionImpl
399 
400 // static
401 TracingMuxer* TracingMuxer::instance_ = nullptr;
402 
403 // This is called by perfetto::Tracing::Initialize().
404 // Can be called on any thread. Typically, but not necessarily, that will be
405 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)406 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
407     : TracingMuxer(args.platform ? args.platform
408                                  : Platform::GetDefaultPlatform()) {
409   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
410 
411   // Create the thread where muxer, producers and service will live.
412   task_runner_ = platform_->CreateTaskRunner({});
413 
414   // Run the initializer on that thread.
415   task_runner_->PostTask([this, args] { Initialize(args); });
416 }
417 
Initialize(const TracingInitArgs & args)418 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
419   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
420 
421   auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
422     if (!backend) {
423       // We skip the log in release builds because the *_backend_fake.cc code
424       // has already an ELOG before returning a nullptr.
425       PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
426       return;
427     }
428     TracingBackendId backend_id = backends_.size();
429     backends_.emplace_back();
430     RegisteredBackend& rb = backends_.back();
431     rb.backend = backend;
432     rb.id = backend_id;
433     rb.type = type;
434     rb.producer.reset(new ProducerImpl(this, backend_id));
435     TracingBackend::ConnectProducerArgs conn_args;
436     conn_args.producer = rb.producer.get();
437     conn_args.producer_name = platform_->GetCurrentProcessName();
438     conn_args.task_runner = task_runner_.get();
439     conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
440     conn_args.shmem_page_size_hint_bytes = args.shmem_page_size_hint_kb * 1024;
441     rb.producer->Initialize(rb.backend->ConnectProducer(conn_args));
442   };
443 
444   if (args.backends & kSystemBackend) {
445     PERFETTO_CHECK(args.system_backend_factory_);
446     add_backend(args.system_backend_factory_(), kSystemBackend);
447   }
448 
449   if (args.backends & kInProcessBackend) {
450     PERFETTO_CHECK(args.in_process_backend_factory_);
451     add_backend(args.in_process_backend_factory_(), kInProcessBackend);
452   }
453 
454   if (args.backends & kCustomBackend) {
455     PERFETTO_CHECK(args.custom_backend);
456     add_backend(args.custom_backend, kCustomBackend);
457   }
458 
459   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
460     PERFETTO_FATAL("Unsupported tracing backend type");
461   }
462 }
463 
464 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)465 bool TracingMuxerImpl::RegisterDataSource(
466     const DataSourceDescriptor& descriptor,
467     DataSourceFactory factory,
468     DataSourceStaticState* static_state) {
469   // Ignore repeated registrations.
470   if (static_state->index != kMaxDataSources)
471     return true;
472 
473   static std::atomic<uint32_t> last_id{};
474   uint32_t new_index = last_id++;
475   if (new_index >= kMaxDataSources) {
476     PERFETTO_DLOG(
477         "RegisterDataSource failed: too many data sources already registered");
478     return false;
479   }
480 
481   // Initialize the static state.
482   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
483                 "instances[] size mismatch");
484   for (size_t i = 0; i < static_state->instances.size(); i++)
485     new (&static_state->instances[i]) DataSourceState{};
486 
487   static_state->index = new_index;
488 
489   task_runner_->PostTask([this, descriptor, factory, static_state] {
490     data_sources_.emplace_back();
491     RegisteredDataSource& rds = data_sources_.back();
492     rds.descriptor = descriptor;
493     rds.factory = factory;
494     rds.static_state = static_state;
495     UpdateDataSourcesOnAllBackends();
496   });
497   return true;
498 }
499 
500 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)501 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
502                                        DataSourceInstanceID instance_id,
503                                        const DataSourceConfig& cfg) {
504   PERFETTO_DCHECK_THREAD(thread_checker_);
505   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
506                 cfg.name().c_str());
507   uint64_t config_hash = ComputeConfigHash(cfg);
508 
509   for (const auto& rds : data_sources_) {
510     if (rds.descriptor.name() != cfg.name())
511       continue;
512     DataSourceStaticState& static_state = *rds.static_state;
513 
514     // If this data source is already active for this exact config, don't start
515     // another instance. This happens when we have several data sources with the
516     // same name, in which case the service sends one SetupDataSource event for
517     // each one. Since we can't map which event maps to which data source, we
518     // ensure each event only starts one data source instance.
519     // TODO(skyostil): Register a unique id with each data source to the service
520     // to disambiguate.
521     bool active_for_config = false;
522     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
523       if (!static_state.TryGet(i))
524         continue;
525       auto* internal_state =
526           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
527       if (internal_state->backend_id == backend_id &&
528           internal_state->config_hash == config_hash) {
529         active_for_config = true;
530         break;
531       }
532     }
533     if (active_for_config) {
534       PERFETTO_DLOG(
535           "Data source %s is already active with this config, skipping",
536           cfg.name().c_str());
537       continue;
538     }
539 
540     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
541       // Find a free slot.
542       if (static_state.TryGet(i))
543         continue;
544 
545       auto* internal_state =
546           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
547       std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
548       static_assert(
549           std::is_same<decltype(internal_state->data_source_instance_id),
550                        DataSourceInstanceID>::value,
551           "data_source_instance_id type mismatch");
552       internal_state->backend_id = backend_id;
553       internal_state->data_source_instance_id = instance_id;
554       internal_state->buffer_id =
555           static_cast<internal::BufferId>(cfg.target_buffer());
556       internal_state->config_hash = config_hash;
557       internal_state->data_source = rds.factory();
558 
559       // This must be made at the end. See matching acquire-load in
560       // DataSource::Trace().
561       static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
562 
563       DataSourceBase::SetupArgs setup_args;
564       setup_args.config = &cfg;
565       setup_args.internal_instance_index = i;
566       internal_state->data_source->OnSetup(setup_args);
567       return;
568     }
569     PERFETTO_ELOG(
570         "Maximum number of data source instances exhausted. "
571         "Dropping data source %" PRIu64,
572         instance_id);
573     break;
574   }
575 }
576 
577 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)578 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
579                                        DataSourceInstanceID instance_id) {
580   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
581   PERFETTO_DCHECK_THREAD(thread_checker_);
582 
583   auto ds = FindDataSource(backend_id, instance_id);
584   if (!ds) {
585     PERFETTO_ELOG("Could not find data source to start");
586     return;
587   }
588 
589   DataSourceBase::StartArgs start_args{};
590   start_args.internal_instance_index = ds.instance_idx;
591 
592   std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
593   ds.internal_state->trace_lambda_enabled = true;
594   ds.internal_state->data_source->OnStart(start_args);
595 }
596 
597 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)598 void TracingMuxerImpl::StopDataSource_AsyncBegin(
599     TracingBackendId backend_id,
600     DataSourceInstanceID instance_id) {
601   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
602   PERFETTO_DCHECK_THREAD(thread_checker_);
603 
604   auto ds = FindDataSource(backend_id, instance_id);
605   if (!ds) {
606     PERFETTO_ELOG("Could not find data source to stop");
607     return;
608   }
609 
610   StopArgsImpl stop_args{};
611   stop_args.internal_instance_index = ds.instance_idx;
612   stop_args.async_stop_closure = [this, backend_id, instance_id] {
613     // TracingMuxerImpl is long lived, capturing |this| is okay.
614     // The notification closure can be moved out of the StopArgs by the
615     // embedder to handle stop asynchronously. The embedder might then
616     // call the closure on a different thread than the current one, hence
617     // this nested PostTask().
618     task_runner_->PostTask([this, backend_id, instance_id] {
619       StopDataSource_AsyncEnd(backend_id, instance_id);
620     });
621   };
622 
623   {
624     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
625     ds.internal_state->data_source->OnStop(stop_args);
626   }
627 
628   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
629   // async closure here. In theory we could avoid the PostTask and call
630   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
631   // divergencies between the deferred-stop vs non-deferred-stop code paths.
632   if (stop_args.async_stop_closure)
633     std::move(stop_args.async_stop_closure)();
634 }
635 
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)636 void TracingMuxerImpl::StopDataSource_AsyncEnd(
637     TracingBackendId backend_id,
638     DataSourceInstanceID instance_id) {
639   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
640   PERFETTO_DCHECK_THREAD(thread_checker_);
641 
642   auto ds = FindDataSource(backend_id, instance_id);
643   if (!ds) {
644     PERFETTO_ELOG(
645         "Async stop of data source %" PRIu64
646         " failed. This might be due to calling the async_stop_closure twice.",
647         instance_id);
648     return;
649   }
650 
651   const uint32_t mask = ~(1 << ds.instance_idx);
652   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
653 
654   // Take the mutex to prevent that the data source is in the middle of
655   // a Trace() execution where it called GetDataSourceLocked() while we
656   // destroy it.
657   {
658     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
659     ds.internal_state->trace_lambda_enabled = false;
660     ds.internal_state->data_source.reset();
661   }
662 
663   // The other fields of internal_state are deliberately *not* cleared.
664   // See races-related comments of DataSource::Trace().
665 
666   TracingMuxer::generation_++;
667 
668   // |backends_| is append-only, Backend instances are always valid.
669   PERFETTO_CHECK(backend_id < backends_.size());
670   ProducerImpl* producer = backends_[backend_id].producer.get();
671   if (producer && producer->connected_)
672     producer->service_->NotifyDataSourceStopped(instance_id);
673 }
674 
DestroyStoppedTraceWritersForCurrentThread()675 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
676   // Iterate across all possible data source types.
677   auto cur_generation = generation_.load(std::memory_order_acquire);
678   auto* root_tls = GetOrCreateTracingTLS();
679 
680   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
681     // |tls| has a vector of per-data-source-instance thread-local state.
682     DataSourceStaticState* static_state = tls.static_state;
683     if (!static_state)
684       return;  // Slot not used.
685 
686     // Iterate across all possible instances for this data source.
687     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
688       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
689       if (!ds_tls.trace_writer)
690         continue;
691 
692       DataSourceState* ds_state = static_state->TryGet(inst);
693       if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
694           ds_state->buffer_id == ds_tls.buffer_id) {
695         continue;
696       }
697 
698       // The DataSource instance has been destroyed or recycled.
699       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
700     }
701   };
702 
703   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
704     // |tls| has a vector of per-data-source-instance thread-local state.
705     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
706     destroy_stopped_instances(tls);
707   }
708   destroy_stopped_instances(root_tls->track_event_tls);
709   root_tls->generation = cur_generation;
710 }
711 
712 // Called both when a new data source is registered or when a new backend
713 // connects. In both cases we want to be sure we reflected the data source
714 // registrations on the backends.
UpdateDataSourcesOnAllBackends()715 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
716   PERFETTO_DCHECK_THREAD(thread_checker_);
717   for (RegisteredDataSource& rds : data_sources_) {
718     for (RegisteredBackend& backend : backends_) {
719       // We cannot call RegisterDataSource on the backend before it connects.
720       if (!backend.producer->connected_)
721         continue;
722 
723       PERFETTO_DCHECK(rds.static_state->index < kMaxDataSourceInstances);
724       if (backend.producer->registered_data_sources_.test(
725               rds.static_state->index))
726         continue;
727 
728       rds.descriptor.set_will_notify_on_start(true);
729       rds.descriptor.set_will_notify_on_stop(true);
730       backend.producer->service_->RegisterDataSource(rds.descriptor);
731       backend.producer->registered_data_sources_.set(rds.static_state->index);
732     }
733   }
734 }
735 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)736 void TracingMuxerImpl::SetupTracingSession(
737     TracingSessionGlobalID session_id,
738     const std::shared_ptr<TraceConfig>& trace_config,
739     base::ScopedFile trace_fd) {
740   PERFETTO_DCHECK_THREAD(thread_checker_);
741   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
742 
743   auto* consumer = FindConsumer(session_id);
744   if (!consumer)
745     return;
746 
747   consumer->trace_config_ = trace_config;
748   if (trace_fd)
749     consumer->trace_fd_ = std::move(trace_fd);
750 
751   if (!consumer->connected_)
752     return;
753 
754   // Only used in the deferred start mode.
755   if (trace_config->deferred_start()) {
756     consumer->service_->EnableTracing(*trace_config,
757                                       std::move(consumer->trace_fd_));
758   }
759 }
760 
StartTracingSession(TracingSessionGlobalID session_id)761 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
762   PERFETTO_DCHECK_THREAD(thread_checker_);
763 
764   auto* consumer = FindConsumer(session_id);
765 
766   if (!consumer)
767     return;
768 
769   if (!consumer->trace_config_) {
770     PERFETTO_ELOG("Must call Setup(config) first");
771     return;
772   }
773 
774   if (!consumer->connected_) {
775     consumer->start_pending_ = true;
776     return;
777   }
778 
779   consumer->start_pending_ = false;
780   if (consumer->trace_config_->deferred_start()) {
781     consumer->service_->StartTracing();
782   } else {
783     consumer->service_->EnableTracing(*consumer->trace_config_,
784                                       std::move(consumer->trace_fd_));
785   }
786 
787   // TODO implement support for the deferred-start + fast-triggering case.
788 }
789 
StopTracingSession(TracingSessionGlobalID session_id)790 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
791   PERFETTO_DCHECK_THREAD(thread_checker_);
792   auto* consumer = FindConsumer(session_id);
793   if (!consumer)
794     return;
795 
796   if (consumer->start_pending_) {
797     // If the session hasn't started yet, wait until it does before stopping.
798     consumer->stop_pending_ = true;
799     return;
800   }
801 
802   consumer->stop_pending_ = false;
803   if (consumer->stopped_) {
804     // If the session was already stopped (e.g., it failed to start), don't try
805     // stopping again.
806     consumer->NotifyStopComplete();
807   } else if (!consumer->trace_config_) {
808     PERFETTO_ELOG("Must call Setup(config) and Start() first");
809     return;
810   } else {
811     consumer->service_->DisableTracing();
812   }
813 
814   consumer->trace_config_.reset();
815 }
816 
DestroyTracingSession(TracingSessionGlobalID session_id)817 void TracingMuxerImpl::DestroyTracingSession(
818     TracingSessionGlobalID session_id) {
819   PERFETTO_DCHECK_THREAD(thread_checker_);
820   for (RegisteredBackend& backend : backends_) {
821     // We need to find the consumer (if any) and call Disconnect as we destroy
822     // the tracing session. We can't call Disconnect() inside this for loop
823     // because in the in-process case this will end up to a synchronous call to
824     // OnConsumerDisconnect which will invalidate all the iterators to
825     // |backend.consumers|.
826     ConsumerImpl* consumer = nullptr;
827     for (auto& con : backend.consumers) {
828       if (con->session_id_ == session_id) {
829         consumer = con.get();
830         break;
831       }
832     }
833     if (consumer) {
834       // We broke out of the loop above on the assumption that each backend will
835       // only have a single consumer per session. This DCHECK ensures that
836       // this is the case.
837       PERFETTO_DCHECK(
838           std::count_if(backend.consumers.begin(), backend.consumers.end(),
839                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
840                           return con->session_id_ == session_id;
841                         }) == 1u);
842       consumer->Disconnect();
843     }
844   }
845 }
846 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)847 void TracingMuxerImpl::ReadTracingSessionData(
848     TracingSessionGlobalID session_id,
849     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
850   PERFETTO_DCHECK_THREAD(thread_checker_);
851   auto* consumer = FindConsumer(session_id);
852   if (!consumer)
853     return;
854   PERFETTO_DCHECK(!consumer->read_trace_callback_);
855   consumer->read_trace_callback_ = std::move(callback);
856   consumer->service_->ReadBuffers();
857 }
858 
FindConsumer(TracingSessionGlobalID session_id)859 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
860     TracingSessionGlobalID session_id) {
861   PERFETTO_DCHECK_THREAD(thread_checker_);
862   for (RegisteredBackend& backend : backends_) {
863     for (auto& consumer : backend.consumers) {
864       if (consumer->session_id_ == session_id) {
865         PERFETTO_DCHECK(consumer->service_);
866         return consumer.get();
867       }
868     }
869   }
870   return nullptr;
871 }
872 
OnConsumerDisconnected(ConsumerImpl * consumer)873 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
874   PERFETTO_DCHECK_THREAD(thread_checker_);
875   for (RegisteredBackend& backend : backends_) {
876     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
877       return con.get() == consumer;
878     };
879     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
880                                            backend.consumers.end(), pred),
881                             backend.consumers.end());
882   }
883 }
884 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)885 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
886     TracingBackendId backend_id,
887     DataSourceInstanceID instance_id) {
888   PERFETTO_DCHECK_THREAD(thread_checker_);
889   for (const auto& rds : data_sources_) {
890     DataSourceStaticState* static_state = rds.static_state;
891     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
892       auto* internal_state = static_state->TryGet(i);
893       if (internal_state && internal_state->backend_id == backend_id &&
894           internal_state->data_source_instance_id == instance_id) {
895         return FindDataSourceRes(static_state, internal_state, i);
896       }
897     }
898   }
899   return FindDataSourceRes();
900 }
901 
902 // Can be called from any thread.
CreateTraceWriter(DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)903 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
904     DataSourceState* data_source,
905     BufferExhaustedPolicy buffer_exhausted_policy) {
906   ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
907   return producer->service_->CreateTraceWriter(data_source->buffer_id,
908                                                buffer_exhausted_policy);
909 }
910 
911 // This is called via the public API Tracing::NewTrace().
912 // Can be called from any thread.
CreateTracingSession(BackendType backend_type)913 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
914     BackendType backend_type) {
915   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
916 
917   // |backend_type| can only specify one backend, not an OR-ed mask.
918   PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
919 
920   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
921   task_runner_->PostTask([this, backend_type, session_id] {
922     for (RegisteredBackend& backend : backends_) {
923       if (backend_type && backend.type != backend_type)
924         continue;
925 
926       backend.consumers.emplace_back(
927           new ConsumerImpl(this, backend.id, session_id));
928       auto& consumer = backend.consumers.back();
929       TracingBackend::ConnectConsumerArgs conn_args;
930       conn_args.consumer = consumer.get();
931       conn_args.task_runner = task_runner_.get();
932       consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
933       return;
934     }
935     PERFETTO_ELOG(
936         "Cannot create tracing session, no tracing backend ready for type=%d",
937         backend_type);
938   });
939 
940   return std::unique_ptr<TracingSession>(
941       new TracingSessionImpl(this, session_id));
942 }
943 
InitializeInstance(const TracingInitArgs & args)944 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
945   if (instance_)
946     PERFETTO_FATAL("Tracing already initialized");
947   instance_ = new TracingMuxerImpl(args);
948 }
949 
950 TracingMuxer::~TracingMuxer() = default;
951 
952 static_assert(std::is_same<internal::BufferId, BufferID>::value,
953               "public's BufferId and tracing/core's BufferID diverged");
954 
955 }  // namespace internal
956 }  // namespace perfetto
957