1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/base/time.h"
28 #include "perfetto/ext/base/hash.h"
29 #include "perfetto/ext/base/thread_checker.h"
30 #include "perfetto/ext/base/waitable_event.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/ext/tracing/core/trace_packet.h"
33 #include "perfetto/ext/tracing/core/trace_stats.h"
34 #include "perfetto/ext/tracing/core/trace_writer.h"
35 #include "perfetto/ext/tracing/core/tracing_service.h"
36 #include "perfetto/tracing/buffer_exhausted_policy.h"
37 #include "perfetto/tracing/core/data_source_config.h"
38 #include "perfetto/tracing/core/tracing_service_state.h"
39 #include "perfetto/tracing/data_source.h"
40 #include "perfetto/tracing/internal/data_source_internal.h"
41 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
42 #include "perfetto/tracing/internal/tracing_backend_fake.h"
43 #include "perfetto/tracing/trace_writer_base.h"
44 #include "perfetto/tracing/tracing.h"
45 #include "perfetto/tracing/tracing_backend.h"
46
47 #include "protos/perfetto/config/interceptor_config.gen.h"
48
49 #include "src/tracing/internal/tracing_muxer_fake.h"
50
51 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
52 #include <io.h> // For dup()
53 #else
54 #include <unistd.h> // For dup()
55 #endif
56
57 namespace perfetto {
58 namespace internal {
59
60 namespace {
61
62 // A task runner which prevents calls to DataSource::Trace() while an operation
63 // is in progress. Used to guard against unexpected re-entrancy where the
64 // user-provided task runner implementation tries to enter a trace point under
65 // the hood.
66 class NonReentrantTaskRunner : public base::TaskRunner {
67 public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)68 NonReentrantTaskRunner(TracingMuxer* muxer,
69 std::unique_ptr<base::TaskRunner> task_runner)
70 : muxer_(muxer), task_runner_(std::move(task_runner)) {}
71
72 // base::TaskRunner implementation.
PostTask(std::function<void ()> task)73 void PostTask(std::function<void()> task) override {
74 CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
75 }
76
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)77 void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
78 CallWithGuard(
79 [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
80 }
81
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)82 void AddFileDescriptorWatch(base::PlatformHandle fd,
83 std::function<void()> callback) override {
84 CallWithGuard(
85 [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
86 }
87
RemoveFileDescriptorWatch(base::PlatformHandle fd)88 void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
89 CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
90 }
91
RunsTasksOnCurrentThread() const92 bool RunsTasksOnCurrentThread() const override {
93 bool result;
94 CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
95 return result;
96 }
97
98 private:
99 template <typename T>
CallWithGuard(T lambda) const100 void CallWithGuard(T lambda) const {
101 auto* root_tls = muxer_->GetOrCreateTracingTLS();
102 if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
103 lambda();
104 return;
105 }
106 ScopedReentrancyAnnotator scoped_annotator(*root_tls);
107 lambda();
108 }
109
110 TracingMuxer* const muxer_;
111 std::unique_ptr<base::TaskRunner> task_runner_;
112 };
113
114 class StopArgsImpl : public DataSourceBase::StopArgs {
115 public:
HandleStopAsynchronously() const116 std::function<void()> HandleStopAsynchronously() const override {
117 auto closure = std::move(async_stop_closure);
118 async_stop_closure = std::function<void()>();
119 return closure;
120 }
121
122 mutable std::function<void()> async_stop_closure;
123 };
124
ComputeConfigHash(const DataSourceConfig & config)125 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
126 base::Hash hasher;
127 std::string config_bytes = config.SerializeAsString();
128 hasher.Update(config_bytes.data(), config_bytes.size());
129 return hasher.digest();
130 }
131
132 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
133 static TracingMuxerImpl* g_prev_instance{};
134
135 } // namespace
136
137 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)138 TracingMuxerImpl::ProducerImpl::ProducerImpl(
139 TracingMuxerImpl* muxer,
140 TracingBackendId backend_id,
141 uint32_t shmem_batch_commits_duration_ms)
142 : muxer_(muxer),
143 backend_id_(backend_id),
144 shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
145
~ProducerImpl()146 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
147 muxer_ = nullptr;
148 }
149
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)150 void TracingMuxerImpl::ProducerImpl::Initialize(
151 std::unique_ptr<ProducerEndpoint> endpoint) {
152 PERFETTO_DCHECK_THREAD(thread_checker_);
153 PERFETTO_DCHECK(!connected_);
154 connection_id_++;
155
156 // Adopt the endpoint into a shared pointer so that we can safely share it
157 // across threads that create trace writers. The custom deleter function
158 // ensures that the endpoint is always destroyed on the muxer's thread. (Note
159 // that |task_runner| is assumed to outlive tracing sessions on all threads.)
160 auto* task_runner = muxer_->task_runner_.get();
161 auto deleter = [task_runner](ProducerEndpoint* e) {
162 if (task_runner->RunsTasksOnCurrentThread()) {
163 delete e;
164 return;
165 }
166 task_runner->PostTask([e] { delete e; });
167 };
168 std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
169 // This atomic store is needed because another thread might be concurrently
170 // creating a trace writer using the previous (disconnected) |service_|. See
171 // CreateTraceWriter().
172 std::atomic_store(&service_, std::move(service));
173 // Don't try to use the service here since it may not have connected yet. See
174 // OnConnect().
175 }
176
OnConnect()177 void TracingMuxerImpl::ProducerImpl::OnConnect() {
178 PERFETTO_DLOG("Producer connected");
179 PERFETTO_DCHECK_THREAD(thread_checker_);
180 PERFETTO_DCHECK(!connected_);
181 connected_ = true;
182 muxer_->UpdateDataSourcesOnAllBackends();
183 }
184
OnDisconnect()185 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
186 PERFETTO_DCHECK_THREAD(thread_checker_);
187 // If we're being destroyed, bail out.
188 if (!muxer_)
189 return;
190 connected_ = false;
191 // Active data sources for this producer will be stopped by
192 // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
193 // will have a different connection id (even before it has finished
194 // connecting).
195 registered_data_sources_.reset();
196 DisposeConnection();
197
198 // Try reconnecting the producer.
199 muxer_->OnProducerDisconnected(this);
200 }
201
DisposeConnection()202 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
203 // Keep the old service around as a dead connection in case it has active
204 // trace writers. If any tracing sessions were created, we can't clear
205 // |service_| here because other threads may be concurrently creating new
206 // trace writers. Any reconnection attempt will atomically swap the new
207 // service in place of the old one.
208 if (did_setup_tracing_) {
209 dead_services_.push_back(service_);
210 } else {
211 service_.reset();
212 }
213 }
214
OnTracingSetup()215 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
216 PERFETTO_DCHECK_THREAD(thread_checker_);
217 did_setup_tracing_ = true;
218 service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
219 shmem_batch_commits_duration_ms_);
220 }
221
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)222 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
223 DataSourceInstanceID id,
224 const DataSourceConfig& cfg) {
225 PERFETTO_DCHECK_THREAD(thread_checker_);
226 if (!muxer_)
227 return;
228 muxer_->SetupDataSource(backend_id_, connection_id_, id, cfg);
229 }
230
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)231 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
232 const DataSourceConfig&) {
233 PERFETTO_DCHECK_THREAD(thread_checker_);
234 if (!muxer_)
235 return;
236 muxer_->StartDataSource(backend_id_, id);
237 service_->NotifyDataSourceStarted(id);
238 }
239
StopDataSource(DataSourceInstanceID id)240 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
241 PERFETTO_DCHECK_THREAD(thread_checker_);
242 if (!muxer_)
243 return;
244 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
245 }
246
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)247 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
248 const DataSourceInstanceID*,
249 size_t) {
250 // Flush is not plumbed for now, we just ack straight away.
251 PERFETTO_DCHECK_THREAD(thread_checker_);
252 service_->NotifyFlushComplete(flush_id);
253 }
254
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)255 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
256 const DataSourceInstanceID* instances,
257 size_t instance_count) {
258 PERFETTO_DCHECK_THREAD(thread_checker_);
259 if (!muxer_)
260 return;
261 for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
262 muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
263 }
264 }
265
SweepDeadServices()266 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
267 PERFETTO_DCHECK_THREAD(thread_checker_);
268 auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
269 auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
270 return !arbiter || arbiter->TryShutdown();
271 };
272 for (auto it = dead_services_.begin(); it != dead_services_.end();) {
273 auto next_it = it;
274 next_it++;
275 if (is_unused(*it)) {
276 dead_services_.erase(it);
277 }
278 it = next_it;
279 }
280 return dead_services_.empty();
281 }
282
283 // ----- End of TracingMuxerImpl::ProducerImpl methods.
284
285 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingBackendId backend_id,TracingSessionGlobalID session_id)286 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
287 BackendType backend_type,
288 TracingBackendId backend_id,
289 TracingSessionGlobalID session_id)
290 : muxer_(muxer),
291 backend_type_(backend_type),
292 backend_id_(backend_id),
293 session_id_(session_id) {}
294
~ConsumerImpl()295 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
296 muxer_ = nullptr;
297 }
298
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)299 void TracingMuxerImpl::ConsumerImpl::Initialize(
300 std::unique_ptr<ConsumerEndpoint> endpoint) {
301 PERFETTO_DCHECK_THREAD(thread_checker_);
302 service_ = std::move(endpoint);
303 // Don't try to use the service here since it may not have connected yet. See
304 // OnConnect().
305 }
306
OnConnect()307 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
308 PERFETTO_DCHECK_THREAD(thread_checker_);
309 PERFETTO_DCHECK(!connected_);
310 connected_ = true;
311
312 // Observe data source instance events so we get notified when tracing starts.
313 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
314 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
315
316 // If the API client configured and started tracing before we connected,
317 // tell the backend about it now.
318 if (trace_config_)
319 muxer_->SetupTracingSession(session_id_, trace_config_);
320 if (start_pending_)
321 muxer_->StartTracingSession(session_id_);
322 if (get_trace_stats_pending_) {
323 auto callback = std::move(get_trace_stats_callback_);
324 get_trace_stats_callback_ = nullptr;
325 muxer_->GetTraceStats(session_id_, std::move(callback));
326 }
327 if (query_service_state_callback_) {
328 auto callback = std::move(query_service_state_callback_);
329 query_service_state_callback_ = nullptr;
330 muxer_->QueryServiceState(session_id_, std::move(callback));
331 }
332 if (stop_pending_)
333 muxer_->StopTracingSession(session_id_);
334 }
335
OnDisconnect()336 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
337 PERFETTO_DCHECK_THREAD(thread_checker_);
338 // If we're being destroyed, bail out.
339 if (!muxer_)
340 return;
341 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
342 if (!connected_ && backend_type_ == kSystemBackend) {
343 PERFETTO_ELOG(
344 "Unable to connect to the system tracing service as a consumer. On "
345 "Android, use the \"perfetto\" command line tool instead to start "
346 "system-wide tracing sessions");
347 }
348 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
349
350 // Notify the client about disconnection.
351 NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
352
353 // Make sure the client doesn't hang in a blocking start/stop because of the
354 // disconnection.
355 NotifyStartComplete();
356 NotifyStopComplete();
357
358 // It shouldn't be necessary to call StopTracingSession. If we get this call
359 // it means that the service did shutdown before us, so there is no point
360 // trying it to ask it to stop the session. We should just remember to cleanup
361 // the consumer vector.
362 connected_ = false;
363
364 // Notify the muxer that it is safe to destroy |this|. This is needed because
365 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
366 // access until OnDisconnect() is called.
367 muxer_->OnConsumerDisconnected(this);
368 }
369
Disconnect()370 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
371 // This is weird and deserves a comment.
372 //
373 // When we called the ConnectConsumer method on the service it returns
374 // us a ConsumerEndpoint which we stored in |service_|, however this
375 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
376 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
377 // the ConsumerImpl pointer has to be valid until the
378 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
379 // ConsumerEndpoint |service_|. Eventually this will call
380 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
381 // call the destructor of |this|.
382 service_.reset();
383 }
384
OnTracingDisabled(const std::string & error)385 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
386 const std::string& error) {
387 PERFETTO_DCHECK_THREAD(thread_checker_);
388 PERFETTO_DCHECK(!stopped_);
389 stopped_ = true;
390
391 if (!error.empty())
392 NotifyError(TracingError{TracingError::kTracingFailed, error});
393
394 // If we're still waiting for the start event, fire it now. This may happen if
395 // there are no active data sources in the session.
396 NotifyStartComplete();
397 NotifyStopComplete();
398 }
399
NotifyStartComplete()400 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
401 PERFETTO_DCHECK_THREAD(thread_checker_);
402 if (start_complete_callback_) {
403 muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
404 start_complete_callback_ = nullptr;
405 }
406 if (blocking_start_complete_callback_) {
407 muxer_->task_runner_->PostTask(
408 std::move(blocking_start_complete_callback_));
409 blocking_start_complete_callback_ = nullptr;
410 }
411 }
412
NotifyError(const TracingError & error)413 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
414 PERFETTO_DCHECK_THREAD(thread_checker_);
415 if (error_callback_) {
416 muxer_->task_runner_->PostTask(
417 std::bind(std::move(error_callback_), error));
418 }
419 }
420
NotifyStopComplete()421 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
422 PERFETTO_DCHECK_THREAD(thread_checker_);
423 if (stop_complete_callback_) {
424 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
425 stop_complete_callback_ = nullptr;
426 }
427 if (blocking_stop_complete_callback_) {
428 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
429 blocking_stop_complete_callback_ = nullptr;
430 }
431 }
432
OnTraceData(std::vector<TracePacket> packets,bool has_more)433 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
434 std::vector<TracePacket> packets,
435 bool has_more) {
436 PERFETTO_DCHECK_THREAD(thread_checker_);
437 if (!read_trace_callback_)
438 return;
439
440 size_t capacity = 0;
441 for (const auto& packet : packets) {
442 // 16 is an over-estimation of the proto preamble size
443 capacity += packet.size() + 16;
444 }
445
446 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
447 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
448 buf->reserve(capacity);
449 for (auto& packet : packets) {
450 char* start;
451 size_t size;
452 std::tie(start, size) = packet.GetProtoPreamble();
453 buf->insert(buf->end(), start, start + size);
454 for (auto& slice : packet.slices()) {
455 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
456 buf->insert(buf->end(), slice_data, slice_data + slice.size);
457 }
458 }
459
460 auto callback = read_trace_callback_;
461 muxer_->task_runner_->PostTask([callback, buf, has_more] {
462 TracingSession::ReadTraceCallbackArgs callback_arg{};
463 callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
464 callback_arg.size = buf->size();
465 callback_arg.has_more = has_more;
466 callback(callback_arg);
467 });
468
469 if (!has_more)
470 read_trace_callback_ = nullptr;
471 }
472
OnObservableEvents(const ObservableEvents & events)473 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
474 const ObservableEvents& events) {
475 if (events.instance_state_changes_size()) {
476 for (const auto& state_change : events.instance_state_changes()) {
477 DataSourceHandle handle{state_change.producer_name(),
478 state_change.data_source_name()};
479 data_source_states_[handle] =
480 state_change.state() ==
481 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
482 }
483 }
484
485 if (events.instance_state_changes_size() ||
486 events.all_data_sources_started()) {
487 // Data sources are first reported as being stopped before starting, so once
488 // all the data sources we know about have started we can declare tracing
489 // begun. In the case where there are no matching data sources for the
490 // session, the service will report the all_data_sources_started() event
491 // without adding any instances (only since Android S / Perfetto v10.0).
492 if (start_complete_callback_ || blocking_start_complete_callback_) {
493 bool all_data_sources_started = std::all_of(
494 data_source_states_.cbegin(), data_source_states_.cend(),
495 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
496 if (all_data_sources_started)
497 NotifyStartComplete();
498 }
499 }
500 }
501
OnTraceStats(bool success,const TraceStats & trace_stats)502 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
503 bool success,
504 const TraceStats& trace_stats) {
505 if (!get_trace_stats_callback_)
506 return;
507 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
508 callback_arg.success = success;
509 callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
510 muxer_->task_runner_->PostTask(
511 std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
512 get_trace_stats_callback_ = nullptr;
513 }
514
515 // The callbacks below are not used.
OnDetach(bool)516 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)517 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
518 // ----- End of TracingMuxerImpl::ConsumerImpl
519
520 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
521
522 // TracingSessionImpl is the RAII object returned to API clients when they
523 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
524 // tracing.
525
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)526 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
527 TracingMuxerImpl* muxer,
528 TracingSessionGlobalID session_id,
529 BackendType backend_type)
530 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
531
532 // Can be destroyed from any thread.
~TracingSessionImpl()533 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
534 auto* muxer = muxer_;
535 auto session_id = session_id_;
536 muxer->task_runner_->PostTask(
537 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
538 }
539
540 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)541 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
542 int fd) {
543 auto* muxer = muxer_;
544 auto session_id = session_id_;
545 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
546 if (fd >= 0) {
547 base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
548 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
549 if (backend_type_ != kInProcessBackend) {
550 PERFETTO_FATAL(
551 "Passing a file descriptor to TracingSession::Setup() is only "
552 "supported with the kInProcessBackend on Windows. Use "
553 "TracingSession::ReadTrace() instead");
554 }
555 #endif
556 trace_config->set_write_into_file(true);
557 fd = dup(fd);
558 }
559 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
560 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
561 });
562 }
563
564 // Can be called from any thread.
Start()565 void TracingMuxerImpl::TracingSessionImpl::Start() {
566 auto* muxer = muxer_;
567 auto session_id = session_id_;
568 muxer->task_runner_->PostTask(
569 [muxer, session_id] { muxer->StartTracingSession(session_id); });
570 }
571
572 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)573 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
574 const TraceConfig& cfg) {
575 auto* muxer = muxer_;
576 auto session_id = session_id_;
577 muxer->task_runner_->PostTask([muxer, session_id, cfg] {
578 muxer->ChangeTracingSessionConfig(session_id, cfg);
579 });
580 }
581
582 // Can be called from any thread except the service thread.
StartBlocking()583 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
584 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
585 auto* muxer = muxer_;
586 auto session_id = session_id_;
587 base::WaitableEvent tracing_started;
588 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
589 auto* consumer = muxer->FindConsumer(session_id);
590 if (!consumer) {
591 // TODO(skyostil): Signal an error to the user.
592 tracing_started.Notify();
593 return;
594 }
595 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
596 consumer->blocking_start_complete_callback_ = [&] {
597 tracing_started.Notify();
598 };
599 muxer->StartTracingSession(session_id);
600 });
601 tracing_started.Wait();
602 }
603
604 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)605 void TracingMuxerImpl::TracingSessionImpl::Flush(
606 std::function<void(bool)> user_callback,
607 uint32_t timeout_ms) {
608 auto* muxer = muxer_;
609 auto session_id = session_id_;
610 muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
611 auto* consumer = muxer->FindConsumer(session_id);
612 if (!consumer) {
613 std::move(user_callback)(false);
614 return;
615 }
616 muxer->FlushTracingSession(session_id, timeout_ms,
617 std::move(user_callback));
618 });
619 }
620
621 // Can be called from any thread.
Stop()622 void TracingMuxerImpl::TracingSessionImpl::Stop() {
623 auto* muxer = muxer_;
624 auto session_id = session_id_;
625 muxer->task_runner_->PostTask(
626 [muxer, session_id] { muxer->StopTracingSession(session_id); });
627 }
628
629 // Can be called from any thread except the service thread.
StopBlocking()630 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
631 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
632 auto* muxer = muxer_;
633 auto session_id = session_id_;
634 base::WaitableEvent tracing_stopped;
635 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
636 auto* consumer = muxer->FindConsumer(session_id);
637 if (!consumer) {
638 // TODO(skyostil): Signal an error to the user.
639 tracing_stopped.Notify();
640 return;
641 }
642 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
643 consumer->blocking_stop_complete_callback_ = [&] {
644 tracing_stopped.Notify();
645 };
646 muxer->StopTracingSession(session_id);
647 });
648 tracing_stopped.Wait();
649 }
650
651 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)652 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
653 auto* muxer = muxer_;
654 auto session_id = session_id_;
655 muxer->task_runner_->PostTask([muxer, session_id, cb] {
656 muxer->ReadTracingSessionData(session_id, std::move(cb));
657 });
658 }
659
660 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)661 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
662 std::function<void()> cb) {
663 auto* muxer = muxer_;
664 auto session_id = session_id_;
665 muxer->task_runner_->PostTask([muxer, session_id, cb] {
666 auto* consumer = muxer->FindConsumer(session_id);
667 if (!consumer)
668 return;
669 consumer->start_complete_callback_ = cb;
670 });
671 }
672
673 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)674 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
675 std::function<void(TracingError)> cb) {
676 auto* muxer = muxer_;
677 auto session_id = session_id_;
678 muxer->task_runner_->PostTask([muxer, session_id, cb] {
679 auto* consumer = muxer->FindConsumer(session_id);
680 if (!consumer) {
681 // Notify the client about concurrent disconnection of the session.
682 if (cb)
683 cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
684 return;
685 }
686 consumer->error_callback_ = cb;
687 });
688 }
689
690 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)691 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
692 std::function<void()> cb) {
693 auto* muxer = muxer_;
694 auto session_id = session_id_;
695 muxer->task_runner_->PostTask([muxer, session_id, cb] {
696 auto* consumer = muxer->FindConsumer(session_id);
697 if (!consumer)
698 return;
699 consumer->stop_complete_callback_ = cb;
700 });
701 }
702
703 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)704 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
705 GetTraceStatsCallback cb) {
706 auto* muxer = muxer_;
707 auto session_id = session_id_;
708 muxer->task_runner_->PostTask([muxer, session_id, cb] {
709 muxer->GetTraceStats(session_id, std::move(cb));
710 });
711 }
712
713 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)714 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
715 QueryServiceStateCallback cb) {
716 auto* muxer = muxer_;
717 auto session_id = session_id_;
718 muxer->task_runner_->PostTask([muxer, session_id, cb] {
719 muxer->QueryServiceState(session_id, std::move(cb));
720 });
721 }
722
723 // ----- End of TracingMuxerImpl::TracingSessionImpl
724
725 // static
726 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
727
728 // This is called by perfetto::Tracing::Initialize().
729 // Can be called on any thread. Typically, but not necessarily, that will be
730 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)731 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
732 : TracingMuxer(args.platform ? args.platform
733 : Platform::GetDefaultPlatform()) {
734 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
735 instance_ = this;
736
737 // Create the thread where muxer, producers and service will live.
738 Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
739 task_runner_.reset(new NonReentrantTaskRunner(
740 this, platform_->CreateTaskRunner(std::move(tr_args))));
741
742 // Run the initializer on that thread.
743 task_runner_->PostTask([this, args] { Initialize(args); });
744 }
745
Initialize(const TracingInitArgs & args)746 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
747 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
748
749 policy_ = args.tracing_policy;
750
751 auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
752 if (!backend) {
753 // We skip the log in release builds because the *_backend_fake.cc code
754 // has already an ELOG before returning a nullptr.
755 PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
756 return;
757 }
758 TracingBackendId backend_id = backends_.size();
759 backends_.emplace_back();
760 RegisteredBackend& rb = backends_.back();
761 rb.backend = backend;
762 rb.id = backend_id;
763 rb.type = type;
764 rb.producer.reset(new ProducerImpl(this, backend_id,
765 args.shmem_batch_commits_duration_ms));
766 rb.producer_conn_args.producer = rb.producer.get();
767 rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
768 rb.producer_conn_args.task_runner = task_runner_.get();
769 rb.producer_conn_args.shmem_size_hint_bytes =
770 args.shmem_size_hint_kb * 1024;
771 rb.producer_conn_args.shmem_page_size_hint_bytes =
772 args.shmem_page_size_hint_kb * 1024;
773 rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
774 };
775
776 if (args.backends & kSystemBackend) {
777 PERFETTO_CHECK(args.system_backend_factory_);
778 add_backend(args.system_backend_factory_(), kSystemBackend);
779 }
780
781 if (args.backends & kInProcessBackend) {
782 PERFETTO_CHECK(args.in_process_backend_factory_);
783 add_backend(args.in_process_backend_factory_(), kInProcessBackend);
784 }
785
786 if (args.backends & kCustomBackend) {
787 PERFETTO_CHECK(args.custom_backend);
788 add_backend(args.custom_backend, kCustomBackend);
789 }
790
791 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
792 PERFETTO_FATAL("Unsupported tracing backend type");
793 }
794
795 // Fallback backend for consumer creation for an unsupported backend type.
796 // This backend simply fails any attempt to start a tracing session.
797 // NOTE: This backend instance has to be added last.
798 add_backend(internal::TracingBackendFake::GetInstance(),
799 BackendType::kUnspecifiedBackend);
800 }
801
802 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)803 bool TracingMuxerImpl::RegisterDataSource(
804 const DataSourceDescriptor& descriptor,
805 DataSourceFactory factory,
806 DataSourceStaticState* static_state) {
807 // Ignore repeated registrations.
808 if (static_state->index != kMaxDataSources)
809 return true;
810
811 uint32_t new_index = next_data_source_index_++;
812 if (new_index >= kMaxDataSources) {
813 PERFETTO_DLOG(
814 "RegisterDataSource failed: too many data sources already registered");
815 return false;
816 }
817
818 // Initialize the static state.
819 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
820 "instances[] size mismatch");
821 for (size_t i = 0; i < static_state->instances.size(); i++)
822 new (&static_state->instances[i]) DataSourceState{};
823
824 static_state->index = new_index;
825
826 // Generate a semi-unique id for this data source.
827 base::Hash hash;
828 hash.Update(reinterpret_cast<intptr_t>(static_state));
829 hash.Update(base::GetWallTimeNs().count());
830 static_state->id = hash.digest() ? hash.digest() : 1;
831
832 task_runner_->PostTask([this, descriptor, factory, static_state] {
833 data_sources_.emplace_back();
834 RegisteredDataSource& rds = data_sources_.back();
835 rds.descriptor = descriptor;
836 rds.factory = factory;
837 rds.static_state = static_state;
838
839 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
840 });
841 return true;
842 }
843
844 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)845 void TracingMuxerImpl::UpdateDataSourceDescriptor(
846 const DataSourceDescriptor& descriptor,
847 const DataSourceStaticState* static_state) {
848 task_runner_->PostTask([this, descriptor, static_state] {
849 for (auto& rds : data_sources_) {
850 if (rds.static_state == static_state) {
851 PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
852 rds.descriptor = descriptor;
853 rds.descriptor.set_id(static_state->id);
854 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
855 return;
856 }
857 }
858 });
859 }
860
861 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)862 void TracingMuxerImpl::RegisterInterceptor(
863 const InterceptorDescriptor& descriptor,
864 InterceptorFactory factory,
865 InterceptorBase::TLSFactory tls_factory,
866 InterceptorBase::TracePacketCallback packet_callback) {
867 task_runner_->PostTask(
868 [this, descriptor, factory, tls_factory, packet_callback] {
869 // Ignore repeated registrations.
870 for (const auto& interceptor : interceptors_) {
871 if (interceptor.descriptor.name() == descriptor.name()) {
872 PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
873 PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
874 return;
875 }
876 }
877 // Only allow certain interceptors for now.
878 if (descriptor.name() != "test_interceptor" &&
879 descriptor.name() != "console") {
880 PERFETTO_ELOG(
881 "Interceptors are experimental. If you want to use them, please "
882 "get in touch with the project maintainers "
883 "(https://perfetto.dev/docs/contributing/"
884 "getting-started#community).");
885 return;
886 }
887 interceptors_.emplace_back();
888 RegisteredInterceptor& interceptor = interceptors_.back();
889 interceptor.descriptor = descriptor;
890 interceptor.factory = factory;
891 interceptor.tls_factory = tls_factory;
892 interceptor.packet_callback = packet_callback;
893 });
894 }
895
896 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)897 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
898 uint32_t backend_connection_id,
899 DataSourceInstanceID instance_id,
900 const DataSourceConfig& cfg) {
901 PERFETTO_DCHECK_THREAD(thread_checker_);
902 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
903 cfg.name().c_str());
904 uint64_t config_hash = ComputeConfigHash(cfg);
905
906 for (const auto& rds : data_sources_) {
907 if (rds.descriptor.name() != cfg.name())
908 continue;
909 DataSourceStaticState& static_state = *rds.static_state;
910
911 // If this data source is already active for this exact config, don't start
912 // another instance. This happens when we have several data sources with the
913 // same name, in which case the service sends one SetupDataSource event for
914 // each one. Since we can't map which event maps to which data source, we
915 // ensure each event only starts one data source instance.
916 // TODO(skyostil): Register a unique id with each data source to the service
917 // to disambiguate.
918 bool active_for_config = false;
919 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
920 if (!static_state.TryGet(i))
921 continue;
922 auto* internal_state =
923 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
924 if (internal_state->backend_id == backend_id &&
925 internal_state->config_hash == config_hash) {
926 active_for_config = true;
927 break;
928 }
929 }
930 if (active_for_config) {
931 PERFETTO_DLOG(
932 "Data source %s is already active with this config, skipping",
933 cfg.name().c_str());
934 continue;
935 }
936
937 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
938 // Find a free slot.
939 if (static_state.TryGet(i))
940 continue;
941
942 auto* internal_state =
943 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
944 std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
945 static_assert(
946 std::is_same<decltype(internal_state->data_source_instance_id),
947 DataSourceInstanceID>::value,
948 "data_source_instance_id type mismatch");
949 internal_state->muxer_id_for_testing = muxer_id_for_testing_;
950 internal_state->backend_id = backend_id;
951 internal_state->backend_connection_id = backend_connection_id;
952 internal_state->data_source_instance_id = instance_id;
953 internal_state->buffer_id =
954 static_cast<internal::BufferId>(cfg.target_buffer());
955 internal_state->config_hash = config_hash;
956 internal_state->data_source = rds.factory();
957 internal_state->interceptor = nullptr;
958 internal_state->interceptor_id = 0;
959
960 if (cfg.has_interceptor_config()) {
961 for (size_t j = 0; j < interceptors_.size(); j++) {
962 if (cfg.interceptor_config().name() ==
963 interceptors_[j].descriptor.name()) {
964 PERFETTO_DLOG("Intercepting data source %" PRIu64
965 " \"%s\" into \"%s\"",
966 instance_id, cfg.name().c_str(),
967 cfg.interceptor_config().name().c_str());
968 internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
969 internal_state->interceptor = interceptors_[j].factory();
970 internal_state->interceptor->OnSetup({cfg});
971 break;
972 }
973 }
974 if (!internal_state->interceptor_id) {
975 PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
976 cfg.interceptor_config().name().c_str());
977 }
978 }
979
980 // This must be made at the end. See matching acquire-load in
981 // DataSource::Trace().
982 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
983
984 DataSourceBase::SetupArgs setup_args;
985 setup_args.config = &cfg;
986 setup_args.internal_instance_index = i;
987 internal_state->data_source->OnSetup(setup_args);
988 return;
989 }
990 PERFETTO_ELOG(
991 "Maximum number of data source instances exhausted. "
992 "Dropping data source %" PRIu64,
993 instance_id);
994 break;
995 }
996 }
997
998 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)999 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1000 DataSourceInstanceID instance_id) {
1001 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1002 PERFETTO_DCHECK_THREAD(thread_checker_);
1003
1004 auto ds = FindDataSource(backend_id, instance_id);
1005 if (!ds) {
1006 PERFETTO_ELOG("Could not find data source to start");
1007 return;
1008 }
1009
1010 DataSourceBase::StartArgs start_args{};
1011 start_args.internal_instance_index = ds.instance_idx;
1012
1013 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1014 if (ds.internal_state->interceptor)
1015 ds.internal_state->interceptor->OnStart({});
1016 ds.internal_state->trace_lambda_enabled = true;
1017 ds.internal_state->data_source->OnStart(start_args);
1018 }
1019
1020 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1021 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1022 TracingBackendId backend_id,
1023 DataSourceInstanceID instance_id) {
1024 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1025 PERFETTO_DCHECK_THREAD(thread_checker_);
1026
1027 auto ds = FindDataSource(backend_id, instance_id);
1028 if (!ds) {
1029 PERFETTO_ELOG("Could not find data source to stop");
1030 return;
1031 }
1032
1033 StopArgsImpl stop_args{};
1034 stop_args.internal_instance_index = ds.instance_idx;
1035 stop_args.async_stop_closure = [this, backend_id, instance_id] {
1036 // TracingMuxerImpl is long lived, capturing |this| is okay.
1037 // The notification closure can be moved out of the StopArgs by the
1038 // embedder to handle stop asynchronously. The embedder might then
1039 // call the closure on a different thread than the current one, hence
1040 // this nested PostTask().
1041 task_runner_->PostTask([this, backend_id, instance_id] {
1042 StopDataSource_AsyncEnd(backend_id, instance_id);
1043 });
1044 };
1045
1046 {
1047 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1048 if (ds.internal_state->interceptor)
1049 ds.internal_state->interceptor->OnStop({});
1050 ds.internal_state->data_source->OnStop(stop_args);
1051 }
1052
1053 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1054 // async closure here. In theory we could avoid the PostTask and call
1055 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1056 // divergencies between the deferred-stop vs non-deferred-stop code paths.
1057 if (stop_args.async_stop_closure)
1058 std::move(stop_args.async_stop_closure)();
1059 }
1060
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)1061 void TracingMuxerImpl::StopDataSource_AsyncEnd(
1062 TracingBackendId backend_id,
1063 DataSourceInstanceID instance_id) {
1064 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1065 PERFETTO_DCHECK_THREAD(thread_checker_);
1066
1067 auto ds = FindDataSource(backend_id, instance_id);
1068 if (!ds) {
1069 PERFETTO_ELOG(
1070 "Async stop of data source %" PRIu64
1071 " failed. This might be due to calling the async_stop_closure twice.",
1072 instance_id);
1073 return;
1074 }
1075
1076 const uint32_t mask = ~(1 << ds.instance_idx);
1077 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1078
1079 // Take the mutex to prevent that the data source is in the middle of
1080 // a Trace() execution where it called GetDataSourceLocked() while we
1081 // destroy it.
1082 {
1083 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1084 ds.internal_state->trace_lambda_enabled = false;
1085 ds.internal_state->data_source.reset();
1086 ds.internal_state->interceptor.reset();
1087 }
1088
1089 // The other fields of internal_state are deliberately *not* cleared.
1090 // See races-related comments of DataSource::Trace().
1091
1092 TracingMuxer::generation_++;
1093
1094 // |backends_| is append-only, Backend instances are always valid.
1095 PERFETTO_CHECK(backend_id < backends_.size());
1096 ProducerImpl* producer = backends_[backend_id].producer.get();
1097 if (!producer)
1098 return;
1099 if (producer->connected_) {
1100 // Flush any commits that might have been batched by SharedMemoryArbiter.
1101 producer->service_->MaybeSharedMemoryArbiter()
1102 ->FlushPendingCommitDataRequests();
1103 producer->service_->NotifyDataSourceStopped(instance_id);
1104 }
1105 producer->SweepDeadServices();
1106 }
1107
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1108 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1109 TracingBackendId backend_id,
1110 DataSourceInstanceID instance_id) {
1111 PERFETTO_DCHECK_THREAD(thread_checker_);
1112 PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1113 instance_id);
1114 auto ds = FindDataSource(backend_id, instance_id);
1115 if (!ds) {
1116 PERFETTO_ELOG("Could not find data source to clear incremental state for");
1117 return;
1118 }
1119 // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1120 // the incremental state should be cleared.
1121 ds.static_state->incremental_state_generation.fetch_add(
1122 1, std::memory_order_relaxed);
1123 }
1124
SyncProducersForTesting()1125 void TracingMuxerImpl::SyncProducersForTesting() {
1126 std::mutex mutex;
1127 std::condition_variable cv;
1128
1129 // IPC-based producers don't report connection errors explicitly for each
1130 // command, but instead with an asynchronous callback
1131 // (ProducerImpl::OnDisconnected). This means that the sync command below
1132 // may have completed but failed to reach the service because of a
1133 // disconnection, but we can't tell until the disconnection message comes
1134 // through. To guard against this, we run two whole rounds of sync round-trips
1135 // before returning; the first one will detect any disconnected producers and
1136 // the second one will ensure any reconnections have completed and all data
1137 // sources are registered in the service again.
1138 for (size_t i = 0; i < 2; i++) {
1139 size_t countdown = std::numeric_limits<size_t>::max();
1140 task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1141 {
1142 std::unique_lock<std::mutex> countdown_lock(mutex);
1143 countdown = backends_.size();
1144 }
1145 for (auto& backend : backends_) {
1146 auto* producer = backend.producer.get();
1147 producer->service_->Sync([&mutex, &cv, &countdown] {
1148 std::unique_lock<std::mutex> countdown_lock(mutex);
1149 countdown--;
1150 cv.notify_one();
1151 });
1152 }
1153 });
1154
1155 {
1156 std::unique_lock<std::mutex> countdown_lock(mutex);
1157 cv.wait(countdown_lock, [&countdown] { return !countdown; });
1158 }
1159 }
1160
1161 // Check that all producers are indeed connected.
1162 bool done = false;
1163 bool all_producers_connected = true;
1164 task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1165 for (auto& backend : backends_)
1166 all_producers_connected &= backend.producer->connected_;
1167 std::unique_lock<std::mutex> lock(mutex);
1168 done = true;
1169 cv.notify_one();
1170 });
1171
1172 {
1173 std::unique_lock<std::mutex> lock(mutex);
1174 cv.wait(lock, [&done] { return done; });
1175 }
1176 PERFETTO_DCHECK(all_producers_connected);
1177 }
1178
DestroyStoppedTraceWritersForCurrentThread()1179 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1180 // Iterate across all possible data source types.
1181 auto cur_generation = generation_.load(std::memory_order_acquire);
1182 auto* root_tls = GetOrCreateTracingTLS();
1183
1184 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1185 // |tls| has a vector of per-data-source-instance thread-local state.
1186 DataSourceStaticState* static_state = tls.static_state;
1187 if (!static_state)
1188 return; // Slot not used.
1189
1190 // Iterate across all possible instances for this data source.
1191 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1192 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1193 if (!ds_tls.trace_writer)
1194 continue;
1195
1196 DataSourceState* ds_state = static_state->TryGet(inst);
1197 if (ds_state &&
1198 ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1199 ds_state->backend_id == ds_tls.backend_id &&
1200 ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1201 ds_state->buffer_id == ds_tls.buffer_id &&
1202 ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1203 continue;
1204 }
1205
1206 // The DataSource instance has been destroyed or recycled.
1207 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
1208 }
1209 };
1210
1211 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1212 // |tls| has a vector of per-data-source-instance thread-local state.
1213 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1214 destroy_stopped_instances(tls);
1215 }
1216 destroy_stopped_instances(root_tls->track_event_tls);
1217 root_tls->generation = cur_generation;
1218 }
1219
1220 // Called both when a new data source is registered or when a new backend
1221 // connects. In both cases we want to be sure we reflected the data source
1222 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1223 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1224 PERFETTO_DCHECK_THREAD(thread_checker_);
1225 for (RegisteredDataSource& rds : data_sources_) {
1226 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1227 }
1228 }
1229
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1230 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1231 bool is_changed) {
1232 PERFETTO_DCHECK_THREAD(thread_checker_);
1233 for (RegisteredBackend& backend : backends_) {
1234 // We cannot call RegisterDataSource on the backend before it connects.
1235 if (!backend.producer->connected_)
1236 continue;
1237
1238 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1239 bool is_registered = backend.producer->registered_data_sources_.test(
1240 rds.static_state->index);
1241 if (is_registered && !is_changed)
1242 continue;
1243
1244 rds.descriptor.set_will_notify_on_start(true);
1245 rds.descriptor.set_will_notify_on_stop(true);
1246 rds.descriptor.set_handles_incremental_state_clear(true);
1247 rds.descriptor.set_id(rds.static_state->id);
1248 if (is_registered) {
1249 backend.producer->service_->UpdateDataSource(rds.descriptor);
1250 } else {
1251 backend.producer->service_->RegisterDataSource(rds.descriptor);
1252 }
1253 backend.producer->registered_data_sources_.set(rds.static_state->index);
1254 }
1255 }
1256
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1257 void TracingMuxerImpl::SetupTracingSession(
1258 TracingSessionGlobalID session_id,
1259 const std::shared_ptr<TraceConfig>& trace_config,
1260 base::ScopedFile trace_fd) {
1261 PERFETTO_DCHECK_THREAD(thread_checker_);
1262 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1263
1264 auto* consumer = FindConsumer(session_id);
1265 if (!consumer)
1266 return;
1267
1268 consumer->trace_config_ = trace_config;
1269 if (trace_fd)
1270 consumer->trace_fd_ = std::move(trace_fd);
1271
1272 if (!consumer->connected_)
1273 return;
1274
1275 // Only used in the deferred start mode.
1276 if (trace_config->deferred_start()) {
1277 consumer->service_->EnableTracing(*trace_config,
1278 std::move(consumer->trace_fd_));
1279 }
1280 }
1281
StartTracingSession(TracingSessionGlobalID session_id)1282 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1283 PERFETTO_DCHECK_THREAD(thread_checker_);
1284
1285 auto* consumer = FindConsumer(session_id);
1286
1287 if (!consumer)
1288 return;
1289
1290 if (!consumer->trace_config_) {
1291 PERFETTO_ELOG("Must call Setup(config) first");
1292 return;
1293 }
1294
1295 if (!consumer->connected_) {
1296 consumer->start_pending_ = true;
1297 return;
1298 }
1299
1300 consumer->start_pending_ = false;
1301 if (consumer->trace_config_->deferred_start()) {
1302 consumer->service_->StartTracing();
1303 } else {
1304 consumer->service_->EnableTracing(*consumer->trace_config_,
1305 std::move(consumer->trace_fd_));
1306 }
1307
1308 // TODO implement support for the deferred-start + fast-triggering case.
1309 }
1310
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1311 void TracingMuxerImpl::ChangeTracingSessionConfig(
1312 TracingSessionGlobalID session_id,
1313 const TraceConfig& trace_config) {
1314 PERFETTO_DCHECK_THREAD(thread_checker_);
1315
1316 auto* consumer = FindConsumer(session_id);
1317
1318 if (!consumer)
1319 return;
1320
1321 if (!consumer->trace_config_) {
1322 // Changing the config is only supported for started sessions.
1323 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1324 return;
1325 }
1326
1327 consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1328 if (consumer->connected_)
1329 consumer->service_->ChangeTraceConfig(trace_config);
1330 }
1331
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1332 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1333 uint32_t timeout_ms,
1334 std::function<void(bool)> callback) {
1335 PERFETTO_DCHECK_THREAD(thread_checker_);
1336 auto* consumer = FindConsumer(session_id);
1337 if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1338 !consumer->trace_config_) {
1339 PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1340 std::move(callback)(false);
1341 return;
1342 }
1343
1344 consumer->service_->Flush(timeout_ms, std::move(callback));
1345 }
1346
StopTracingSession(TracingSessionGlobalID session_id)1347 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1348 PERFETTO_DCHECK_THREAD(thread_checker_);
1349 auto* consumer = FindConsumer(session_id);
1350 if (!consumer)
1351 return;
1352
1353 if (consumer->start_pending_) {
1354 // If the session hasn't started yet, wait until it does before stopping.
1355 consumer->stop_pending_ = true;
1356 return;
1357 }
1358
1359 consumer->stop_pending_ = false;
1360 if (consumer->stopped_) {
1361 // If the session was already stopped (e.g., it failed to start), don't try
1362 // stopping again.
1363 consumer->NotifyStopComplete();
1364 } else if (!consumer->trace_config_) {
1365 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1366 return;
1367 } else {
1368 consumer->service_->DisableTracing();
1369 }
1370
1371 consumer->trace_config_.reset();
1372 }
1373
DestroyTracingSession(TracingSessionGlobalID session_id)1374 void TracingMuxerImpl::DestroyTracingSession(
1375 TracingSessionGlobalID session_id) {
1376 PERFETTO_DCHECK_THREAD(thread_checker_);
1377 for (RegisteredBackend& backend : backends_) {
1378 // We need to find the consumer (if any) and call Disconnect as we destroy
1379 // the tracing session. We can't call Disconnect() inside this for loop
1380 // because in the in-process case this will end up to a synchronous call to
1381 // OnConsumerDisconnect which will invalidate all the iterators to
1382 // |backend.consumers|.
1383 ConsumerImpl* consumer = nullptr;
1384 for (auto& con : backend.consumers) {
1385 if (con->session_id_ == session_id) {
1386 consumer = con.get();
1387 break;
1388 }
1389 }
1390 if (consumer) {
1391 // We broke out of the loop above on the assumption that each backend will
1392 // only have a single consumer per session. This DCHECK ensures that
1393 // this is the case.
1394 PERFETTO_DCHECK(
1395 std::count_if(backend.consumers.begin(), backend.consumers.end(),
1396 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
1397 return con->session_id_ == session_id;
1398 }) == 1u);
1399 consumer->Disconnect();
1400 }
1401 }
1402 }
1403
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)1404 void TracingMuxerImpl::ReadTracingSessionData(
1405 TracingSessionGlobalID session_id,
1406 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
1407 PERFETTO_DCHECK_THREAD(thread_checker_);
1408 auto* consumer = FindConsumer(session_id);
1409 if (!consumer) {
1410 // TODO(skyostil): Signal an error to the user.
1411 TracingSession::ReadTraceCallbackArgs callback_arg{};
1412 callback(callback_arg);
1413 return;
1414 }
1415 PERFETTO_DCHECK(!consumer->read_trace_callback_);
1416 consumer->read_trace_callback_ = std::move(callback);
1417 consumer->service_->ReadBuffers();
1418 }
1419
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)1420 void TracingMuxerImpl::GetTraceStats(
1421 TracingSessionGlobalID session_id,
1422 TracingSession::GetTraceStatsCallback callback) {
1423 PERFETTO_DCHECK_THREAD(thread_checker_);
1424 auto* consumer = FindConsumer(session_id);
1425 if (!consumer) {
1426 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
1427 callback_arg.success = false;
1428 callback(std::move(callback_arg));
1429 return;
1430 }
1431 PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
1432 consumer->get_trace_stats_callback_ = std::move(callback);
1433 if (!consumer->connected_) {
1434 consumer->get_trace_stats_pending_ = true;
1435 return;
1436 }
1437 consumer->get_trace_stats_pending_ = false;
1438 consumer->service_->GetTraceStats();
1439 }
1440
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)1441 void TracingMuxerImpl::QueryServiceState(
1442 TracingSessionGlobalID session_id,
1443 TracingSession::QueryServiceStateCallback callback) {
1444 PERFETTO_DCHECK_THREAD(thread_checker_);
1445 auto* consumer = FindConsumer(session_id);
1446 if (!consumer) {
1447 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1448 callback_arg.success = false;
1449 callback(std::move(callback_arg));
1450 return;
1451 }
1452 PERFETTO_DCHECK(!consumer->query_service_state_callback_);
1453 if (!consumer->connected_) {
1454 consumer->query_service_state_callback_ = std::move(callback);
1455 return;
1456 }
1457 auto callback_wrapper = [callback](bool success,
1458 protos::gen::TracingServiceState state) {
1459 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1460 callback_arg.success = success;
1461 callback_arg.service_state_data = state.SerializeAsArray();
1462 callback(std::move(callback_arg));
1463 };
1464 consumer->service_->QueryServiceState(std::move(callback_wrapper));
1465 }
1466
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)1467 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
1468 uint32_t batch_commits_duration_ms,
1469 BackendType backend_type) {
1470 for (RegisteredBackend& backend : backends_) {
1471 if (backend.producer && backend.producer->connected_ &&
1472 backend.type == backend_type) {
1473 backend.producer->service_->MaybeSharedMemoryArbiter()
1474 ->SetBatchCommitsDuration(batch_commits_duration_ms);
1475 }
1476 }
1477 }
1478
EnableDirectSMBPatchingForTesting(BackendType backend_type)1479 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
1480 BackendType backend_type) {
1481 for (RegisteredBackend& backend : backends_) {
1482 if (backend.producer && backend.producer->connected_ &&
1483 backend.type == backend_type &&
1484 !backend.producer->service_->MaybeSharedMemoryArbiter()
1485 ->EnableDirectSMBPatching()) {
1486 return false;
1487 }
1488 }
1489 return true;
1490 }
1491
FindConsumer(TracingSessionGlobalID session_id)1492 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
1493 TracingSessionGlobalID session_id) {
1494 PERFETTO_DCHECK_THREAD(thread_checker_);
1495 for (RegisteredBackend& backend : backends_) {
1496 for (auto& consumer : backend.consumers) {
1497 if (consumer->session_id_ == session_id) {
1498 return consumer.get();
1499 }
1500 }
1501 }
1502 return nullptr;
1503 }
1504
InitializeConsumer(TracingSessionGlobalID session_id)1505 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
1506 PERFETTO_DCHECK_THREAD(thread_checker_);
1507
1508 auto* consumer = FindConsumer(session_id);
1509 if (!consumer)
1510 return;
1511
1512 TracingBackendId backend_id = consumer->backend_id_;
1513 // |backends_| is append-only, Backend instances are always valid.
1514 PERFETTO_CHECK(backend_id < backends_.size());
1515 RegisteredBackend& backend = backends_[backend_id];
1516
1517 TracingBackend::ConnectConsumerArgs conn_args;
1518 conn_args.consumer = consumer;
1519 conn_args.task_runner = task_runner_.get();
1520 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
1521 }
1522
OnConsumerDisconnected(ConsumerImpl * consumer)1523 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
1524 PERFETTO_DCHECK_THREAD(thread_checker_);
1525 for (RegisteredBackend& backend : backends_) {
1526 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
1527 return con.get() == consumer;
1528 };
1529 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
1530 backend.consumers.end(), pred),
1531 backend.consumers.end());
1532 }
1533 }
1534
SetMaxProducerReconnectionsForTesting(uint32_t count)1535 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
1536 max_producer_reconnections_.store(count);
1537 }
1538
OnProducerDisconnected(ProducerImpl * producer)1539 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
1540 PERFETTO_DCHECK_THREAD(thread_checker_);
1541 for (RegisteredBackend& backend : backends_) {
1542 if (backend.producer.get() != producer)
1543 continue;
1544 // Try reconnecting the disconnected producer. If the connection succeeds,
1545 // all the data sources will be automatically re-registered.
1546 if (producer->connection_id_ > max_producer_reconnections_.load()) {
1547 // Avoid reconnecting a failing producer too many times. Instead we just
1548 // leak the producer instead of trying to avoid further complicating
1549 // cross-thread trace writer creation.
1550 PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
1551 continue;
1552 }
1553 backend.producer->Initialize(
1554 backend.backend->ConnectProducer(backend.producer_conn_args));
1555 }
1556
1557 // Increment the generation counter to atomically ensure that:
1558 // 1. Old trace writers from the severed connection eventually get cleaned up
1559 // by DestroyStoppedTraceWritersForCurrentThread().
1560 // 2. No new trace writers can be created for the SharedMemoryArbiter from the
1561 // old connection.
1562 TracingMuxer::generation_++;
1563 }
1564
SweepDeadBackends()1565 void TracingMuxerImpl::SweepDeadBackends() {
1566 PERFETTO_DCHECK_THREAD(thread_checker_);
1567 for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
1568 auto next_it = it;
1569 next_it++;
1570 if (it->producer->SweepDeadServices())
1571 dead_backends_.erase(it);
1572 it = next_it;
1573 }
1574 }
1575
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1576 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
1577 TracingBackendId backend_id,
1578 DataSourceInstanceID instance_id) {
1579 PERFETTO_DCHECK_THREAD(thread_checker_);
1580 for (const auto& rds : data_sources_) {
1581 DataSourceStaticState* static_state = rds.static_state;
1582 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1583 auto* internal_state = static_state->TryGet(i);
1584 if (internal_state && internal_state->backend_id == backend_id &&
1585 internal_state->data_source_instance_id == instance_id) {
1586 return FindDataSourceRes(static_state, internal_state, i);
1587 }
1588 }
1589 }
1590 return FindDataSourceRes();
1591 }
1592
1593 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)1594 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
1595 DataSourceStaticState* static_state,
1596 uint32_t data_source_instance_index,
1597 DataSourceState* data_source,
1598 BufferExhaustedPolicy buffer_exhausted_policy) {
1599 if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
1600 // If the session is being intercepted, return a heap-backed trace writer
1601 // instead. This is safe because all the data given to the interceptor is
1602 // either thread-local (|instance_index|), statically allocated
1603 // (|static_state|) or constant after initialization (|interceptor|). Access
1604 // to the interceptor instance itself through |data_source| is protected by
1605 // a statically allocated lock (similarly to the data source instance).
1606 auto& interceptor = interceptors_[data_source->interceptor_id - 1];
1607 return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
1608 interceptor.tls_factory(static_state, data_source_instance_index),
1609 interceptor.packet_callback, static_state, data_source_instance_index));
1610 }
1611 ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
1612 // Atomically load the current service endpoint. We keep the pointer as a
1613 // shared pointer on the stack to guard against it from being concurrently
1614 // modified on the thread by ProducerImpl::Initialize() swapping in a
1615 // reconnected service on the muxer task runner thread.
1616 //
1617 // The endpoint may also be concurrently modified by SweepDeadServices()
1618 // clearing out old disconnected services. We guard against that by
1619 // SharedMemoryArbiter keeping track of any outstanding trace writers. After
1620 // shutdown has started, the trace writer created below will be a null one
1621 // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
1622 //
1623 // We use an atomic pointer instead of holding a lock because
1624 // CreateTraceWriter posts tasks under the hood.
1625 std::shared_ptr<ProducerEndpoint> service =
1626 std::atomic_load(&producer->service_);
1627 return service->CreateTraceWriter(data_source->buffer_id,
1628 buffer_exhausted_policy);
1629 }
1630
1631 // This is called via the public API Tracing::NewTrace().
1632 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type)1633 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
1634 BackendType requested_backend_type) {
1635 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
1636
1637 // |backend_type| can only specify one backend, not an OR-ed mask.
1638 PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
1639
1640 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
1641 task_runner_->PostTask([this, requested_backend_type, session_id] {
1642 for (RegisteredBackend& backend : backends_) {
1643 if (requested_backend_type && backend.type &&
1644 backend.type != requested_backend_type) {
1645 continue;
1646 }
1647
1648 TracingBackendId backend_id = backend.id;
1649
1650 // Create the consumer now, even if we have to ask the embedder below, so
1651 // that any other tasks executing after this one can find the consumer and
1652 // change its pending attributes.
1653 backend.consumers.emplace_back(
1654 new ConsumerImpl(this, backend.type, backend.id, session_id));
1655
1656 // The last registered backend in |backends_| is the unsupported backend
1657 // without a valid type.
1658 if (!backend.type) {
1659 PERFETTO_ELOG(
1660 "No tracing backend ready for type=%d, consumer will disconnect",
1661 requested_backend_type);
1662 InitializeConsumer(session_id);
1663 return;
1664 }
1665
1666 // Check if the embedder wants to be asked for permission before
1667 // connecting the consumer.
1668 if (!policy_) {
1669 InitializeConsumer(session_id);
1670 return;
1671 }
1672
1673 TracingPolicy::ShouldAllowConsumerSessionArgs args;
1674 args.backend_type = backend.type;
1675 args.result_callback = [this, backend_id, session_id](bool allow) {
1676 task_runner_->PostTask([this, backend_id, session_id, allow] {
1677 if (allow) {
1678 InitializeConsumer(session_id);
1679 return;
1680 }
1681
1682 PERFETTO_ELOG(
1683 "Consumer session for backend type type=%d forbidden, "
1684 "consumer will disconnect",
1685 backends_[backend_id].type);
1686
1687 auto* consumer = FindConsumer(session_id);
1688 if (!consumer)
1689 return;
1690
1691 consumer->OnDisconnect();
1692 });
1693 };
1694 policy_->ShouldAllowConsumerSession(args);
1695 return;
1696 }
1697 PERFETTO_DFATAL("Not reached");
1698 });
1699
1700 return std::unique_ptr<TracingSession>(
1701 new TracingSessionImpl(this, session_id, requested_backend_type));
1702 }
1703
1704 // static
InitializeInstance(const TracingInitArgs & args)1705 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
1706 if (instance_ != TracingMuxerFake::Get())
1707 PERFETTO_FATAL("Tracing already initialized");
1708 // If we previously had a TracingMuxerImpl instance which was reset,
1709 // reinitialize and reuse it instead of trying to create a new one. See
1710 // ResetForTesting().
1711 if (g_prev_instance) {
1712 auto* muxer = g_prev_instance;
1713 g_prev_instance = nullptr;
1714 instance_ = muxer;
1715 muxer->task_runner_->PostTask([muxer, args] { muxer->Initialize(args); });
1716 } else {
1717 new TracingMuxerImpl(args);
1718 }
1719 }
1720
1721 // static
ResetForTesting()1722 void TracingMuxerImpl::ResetForTesting() {
1723 // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
1724 // various objects make that a non-starter. In particular:
1725 //
1726 // 1) Any thread that has entered a trace event has a TraceWriter, which holds
1727 // a reference back to ProducerImpl::service_.
1728 //
1729 // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
1730 //
1731 // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
1732 // turn depends on TracingMuxerImpl itself.
1733 //
1734 // Because of this, it's not safe to deallocate TracingMuxerImpl until all
1735 // threads have dropped their TraceWriters. Since we can't really ask the
1736 // caller to guarantee this, we'll instead reset enough of the muxer's state
1737 // so that it can be reinitialized later and ensure all necessary objects from
1738 // the old state remain alive until all references have gone away.
1739 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
1740
1741 base::WaitableEvent reset_done;
1742 auto do_reset = [muxer, &reset_done] {
1743 // Unregister all data sources so they don't interfere with any future
1744 // tracing sessions.
1745 for (RegisteredDataSource& rds : muxer->data_sources_) {
1746 for (RegisteredBackend& backend : muxer->backends_) {
1747 if (!backend.producer->service_)
1748 continue;
1749 backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
1750 }
1751 }
1752 for (auto& backend : muxer->backends_) {
1753 // Check that no consumer session is currently active on any backend.
1754 for (auto& consumer : backend.consumers)
1755 PERFETTO_CHECK(!consumer->service_);
1756 backend.producer->muxer_ = nullptr;
1757 backend.producer->DisposeConnection();
1758 muxer->dead_backends_.push_back(std::move(backend));
1759 }
1760 muxer->backends_.clear();
1761 muxer->interceptors_.clear();
1762
1763 for (auto& ds : muxer->data_sources_) {
1764 ds.static_state->~DataSourceStaticState();
1765 new (ds.static_state) DataSourceStaticState{};
1766 }
1767 muxer->data_sources_.clear();
1768 muxer->next_data_source_index_ = 0;
1769
1770 // Free all backends without active trace writers or other inbound
1771 // references. Note that even if all the backends get swept, the muxer still
1772 // needs to stay around since |task_runner_| is assumed to be long-lived.
1773 muxer->SweepDeadBackends();
1774
1775 // Make sure we eventually discard any per-thread trace writers from the
1776 // previous instance.
1777 muxer->muxer_id_for_testing_++;
1778
1779 g_prev_instance = muxer;
1780 instance_ = TracingMuxerFake::Get();
1781 reset_done.Notify();
1782 };
1783
1784 // Some tests run the muxer and the test on the same thread. In these cases,
1785 // we can reset synchronously.
1786 if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
1787 do_reset();
1788 } else {
1789 muxer->task_runner_->PostTask(std::move(do_reset));
1790 reset_done.Wait();
1791 }
1792 }
1793
1794 TracingMuxer::~TracingMuxer() = default;
1795
1796 static_assert(std::is_same<internal::BufferId, BufferID>::value,
1797 "public's BufferId and tracing/core's BufferID diverged");
1798
1799 } // namespace internal
1800 } // namespace perfetto
1801