1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49
50 #include "protos/perfetto/config/interceptor_config.gen.h"
51
52 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
53 #include <io.h> // For dup()
54 #else
55 #include <unistd.h> // For dup()
56 #endif
57
58 namespace perfetto {
59 namespace internal {
60
61 namespace {
62
63 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
64
65 // A task runner which prevents calls to DataSource::Trace() while an operation
66 // is in progress. Used to guard against unexpected re-entrancy where the
67 // user-provided task runner implementation tries to enter a trace point under
68 // the hood.
69 class NonReentrantTaskRunner : public base::TaskRunner {
70 public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)71 NonReentrantTaskRunner(TracingMuxer* muxer,
72 std::unique_ptr<base::TaskRunner> task_runner)
73 : muxer_(muxer), task_runner_(std::move(task_runner)) {}
74
75 // base::TaskRunner implementation.
PostTask(std::function<void ()> task)76 void PostTask(std::function<void()> task) override {
77 CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
78 }
79
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)80 void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
81 CallWithGuard(
82 [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
83 }
84
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)85 void AddFileDescriptorWatch(base::PlatformHandle fd,
86 std::function<void()> callback) override {
87 CallWithGuard(
88 [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
89 }
90
RemoveFileDescriptorWatch(base::PlatformHandle fd)91 void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
92 CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
93 }
94
RunsTasksOnCurrentThread() const95 bool RunsTasksOnCurrentThread() const override {
96 bool result;
97 CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
98 return result;
99 }
100
101 private:
102 template <typename T>
CallWithGuard(T lambda) const103 void CallWithGuard(T lambda) const {
104 auto* root_tls = muxer_->GetOrCreateTracingTLS();
105 if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
106 lambda();
107 return;
108 }
109 ScopedReentrancyAnnotator scoped_annotator(*root_tls);
110 lambda();
111 }
112
113 TracingMuxer* const muxer_;
114 std::unique_ptr<base::TaskRunner> task_runner_;
115 };
116
117 class StopArgsImpl : public DataSourceBase::StopArgs {
118 public:
HandleStopAsynchronously() const119 std::function<void()> HandleStopAsynchronously() const override {
120 auto closure = std::move(async_stop_closure);
121 async_stop_closure = std::function<void()>();
122 return closure;
123 }
124
125 mutable std::function<void()> async_stop_closure;
126 };
127
128 class FlushArgsImpl : public DataSourceBase::FlushArgs {
129 public:
HandleFlushAsynchronously() const130 std::function<void()> HandleFlushAsynchronously() const override {
131 auto closure = std::move(async_flush_closure);
132 async_flush_closure = std::function<void()>();
133 return closure;
134 }
135
136 mutable std::function<void()> async_flush_closure;
137 };
138
139 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
140 static TracingMuxerImpl* g_prev_instance{};
141
142 template <typename RegisteredBackend>
143 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anonab6d44540111::CompareBackendByType144 static int BackendTypePriority(BackendType type) {
145 switch (type) {
146 case kSystemBackend:
147 return 0;
148 case kInProcessBackend:
149 return 1;
150 case kCustomBackend:
151 return 2;
152 // The UnspecifiedBackend has the highest priority so that
153 // TracingBackendFake is the last one on the backend lists.
154 case kUnspecifiedBackend:
155 break;
156 }
157 return 3;
158 }
operator ()perfetto::internal::__anonab6d44540111::CompareBackendByType159 bool operator()(BackendType type, const RegisteredBackend& b) {
160 return BackendTypePriority(type) < BackendTypePriority(b.type);
161 }
162 };
163
164 } // namespace
165
166 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms,bool shmem_direct_patching_enabled)167 TracingMuxerImpl::ProducerImpl::ProducerImpl(
168 TracingMuxerImpl* muxer,
169 TracingBackendId backend_id,
170 uint32_t shmem_batch_commits_duration_ms,
171 bool shmem_direct_patching_enabled)
172 : muxer_(muxer),
173 backend_id_(backend_id),
174 shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms),
175 shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {}
176
~ProducerImpl()177 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
178 muxer_ = nullptr;
179 }
180
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)181 void TracingMuxerImpl::ProducerImpl::Initialize(
182 std::unique_ptr<ProducerEndpoint> endpoint) {
183 PERFETTO_DCHECK_THREAD(thread_checker_);
184 PERFETTO_DCHECK(!connected_);
185 connection_id_.fetch_add(1, std::memory_order_relaxed);
186 is_producer_provided_smb_ = endpoint->shared_memory();
187 last_startup_target_buffer_reservation_ = 0;
188
189 // Adopt the endpoint into a shared pointer so that we can safely share it
190 // across threads that create trace writers. The custom deleter function
191 // ensures that the endpoint is always destroyed on the muxer's thread. (Note
192 // that |task_runner| is assumed to outlive tracing sessions on all threads.)
193 auto* task_runner = muxer_->task_runner_.get();
194 auto deleter = [task_runner](ProducerEndpoint* e) {
195 if (task_runner->RunsTasksOnCurrentThread()) {
196 delete e;
197 return;
198 }
199 task_runner->PostTask([e] { delete e; });
200 };
201 std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
202 // This atomic store is needed because another thread might be concurrently
203 // creating a trace writer using the previous (disconnected) |service_|. See
204 // CreateTraceWriter().
205 std::atomic_store(&service_, std::move(service));
206 // Don't try to use the service here since it may not have connected yet. See
207 // OnConnect().
208 }
209
OnConnect()210 void TracingMuxerImpl::ProducerImpl::OnConnect() {
211 PERFETTO_DLOG("Producer connected");
212 PERFETTO_DCHECK_THREAD(thread_checker_);
213 PERFETTO_DCHECK(!connected_);
214 if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
215 PERFETTO_ELOG(
216 "The service likely doesn't support producer-provided SMBs. Preventing "
217 "future attempts to use producer-provided SMB again with this "
218 "backend.");
219 producer_provided_smb_failed_ = true;
220 // Will call OnDisconnect() and cause a reconnect without producer-provided
221 // SMB.
222 service_->Disconnect();
223 return;
224 }
225 connected_ = true;
226 muxer_->UpdateDataSourcesOnAllBackends();
227 SendOnConnectTriggers();
228 }
229
OnDisconnect()230 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
231 PERFETTO_DCHECK_THREAD(thread_checker_);
232 // If we're being destroyed, bail out.
233 if (!muxer_)
234 return;
235 connected_ = false;
236 // Active data sources for this producer will be stopped by
237 // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
238 // will have a different connection id (even before it has finished
239 // connecting).
240 registered_data_sources_.reset();
241 DisposeConnection();
242
243 // Try reconnecting the producer.
244 muxer_->OnProducerDisconnected(this);
245 }
246
DisposeConnection()247 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
248 // Keep the old service around as a dead connection in case it has active
249 // trace writers. If any tracing sessions were created, we can't clear
250 // |service_| here because other threads may be concurrently creating new
251 // trace writers. Any reconnection attempt will atomically swap the new
252 // service in place of the old one.
253 if (did_setup_tracing_ || did_setup_startup_tracing_) {
254 dead_services_.push_back(service_);
255 } else {
256 service_.reset();
257 }
258 }
259
OnTracingSetup()260 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
261 PERFETTO_DCHECK_THREAD(thread_checker_);
262 did_setup_tracing_ = true;
263 service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
264 shmem_batch_commits_duration_ms_);
265 if (shmem_direct_patching_enabled_) {
266 service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching();
267 }
268 }
269
OnStartupTracingSetup()270 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
271 PERFETTO_DCHECK_THREAD(thread_checker_);
272 did_setup_startup_tracing_ = true;
273 }
274
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)275 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
276 DataSourceInstanceID id,
277 const DataSourceConfig& cfg) {
278 PERFETTO_DCHECK_THREAD(thread_checker_);
279 if (!muxer_)
280 return;
281 muxer_->SetupDataSource(
282 backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
283 }
284
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)285 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
286 const DataSourceConfig&) {
287 PERFETTO_DCHECK_THREAD(thread_checker_);
288 if (!muxer_)
289 return;
290 muxer_->StartDataSource(backend_id_, id);
291 service_->NotifyDataSourceStarted(id);
292 }
293
StopDataSource(DataSourceInstanceID id)294 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
295 PERFETTO_DCHECK_THREAD(thread_checker_);
296 if (!muxer_)
297 return;
298 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
299 }
300
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count,FlushFlags flush_flags)301 void TracingMuxerImpl::ProducerImpl::Flush(
302 FlushRequestID flush_id,
303 const DataSourceInstanceID* instances,
304 size_t instance_count,
305 FlushFlags flush_flags) {
306 PERFETTO_DCHECK_THREAD(thread_checker_);
307 bool all_handled = true;
308 if (muxer_) {
309 for (size_t i = 0; i < instance_count; i++) {
310 DataSourceInstanceID ds_id = instances[i];
311 bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id,
312 flush_id, flush_flags);
313 if (!handled) {
314 pending_flushes_[flush_id].insert(ds_id);
315 all_handled = false;
316 }
317 }
318 }
319
320 if (all_handled) {
321 service_->NotifyFlushComplete(flush_id);
322 }
323 }
324
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)325 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
326 const DataSourceInstanceID* instances,
327 size_t instance_count) {
328 PERFETTO_DCHECK_THREAD(thread_checker_);
329 if (!muxer_)
330 return;
331 for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
332 muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
333 }
334 }
335
SweepDeadServices()336 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
337 PERFETTO_DCHECK_THREAD(thread_checker_);
338 auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
339 auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
340 return !arbiter || arbiter->TryShutdown();
341 };
342 for (auto it = dead_services_.begin(); it != dead_services_.end();) {
343 auto next_it = it;
344 next_it++;
345 if (is_unused(*it)) {
346 dead_services_.erase(it);
347 }
348 it = next_it;
349 }
350 return dead_services_.empty();
351 }
352
SendOnConnectTriggers()353 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
354 PERFETTO_DCHECK_THREAD(thread_checker_);
355 base::TimeMillis now = base::GetWallTimeMs();
356 std::vector<std::string> triggers;
357 while (!on_connect_triggers_.empty()) {
358 // Skip if we passed TTL.
359 if (on_connect_triggers_.front().second > now) {
360 triggers.push_back(std::move(on_connect_triggers_.front().first));
361 }
362 on_connect_triggers_.pop_front();
363 }
364 if (!triggers.empty()) {
365 service_->ActivateTriggers(triggers);
366 }
367 }
368
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)369 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
370 DataSourceInstanceID ds_id,
371 FlushRequestID flush_id) {
372 if (!connected_) {
373 return;
374 }
375
376 {
377 auto it = pending_flushes_.find(flush_id);
378 if (it == pending_flushes_.end()) {
379 return;
380 }
381 std::set<DataSourceInstanceID>& ds_ids = it->second;
382 ds_ids.erase(ds_id);
383 }
384
385 std::optional<DataSourceInstanceID> biggest_flush_id;
386 for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
387 if (it->second.empty()) {
388 biggest_flush_id = it->first;
389 it = pending_flushes_.erase(it);
390 } else {
391 break;
392 }
393 }
394
395 if (biggest_flush_id) {
396 service_->NotifyFlushComplete(*biggest_flush_id);
397 }
398 }
399
400 // ----- End of TracingMuxerImpl::ProducerImpl methods.
401
402 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)403 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
404 BackendType backend_type,
405 TracingSessionGlobalID session_id)
406 : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
407
~ConsumerImpl()408 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
409 muxer_ = nullptr;
410 }
411
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)412 void TracingMuxerImpl::ConsumerImpl::Initialize(
413 std::unique_ptr<ConsumerEndpoint> endpoint) {
414 PERFETTO_DCHECK_THREAD(thread_checker_);
415 service_ = std::move(endpoint);
416 // Don't try to use the service here since it may not have connected yet. See
417 // OnConnect().
418 }
419
OnConnect()420 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
421 PERFETTO_DCHECK_THREAD(thread_checker_);
422 PERFETTO_DCHECK(!connected_);
423 connected_ = true;
424
425 // Observe data source instance events so we get notified when tracing starts.
426 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
427 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
428
429 // If the API client configured and started tracing before we connected,
430 // tell the backend about it now.
431 if (trace_config_)
432 muxer_->SetupTracingSession(session_id_, trace_config_);
433 if (start_pending_)
434 muxer_->StartTracingSession(session_id_);
435 if (get_trace_stats_pending_) {
436 auto callback = std::move(get_trace_stats_callback_);
437 get_trace_stats_callback_ = nullptr;
438 muxer_->GetTraceStats(session_id_, std::move(callback));
439 }
440 if (query_service_state_callback_) {
441 auto callback = std::move(query_service_state_callback_);
442 query_service_state_callback_ = nullptr;
443 muxer_->QueryServiceState(session_id_, std::move(callback));
444 }
445 if (stop_pending_)
446 muxer_->StopTracingSession(session_id_);
447 }
448
OnDisconnect()449 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
450 PERFETTO_DCHECK_THREAD(thread_checker_);
451 // If we're being destroyed, bail out.
452 if (!muxer_)
453 return;
454 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
455 if (!connected_ && backend_type_ == kSystemBackend) {
456 PERFETTO_ELOG(
457 "Unable to connect to the system tracing service as a consumer. On "
458 "Android, use the \"perfetto\" command line tool instead to start "
459 "system-wide tracing sessions");
460 }
461 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
462
463 // Notify the client about disconnection.
464 NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
465
466 // Make sure the client doesn't hang in a blocking start/stop because of the
467 // disconnection.
468 NotifyStartComplete();
469 NotifyStopComplete();
470
471 // It shouldn't be necessary to call StopTracingSession. If we get this call
472 // it means that the service did shutdown before us, so there is no point
473 // trying it to ask it to stop the session. We should just remember to cleanup
474 // the consumer vector.
475 connected_ = false;
476
477 // Notify the muxer that it is safe to destroy |this|. This is needed because
478 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
479 // access until OnDisconnect() is called.
480 muxer_->OnConsumerDisconnected(this);
481 }
482
Disconnect()483 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
484 // This is weird and deserves a comment.
485 //
486 // When we called the ConnectConsumer method on the service it returns
487 // us a ConsumerEndpoint which we stored in |service_|, however this
488 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
489 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
490 // the ConsumerImpl pointer has to be valid until the
491 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
492 // ConsumerEndpoint |service_|. Eventually this will call
493 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
494 // call the destructor of |this|.
495 service_.reset();
496 }
497
OnTracingDisabled(const std::string & error)498 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
499 const std::string& error) {
500 PERFETTO_DCHECK_THREAD(thread_checker_);
501 PERFETTO_DCHECK(!stopped_);
502 stopped_ = true;
503
504 if (!error.empty())
505 NotifyError(TracingError{TracingError::kTracingFailed, error});
506
507 // If we're still waiting for the start event, fire it now. This may happen if
508 // there are no active data sources in the session.
509 NotifyStartComplete();
510 NotifyStopComplete();
511 }
512
NotifyStartComplete()513 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
514 PERFETTO_DCHECK_THREAD(thread_checker_);
515 if (start_complete_callback_) {
516 muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
517 start_complete_callback_ = nullptr;
518 }
519 if (blocking_start_complete_callback_) {
520 muxer_->task_runner_->PostTask(
521 std::move(blocking_start_complete_callback_));
522 blocking_start_complete_callback_ = nullptr;
523 }
524 }
525
NotifyError(const TracingError & error)526 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
527 PERFETTO_DCHECK_THREAD(thread_checker_);
528 if (error_callback_) {
529 muxer_->task_runner_->PostTask(
530 std::bind(std::move(error_callback_), error));
531 }
532 }
533
NotifyStopComplete()534 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
535 PERFETTO_DCHECK_THREAD(thread_checker_);
536 if (stop_complete_callback_) {
537 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
538 stop_complete_callback_ = nullptr;
539 }
540 if (blocking_stop_complete_callback_) {
541 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
542 blocking_stop_complete_callback_ = nullptr;
543 }
544 }
545
OnTraceData(std::vector<TracePacket> packets,bool has_more)546 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
547 std::vector<TracePacket> packets,
548 bool has_more) {
549 PERFETTO_DCHECK_THREAD(thread_checker_);
550 if (!read_trace_callback_)
551 return;
552
553 size_t capacity = 0;
554 for (const auto& packet : packets) {
555 // 16 is an over-estimation of the proto preamble size
556 capacity += packet.size() + 16;
557 }
558
559 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
560 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
561 buf->reserve(capacity);
562 for (auto& packet : packets) {
563 char* start;
564 size_t size;
565 std::tie(start, size) = packet.GetProtoPreamble();
566 buf->insert(buf->end(), start, start + size);
567 for (auto& slice : packet.slices()) {
568 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
569 buf->insert(buf->end(), slice_data, slice_data + slice.size);
570 }
571 }
572
573 auto callback = read_trace_callback_;
574 muxer_->task_runner_->PostTask([callback, buf, has_more] {
575 TracingSession::ReadTraceCallbackArgs callback_arg{};
576 callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
577 callback_arg.size = buf->size();
578 callback_arg.has_more = has_more;
579 callback(callback_arg);
580 });
581
582 if (!has_more)
583 read_trace_callback_ = nullptr;
584 }
585
OnObservableEvents(const ObservableEvents & events)586 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
587 const ObservableEvents& events) {
588 if (events.instance_state_changes_size()) {
589 for (const auto& state_change : events.instance_state_changes()) {
590 DataSourceHandle handle{state_change.producer_name(),
591 state_change.data_source_name()};
592 data_source_states_[handle] =
593 state_change.state() ==
594 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
595 }
596 }
597
598 if (events.instance_state_changes_size() ||
599 events.all_data_sources_started()) {
600 // Data sources are first reported as being stopped before starting, so once
601 // all the data sources we know about have started we can declare tracing
602 // begun. In the case where there are no matching data sources for the
603 // session, the service will report the all_data_sources_started() event
604 // without adding any instances (only since Android S / Perfetto v10.0).
605 if (start_complete_callback_ || blocking_start_complete_callback_) {
606 bool all_data_sources_started = std::all_of(
607 data_source_states_.cbegin(), data_source_states_.cend(),
608 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
609 if (all_data_sources_started)
610 NotifyStartComplete();
611 }
612 }
613 }
614
OnSessionCloned(const OnSessionClonedArgs &)615 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
616 const OnSessionClonedArgs&) {
617 // CloneSession is not exposed in the SDK. This should never happen.
618 PERFETTO_DCHECK(false);
619 }
620
OnTraceStats(bool success,const TraceStats & trace_stats)621 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
622 bool success,
623 const TraceStats& trace_stats) {
624 if (!get_trace_stats_callback_)
625 return;
626 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
627 callback_arg.success = success;
628 callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
629 muxer_->task_runner_->PostTask(
630 std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
631 get_trace_stats_callback_ = nullptr;
632 }
633
634 // The callbacks below are not used.
OnDetach(bool)635 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)636 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
637 // ----- End of TracingMuxerImpl::ConsumerImpl
638
639 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
640
641 // TracingSessionImpl is the RAII object returned to API clients when they
642 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
643 // tracing.
644
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)645 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
646 TracingMuxerImpl* muxer,
647 TracingSessionGlobalID session_id,
648 BackendType backend_type)
649 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
650
651 // Can be destroyed from any thread.
~TracingSessionImpl()652 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
653 auto* muxer = muxer_;
654 auto session_id = session_id_;
655 muxer->task_runner_->PostTask(
656 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
657 }
658
659 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)660 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
661 int fd) {
662 auto* muxer = muxer_;
663 auto session_id = session_id_;
664 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
665 if (fd >= 0) {
666 base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
667 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
668 if (backend_type_ != kInProcessBackend) {
669 PERFETTO_FATAL(
670 "Passing a file descriptor to TracingSession::Setup() is only "
671 "supported with the kInProcessBackend on Windows. Use "
672 "TracingSession::ReadTrace() instead");
673 }
674 #endif
675 trace_config->set_write_into_file(true);
676 fd = dup(fd);
677 }
678 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
679 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
680 });
681 }
682
683 // Can be called from any thread.
Start()684 void TracingMuxerImpl::TracingSessionImpl::Start() {
685 auto* muxer = muxer_;
686 auto session_id = session_id_;
687 muxer->task_runner_->PostTask(
688 [muxer, session_id] { muxer->StartTracingSession(session_id); });
689 }
690
691 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)692 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
693 const TraceConfig& cfg) {
694 auto* muxer = muxer_;
695 auto session_id = session_id_;
696 muxer->task_runner_->PostTask([muxer, session_id, cfg] {
697 muxer->ChangeTracingSessionConfig(session_id, cfg);
698 });
699 }
700
701 // Can be called from any thread except the service thread.
StartBlocking()702 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
703 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
704 auto* muxer = muxer_;
705 auto session_id = session_id_;
706 base::WaitableEvent tracing_started;
707 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
708 auto* consumer = muxer->FindConsumer(session_id);
709 if (!consumer) {
710 // TODO(skyostil): Signal an error to the user.
711 tracing_started.Notify();
712 return;
713 }
714 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
715 consumer->blocking_start_complete_callback_ = [&] {
716 tracing_started.Notify();
717 };
718 muxer->StartTracingSession(session_id);
719 });
720 tracing_started.Wait();
721 }
722
723 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)724 void TracingMuxerImpl::TracingSessionImpl::Flush(
725 std::function<void(bool)> user_callback,
726 uint32_t timeout_ms) {
727 auto* muxer = muxer_;
728 auto session_id = session_id_;
729 muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
730 auto* consumer = muxer->FindConsumer(session_id);
731 if (!consumer) {
732 std::move(user_callback)(false);
733 return;
734 }
735 muxer->FlushTracingSession(session_id, timeout_ms,
736 std::move(user_callback));
737 });
738 }
739
740 // Can be called from any thread.
Stop()741 void TracingMuxerImpl::TracingSessionImpl::Stop() {
742 auto* muxer = muxer_;
743 auto session_id = session_id_;
744 muxer->task_runner_->PostTask(
745 [muxer, session_id] { muxer->StopTracingSession(session_id); });
746 }
747
748 // Can be called from any thread except the service thread.
StopBlocking()749 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
750 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
751 auto* muxer = muxer_;
752 auto session_id = session_id_;
753 base::WaitableEvent tracing_stopped;
754 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
755 auto* consumer = muxer->FindConsumer(session_id);
756 if (!consumer) {
757 // TODO(skyostil): Signal an error to the user.
758 tracing_stopped.Notify();
759 return;
760 }
761 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
762 consumer->blocking_stop_complete_callback_ = [&] {
763 tracing_stopped.Notify();
764 };
765 muxer->StopTracingSession(session_id);
766 });
767 tracing_stopped.Wait();
768 }
769
770 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)771 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
772 auto* muxer = muxer_;
773 auto session_id = session_id_;
774 muxer->task_runner_->PostTask([muxer, session_id, cb] {
775 muxer->ReadTracingSessionData(session_id, std::move(cb));
776 });
777 }
778
779 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)780 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
781 std::function<void()> cb) {
782 auto* muxer = muxer_;
783 auto session_id = session_id_;
784 muxer->task_runner_->PostTask([muxer, session_id, cb] {
785 auto* consumer = muxer->FindConsumer(session_id);
786 if (!consumer)
787 return;
788 consumer->start_complete_callback_ = cb;
789 });
790 }
791
792 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)793 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
794 std::function<void(TracingError)> cb) {
795 auto* muxer = muxer_;
796 auto session_id = session_id_;
797 muxer->task_runner_->PostTask([muxer, session_id, cb] {
798 auto* consumer = muxer->FindConsumer(session_id);
799 if (!consumer) {
800 // Notify the client about concurrent disconnection of the session.
801 if (cb)
802 cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
803 return;
804 }
805 consumer->error_callback_ = cb;
806 });
807 }
808
809 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)810 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
811 std::function<void()> cb) {
812 auto* muxer = muxer_;
813 auto session_id = session_id_;
814 muxer->task_runner_->PostTask([muxer, session_id, cb] {
815 auto* consumer = muxer->FindConsumer(session_id);
816 if (!consumer)
817 return;
818 consumer->stop_complete_callback_ = cb;
819 });
820 }
821
822 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)823 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
824 GetTraceStatsCallback cb) {
825 auto* muxer = muxer_;
826 auto session_id = session_id_;
827 muxer->task_runner_->PostTask([muxer, session_id, cb] {
828 muxer->GetTraceStats(session_id, std::move(cb));
829 });
830 }
831
832 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)833 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
834 QueryServiceStateCallback cb) {
835 auto* muxer = muxer_;
836 auto session_id = session_id_;
837 muxer->task_runner_->PostTask([muxer, session_id, cb] {
838 muxer->QueryServiceState(session_id, std::move(cb));
839 });
840 }
841
842 // ----- End of TracingMuxerImpl::TracingSessionImpl
843
844 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
845
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)846 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
847 TracingMuxerImpl* muxer,
848 TracingSessionGlobalID session_id,
849 BackendType backend_type)
850 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
851
852 // Can be destroyed from any thread.
853 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
854 default;
855
Abort()856 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
857 auto* muxer = muxer_;
858 auto session_id = session_id_;
859 auto backend_type = backend_type_;
860 muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
861 muxer->AbortStartupTracingSession(session_id, backend_type);
862 });
863 }
864
865 // Must not be called from the SDK's internal thread.
AbortBlocking()866 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
867 auto* muxer = muxer_;
868 auto session_id = session_id_;
869 auto backend_type = backend_type_;
870 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
871 base::WaitableEvent event;
872 muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
873 muxer->AbortStartupTracingSession(session_id, backend_type);
874 event.Notify();
875 });
876 event.Wait();
877 }
878
879 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
880
881 // static
882 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
883
884 // This is called by perfetto::Tracing::Initialize().
885 // Can be called on any thread. Typically, but not necessarily, that will be
886 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)887 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
888 : TracingMuxer(args.platform ? args.platform
889 : Platform::GetDefaultPlatform()) {
890 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
891 instance_ = this;
892
893 // Create the thread where muxer, producers and service will live.
894 Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
895 task_runner_.reset(new NonReentrantTaskRunner(
896 this, platform_->CreateTaskRunner(std::move(tr_args))));
897
898 // Run the initializer on that thread.
899 task_runner_->PostTask([this, args] {
900 Initialize(args);
901 AddBackends(args);
902 });
903 }
904
Initialize(const TracingInitArgs & args)905 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
906 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
907
908 policy_ = args.tracing_policy;
909 supports_multiple_data_source_instances_ =
910 args.supports_multiple_data_source_instances;
911
912 // Fallback backend for producer creation for an unsupported backend type.
913 PERFETTO_CHECK(producer_backends_.empty());
914 AddProducerBackend(internal::TracingBackendFake::GetInstance(),
915 BackendType::kUnspecifiedBackend, args);
916 // Fallback backend for consumer creation for an unsupported backend type.
917 // This backend simply fails any attempt to start a tracing session.
918 PERFETTO_CHECK(consumer_backends_.empty());
919 AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
920 BackendType::kUnspecifiedBackend);
921 }
922
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)923 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
924 BackendType type) {
925 if (!backend) {
926 // We skip the log in release builds because the *_backend_fake.cc code
927 // has already an ELOG before returning a nullptr.
928 PERFETTO_DLOG("Consumer backend creation failed, type %d",
929 static_cast<int>(type));
930 return;
931 }
932 // Keep the backends sorted by type.
933 auto it =
934 std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
935 type, CompareBackendByType<RegisteredConsumerBackend>());
936 it = consumer_backends_.emplace(it);
937
938 RegisteredConsumerBackend& rb = *it;
939 rb.backend = backend;
940 rb.type = type;
941 }
942
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)943 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
944 BackendType type,
945 const TracingInitArgs& args) {
946 if (!backend) {
947 // We skip the log in release builds because the *_backend_fake.cc code
948 // has already an ELOG before returning a nullptr.
949 PERFETTO_DLOG("Producer backend creation failed, type %d",
950 static_cast<int>(type));
951 return;
952 }
953 TracingBackendId backend_id = producer_backends_.size();
954 // Keep the backends sorted by type.
955 auto it =
956 std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
957 type, CompareBackendByType<RegisteredProducerBackend>());
958 it = producer_backends_.emplace(it);
959
960 RegisteredProducerBackend& rb = *it;
961 rb.backend = backend;
962 rb.id = backend_id;
963 rb.type = type;
964 rb.producer.reset(new ProducerImpl(this, backend_id,
965 args.shmem_batch_commits_duration_ms,
966 args.shmem_direct_patching_enabled));
967 rb.producer_conn_args.producer = rb.producer.get();
968 rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
969 rb.producer_conn_args.task_runner = task_runner_.get();
970 rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
971 rb.producer_conn_args.shmem_page_size_hint_bytes =
972 args.shmem_page_size_hint_kb * 1024;
973 rb.producer_conn_args.create_socket_async = args.create_socket_async;
974 rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
975 }
976
977 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)978 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
979 for (RegisteredProducerBackend& b : producer_backends_) {
980 if (b.id == id) {
981 return &b;
982 }
983 }
984 return nullptr;
985 }
986
987 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)988 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
989 for (RegisteredProducerBackend& b : producer_backends_) {
990 if (b.type == type) {
991 return &b;
992 }
993 }
994 return nullptr;
995 }
996
997 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)998 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
999 for (RegisteredConsumerBackend& b : consumer_backends_) {
1000 if (b.type == type) {
1001 return &b;
1002 }
1003 }
1004 return nullptr;
1005 }
1006
AddBackends(const TracingInitArgs & args)1007 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1008 if (args.backends & kSystemBackend) {
1009 PERFETTO_CHECK(args.system_producer_backend_factory_);
1010 if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1011 AddProducerBackend(args.system_producer_backend_factory_(),
1012 kSystemBackend, args);
1013 }
1014 if (args.enable_system_consumer) {
1015 PERFETTO_CHECK(args.system_consumer_backend_factory_);
1016 if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1017 AddConsumerBackend(args.system_consumer_backend_factory_(),
1018 kSystemBackend);
1019 }
1020 }
1021 }
1022
1023 if (args.backends & kInProcessBackend) {
1024 TracingBackend* b = nullptr;
1025 if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1026 if (!b) {
1027 PERFETTO_CHECK(args.in_process_backend_factory_);
1028 b = args.in_process_backend_factory_();
1029 }
1030 AddProducerBackend(b, kInProcessBackend, args);
1031 }
1032 if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1033 if (!b) {
1034 PERFETTO_CHECK(args.in_process_backend_factory_);
1035 b = args.in_process_backend_factory_();
1036 }
1037 AddConsumerBackend(b, kInProcessBackend);
1038 }
1039 }
1040
1041 if (args.backends & kCustomBackend) {
1042 PERFETTO_CHECK(args.custom_backend);
1043 if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1044 AddProducerBackend(args.custom_backend, kCustomBackend, args);
1045 }
1046 if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1047 AddConsumerBackend(args.custom_backend, kCustomBackend);
1048 }
1049 }
1050
1051 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1052 PERFETTO_FATAL("Unsupported tracing backend type");
1053 }
1054 }
1055
1056 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,bool no_flush,DataSourceStaticState * static_state)1057 bool TracingMuxerImpl::RegisterDataSource(
1058 const DataSourceDescriptor& descriptor,
1059 DataSourceFactory factory,
1060 DataSourceParams params,
1061 bool no_flush,
1062 DataSourceStaticState* static_state) {
1063 // Ignore repeated registrations.
1064 if (static_state->index != kMaxDataSources)
1065 return true;
1066
1067 uint32_t new_index = next_data_source_index_++;
1068 if (new_index >= kMaxDataSources) {
1069 PERFETTO_DLOG(
1070 "RegisterDataSource failed: too many data sources already registered");
1071 return false;
1072 }
1073
1074 // Initialize the static state.
1075 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1076 "instances[] size mismatch");
1077 for (size_t i = 0; i < static_state->instances.size(); i++)
1078 new (&static_state->instances[i]) DataSourceState{};
1079
1080 static_state->index = new_index;
1081
1082 // Generate a semi-unique id for this data source.
1083 base::Hasher hash;
1084 hash.Update(reinterpret_cast<intptr_t>(static_state));
1085 hash.Update(base::GetWallTimeNs().count());
1086 static_state->id = hash.digest() ? hash.digest() : 1;
1087
1088 task_runner_->PostTask([this, descriptor, factory, static_state, params,
1089 no_flush] {
1090 data_sources_.emplace_back();
1091 RegisteredDataSource& rds = data_sources_.back();
1092 rds.descriptor = descriptor;
1093 rds.factory = factory;
1094 rds.supports_multiple_instances =
1095 supports_multiple_data_source_instances_ &&
1096 params.supports_multiple_instances;
1097 rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1098 rds.static_state = static_state;
1099 rds.no_flush = no_flush;
1100
1101 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1102 });
1103 return true;
1104 }
1105
1106 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1107 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1108 const DataSourceDescriptor& descriptor,
1109 const DataSourceStaticState* static_state) {
1110 task_runner_->PostTask([this, descriptor, static_state] {
1111 for (auto& rds : data_sources_) {
1112 if (rds.static_state == static_state) {
1113 PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1114 rds.descriptor = descriptor;
1115 rds.descriptor.set_id(static_state->id);
1116 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1117 return;
1118 }
1119 }
1120 });
1121 }
1122
1123 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1124 void TracingMuxerImpl::RegisterInterceptor(
1125 const InterceptorDescriptor& descriptor,
1126 InterceptorFactory factory,
1127 InterceptorBase::TLSFactory tls_factory,
1128 InterceptorBase::TracePacketCallback packet_callback) {
1129 task_runner_->PostTask([this, descriptor, factory, tls_factory,
1130 packet_callback] {
1131 // Ignore repeated registrations.
1132 for (const auto& interceptor : interceptors_) {
1133 if (interceptor.descriptor.name() == descriptor.name()) {
1134 PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1135 PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1136 return;
1137 }
1138 }
1139 // Only allow certain interceptors for now.
1140 if (descriptor.name() != "test_interceptor" &&
1141 descriptor.name() != "console" && descriptor.name() != "etwexport") {
1142 PERFETTO_ELOG(
1143 "Interceptors are experimental. If you want to use them, please "
1144 "get in touch with the project maintainers "
1145 "(https://perfetto.dev/docs/contributing/"
1146 "getting-started#community).");
1147 return;
1148 }
1149 interceptors_.emplace_back();
1150 RegisteredInterceptor& interceptor = interceptors_.back();
1151 interceptor.descriptor = descriptor;
1152 interceptor.factory = factory;
1153 interceptor.tls_factory = tls_factory;
1154 interceptor.packet_callback = packet_callback;
1155 });
1156 }
1157
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1158 void TracingMuxerImpl::ActivateTriggers(
1159 const std::vector<std::string>& triggers,
1160 uint32_t ttl_ms) {
1161 base::TimeMillis expire_time =
1162 base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1163 task_runner_->PostTask([this, triggers, expire_time] {
1164 for (RegisteredProducerBackend& backend : producer_backends_) {
1165 if (backend.producer->connected_) {
1166 backend.producer->service_->ActivateTriggers(triggers);
1167 } else {
1168 for (const std::string& trigger : triggers) {
1169 backend.producer->on_connect_triggers_.emplace_back(trigger,
1170 expire_time);
1171 }
1172 }
1173 }
1174 });
1175 }
1176
1177 // Checks if there is any matching startup tracing data source instance for a
1178 // new SetupDataSource call. If so, moves the data source to this tracing
1179 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1180 static bool MaybeAdoptStartupTracingInDataSource(
1181 TracingBackendId backend_id,
1182 uint32_t backend_connection_id,
1183 DataSourceInstanceID instance_id,
1184 const DataSourceConfig& cfg,
1185 const std::vector<RegisteredDataSource>& data_sources) {
1186 for (const auto& rds : data_sources) {
1187 DataSourceStaticState* static_state = rds.static_state;
1188 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1189 auto* internal_state = static_state->TryGet(i);
1190
1191 if (internal_state &&
1192 internal_state->startup_target_buffer_reservation.load(
1193 std::memory_order_relaxed) &&
1194 internal_state->data_source_instance_id == 0 &&
1195 internal_state->backend_id == backend_id &&
1196 internal_state->backend_connection_id == backend_connection_id &&
1197 internal_state->config &&
1198 internal_state->data_source->CanAdoptStartupSession(
1199 *internal_state->config, cfg)) {
1200 PERFETTO_DLOG("Setting up data source %" PRIu64
1201 " %s by adopting it from a startup tracing session",
1202 instance_id, cfg.name().c_str());
1203
1204 std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1205 // Set the associations. The actual takeover happens in
1206 // StartDataSource().
1207 internal_state->data_source_instance_id = instance_id;
1208 internal_state->buffer_id =
1209 static_cast<internal::BufferId>(cfg.target_buffer());
1210 internal_state->config.reset(new DataSourceConfig(cfg));
1211
1212 // TODO(eseckler): Should the data souce config provided by the service
1213 // be allowed to specify additional interceptors / additional data
1214 // source params?
1215
1216 return true;
1217 }
1218 }
1219 }
1220 return false;
1221 }
1222
1223 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1224 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1225 uint32_t backend_connection_id,
1226 DataSourceInstanceID instance_id,
1227 const DataSourceConfig& cfg) {
1228 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1229 cfg.name().c_str());
1230 PERFETTO_DCHECK_THREAD(thread_checker_);
1231
1232 // First check if there is any matching startup tracing data source instance.
1233 if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1234 instance_id, cfg, data_sources_)) {
1235 return;
1236 }
1237
1238 for (const auto& rds : data_sources_) {
1239 if (rds.descriptor.name() != cfg.name())
1240 continue;
1241 DataSourceStaticState& static_state = *rds.static_state;
1242
1243 // If this data source is already active for this exact config, don't start
1244 // another instance. This happens when we have several data sources with the
1245 // same name, in which case the service sends one SetupDataSource event for
1246 // each one. Since we can't map which event maps to which data source, we
1247 // ensure each event only starts one data source instance.
1248 // TODO(skyostil): Register a unique id with each data source to the service
1249 // to disambiguate.
1250 bool active_for_config = false;
1251 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1252 if (!static_state.TryGet(i))
1253 continue;
1254 auto* internal_state =
1255 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1256 if (internal_state->backend_id == backend_id &&
1257 internal_state->backend_connection_id == backend_connection_id &&
1258 internal_state->config && *internal_state->config == cfg) {
1259 active_for_config = true;
1260 break;
1261 }
1262 }
1263 if (active_for_config) {
1264 PERFETTO_DLOG(
1265 "Data source %s is already active with this config, skipping",
1266 cfg.name().c_str());
1267 continue;
1268 }
1269
1270 SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1271 cfg, /*startup_session_id=*/0);
1272 return;
1273 }
1274 }
1275
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,TracingSessionGlobalID startup_session_id)1276 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1277 const RegisteredDataSource& rds,
1278 TracingBackendId backend_id,
1279 uint32_t backend_connection_id,
1280 DataSourceInstanceID instance_id,
1281 const DataSourceConfig& cfg,
1282 TracingSessionGlobalID startup_session_id) {
1283 PERFETTO_DCHECK_THREAD(thread_checker_);
1284 DataSourceStaticState& static_state = *rds.static_state;
1285
1286 // If any bit is set in `static_state.valid_instances` then at least one
1287 // other instance of data source is running.
1288 if (!rds.supports_multiple_instances &&
1289 static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1290 PERFETTO_ELOG(
1291 "Failed to setup data source because some another instance of this "
1292 "data source is already active");
1293 return FindDataSourceRes();
1294 }
1295
1296 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1297 // Find a free slot.
1298 if (static_state.TryGet(i))
1299 continue;
1300
1301 auto* internal_state =
1302 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1303 std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1304 static_assert(
1305 std::is_same<decltype(internal_state->data_source_instance_id),
1306 DataSourceInstanceID>::value,
1307 "data_source_instance_id type mismatch");
1308 internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1309 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1310
1311 if (startup_session_id) {
1312 uint16_t& last_reservation =
1313 backend.producer->last_startup_target_buffer_reservation_;
1314 if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1315 PERFETTO_ELOG(
1316 "Startup buffer reservations exhausted, dropping data source");
1317 return FindDataSourceRes();
1318 }
1319 internal_state->startup_target_buffer_reservation.store(
1320 ++last_reservation, std::memory_order_relaxed);
1321 } else {
1322 internal_state->startup_target_buffer_reservation.store(
1323 0, std::memory_order_relaxed);
1324 }
1325
1326 internal_state->backend_id = backend_id;
1327 internal_state->backend_connection_id = backend_connection_id;
1328 internal_state->data_source_instance_id = instance_id;
1329 internal_state->buffer_id =
1330 static_cast<internal::BufferId>(cfg.target_buffer());
1331 internal_state->config.reset(new DataSourceConfig(cfg));
1332 internal_state->startup_session_id = startup_session_id;
1333 internal_state->data_source = rds.factory();
1334 internal_state->interceptor = nullptr;
1335 internal_state->interceptor_id = 0;
1336
1337 if (cfg.has_interceptor_config()) {
1338 for (size_t j = 0; j < interceptors_.size(); j++) {
1339 if (cfg.interceptor_config().name() ==
1340 interceptors_[j].descriptor.name()) {
1341 PERFETTO_DLOG("Intercepting data source %" PRIu64
1342 " \"%s\" into \"%s\"",
1343 instance_id, cfg.name().c_str(),
1344 cfg.interceptor_config().name().c_str());
1345 internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1346 internal_state->interceptor = interceptors_[j].factory();
1347 internal_state->interceptor->OnSetup({cfg});
1348 break;
1349 }
1350 }
1351 if (!internal_state->interceptor_id) {
1352 PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1353 cfg.interceptor_config().name().c_str());
1354 }
1355 }
1356
1357 // This must be made at the end. See matching acquire-load in
1358 // DataSource::Trace().
1359 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1360
1361 DataSourceBase::SetupArgs setup_args;
1362 setup_args.config = &cfg;
1363 setup_args.backend_type = backend.type;
1364 setup_args.internal_instance_index = i;
1365
1366 if (!rds.requires_callbacks_under_lock)
1367 lock.unlock();
1368 internal_state->data_source->OnSetup(setup_args);
1369
1370 return FindDataSourceRes(&static_state, internal_state, i,
1371 rds.requires_callbacks_under_lock);
1372 }
1373 PERFETTO_ELOG(
1374 "Maximum number of data source instances exhausted. "
1375 "Dropping data source %" PRIu64,
1376 instance_id);
1377 return FindDataSourceRes();
1378 }
1379
1380 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1381 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1382 DataSourceInstanceID instance_id) {
1383 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1384 PERFETTO_DCHECK_THREAD(thread_checker_);
1385
1386 auto ds = FindDataSource(backend_id, instance_id);
1387 if (!ds) {
1388 PERFETTO_ELOG("Could not find data source to start");
1389 return;
1390 }
1391
1392 // Check if the data source was already started for startup tracing.
1393 uint16_t startup_reservation =
1394 ds.internal_state->startup_target_buffer_reservation.load(
1395 std::memory_order_relaxed);
1396 if (startup_reservation) {
1397 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1398 TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1399 auto session_it = std::find_if(
1400 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1401 [session_id](const RegisteredStartupSession& session) {
1402 return session.session_id == session_id;
1403 });
1404 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1405
1406 if (session_it->is_aborting) {
1407 PERFETTO_DLOG("Data source %" PRIu64
1408 " was already aborted for startup tracing, not starting it",
1409 instance_id);
1410 return;
1411 }
1412
1413 PERFETTO_DLOG(
1414 "Data source %" PRIu64
1415 " was already started for startup tracing, binding its target buffer",
1416 instance_id);
1417
1418 backend.producer->service_->MaybeSharedMemoryArbiter()
1419 ->BindStartupTargetBuffer(startup_reservation,
1420 ds.internal_state->buffer_id);
1421
1422 // The reservation ID can be used even after binding it, so there's no need
1423 // for any barriers here - we just need atomicity.
1424 ds.internal_state->startup_target_buffer_reservation.store(
1425 0, std::memory_order_relaxed);
1426
1427 // TODO(eseckler): Should we reset incremental state at this point, or
1428 // notify the data source some other way?
1429
1430 // The session should not have been fully bound yet (or aborted).
1431 PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1432
1433 session_it->num_unbound_data_sources--;
1434 if (session_it->num_unbound_data_sources == 0) {
1435 if (session_it->on_adopted)
1436 task_runner_->PostTask(session_it->on_adopted);
1437 backend.startup_sessions.erase(session_it);
1438 }
1439 return;
1440 }
1441
1442 StartDataSourceImpl(ds);
1443 }
1444
StartDataSourceImpl(const FindDataSourceRes & ds)1445 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1446 PERFETTO_DCHECK_THREAD(thread_checker_);
1447
1448 DataSourceBase::StartArgs start_args{};
1449 start_args.internal_instance_index = ds.instance_idx;
1450
1451 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1452 if (ds.internal_state->interceptor)
1453 ds.internal_state->interceptor->OnStart({});
1454 ds.internal_state->trace_lambda_enabled.store(true,
1455 std::memory_order_relaxed);
1456 PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1457
1458 if (!ds.requires_callbacks_under_lock)
1459 lock.unlock();
1460 ds.internal_state->data_source->OnStart(start_args);
1461 }
1462
1463 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1464 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1465 TracingBackendId backend_id,
1466 DataSourceInstanceID instance_id) {
1467 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1468 PERFETTO_DCHECK_THREAD(thread_checker_);
1469
1470 auto ds = FindDataSource(backend_id, instance_id);
1471 if (!ds) {
1472 PERFETTO_ELOG("Could not find data source to stop");
1473 return;
1474 }
1475
1476 StopDataSource_AsyncBeginImpl(ds);
1477 }
1478
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1479 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1480 const FindDataSourceRes& ds) {
1481 TracingBackendId backend_id = ds.internal_state->backend_id;
1482 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1483 DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1484
1485 StopArgsImpl stop_args{};
1486 stop_args.internal_instance_index = ds.instance_idx;
1487 stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1488 instance_id, ds] {
1489 // TracingMuxerImpl is long lived, capturing |this| is okay.
1490 // The notification closure can be moved out of the StopArgs by the
1491 // embedder to handle stop asynchronously. The embedder might then
1492 // call the closure on a different thread than the current one, hence
1493 // this nested PostTask().
1494 task_runner_->PostTask(
1495 [this, backend_id, backend_connection_id, instance_id, ds] {
1496 StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1497 instance_id, ds);
1498 });
1499 };
1500
1501 {
1502 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1503
1504 // Don't call OnStop again if the datasource is already stopping.
1505 if (ds.internal_state->async_stop_in_progress)
1506 return;
1507 ds.internal_state->async_stop_in_progress = true;
1508
1509 if (ds.internal_state->interceptor)
1510 ds.internal_state->interceptor->OnStop({});
1511
1512 if (!ds.requires_callbacks_under_lock)
1513 lock.unlock();
1514 ds.internal_state->data_source->OnStop(stop_args);
1515 }
1516
1517 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1518 // async closure here. In theory we could avoid the PostTask and call
1519 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1520 // divergencies between the deferred-stop vs non-deferred-stop code paths.
1521 if (stop_args.async_stop_closure)
1522 std::move(stop_args.async_stop_closure)();
1523 }
1524
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1525 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1526 uint32_t backend_connection_id,
1527 DataSourceInstanceID instance_id,
1528 const FindDataSourceRes& ds) {
1529 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1530 PERFETTO_DCHECK_THREAD(thread_checker_);
1531
1532 // Check that the data source instance is still active and was not modified
1533 // while it was being stopped.
1534 if (!ds.static_state->TryGet(ds.instance_idx) ||
1535 ds.internal_state->backend_id != backend_id ||
1536 ds.internal_state->backend_connection_id != backend_connection_id ||
1537 ds.internal_state->data_source_instance_id != instance_id) {
1538 PERFETTO_ELOG(
1539 "Async stop of data source %" PRIu64
1540 " failed. This might be due to calling the async_stop_closure twice.",
1541 instance_id);
1542 return;
1543 }
1544
1545 const uint32_t mask = ~(1 << ds.instance_idx);
1546 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1547
1548 // Take the mutex to prevent that the data source is in the middle of
1549 // a Trace() execution where it called GetDataSourceLocked() while we
1550 // destroy it.
1551 uint16_t startup_buffer_reservation;
1552 TracingSessionGlobalID startup_session_id;
1553 {
1554 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1555 ds.internal_state->trace_lambda_enabled.store(false,
1556 std::memory_order_relaxed);
1557 ds.internal_state->data_source.reset();
1558 ds.internal_state->interceptor.reset();
1559 ds.internal_state->config.reset();
1560 ds.internal_state->async_stop_in_progress = false;
1561 startup_buffer_reservation =
1562 ds.internal_state->startup_target_buffer_reservation.load(
1563 std::memory_order_relaxed);
1564 startup_session_id = ds.internal_state->startup_session_id;
1565 }
1566
1567 // The other fields of internal_state are deliberately *not* cleared.
1568 // See races-related comments of DataSource::Trace().
1569
1570 TracingMuxer::generation_++;
1571
1572 // |producer_backends_| is append-only, Backend instances are always valid.
1573 PERFETTO_CHECK(backend_id < producer_backends_.size());
1574 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1575 ProducerImpl* producer = backend.producer.get();
1576 if (!producer)
1577 return;
1578
1579 // If the data source instance still has a startup buffer reservation, it was
1580 // only active for startup tracing and never started by the service. Discard
1581 // the startup buffer reservation.
1582 if (startup_buffer_reservation) {
1583 PERFETTO_DCHECK(startup_session_id);
1584
1585 if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1586 producer->service_->MaybeSharedMemoryArbiter()
1587 ->AbortStartupTracingForReservation(startup_buffer_reservation);
1588 }
1589
1590 auto session_it = std::find_if(
1591 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1592 [startup_session_id](const RegisteredStartupSession& session) {
1593 return session.session_id == startup_session_id;
1594 });
1595
1596 // Session should not be removed until abortion of all data source instances
1597 // is complete.
1598 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1599
1600 session_it->num_aborting_data_sources--;
1601 if (session_it->num_aborting_data_sources == 0) {
1602 if (session_it->on_aborted)
1603 task_runner_->PostTask(session_it->on_aborted);
1604
1605 backend.startup_sessions.erase(session_it);
1606 }
1607 }
1608
1609 if (producer->connected_ &&
1610 backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1611 backend_connection_id) {
1612 // Flush any commits that might have been batched by SharedMemoryArbiter.
1613 producer->service_->MaybeSharedMemoryArbiter()
1614 ->FlushPendingCommitDataRequests();
1615 if (instance_id)
1616 producer->service_->NotifyDataSourceStopped(instance_id);
1617 }
1618 producer->SweepDeadServices();
1619 }
1620
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1621 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1622 TracingBackendId backend_id,
1623 DataSourceInstanceID instance_id) {
1624 PERFETTO_DCHECK_THREAD(thread_checker_);
1625 PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1626 instance_id);
1627 auto ds = FindDataSource(backend_id, instance_id);
1628 if (!ds) {
1629 PERFETTO_ELOG("Could not find data source to clear incremental state for");
1630 return;
1631 }
1632
1633 DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1634 clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1635 {
1636 std::unique_lock<std::recursive_mutex> lock;
1637 if (ds.requires_callbacks_under_lock)
1638 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1639 ds.internal_state->data_source->WillClearIncrementalState(
1640 clear_incremental_state_args);
1641 }
1642
1643 // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1644 // the incremental state should be cleared.
1645 ds.static_state->incremental_state_generation.fetch_add(
1646 1, std::memory_order_relaxed);
1647 }
1648
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id,FlushFlags flush_flags)1649 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1650 TracingBackendId backend_id,
1651 DataSourceInstanceID instance_id,
1652 FlushRequestID flush_id,
1653 FlushFlags flush_flags) {
1654 PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1655 auto ds = FindDataSource(backend_id, instance_id);
1656 if (!ds) {
1657 PERFETTO_ELOG("Could not find data source to flush");
1658 return true;
1659 }
1660
1661 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1662
1663 FlushArgsImpl flush_args;
1664 flush_args.flush_flags = flush_flags;
1665 flush_args.internal_instance_index = ds.instance_idx;
1666 flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1667 instance_id, ds, flush_id] {
1668 // TracingMuxerImpl is long lived, capturing |this| is okay.
1669 // The notification closure can be moved out of the StopArgs by the
1670 // embedder to handle stop asynchronously. The embedder might then
1671 // call the closure on a different thread than the current one, hence
1672 // this nested PostTask().
1673 task_runner_->PostTask(
1674 [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1675 FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1676 instance_id, ds, flush_id);
1677 });
1678 };
1679 {
1680 std::unique_lock<std::recursive_mutex> lock;
1681 if (ds.requires_callbacks_under_lock)
1682 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1683 ds.internal_state->data_source->OnFlush(flush_args);
1684 }
1685
1686 // |async_flush_closure| is moved out of |flush_args| if the producer
1687 // requested to handle the flush asynchronously.
1688 bool handled = static_cast<bool>(flush_args.async_flush_closure);
1689 return handled;
1690 }
1691
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1692 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1693 TracingBackendId backend_id,
1694 uint32_t backend_connection_id,
1695 DataSourceInstanceID instance_id,
1696 const FindDataSourceRes& ds,
1697 FlushRequestID flush_id) {
1698 PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1699 PERFETTO_DCHECK_THREAD(thread_checker_);
1700
1701 // Check that the data source instance is still active and was not modified
1702 // while it was being flushed.
1703 if (!ds.static_state->TryGet(ds.instance_idx) ||
1704 ds.internal_state->backend_id != backend_id ||
1705 ds.internal_state->backend_connection_id != backend_connection_id ||
1706 ds.internal_state->data_source_instance_id != instance_id) {
1707 PERFETTO_ELOG("Async flush of data source %" PRIu64
1708 " failed. This might be due to the data source being stopped "
1709 "in the meantime",
1710 instance_id);
1711 return;
1712 }
1713
1714 // |producer_backends_| is append-only, Backend instances are always valid.
1715 PERFETTO_CHECK(backend_id < producer_backends_.size());
1716 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1717
1718 ProducerImpl* producer = backend.producer.get();
1719 if (!producer)
1720 return;
1721
1722 // If the tracing service disconnects and reconnects while a data source is
1723 // handling a flush request, there's no point is sending the flush reply to
1724 // the newly reconnected producer.
1725 if (producer->connected_ &&
1726 backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1727 backend_connection_id) {
1728 producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1729 }
1730 }
1731
SyncProducersForTesting()1732 void TracingMuxerImpl::SyncProducersForTesting() {
1733 std::mutex mutex;
1734 std::condition_variable cv;
1735
1736 // IPC-based producers don't report connection errors explicitly for each
1737 // command, but instead with an asynchronous callback
1738 // (ProducerImpl::OnDisconnected). This means that the sync command below
1739 // may have completed but failed to reach the service because of a
1740 // disconnection, but we can't tell until the disconnection message comes
1741 // through. To guard against this, we run two whole rounds of sync round-trips
1742 // before returning; the first one will detect any disconnected producers and
1743 // the second one will ensure any reconnections have completed and all data
1744 // sources are registered in the service again.
1745 for (size_t i = 0; i < 2; i++) {
1746 size_t countdown = std::numeric_limits<size_t>::max();
1747 task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1748 {
1749 std::unique_lock<std::mutex> countdown_lock(mutex);
1750 countdown = producer_backends_.size();
1751 }
1752 for (auto& backend : producer_backends_) {
1753 auto* producer = backend.producer.get();
1754 producer->service_->Sync([&mutex, &cv, &countdown] {
1755 std::unique_lock<std::mutex> countdown_lock(mutex);
1756 countdown--;
1757 cv.notify_one();
1758 });
1759 }
1760 });
1761
1762 {
1763 std::unique_lock<std::mutex> countdown_lock(mutex);
1764 cv.wait(countdown_lock, [&countdown] { return !countdown; });
1765 }
1766 }
1767
1768 // Check that all producers are indeed connected.
1769 bool done = false;
1770 bool all_producers_connected = true;
1771 task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1772 for (auto& backend : producer_backends_)
1773 all_producers_connected &= backend.producer->connected_;
1774 std::unique_lock<std::mutex> lock(mutex);
1775 done = true;
1776 cv.notify_one();
1777 });
1778
1779 {
1780 std::unique_lock<std::mutex> lock(mutex);
1781 cv.wait(lock, [&done] { return done; });
1782 }
1783 PERFETTO_DCHECK(all_producers_connected);
1784 }
1785
DestroyStoppedTraceWritersForCurrentThread()1786 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1787 // Iterate across all possible data source types.
1788 auto cur_generation = generation_.load(std::memory_order_acquire);
1789 auto* root_tls = GetOrCreateTracingTLS();
1790
1791 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1792 // |tls| has a vector of per-data-source-instance thread-local state.
1793 DataSourceStaticState* static_state = tls.static_state;
1794 if (!static_state)
1795 return; // Slot not used.
1796
1797 // Iterate across all possible instances for this data source.
1798 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1799 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1800 if (!ds_tls.trace_writer)
1801 continue;
1802
1803 DataSourceState* ds_state = static_state->TryGet(inst);
1804 if (ds_state &&
1805 ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1806 ds_state->backend_id == ds_tls.backend_id &&
1807 ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1808 ds_state->startup_target_buffer_reservation.load(
1809 std::memory_order_relaxed) ==
1810 ds_tls.startup_target_buffer_reservation &&
1811 ds_state->buffer_id == ds_tls.buffer_id &&
1812 ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1813 continue;
1814 }
1815
1816 // The DataSource instance has been destroyed or recycled.
1817 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
1818 }
1819 };
1820
1821 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1822 // |tls| has a vector of per-data-source-instance thread-local state.
1823 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1824 destroy_stopped_instances(tls);
1825 }
1826 destroy_stopped_instances(root_tls->track_event_tls);
1827 root_tls->generation = cur_generation;
1828 }
1829
1830 // Called both when a new data source is registered or when a new backend
1831 // connects. In both cases we want to be sure we reflected the data source
1832 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1833 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1834 PERFETTO_DCHECK_THREAD(thread_checker_);
1835 for (RegisteredDataSource& rds : data_sources_) {
1836 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1837 }
1838 }
1839
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1840 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1841 bool is_changed) {
1842 PERFETTO_DCHECK_THREAD(thread_checker_);
1843 for (RegisteredProducerBackend& backend : producer_backends_) {
1844 // We cannot call RegisterDataSource on the backend before it connects.
1845 if (!backend.producer->connected_)
1846 continue;
1847
1848 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1849 bool is_registered = backend.producer->registered_data_sources_.test(
1850 rds.static_state->index);
1851 if (is_registered && !is_changed)
1852 continue;
1853
1854 if (!rds.descriptor.no_flush()) {
1855 rds.descriptor.set_no_flush(rds.no_flush);
1856 }
1857 rds.descriptor.set_will_notify_on_start(true);
1858 if (!rds.descriptor.has_will_notify_on_stop()) {
1859 rds.descriptor.set_will_notify_on_stop(true);
1860 }
1861
1862 rds.descriptor.set_handles_incremental_state_clear(true);
1863 rds.descriptor.set_id(rds.static_state->id);
1864 if (is_registered) {
1865 backend.producer->service_->UpdateDataSource(rds.descriptor);
1866 } else {
1867 backend.producer->service_->RegisterDataSource(rds.descriptor);
1868 }
1869 backend.producer->registered_data_sources_.set(rds.static_state->index);
1870 }
1871 }
1872
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1873 void TracingMuxerImpl::SetupTracingSession(
1874 TracingSessionGlobalID session_id,
1875 const std::shared_ptr<TraceConfig>& trace_config,
1876 base::ScopedFile trace_fd) {
1877 PERFETTO_DCHECK_THREAD(thread_checker_);
1878 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1879
1880 auto* consumer = FindConsumer(session_id);
1881 if (!consumer)
1882 return;
1883
1884 consumer->trace_config_ = trace_config;
1885 if (trace_fd)
1886 consumer->trace_fd_ = std::move(trace_fd);
1887
1888 if (!consumer->connected_)
1889 return;
1890
1891 // Only used in the deferred start mode.
1892 if (trace_config->deferred_start()) {
1893 consumer->service_->EnableTracing(*trace_config,
1894 std::move(consumer->trace_fd_));
1895 }
1896 }
1897
StartTracingSession(TracingSessionGlobalID session_id)1898 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1899 PERFETTO_DCHECK_THREAD(thread_checker_);
1900
1901 auto* consumer = FindConsumer(session_id);
1902
1903 if (!consumer)
1904 return;
1905
1906 if (!consumer->trace_config_) {
1907 PERFETTO_ELOG("Must call Setup(config) first");
1908 return;
1909 }
1910
1911 if (!consumer->connected_) {
1912 consumer->start_pending_ = true;
1913 return;
1914 }
1915
1916 consumer->start_pending_ = false;
1917 if (consumer->trace_config_->deferred_start()) {
1918 consumer->service_->StartTracing();
1919 } else {
1920 consumer->service_->EnableTracing(*consumer->trace_config_,
1921 std::move(consumer->trace_fd_));
1922 }
1923
1924 // TODO implement support for the deferred-start + fast-triggering case.
1925 }
1926
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1927 void TracingMuxerImpl::ChangeTracingSessionConfig(
1928 TracingSessionGlobalID session_id,
1929 const TraceConfig& trace_config) {
1930 PERFETTO_DCHECK_THREAD(thread_checker_);
1931
1932 auto* consumer = FindConsumer(session_id);
1933
1934 if (!consumer)
1935 return;
1936
1937 if (!consumer->trace_config_) {
1938 // Changing the config is only supported for started sessions.
1939 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1940 return;
1941 }
1942
1943 consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1944 if (consumer->connected_)
1945 consumer->service_->ChangeTraceConfig(trace_config);
1946 }
1947
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1948 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1949 uint32_t timeout_ms,
1950 std::function<void(bool)> callback) {
1951 PERFETTO_DCHECK_THREAD(thread_checker_);
1952 auto* consumer = FindConsumer(session_id);
1953 if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1954 !consumer->trace_config_) {
1955 PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1956 std::move(callback)(false);
1957 return;
1958 }
1959
1960 // For now we don't want to expose the flush reason to the consumer-side SDK
1961 // users to avoid misuses until there is a strong need.
1962 consumer->service_->Flush(timeout_ms, std::move(callback),
1963 FlushFlags(FlushFlags::Initiator::kConsumerSdk,
1964 FlushFlags::Reason::kExplicit));
1965 }
1966
StopTracingSession(TracingSessionGlobalID session_id)1967 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1968 PERFETTO_DCHECK_THREAD(thread_checker_);
1969 auto* consumer = FindConsumer(session_id);
1970 if (!consumer)
1971 return;
1972
1973 if (consumer->start_pending_) {
1974 // If the session hasn't started yet, wait until it does before stopping.
1975 consumer->stop_pending_ = true;
1976 return;
1977 }
1978
1979 consumer->stop_pending_ = false;
1980 if (consumer->stopped_) {
1981 // If the session was already stopped (e.g., it failed to start), don't try
1982 // stopping again.
1983 consumer->NotifyStopComplete();
1984 } else if (!consumer->trace_config_) {
1985 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1986 return;
1987 } else {
1988 consumer->service_->DisableTracing();
1989 }
1990
1991 consumer->trace_config_.reset();
1992 }
1993
DestroyTracingSession(TracingSessionGlobalID session_id)1994 void TracingMuxerImpl::DestroyTracingSession(
1995 TracingSessionGlobalID session_id) {
1996 PERFETTO_DCHECK_THREAD(thread_checker_);
1997 for (RegisteredConsumerBackend& backend : consumer_backends_) {
1998 // We need to find the consumer (if any) and call Disconnect as we destroy
1999 // the tracing session. We can't call Disconnect() inside this for loop
2000 // because in the in-process case this will end up to a synchronous call to
2001 // OnConsumerDisconnect which will invalidate all the iterators to
2002 // |backend.consumers|.
2003 ConsumerImpl* consumer = nullptr;
2004 for (auto& con : backend.consumers) {
2005 if (con->session_id_ == session_id) {
2006 consumer = con.get();
2007 break;
2008 }
2009 }
2010 if (consumer) {
2011 // We broke out of the loop above on the assumption that each backend will
2012 // only have a single consumer per session. This DCHECK ensures that
2013 // this is the case.
2014 PERFETTO_DCHECK(
2015 std::count_if(backend.consumers.begin(), backend.consumers.end(),
2016 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2017 return con->session_id_ == session_id;
2018 }) == 1u);
2019 consumer->Disconnect();
2020 }
2021 }
2022 }
2023
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2024 void TracingMuxerImpl::ReadTracingSessionData(
2025 TracingSessionGlobalID session_id,
2026 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2027 PERFETTO_DCHECK_THREAD(thread_checker_);
2028 auto* consumer = FindConsumer(session_id);
2029 if (!consumer) {
2030 // TODO(skyostil): Signal an error to the user.
2031 TracingSession::ReadTraceCallbackArgs callback_arg{};
2032 callback(callback_arg);
2033 return;
2034 }
2035 PERFETTO_DCHECK(!consumer->read_trace_callback_);
2036 consumer->read_trace_callback_ = std::move(callback);
2037 consumer->service_->ReadBuffers();
2038 }
2039
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2040 void TracingMuxerImpl::GetTraceStats(
2041 TracingSessionGlobalID session_id,
2042 TracingSession::GetTraceStatsCallback callback) {
2043 PERFETTO_DCHECK_THREAD(thread_checker_);
2044 auto* consumer = FindConsumer(session_id);
2045 if (!consumer) {
2046 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2047 callback_arg.success = false;
2048 callback(std::move(callback_arg));
2049 return;
2050 }
2051 PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2052 consumer->get_trace_stats_callback_ = std::move(callback);
2053 if (!consumer->connected_) {
2054 consumer->get_trace_stats_pending_ = true;
2055 return;
2056 }
2057 consumer->get_trace_stats_pending_ = false;
2058 consumer->service_->GetTraceStats();
2059 }
2060
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2061 void TracingMuxerImpl::QueryServiceState(
2062 TracingSessionGlobalID session_id,
2063 TracingSession::QueryServiceStateCallback callback) {
2064 PERFETTO_DCHECK_THREAD(thread_checker_);
2065 auto* consumer = FindConsumer(session_id);
2066 if (!consumer) {
2067 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2068 callback_arg.success = false;
2069 callback(std::move(callback_arg));
2070 return;
2071 }
2072 PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2073 if (!consumer->connected_) {
2074 consumer->query_service_state_callback_ = std::move(callback);
2075 return;
2076 }
2077 auto callback_wrapper = [callback](bool success,
2078 protos::gen::TracingServiceState state) {
2079 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2080 callback_arg.success = success;
2081 callback_arg.service_state_data = state.SerializeAsArray();
2082 callback(std::move(callback_arg));
2083 };
2084 consumer->service_->QueryServiceState({}, std::move(callback_wrapper));
2085 }
2086
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2087 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2088 uint32_t batch_commits_duration_ms,
2089 BackendType backend_type) {
2090 for (RegisteredProducerBackend& backend : producer_backends_) {
2091 if (backend.producer && backend.producer->connected_ &&
2092 backend.type == backend_type) {
2093 backend.producer->service_->MaybeSharedMemoryArbiter()
2094 ->SetBatchCommitsDuration(batch_commits_duration_ms);
2095 }
2096 }
2097 }
2098
EnableDirectSMBPatchingForTesting(BackendType backend_type)2099 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2100 BackendType backend_type) {
2101 for (RegisteredProducerBackend& backend : producer_backends_) {
2102 if (backend.producer && backend.producer->connected_ &&
2103 backend.type == backend_type &&
2104 !backend.producer->service_->MaybeSharedMemoryArbiter()
2105 ->EnableDirectSMBPatching()) {
2106 return false;
2107 }
2108 }
2109 return true;
2110 }
2111
FindConsumer(TracingSessionGlobalID session_id)2112 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2113 TracingSessionGlobalID session_id) {
2114 PERFETTO_DCHECK_THREAD(thread_checker_);
2115 return FindConsumerAndBackend(session_id).first;
2116 }
2117
2118 std::pair<TracingMuxerImpl::ConsumerImpl*,
2119 TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2120 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2121 PERFETTO_DCHECK_THREAD(thread_checker_);
2122 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2123 for (auto& consumer : backend.consumers) {
2124 if (consumer->session_id_ == session_id) {
2125 return {consumer.get(), &backend};
2126 }
2127 }
2128 }
2129 return {nullptr, nullptr};
2130 }
2131
InitializeConsumer(TracingSessionGlobalID session_id)2132 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2133 PERFETTO_DCHECK_THREAD(thread_checker_);
2134
2135 auto res = FindConsumerAndBackend(session_id);
2136 if (!res.first || !res.second)
2137 return;
2138 TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2139 RegisteredConsumerBackend& backend = *res.second;
2140
2141 TracingBackend::ConnectConsumerArgs conn_args;
2142 conn_args.consumer = consumer;
2143 conn_args.task_runner = task_runner_.get();
2144 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2145 }
2146
OnConsumerDisconnected(ConsumerImpl * consumer)2147 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2148 PERFETTO_DCHECK_THREAD(thread_checker_);
2149 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2150 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2151 return con.get() == consumer;
2152 };
2153 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2154 backend.consumers.end(), pred),
2155 backend.consumers.end());
2156 }
2157 }
2158
SetMaxProducerReconnectionsForTesting(uint32_t count)2159 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2160 max_producer_reconnections_.store(count);
2161 }
2162
OnProducerDisconnected(ProducerImpl * producer)2163 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2164 PERFETTO_DCHECK_THREAD(thread_checker_);
2165 for (RegisteredProducerBackend& backend : producer_backends_) {
2166 if (backend.producer.get() != producer)
2167 continue;
2168
2169 // The tracing service is disconnected. It does not make sense to keep
2170 // tracing (we wouldn't be able to commit). On reconnection, the tracing
2171 // service will restart the data sources.
2172 for (const auto& rds : data_sources_) {
2173 DataSourceStaticState* static_state = rds.static_state;
2174 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2175 auto* internal_state = static_state->TryGet(i);
2176 if (internal_state && internal_state->backend_id == backend.id &&
2177 internal_state->backend_connection_id ==
2178 backend.producer->connection_id_.load(
2179 std::memory_order_relaxed)) {
2180 StopDataSource_AsyncBeginImpl(
2181 FindDataSourceRes(static_state, internal_state, i,
2182 rds.requires_callbacks_under_lock));
2183 }
2184 }
2185 }
2186
2187 // Try reconnecting the disconnected producer. If the connection succeeds,
2188 // all the data sources will be automatically re-registered.
2189 if (producer->connection_id_.load(std::memory_order_relaxed) >
2190 max_producer_reconnections_.load()) {
2191 // Avoid reconnecting a failing producer too many times. Instead we just
2192 // leak the producer instead of trying to avoid further complicating
2193 // cross-thread trace writer creation.
2194 PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2195 continue;
2196 }
2197
2198 backend.producer->Initialize(
2199 backend.backend->ConnectProducer(backend.producer_conn_args));
2200 // Don't use producer-provided SMBs for the next connection unless startup
2201 // tracing requires it again.
2202 backend.producer_conn_args.use_producer_provided_smb = false;
2203 }
2204 }
2205
SweepDeadBackends()2206 void TracingMuxerImpl::SweepDeadBackends() {
2207 PERFETTO_DCHECK_THREAD(thread_checker_);
2208 for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2209 auto next_it = it;
2210 next_it++;
2211 if (it->producer->SweepDeadServices())
2212 dead_backends_.erase(it);
2213 it = next_it;
2214 }
2215 }
2216
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2217 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2218 TracingBackendId backend_id,
2219 DataSourceInstanceID instance_id) {
2220 PERFETTO_DCHECK_THREAD(thread_checker_);
2221 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2222 for (const auto& rds : data_sources_) {
2223 DataSourceStaticState* static_state = rds.static_state;
2224 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2225 auto* internal_state = static_state->TryGet(i);
2226 if (internal_state && internal_state->backend_id == backend_id &&
2227 internal_state->backend_connection_id ==
2228 backend.producer->connection_id_.load(
2229 std::memory_order_relaxed) &&
2230 internal_state->data_source_instance_id == instance_id) {
2231 return FindDataSourceRes(static_state, internal_state, i,
2232 rds.requires_callbacks_under_lock);
2233 }
2234 }
2235 }
2236 return FindDataSourceRes();
2237 }
2238
2239 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2240 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2241 DataSourceStaticState* static_state,
2242 uint32_t data_source_instance_index,
2243 DataSourceState* data_source,
2244 BufferExhaustedPolicy buffer_exhausted_policy) {
2245 if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2246 // If the session is being intercepted, return a heap-backed trace writer
2247 // instead. This is safe because all the data given to the interceptor is
2248 // either thread-local (|instance_index|), statically allocated
2249 // (|static_state|) or constant after initialization (|interceptor|). Access
2250 // to the interceptor instance itself through |data_source| is protected by
2251 // a statically allocated lock (similarly to the data source instance).
2252 auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2253 return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2254 interceptor.tls_factory(static_state, data_source_instance_index),
2255 interceptor.packet_callback, static_state, data_source_instance_index));
2256 }
2257 ProducerImpl* producer =
2258 FindProducerBackendById(data_source->backend_id)->producer.get();
2259 // Atomically load the current service endpoint. We keep the pointer as a
2260 // shared pointer on the stack to guard against it from being concurrently
2261 // modified on the thread by ProducerImpl::Initialize() swapping in a
2262 // reconnected service on the muxer task runner thread.
2263 //
2264 // The endpoint may also be concurrently modified by SweepDeadServices()
2265 // clearing out old disconnected services. We guard against that by
2266 // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2267 // shutdown has started, the trace writer created below will be a null one
2268 // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2269 //
2270 // We use an atomic pointer instead of holding a lock because
2271 // CreateTraceWriter posts tasks under the hood.
2272 std::shared_ptr<ProducerEndpoint> service =
2273 std::atomic_load(&producer->service_);
2274
2275 // The service may have been disconnected and reconnected concurrently after
2276 // the data source was enabled, in which case we may not have an arbiter, or
2277 // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2278 // Instead, early-out now. A relaxed load is fine here because the atomic_load
2279 // above ensures that the |service| isn't newer.
2280 if (producer->connection_id_.load(std::memory_order_relaxed) !=
2281 data_source->backend_connection_id) {
2282 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2283 }
2284
2285 // We just need a relaxed atomic read here: We can use the reservation ID even
2286 // after the buffer was bound, we just need to be sure to read it atomically.
2287 uint16_t startup_buffer_reservation =
2288 data_source->startup_target_buffer_reservation.load(
2289 std::memory_order_relaxed);
2290 if (startup_buffer_reservation) {
2291 return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2292 startup_buffer_reservation);
2293 }
2294 return service->CreateTraceWriter(data_source->buffer_id,
2295 buffer_exhausted_policy);
2296 }
2297
2298 // This is called via the public API Tracing::NewTrace().
2299 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2300 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2301 BackendType requested_backend_type,
2302 TracingConsumerBackend* (*system_backend_factory)()) {
2303 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2304
2305 // |backend_type| can only specify one backend, not an OR-ed mask.
2306 PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2307
2308 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2309 task_runner_->PostTask([this, requested_backend_type, session_id,
2310 system_backend_factory] {
2311 if (requested_backend_type == kSystemBackend && system_backend_factory &&
2312 !FindConsumerBackendByType(kSystemBackend)) {
2313 AddConsumerBackend(system_backend_factory(), kSystemBackend);
2314 }
2315 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2316 if (requested_backend_type && backend.type &&
2317 backend.type != requested_backend_type) {
2318 continue;
2319 }
2320
2321 // Create the consumer now, even if we have to ask the embedder below, so
2322 // that any other tasks executing after this one can find the consumer and
2323 // change its pending attributes.
2324 backend.consumers.emplace_back(
2325 new ConsumerImpl(this, backend.type, session_id));
2326
2327 // The last registered backend in |consumer_backends_| is the unsupported
2328 // backend without a valid type.
2329 if (!backend.type) {
2330 PERFETTO_ELOG(
2331 "No tracing backend ready for type=%d, consumer will disconnect",
2332 requested_backend_type);
2333 InitializeConsumer(session_id);
2334 return;
2335 }
2336
2337 // Check if the embedder wants to be asked for permission before
2338 // connecting the consumer.
2339 if (!policy_) {
2340 InitializeConsumer(session_id);
2341 return;
2342 }
2343
2344 BackendType type = backend.type;
2345 TracingPolicy::ShouldAllowConsumerSessionArgs args;
2346 args.backend_type = backend.type;
2347 args.result_callback = [this, type, session_id](bool allow) {
2348 task_runner_->PostTask([this, type, session_id, allow] {
2349 if (allow) {
2350 InitializeConsumer(session_id);
2351 return;
2352 }
2353
2354 PERFETTO_ELOG(
2355 "Consumer session for backend type type=%d forbidden, "
2356 "consumer will disconnect",
2357 type);
2358
2359 auto* consumer = FindConsumer(session_id);
2360 if (!consumer)
2361 return;
2362
2363 consumer->OnDisconnect();
2364 });
2365 };
2366 policy_->ShouldAllowConsumerSession(args);
2367 return;
2368 }
2369 PERFETTO_DFATAL("Not reached");
2370 });
2371
2372 return std::unique_ptr<TracingSession>(
2373 new TracingSessionImpl(this, session_id, requested_backend_type));
2374 }
2375
2376 // static
2377 // This is called via the public API Tracing::SetupStartupTracing().
2378 // Can be called from any thread.
2379 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2380 TracingMuxerImpl::CreateStartupTracingSession(
2381 const TraceConfig& config,
2382 Tracing::SetupStartupTracingOpts opts) {
2383 BackendType backend_type = opts.backend;
2384 // |backend_type| can only specify one backend, not an OR-ed mask.
2385 PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2386 // The in-process backend doesn't support startup tracing.
2387 PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2388
2389 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2390
2391 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2392 task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2393 for (RegisteredProducerBackend& backend : producer_backends_) {
2394 if (backend_type && backend.type && backend.type != backend_type) {
2395 continue;
2396 }
2397
2398 TracingBackendId backend_id = backend.id;
2399
2400 // The last registered backend in |producer_backends_| is the unsupported
2401 // backend without a valid type.
2402 if (!backend.type) {
2403 PERFETTO_ELOG(
2404 "No tracing backend initialized for type=%d, startup tracing "
2405 "failed",
2406 backend_type);
2407 if (opts.on_setup)
2408 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2409 0 /* num_data_sources_started */});
2410 return;
2411 }
2412
2413 if (!backend.producer->service_ ||
2414 !backend.producer->service_->shared_memory()) {
2415 // If we unsuccessfully attempted to use a producer-provided SMB in the
2416 // past, don't try again.
2417 if (backend.producer->producer_provided_smb_failed_) {
2418 PERFETTO_ELOG(
2419 "Backend %zu doesn't seem to support producer-provided "
2420 "SMBs, startup tracing failed",
2421 backend_id);
2422 if (opts.on_setup)
2423 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2424 0 /* num_data_sources_started */});
2425 return;
2426 }
2427
2428 PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2429 backend_id);
2430 backend.producer_conn_args.use_producer_provided_smb = true;
2431 backend.producer->service_->Disconnect(); // Causes a reconnect.
2432 PERFETTO_DCHECK(backend.producer->service_ &&
2433 backend.producer->service_->MaybeSharedMemoryArbiter());
2434 }
2435
2436 RegisteredStartupSession session;
2437 session.session_id = session_id;
2438 session.on_aborted = opts.on_aborted;
2439 session.on_adopted = opts.on_adopted;
2440
2441 for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2442 // Find all matching data sources and start one instance of each.
2443 for (const auto& rds : data_sources_) {
2444 if (rds.descriptor.name() != ds_cfg.config().name())
2445 continue;
2446
2447 PERFETTO_DLOG(
2448 "Setting up data source %s for startup tracing with target "
2449 "buffer reservation %" PRIi32,
2450 rds.descriptor.name().c_str(),
2451 backend.producer->last_startup_target_buffer_reservation_ + 1u);
2452 auto ds = SetupDataSourceImpl(
2453 rds, backend_id,
2454 backend.producer->connection_id_.load(std::memory_order_relaxed),
2455 /*instance_id=*/0, ds_cfg.config(),
2456 /*startup_session_id=*/session_id);
2457 if (ds) {
2458 StartDataSourceImpl(ds);
2459 session.num_unbound_data_sources++;
2460 }
2461 }
2462 }
2463
2464 int num_ds = session.num_unbound_data_sources;
2465 auto on_setup = opts.on_setup;
2466 if (on_setup) {
2467 backend.producer->OnStartupTracingSetup();
2468 task_runner_->PostTask([on_setup, num_ds] {
2469 on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2470 });
2471 }
2472
2473 if (num_ds > 0) {
2474 backend.startup_sessions.push_back(std::move(session));
2475
2476 if (opts.timeout_ms > 0) {
2477 task_runner_->PostDelayedTask(
2478 [this, session_id, backend_type] {
2479 AbortStartupTracingSession(session_id, backend_type);
2480 },
2481 opts.timeout_ms);
2482 }
2483 }
2484 return;
2485 }
2486 PERFETTO_DFATAL("Invalid startup tracing session backend");
2487 });
2488
2489 return std::unique_ptr<StartupTracingSession>(
2490 new StartupTracingSessionImpl(this, session_id, backend_type));
2491 }
2492
2493 // Must not be called from the SDK's internal thread.
2494 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2495 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2496 const TraceConfig& config,
2497 Tracing::SetupStartupTracingOpts opts) {
2498 auto previous_on_setup = std::move(opts.on_setup);
2499 PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2500 base::WaitableEvent event;
2501 // It is safe to capture by reference because once on_setup is called only
2502 // once before this method returns.
2503 opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2504 if (previous_on_setup) {
2505 previous_on_setup(std::move(args));
2506 }
2507 event.Notify();
2508 };
2509 auto session = CreateStartupTracingSession(config, std::move(opts));
2510 event.Wait();
2511 return session;
2512 }
2513
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2514 void TracingMuxerImpl::AbortStartupTracingSession(
2515 TracingSessionGlobalID session_id,
2516 BackendType backend_type) {
2517 PERFETTO_DCHECK_THREAD(thread_checker_);
2518
2519 for (RegisteredProducerBackend& backend : producer_backends_) {
2520 if (backend_type != backend.type)
2521 continue;
2522
2523 auto session_it = std::find_if(
2524 backend.startup_sessions.begin(), backend.startup_sessions.end(),
2525 [session_id](const RegisteredStartupSession& session) {
2526 return session.session_id == session_id;
2527 });
2528
2529 // The startup session may have already been aborted or fully adopted.
2530 if (session_it == backend.startup_sessions.end())
2531 return;
2532 if (session_it->is_aborting)
2533 return;
2534
2535 session_it->is_aborting = true;
2536
2537 // Iterate all data sources and abort them if they weren't adopted yet.
2538 for (const auto& rds : data_sources_) {
2539 DataSourceStaticState* static_state = rds.static_state;
2540 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2541 auto* internal_state = static_state->TryGet(i);
2542 if (internal_state &&
2543 internal_state->startup_target_buffer_reservation.load(
2544 std::memory_order_relaxed) &&
2545 internal_state->data_source_instance_id == 0 &&
2546 internal_state->startup_session_id == session_id) {
2547 PERFETTO_DLOG(
2548 "Aborting startup tracing for data source %s (target buffer "
2549 "reservation %" PRIu16 ")",
2550 rds.descriptor.name().c_str(),
2551 internal_state->startup_target_buffer_reservation.load(
2552 std::memory_order_relaxed));
2553
2554 // Abort the instance asynchronously by stopping it. From this point
2555 // onwards, the service will not be able to adopt it via
2556 // StartDataSource().
2557 session_it->num_aborting_data_sources++;
2558 StopDataSource_AsyncBeginImpl(
2559 FindDataSourceRes(static_state, internal_state, i,
2560 rds.requires_callbacks_under_lock));
2561 }
2562 }
2563 }
2564
2565 // If we did everything right, we should have aborted all still-unbound data
2566 // source instances.
2567 PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2568 session_it->num_aborting_data_sources);
2569
2570 if (session_it->num_aborting_data_sources == 0) {
2571 if (session_it->on_aborted)
2572 task_runner_->PostTask(session_it->on_aborted);
2573
2574 backend.startup_sessions.erase(session_it);
2575 }
2576 return;
2577 }
2578 // We might reach here in tests because when we start a trace, we post the
2579 // Task(AbortStartupTrace, delay=timeout). When we do
2580 // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2581 // kill those delayed tasks because TaskRunner doesn't have support for
2582 // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2583 // to wait for the completion of all the scheduled tasks (apart from
2584 // deleting the TaskRunner) and we want to avoid doing that because we need
2585 // a long running TaskRunner in muxer.
2586 PERFETTO_DLOG("Invalid startup tracing session backend");
2587 }
2588
InitializeInstance(const TracingInitArgs & args)2589 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2590 if (instance_ != TracingMuxerFake::Get()) {
2591 // The tracing muxer was already initialized. We might need to initialize
2592 // additional backends that were not configured earlier.
2593 auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2594 muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2595 return;
2596 }
2597 // If we previously had a TracingMuxerImpl instance which was reset,
2598 // reinitialize and reuse it instead of trying to create a new one. See
2599 // ResetForTesting().
2600 if (g_prev_instance) {
2601 auto* muxer = g_prev_instance;
2602 g_prev_instance = nullptr;
2603 instance_ = muxer;
2604 muxer->task_runner_->PostTask([muxer, args] {
2605 muxer->Initialize(args);
2606 muxer->AddBackends(args);
2607 });
2608 } else {
2609 new TracingMuxerImpl(args);
2610 }
2611 }
2612
2613 // static
ResetForTesting()2614 void TracingMuxerImpl::ResetForTesting() {
2615 // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2616 // various objects make that a non-starter. In particular:
2617 //
2618 // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2619 // a reference back to ProducerImpl::service_.
2620 //
2621 // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2622 //
2623 // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2624 // turn depends on TracingMuxerImpl itself.
2625 //
2626 // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2627 // threads have dropped their TraceWriters. Since we can't really ask the
2628 // caller to guarantee this, we'll instead reset enough of the muxer's state
2629 // so that it can be reinitialized later and ensure all necessary objects from
2630 // the old state remain alive until all references have gone away.
2631 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2632
2633 base::WaitableEvent reset_done;
2634 auto do_reset = [muxer, &reset_done] {
2635 muxer->DestroyStoppedTraceWritersForCurrentThread();
2636 // Unregister all data sources so they don't interfere with any future
2637 // tracing sessions.
2638 for (RegisteredDataSource& rds : muxer->data_sources_) {
2639 for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2640 if (!backend.producer->service_ || !backend.producer->connected_)
2641 continue;
2642 backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2643 }
2644 }
2645 for (auto& backend : muxer->consumer_backends_) {
2646 // Check that no consumer session is currently active on any backend.
2647 for (auto& consumer : backend.consumers)
2648 PERFETTO_CHECK(!consumer->service_);
2649 }
2650 for (auto& backend : muxer->producer_backends_) {
2651 backend.producer->muxer_ = nullptr;
2652 backend.producer->DisposeConnection();
2653 muxer->dead_backends_.push_back(std::move(backend));
2654 }
2655 muxer->consumer_backends_.clear();
2656 muxer->producer_backends_.clear();
2657 muxer->interceptors_.clear();
2658
2659 for (auto& ds : muxer->data_sources_) {
2660 ds.static_state->ResetForTesting();
2661 }
2662
2663 muxer->data_sources_.clear();
2664 muxer->next_data_source_index_ = 0;
2665
2666 // Free all backends without active trace writers or other inbound
2667 // references. Note that even if all the backends get swept, the muxer still
2668 // needs to stay around since |task_runner_| is assumed to be long-lived.
2669 muxer->SweepDeadBackends();
2670
2671 // Make sure we eventually discard any per-thread trace writers from the
2672 // previous instance.
2673 muxer->muxer_id_for_testing_++;
2674
2675 g_prev_instance = muxer;
2676 instance_ = TracingMuxerFake::Get();
2677
2678 // Call the user provided cleanups on the muxer thread.
2679 for (auto& cb : muxer->reset_callbacks_) {
2680 cb();
2681 }
2682
2683 reset_done.Notify();
2684 };
2685
2686 // Some tests run the muxer and the test on the same thread. In these cases,
2687 // we can reset synchronously.
2688 if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2689 do_reset();
2690 } else {
2691 muxer->DestroyStoppedTraceWritersForCurrentThread();
2692 muxer->task_runner_->PostTask(std::move(do_reset));
2693 reset_done.Wait();
2694 // Call the user provided cleanups also on this thread.
2695 for (auto& cb : muxer->reset_callbacks_) {
2696 cb();
2697 }
2698 }
2699 muxer->reset_callbacks_.clear();
2700 }
2701
2702 // static
Shutdown()2703 void TracingMuxerImpl::Shutdown() {
2704 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2705
2706 // Shutting down on the muxer thread would lead to a deadlock.
2707 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2708 muxer->DestroyStoppedTraceWritersForCurrentThread();
2709
2710 std::unique_ptr<base::TaskRunner> owned_task_runner(
2711 muxer->task_runner_.get());
2712 base::WaitableEvent shutdown_done;
2713 owned_task_runner->PostTask([muxer, &shutdown_done] {
2714 // Check that no consumer session is currently active on any backend.
2715 // Producers will be automatically disconnected as a part of deleting the
2716 // muxer below.
2717 for (auto& backend : muxer->consumer_backends_) {
2718 for (auto& consumer : backend.consumers) {
2719 PERFETTO_CHECK(!consumer->service_);
2720 }
2721 }
2722 // Make sure no trace writers are lingering around on the muxer thread. Note
2723 // that we can't do this for any arbitrary thread in the process; it is the
2724 // caller's responsibility to clean them up before shutting down Perfetto.
2725 muxer->DestroyStoppedTraceWritersForCurrentThread();
2726 // The task runner must be deleted outside the muxer thread. This is done by
2727 // `owned_task_runner` above.
2728 muxer->task_runner_.release();
2729 auto* platform = muxer->platform_;
2730 delete muxer;
2731 instance_ = TracingMuxerFake::Get();
2732 platform->Shutdown();
2733 shutdown_done.Notify();
2734 });
2735 shutdown_done.Wait();
2736 }
2737
AppendResetForTestingCallback(std::function<void ()> cb)2738 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2739 reset_callbacks_.push_back(std::move(cb));
2740 }
2741
2742 TracingMuxer::~TracingMuxer() = default;
2743
2744 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2745 "public's BufferId and tracing/core's BufferID diverged");
2746
2747 } // namespace internal
2748 } // namespace perfetto
2749