1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/ext/base/hash.h"
28 #include "perfetto/ext/base/thread_checker.h"
29 #include "perfetto/ext/base/waitable_event.h"
30 #include "perfetto/ext/tracing/core/trace_packet.h"
31 #include "perfetto/ext/tracing/core/trace_writer.h"
32 #include "perfetto/ext/tracing/core/tracing_service.h"
33 #include "perfetto/tracing/buffer_exhausted_policy.h"
34 #include "perfetto/tracing/core/data_source_config.h"
35 #include "perfetto/tracing/data_source.h"
36 #include "perfetto/tracing/internal/data_source_internal.h"
37 #include "perfetto/tracing/trace_writer_base.h"
38 #include "perfetto/tracing/tracing.h"
39 #include "perfetto/tracing/tracing_backend.h"
40
41 namespace perfetto {
42 namespace internal {
43
44 namespace {
45
46 class StopArgsImpl : public DataSourceBase::StopArgs {
47 public:
HandleStopAsynchronously() const48 std::function<void()> HandleStopAsynchronously() const override {
49 auto closure = std::move(async_stop_closure);
50 async_stop_closure = std::function<void()>();
51 return closure;
52 }
53
54 mutable std::function<void()> async_stop_closure;
55 };
56
ComputeConfigHash(const DataSourceConfig & config)57 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
58 base::Hash hasher;
59 std::string config_bytes = config.SerializeAsString();
60 hasher.Update(config_bytes.data(), config_bytes.size());
61 return hasher.digest();
62 }
63
64 } // namespace
65
66 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id)67 TracingMuxerImpl::ProducerImpl::ProducerImpl(TracingMuxerImpl* muxer,
68 TracingBackendId backend_id)
69 : muxer_(muxer), backend_id_(backend_id) {}
70 TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
71
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)72 void TracingMuxerImpl::ProducerImpl::Initialize(
73 std::unique_ptr<ProducerEndpoint> endpoint) {
74 service_ = std::move(endpoint);
75 }
76
OnConnect()77 void TracingMuxerImpl::ProducerImpl::OnConnect() {
78 PERFETTO_DLOG("Producer connected");
79 PERFETTO_DCHECK_THREAD(thread_checker_);
80 PERFETTO_DCHECK(!connected_);
81 connected_ = true;
82 muxer_->UpdateDataSourcesOnAllBackends();
83 }
84
OnDisconnect()85 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
86 PERFETTO_DCHECK_THREAD(thread_checker_);
87 connected_ = false;
88 // TODO: handle more graceful.
89 PERFETTO_ELOG("Cannot connect to traced. Is it running?");
90 }
91
OnTracingSetup()92 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
93 PERFETTO_DCHECK_THREAD(thread_checker_);
94 }
95
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)96 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
97 DataSourceInstanceID id,
98 const DataSourceConfig& cfg) {
99 PERFETTO_DCHECK_THREAD(thread_checker_);
100 muxer_->SetupDataSource(backend_id_, id, cfg);
101 }
102
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)103 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
104 const DataSourceConfig&) {
105 PERFETTO_DCHECK_THREAD(thread_checker_);
106 muxer_->StartDataSource(backend_id_, id);
107 service_->NotifyDataSourceStarted(id);
108 }
109
StopDataSource(DataSourceInstanceID id)110 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
111 PERFETTO_DCHECK_THREAD(thread_checker_);
112 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
113 }
114
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)115 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
116 const DataSourceInstanceID*,
117 size_t) {
118 // Flush is not plumbed for now, we just ack straight away.
119 PERFETTO_DCHECK_THREAD(thread_checker_);
120 service_->NotifyFlushComplete(flush_id);
121 }
122
ClearIncrementalState(const DataSourceInstanceID *,size_t)123 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
124 const DataSourceInstanceID*,
125 size_t) {
126 PERFETTO_DCHECK_THREAD(thread_checker_);
127 // TODO(skyostil): Mark each affected data source's incremental state as
128 // needing to be cleared.
129 }
130 // ----- End of TracingMuxerImpl::ProducerImpl methods.
131
132 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,TracingSessionGlobalID session_id)133 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
134 TracingBackendId backend_id,
135 TracingSessionGlobalID session_id)
136 : muxer_(muxer), backend_id_(backend_id), session_id_(session_id) {}
137
138 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
139
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)140 void TracingMuxerImpl::ConsumerImpl::Initialize(
141 std::unique_ptr<ConsumerEndpoint> endpoint) {
142 PERFETTO_DCHECK_THREAD(thread_checker_);
143 service_ = std::move(endpoint);
144 // Observe data source instance events so we get notified when tracing starts.
145 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
146 }
147
OnConnect()148 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
149 PERFETTO_DCHECK_THREAD(thread_checker_);
150 PERFETTO_DCHECK(!connected_);
151 connected_ = true;
152
153 // If the API client configured and started tracing before we connected,
154 // tell the backend about it now.
155 if (trace_config_) {
156 muxer_->SetupTracingSession(session_id_, trace_config_);
157 if (start_pending_)
158 muxer_->StartTracingSession(session_id_);
159 if (stop_pending_)
160 muxer_->StopTracingSession(session_id_);
161 }
162 }
163
OnDisconnect()164 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
165 PERFETTO_DCHECK_THREAD(thread_checker_);
166 // It shouldn't be necessary to call StopTracingSession. If we get this call
167 // it means that the service did shutdown before us, so there is no point
168 // trying it to ask it to stop the session. We should just remember to cleanup
169 // the consumer vector.
170 connected_ = false;
171
172 // TODO notify the client somehow.
173
174 // Notify the muxer that it is safe to destroy |this|. This is needed because
175 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
176 // access until OnDisconnect() is called.
177 muxer_->OnConsumerDisconnected(this);
178 }
179
Disconnect()180 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
181 // This is weird and deserves a comment.
182 //
183 // When we called the ConnectConsumer method on the service it returns
184 // us a ConsumerEndpoint which we stored in |service_|, however this
185 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
186 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
187 // the ConsumerImpl pointer has to be valid until the
188 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
189 // ConsumerEndpoint |service_|. Eventually this will call
190 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
191 // call the destructor of |this|.
192 service_.reset();
193 }
194
OnTracingDisabled()195 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled() {
196 PERFETTO_DCHECK_THREAD(thread_checker_);
197 PERFETTO_DCHECK(!stopped_);
198 stopped_ = true;
199 // If we're still waiting for the start event, fire it now. This may happen if
200 // there are no active data sources in the session.
201 NotifyStartComplete();
202 NotifyStopComplete();
203 }
204
NotifyStartComplete()205 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
206 PERFETTO_DCHECK_THREAD(thread_checker_);
207 if (blocking_start_complete_callback_) {
208 muxer_->task_runner_->PostTask(
209 std::move(blocking_start_complete_callback_));
210 blocking_start_complete_callback_ = nullptr;
211 }
212 }
213
NotifyStopComplete()214 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
215 PERFETTO_DCHECK_THREAD(thread_checker_);
216 if (stop_complete_callback_) {
217 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
218 stop_complete_callback_ = nullptr;
219 }
220 if (blocking_stop_complete_callback_) {
221 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
222 blocking_stop_complete_callback_ = nullptr;
223 }
224 }
225
OnTraceData(std::vector<TracePacket> packets,bool has_more)226 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
227 std::vector<TracePacket> packets,
228 bool has_more) {
229 PERFETTO_DCHECK_THREAD(thread_checker_);
230 if (!read_trace_callback_)
231 return;
232
233 size_t capacity = 0;
234 for (const auto& packet : packets) {
235 // 16 is an over-estimation of the proto preamble size
236 capacity += packet.size() + 16;
237 }
238
239 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
240 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
241 buf->reserve(capacity);
242 for (auto& packet : packets) {
243 char* start;
244 size_t size;
245 std::tie(start, size) = packet.GetProtoPreamble();
246 buf->insert(buf->end(), start, start + size);
247 for (auto& slice : packet.slices()) {
248 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
249 buf->insert(buf->end(), slice_data, slice_data + slice.size);
250 }
251 }
252
253 auto callback = read_trace_callback_;
254 muxer_->task_runner_->PostTask([callback, buf, has_more] {
255 TracingSession::ReadTraceCallbackArgs callback_arg{};
256 callback_arg.data = buf->size() ? &(*buf)[0] : nullptr;
257 callback_arg.size = buf->size();
258 callback_arg.has_more = has_more;
259 callback(callback_arg);
260 });
261
262 if (!has_more)
263 read_trace_callback_ = nullptr;
264 }
265
OnObservableEvents(const ObservableEvents & events)266 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
267 const ObservableEvents& events) {
268 if (events.instance_state_changes_size()) {
269 for (const auto& state_change : events.instance_state_changes()) {
270 DataSourceHandle handle{state_change.producer_name(),
271 state_change.data_source_name()};
272 data_source_states_[handle] =
273 state_change.state() ==
274 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
275 }
276 // Data sources are first reported as being stopped before starting, so once
277 // all the data sources we know about have started we can declare tracing
278 // begun.
279 if (blocking_start_complete_callback_) {
280 bool all_data_sources_started = std::all_of(
281 data_source_states_.cbegin(), data_source_states_.cend(),
282 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
283 if (all_data_sources_started)
284 NotifyStartComplete();
285 }
286 }
287 }
288
289 // The callbacks below are not used.
OnDetach(bool)290 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)291 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
OnTraceStats(bool,const TraceStats &)292 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(bool, const TraceStats&) {}
293 // ----- End of TracingMuxerImpl::ConsumerImpl
294
295 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
296
297 // TracingSessionImpl is the RAII object returned to API clients when they
298 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
299 // tracing.
300
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id)301 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
302 TracingMuxerImpl* muxer,
303 TracingSessionGlobalID session_id)
304 : muxer_(muxer), session_id_(session_id) {}
305
306 // Can be destroyed from any thread.
~TracingSessionImpl()307 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
308 auto* muxer = muxer_;
309 auto session_id = session_id_;
310 muxer->task_runner_->PostTask(
311 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
312 }
313
314 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)315 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
316 int fd) {
317 auto* muxer = muxer_;
318 auto session_id = session_id_;
319 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
320 if (fd >= 0) {
321 trace_config->set_write_into_file(true);
322 fd = dup(fd);
323 }
324 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
325 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
326 });
327 }
328
329 // Can be called from any thread.
Start()330 void TracingMuxerImpl::TracingSessionImpl::Start() {
331 auto* muxer = muxer_;
332 auto session_id = session_id_;
333 muxer->task_runner_->PostTask(
334 [muxer, session_id] { muxer->StartTracingSession(session_id); });
335 }
336
337 // Can be called from any thread except the service thread.
StartBlocking()338 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
339 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
340 auto* muxer = muxer_;
341 auto session_id = session_id_;
342 base::WaitableEvent tracing_started;
343 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
344 auto* consumer = muxer->FindConsumer(session_id);
345 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
346 consumer->blocking_start_complete_callback_ = [&] {
347 tracing_started.Notify();
348 };
349 muxer->StartTracingSession(session_id);
350 });
351 tracing_started.Wait();
352 }
353
354 // Can be called from any thread.
Stop()355 void TracingMuxerImpl::TracingSessionImpl::Stop() {
356 auto* muxer = muxer_;
357 auto session_id = session_id_;
358 muxer->task_runner_->PostTask(
359 [muxer, session_id] { muxer->StopTracingSession(session_id); });
360 }
361
362 // Can be called from any thread except the service thread.
StopBlocking()363 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
364 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
365 auto* muxer = muxer_;
366 auto session_id = session_id_;
367 base::WaitableEvent tracing_stopped;
368 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
369 auto* consumer = muxer->FindConsumer(session_id);
370 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
371 consumer->blocking_stop_complete_callback_ = [&] {
372 tracing_stopped.Notify();
373 };
374 muxer->StopTracingSession(session_id);
375 });
376 tracing_stopped.Wait();
377 }
378
379 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)380 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
381 auto* muxer = muxer_;
382 auto session_id = session_id_;
383 muxer->task_runner_->PostTask([muxer, session_id, cb] {
384 muxer->ReadTracingSessionData(session_id, std::move(cb));
385 });
386 }
387
388 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)389 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
390 std::function<void()> cb) {
391 auto* muxer = muxer_;
392 auto session_id = session_id_;
393 muxer->task_runner_->PostTask([muxer, session_id, cb] {
394 auto* consumer = muxer->FindConsumer(session_id);
395 consumer->stop_complete_callback_ = cb;
396 });
397 }
398 // ----- End of TracingMuxerImpl::TracingSessionImpl
399
400 // static
401 TracingMuxer* TracingMuxer::instance_ = nullptr;
402
403 // This is called by perfetto::Tracing::Initialize().
404 // Can be called on any thread. Typically, but not necessarily, that will be
405 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)406 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
407 : TracingMuxer(args.platform ? args.platform
408 : Platform::GetDefaultPlatform()) {
409 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
410
411 // Create the thread where muxer, producers and service will live.
412 task_runner_ = platform_->CreateTaskRunner({});
413
414 // Run the initializer on that thread.
415 task_runner_->PostTask([this, args] { Initialize(args); });
416 }
417
Initialize(const TracingInitArgs & args)418 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
419 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
420
421 auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
422 if (!backend) {
423 // We skip the log in release builds because the *_backend_fake.cc code
424 // has already an ELOG before returning a nullptr.
425 PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
426 return;
427 }
428 TracingBackendId backend_id = backends_.size();
429 backends_.emplace_back();
430 RegisteredBackend& rb = backends_.back();
431 rb.backend = backend;
432 rb.id = backend_id;
433 rb.type = type;
434 rb.producer.reset(new ProducerImpl(this, backend_id));
435 TracingBackend::ConnectProducerArgs conn_args;
436 conn_args.producer = rb.producer.get();
437 conn_args.producer_name = platform_->GetCurrentProcessName();
438 conn_args.task_runner = task_runner_.get();
439 conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
440 conn_args.shmem_page_size_hint_bytes = args.shmem_page_size_hint_kb * 1024;
441 rb.producer->Initialize(rb.backend->ConnectProducer(conn_args));
442 };
443
444 if (args.backends & kSystemBackend) {
445 PERFETTO_CHECK(args.system_backend_factory_);
446 add_backend(args.system_backend_factory_(), kSystemBackend);
447 }
448
449 if (args.backends & kInProcessBackend) {
450 PERFETTO_CHECK(args.in_process_backend_factory_);
451 add_backend(args.in_process_backend_factory_(), kInProcessBackend);
452 }
453
454 if (args.backends & kCustomBackend) {
455 PERFETTO_CHECK(args.custom_backend);
456 add_backend(args.custom_backend, kCustomBackend);
457 }
458
459 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
460 PERFETTO_FATAL("Unsupported tracing backend type");
461 }
462 }
463
464 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)465 bool TracingMuxerImpl::RegisterDataSource(
466 const DataSourceDescriptor& descriptor,
467 DataSourceFactory factory,
468 DataSourceStaticState* static_state) {
469 // Ignore repeated registrations.
470 if (static_state->index != kMaxDataSources)
471 return true;
472
473 static std::atomic<uint32_t> last_id{};
474 uint32_t new_index = last_id++;
475 if (new_index >= kMaxDataSources) {
476 PERFETTO_DLOG(
477 "RegisterDataSource failed: too many data sources already registered");
478 return false;
479 }
480
481 // Initialize the static state.
482 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
483 "instances[] size mismatch");
484 for (size_t i = 0; i < static_state->instances.size(); i++)
485 new (&static_state->instances[i]) DataSourceState{};
486
487 static_state->index = new_index;
488
489 task_runner_->PostTask([this, descriptor, factory, static_state] {
490 data_sources_.emplace_back();
491 RegisteredDataSource& rds = data_sources_.back();
492 rds.descriptor = descriptor;
493 rds.factory = factory;
494 rds.static_state = static_state;
495 UpdateDataSourcesOnAllBackends();
496 });
497 return true;
498 }
499
500 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)501 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
502 DataSourceInstanceID instance_id,
503 const DataSourceConfig& cfg) {
504 PERFETTO_DCHECK_THREAD(thread_checker_);
505 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
506 cfg.name().c_str());
507 uint64_t config_hash = ComputeConfigHash(cfg);
508
509 for (const auto& rds : data_sources_) {
510 if (rds.descriptor.name() != cfg.name())
511 continue;
512 DataSourceStaticState& static_state = *rds.static_state;
513
514 // If this data source is already active for this exact config, don't start
515 // another instance. This happens when we have several data sources with the
516 // same name, in which case the service sends one SetupDataSource event for
517 // each one. Since we can't map which event maps to which data source, we
518 // ensure each event only starts one data source instance.
519 // TODO(skyostil): Register a unique id with each data source to the service
520 // to disambiguate.
521 bool active_for_config = false;
522 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
523 if (!static_state.TryGet(i))
524 continue;
525 auto* internal_state =
526 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
527 if (internal_state->backend_id == backend_id &&
528 internal_state->config_hash == config_hash) {
529 active_for_config = true;
530 break;
531 }
532 }
533 if (active_for_config) {
534 PERFETTO_DLOG(
535 "Data source %s is already active with this config, skipping",
536 cfg.name().c_str());
537 continue;
538 }
539
540 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
541 // Find a free slot.
542 if (static_state.TryGet(i))
543 continue;
544
545 auto* internal_state =
546 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
547 std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
548 static_assert(
549 std::is_same<decltype(internal_state->data_source_instance_id),
550 DataSourceInstanceID>::value,
551 "data_source_instance_id type mismatch");
552 internal_state->backend_id = backend_id;
553 internal_state->data_source_instance_id = instance_id;
554 internal_state->buffer_id =
555 static_cast<internal::BufferId>(cfg.target_buffer());
556 internal_state->config_hash = config_hash;
557 internal_state->data_source = rds.factory();
558
559 // This must be made at the end. See matching acquire-load in
560 // DataSource::Trace().
561 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
562
563 DataSourceBase::SetupArgs setup_args;
564 setup_args.config = &cfg;
565 setup_args.internal_instance_index = i;
566 internal_state->data_source->OnSetup(setup_args);
567 return;
568 }
569 PERFETTO_ELOG(
570 "Maximum number of data source instances exhausted. "
571 "Dropping data source %" PRIu64,
572 instance_id);
573 break;
574 }
575 }
576
577 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)578 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
579 DataSourceInstanceID instance_id) {
580 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
581 PERFETTO_DCHECK_THREAD(thread_checker_);
582
583 auto ds = FindDataSource(backend_id, instance_id);
584 if (!ds) {
585 PERFETTO_ELOG("Could not find data source to start");
586 return;
587 }
588
589 DataSourceBase::StartArgs start_args{};
590 start_args.internal_instance_index = ds.instance_idx;
591
592 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
593 ds.internal_state->trace_lambda_enabled = true;
594 ds.internal_state->data_source->OnStart(start_args);
595 }
596
597 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)598 void TracingMuxerImpl::StopDataSource_AsyncBegin(
599 TracingBackendId backend_id,
600 DataSourceInstanceID instance_id) {
601 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
602 PERFETTO_DCHECK_THREAD(thread_checker_);
603
604 auto ds = FindDataSource(backend_id, instance_id);
605 if (!ds) {
606 PERFETTO_ELOG("Could not find data source to stop");
607 return;
608 }
609
610 StopArgsImpl stop_args{};
611 stop_args.internal_instance_index = ds.instance_idx;
612 stop_args.async_stop_closure = [this, backend_id, instance_id] {
613 // TracingMuxerImpl is long lived, capturing |this| is okay.
614 // The notification closure can be moved out of the StopArgs by the
615 // embedder to handle stop asynchronously. The embedder might then
616 // call the closure on a different thread than the current one, hence
617 // this nested PostTask().
618 task_runner_->PostTask([this, backend_id, instance_id] {
619 StopDataSource_AsyncEnd(backend_id, instance_id);
620 });
621 };
622
623 {
624 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
625 ds.internal_state->data_source->OnStop(stop_args);
626 }
627
628 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
629 // async closure here. In theory we could avoid the PostTask and call
630 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
631 // divergencies between the deferred-stop vs non-deferred-stop code paths.
632 if (stop_args.async_stop_closure)
633 std::move(stop_args.async_stop_closure)();
634 }
635
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)636 void TracingMuxerImpl::StopDataSource_AsyncEnd(
637 TracingBackendId backend_id,
638 DataSourceInstanceID instance_id) {
639 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
640 PERFETTO_DCHECK_THREAD(thread_checker_);
641
642 auto ds = FindDataSource(backend_id, instance_id);
643 if (!ds) {
644 PERFETTO_ELOG(
645 "Async stop of data source %" PRIu64
646 " failed. This might be due to calling the async_stop_closure twice.",
647 instance_id);
648 return;
649 }
650
651 const uint32_t mask = ~(1 << ds.instance_idx);
652 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
653
654 // Take the mutex to prevent that the data source is in the middle of
655 // a Trace() execution where it called GetDataSourceLocked() while we
656 // destroy it.
657 {
658 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
659 ds.internal_state->trace_lambda_enabled = false;
660 ds.internal_state->data_source.reset();
661 }
662
663 // The other fields of internal_state are deliberately *not* cleared.
664 // See races-related comments of DataSource::Trace().
665
666 TracingMuxer::generation_++;
667
668 // |backends_| is append-only, Backend instances are always valid.
669 PERFETTO_CHECK(backend_id < backends_.size());
670 ProducerImpl* producer = backends_[backend_id].producer.get();
671 if (producer && producer->connected_)
672 producer->service_->NotifyDataSourceStopped(instance_id);
673 }
674
DestroyStoppedTraceWritersForCurrentThread()675 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
676 // Iterate across all possible data source types.
677 auto cur_generation = generation_.load(std::memory_order_acquire);
678 auto* root_tls = GetOrCreateTracingTLS();
679
680 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
681 // |tls| has a vector of per-data-source-instance thread-local state.
682 DataSourceStaticState* static_state = tls.static_state;
683 if (!static_state)
684 return; // Slot not used.
685
686 // Iterate across all possible instances for this data source.
687 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
688 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
689 if (!ds_tls.trace_writer)
690 continue;
691
692 DataSourceState* ds_state = static_state->TryGet(inst);
693 if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
694 ds_state->buffer_id == ds_tls.buffer_id) {
695 continue;
696 }
697
698 // The DataSource instance has been destroyed or recycled.
699 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
700 }
701 };
702
703 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
704 // |tls| has a vector of per-data-source-instance thread-local state.
705 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
706 destroy_stopped_instances(tls);
707 }
708 destroy_stopped_instances(root_tls->track_event_tls);
709 root_tls->generation = cur_generation;
710 }
711
712 // Called both when a new data source is registered or when a new backend
713 // connects. In both cases we want to be sure we reflected the data source
714 // registrations on the backends.
UpdateDataSourcesOnAllBackends()715 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
716 PERFETTO_DCHECK_THREAD(thread_checker_);
717 for (RegisteredDataSource& rds : data_sources_) {
718 for (RegisteredBackend& backend : backends_) {
719 // We cannot call RegisterDataSource on the backend before it connects.
720 if (!backend.producer->connected_)
721 continue;
722
723 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSourceInstances);
724 if (backend.producer->registered_data_sources_.test(
725 rds.static_state->index))
726 continue;
727
728 rds.descriptor.set_will_notify_on_start(true);
729 rds.descriptor.set_will_notify_on_stop(true);
730 backend.producer->service_->RegisterDataSource(rds.descriptor);
731 backend.producer->registered_data_sources_.set(rds.static_state->index);
732 }
733 }
734 }
735
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)736 void TracingMuxerImpl::SetupTracingSession(
737 TracingSessionGlobalID session_id,
738 const std::shared_ptr<TraceConfig>& trace_config,
739 base::ScopedFile trace_fd) {
740 PERFETTO_DCHECK_THREAD(thread_checker_);
741 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
742
743 auto* consumer = FindConsumer(session_id);
744 if (!consumer)
745 return;
746
747 consumer->trace_config_ = trace_config;
748 if (trace_fd)
749 consumer->trace_fd_ = std::move(trace_fd);
750
751 if (!consumer->connected_)
752 return;
753
754 // Only used in the deferred start mode.
755 if (trace_config->deferred_start()) {
756 consumer->service_->EnableTracing(*trace_config,
757 std::move(consumer->trace_fd_));
758 }
759 }
760
StartTracingSession(TracingSessionGlobalID session_id)761 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
762 PERFETTO_DCHECK_THREAD(thread_checker_);
763
764 auto* consumer = FindConsumer(session_id);
765
766 if (!consumer)
767 return;
768
769 if (!consumer->trace_config_) {
770 PERFETTO_ELOG("Must call Setup(config) first");
771 return;
772 }
773
774 if (!consumer->connected_) {
775 consumer->start_pending_ = true;
776 return;
777 }
778
779 consumer->start_pending_ = false;
780 if (consumer->trace_config_->deferred_start()) {
781 consumer->service_->StartTracing();
782 } else {
783 consumer->service_->EnableTracing(*consumer->trace_config_,
784 std::move(consumer->trace_fd_));
785 }
786
787 // TODO implement support for the deferred-start + fast-triggering case.
788 }
789
StopTracingSession(TracingSessionGlobalID session_id)790 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
791 PERFETTO_DCHECK_THREAD(thread_checker_);
792 auto* consumer = FindConsumer(session_id);
793 if (!consumer)
794 return;
795
796 if (consumer->start_pending_) {
797 // If the session hasn't started yet, wait until it does before stopping.
798 consumer->stop_pending_ = true;
799 return;
800 }
801
802 consumer->stop_pending_ = false;
803 if (consumer->stopped_) {
804 // If the session was already stopped (e.g., it failed to start), don't try
805 // stopping again.
806 consumer->NotifyStopComplete();
807 } else if (!consumer->trace_config_) {
808 PERFETTO_ELOG("Must call Setup(config) and Start() first");
809 return;
810 } else {
811 consumer->service_->DisableTracing();
812 }
813
814 consumer->trace_config_.reset();
815 }
816
DestroyTracingSession(TracingSessionGlobalID session_id)817 void TracingMuxerImpl::DestroyTracingSession(
818 TracingSessionGlobalID session_id) {
819 PERFETTO_DCHECK_THREAD(thread_checker_);
820 for (RegisteredBackend& backend : backends_) {
821 // We need to find the consumer (if any) and call Disconnect as we destroy
822 // the tracing session. We can't call Disconnect() inside this for loop
823 // because in the in-process case this will end up to a synchronous call to
824 // OnConsumerDisconnect which will invalidate all the iterators to
825 // |backend.consumers|.
826 ConsumerImpl* consumer = nullptr;
827 for (auto& con : backend.consumers) {
828 if (con->session_id_ == session_id) {
829 consumer = con.get();
830 break;
831 }
832 }
833 if (consumer) {
834 // We broke out of the loop above on the assumption that each backend will
835 // only have a single consumer per session. This DCHECK ensures that
836 // this is the case.
837 PERFETTO_DCHECK(
838 std::count_if(backend.consumers.begin(), backend.consumers.end(),
839 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
840 return con->session_id_ == session_id;
841 }) == 1u);
842 consumer->Disconnect();
843 }
844 }
845 }
846
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)847 void TracingMuxerImpl::ReadTracingSessionData(
848 TracingSessionGlobalID session_id,
849 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
850 PERFETTO_DCHECK_THREAD(thread_checker_);
851 auto* consumer = FindConsumer(session_id);
852 if (!consumer)
853 return;
854 PERFETTO_DCHECK(!consumer->read_trace_callback_);
855 consumer->read_trace_callback_ = std::move(callback);
856 consumer->service_->ReadBuffers();
857 }
858
FindConsumer(TracingSessionGlobalID session_id)859 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
860 TracingSessionGlobalID session_id) {
861 PERFETTO_DCHECK_THREAD(thread_checker_);
862 for (RegisteredBackend& backend : backends_) {
863 for (auto& consumer : backend.consumers) {
864 if (consumer->session_id_ == session_id) {
865 PERFETTO_DCHECK(consumer->service_);
866 return consumer.get();
867 }
868 }
869 }
870 return nullptr;
871 }
872
OnConsumerDisconnected(ConsumerImpl * consumer)873 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
874 PERFETTO_DCHECK_THREAD(thread_checker_);
875 for (RegisteredBackend& backend : backends_) {
876 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
877 return con.get() == consumer;
878 };
879 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
880 backend.consumers.end(), pred),
881 backend.consumers.end());
882 }
883 }
884
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)885 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
886 TracingBackendId backend_id,
887 DataSourceInstanceID instance_id) {
888 PERFETTO_DCHECK_THREAD(thread_checker_);
889 for (const auto& rds : data_sources_) {
890 DataSourceStaticState* static_state = rds.static_state;
891 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
892 auto* internal_state = static_state->TryGet(i);
893 if (internal_state && internal_state->backend_id == backend_id &&
894 internal_state->data_source_instance_id == instance_id) {
895 return FindDataSourceRes(static_state, internal_state, i);
896 }
897 }
898 }
899 return FindDataSourceRes();
900 }
901
902 // Can be called from any thread.
CreateTraceWriter(DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)903 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
904 DataSourceState* data_source,
905 BufferExhaustedPolicy buffer_exhausted_policy) {
906 ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
907 return producer->service_->CreateTraceWriter(data_source->buffer_id,
908 buffer_exhausted_policy);
909 }
910
911 // This is called via the public API Tracing::NewTrace().
912 // Can be called from any thread.
CreateTracingSession(BackendType backend_type)913 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
914 BackendType backend_type) {
915 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
916
917 // |backend_type| can only specify one backend, not an OR-ed mask.
918 PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
919
920 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
921 task_runner_->PostTask([this, backend_type, session_id] {
922 for (RegisteredBackend& backend : backends_) {
923 if (backend_type && backend.type != backend_type)
924 continue;
925
926 backend.consumers.emplace_back(
927 new ConsumerImpl(this, backend.id, session_id));
928 auto& consumer = backend.consumers.back();
929 TracingBackend::ConnectConsumerArgs conn_args;
930 conn_args.consumer = consumer.get();
931 conn_args.task_runner = task_runner_.get();
932 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
933 return;
934 }
935 PERFETTO_ELOG(
936 "Cannot create tracing session, no tracing backend ready for type=%d",
937 backend_type);
938 });
939
940 return std::unique_ptr<TracingSession>(
941 new TracingSessionImpl(this, session_id));
942 }
943
InitializeInstance(const TracingInitArgs & args)944 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
945 if (instance_)
946 PERFETTO_FATAL("Tracing already initialized");
947 instance_ = new TracingMuxerImpl(args);
948 }
949
950 TracingMuxer::~TracingMuxer() = default;
951
952 static_assert(std::is_same<internal::BufferId, BufferID>::value,
953 "public's BufferId and tracing/core's BufferID diverged");
954
955 } // namespace internal
956 } // namespace perfetto
957