1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49
50 #include "protos/perfetto/config/chrome/chrome_config.gen.h"
51 #include "protos/perfetto/config/interceptor_config.gen.h"
52
53 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
54 #include <io.h> // For dup()
55 #else
56 #include <unistd.h> // For dup()
57 #endif
58
59 namespace perfetto {
60 namespace internal {
61
62 namespace {
63
64 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
65
66 // A task runner which prevents calls to DataSource::Trace() while an operation
67 // is in progress. Used to guard against unexpected re-entrancy where the
68 // user-provided task runner implementation tries to enter a trace point under
69 // the hood.
70 class NonReentrantTaskRunner : public base::TaskRunner {
71 public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)72 NonReentrantTaskRunner(TracingMuxer* muxer,
73 std::unique_ptr<base::TaskRunner> task_runner)
74 : muxer_(muxer), task_runner_(std::move(task_runner)) {}
75
76 // base::TaskRunner implementation.
PostTask(std::function<void ()> task)77 void PostTask(std::function<void()> task) override {
78 CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
79 }
80
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)81 void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
82 CallWithGuard(
83 [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
84 }
85
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)86 void AddFileDescriptorWatch(base::PlatformHandle fd,
87 std::function<void()> callback) override {
88 CallWithGuard(
89 [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
90 }
91
RemoveFileDescriptorWatch(base::PlatformHandle fd)92 void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
93 CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
94 }
95
RunsTasksOnCurrentThread() const96 bool RunsTasksOnCurrentThread() const override {
97 bool result;
98 CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
99 return result;
100 }
101
102 private:
103 template <typename T>
CallWithGuard(T lambda) const104 void CallWithGuard(T lambda) const {
105 auto* root_tls = muxer_->GetOrCreateTracingTLS();
106 if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
107 lambda();
108 return;
109 }
110 ScopedReentrancyAnnotator scoped_annotator(*root_tls);
111 lambda();
112 }
113
114 TracingMuxer* const muxer_;
115 std::unique_ptr<base::TaskRunner> task_runner_;
116 };
117
118 class StopArgsImpl : public DataSourceBase::StopArgs {
119 public:
HandleStopAsynchronously() const120 std::function<void()> HandleStopAsynchronously() const override {
121 auto closure = std::move(async_stop_closure);
122 async_stop_closure = std::function<void()>();
123 return closure;
124 }
125
126 mutable std::function<void()> async_stop_closure;
127 };
128
129 class FlushArgsImpl : public DataSourceBase::FlushArgs {
130 public:
HandleFlushAsynchronously() const131 std::function<void()> HandleFlushAsynchronously() const override {
132 auto closure = std::move(async_flush_closure);
133 async_flush_closure = std::function<void()>();
134 return closure;
135 }
136
137 mutable std::function<void()> async_flush_closure;
138 };
139
ComputeConfigHash(const DataSourceConfig & config)140 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
141 base::Hasher hasher;
142 std::string config_bytes = config.SerializeAsString();
143 hasher.Update(config_bytes.data(), config_bytes.size());
144 return hasher.digest();
145 }
146
147 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
148 static TracingMuxerImpl* g_prev_instance{};
149
ComputeStartupConfigHash(DataSourceConfig config)150 uint64_t ComputeStartupConfigHash(DataSourceConfig config) {
151 // Clear target buffer and tracing-service provided fields for comparison of
152 // configs for startup tracing, since these fields are not available when
153 // setting up data sources for startup tracing.
154 config.set_target_buffer(0);
155 config.set_tracing_session_id(0);
156 config.set_session_initiator(DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
157 config.set_trace_duration_ms(0);
158 config.set_stop_timeout_ms(0);
159 config.set_enable_extra_guardrails(false);
160 // Clear some fields inside Chrome config:
161 // * client_priority, because Chrome always sets the priority to
162 // USER_INITIATED when setting up startup tracing.
163 // * convert_to_legacy_json, because Telemetry initiates tracing with proto
164 // format, but in some cases adopts the tracing session later via devtools
165 // which expect json format.
166 // TODO(khokhlov): Don't clear client_priority when Chrome correctly sets it
167 // for startup tracing (and propagates it to all child processes).
168 if (config.has_chrome_config()) {
169 config.mutable_chrome_config()->set_client_priority(
170 perfetto::protos::gen::ChromeConfig::UNKNOWN);
171 config.mutable_chrome_config()->set_convert_to_legacy_json(false);
172 }
173 base::Hasher hasher;
174 std::string config_bytes = config.SerializeAsString();
175 hasher.Update(config_bytes.data(), config_bytes.size());
176 return hasher.digest();
177 }
178
179 template <typename RegisteredBackend>
180 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anon3be2c9c50111::CompareBackendByType181 static int BackendTypePriority(BackendType type) {
182 switch (type) {
183 case kSystemBackend:
184 return 0;
185 case kInProcessBackend:
186 return 1;
187 case kCustomBackend:
188 return 2;
189 // The UnspecifiedBackend has the highest priority so that
190 // TracingBackendFake is the last one on the backend lists.
191 case kUnspecifiedBackend:
192 break;
193 }
194 return 3;
195 }
operator ()perfetto::internal::__anon3be2c9c50111::CompareBackendByType196 bool operator()(BackendType type, const RegisteredBackend& b) {
197 return BackendTypePriority(type) < BackendTypePriority(b.type);
198 }
199 };
200
201 } // namespace
202
203 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)204 TracingMuxerImpl::ProducerImpl::ProducerImpl(
205 TracingMuxerImpl* muxer,
206 TracingBackendId backend_id,
207 uint32_t shmem_batch_commits_duration_ms)
208 : muxer_(muxer),
209 backend_id_(backend_id),
210 shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
211
~ProducerImpl()212 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
213 muxer_ = nullptr;
214 }
215
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)216 void TracingMuxerImpl::ProducerImpl::Initialize(
217 std::unique_ptr<ProducerEndpoint> endpoint) {
218 PERFETTO_DCHECK_THREAD(thread_checker_);
219 PERFETTO_DCHECK(!connected_);
220 connection_id_.fetch_add(1, std::memory_order_relaxed);
221 is_producer_provided_smb_ = endpoint->shared_memory();
222 last_startup_target_buffer_reservation_ = 0;
223
224 // Adopt the endpoint into a shared pointer so that we can safely share it
225 // across threads that create trace writers. The custom deleter function
226 // ensures that the endpoint is always destroyed on the muxer's thread. (Note
227 // that |task_runner| is assumed to outlive tracing sessions on all threads.)
228 auto* task_runner = muxer_->task_runner_.get();
229 auto deleter = [task_runner](ProducerEndpoint* e) {
230 if (task_runner->RunsTasksOnCurrentThread()) {
231 delete e;
232 return;
233 }
234 task_runner->PostTask([e] { delete e; });
235 };
236 std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
237 // This atomic store is needed because another thread might be concurrently
238 // creating a trace writer using the previous (disconnected) |service_|. See
239 // CreateTraceWriter().
240 std::atomic_store(&service_, std::move(service));
241 // Don't try to use the service here since it may not have connected yet. See
242 // OnConnect().
243 }
244
OnConnect()245 void TracingMuxerImpl::ProducerImpl::OnConnect() {
246 PERFETTO_DLOG("Producer connected");
247 PERFETTO_DCHECK_THREAD(thread_checker_);
248 PERFETTO_DCHECK(!connected_);
249 if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
250 PERFETTO_ELOG(
251 "The service likely doesn't support producer-provided SMBs. Preventing "
252 "future attempts to use producer-provided SMB again with this "
253 "backend.");
254 producer_provided_smb_failed_ = true;
255 // Will call OnDisconnect() and cause a reconnect without producer-provided
256 // SMB.
257 service_->Disconnect();
258 return;
259 }
260 connected_ = true;
261 muxer_->UpdateDataSourcesOnAllBackends();
262 SendOnConnectTriggers();
263 }
264
OnDisconnect()265 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
266 PERFETTO_DCHECK_THREAD(thread_checker_);
267 // If we're being destroyed, bail out.
268 if (!muxer_)
269 return;
270 connected_ = false;
271 // Active data sources for this producer will be stopped by
272 // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
273 // will have a different connection id (even before it has finished
274 // connecting).
275 registered_data_sources_.reset();
276 DisposeConnection();
277
278 // Try reconnecting the producer.
279 muxer_->OnProducerDisconnected(this);
280 }
281
DisposeConnection()282 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
283 // Keep the old service around as a dead connection in case it has active
284 // trace writers. If any tracing sessions were created, we can't clear
285 // |service_| here because other threads may be concurrently creating new
286 // trace writers. Any reconnection attempt will atomically swap the new
287 // service in place of the old one.
288 if (did_setup_tracing_ || did_setup_startup_tracing_) {
289 dead_services_.push_back(service_);
290 } else {
291 service_.reset();
292 }
293 }
294
OnTracingSetup()295 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
296 PERFETTO_DCHECK_THREAD(thread_checker_);
297 did_setup_tracing_ = true;
298 service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
299 shmem_batch_commits_duration_ms_);
300 }
301
OnStartupTracingSetup()302 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
303 PERFETTO_DCHECK_THREAD(thread_checker_);
304 did_setup_startup_tracing_ = true;
305 }
306
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)307 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
308 DataSourceInstanceID id,
309 const DataSourceConfig& cfg) {
310 PERFETTO_DCHECK_THREAD(thread_checker_);
311 if (!muxer_)
312 return;
313 muxer_->SetupDataSource(
314 backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
315 }
316
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)317 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
318 const DataSourceConfig&) {
319 PERFETTO_DCHECK_THREAD(thread_checker_);
320 if (!muxer_)
321 return;
322 muxer_->StartDataSource(backend_id_, id);
323 service_->NotifyDataSourceStarted(id);
324 }
325
StopDataSource(DataSourceInstanceID id)326 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
327 PERFETTO_DCHECK_THREAD(thread_checker_);
328 if (!muxer_)
329 return;
330 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
331 }
332
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count)333 void TracingMuxerImpl::ProducerImpl::Flush(
334 FlushRequestID flush_id,
335 const DataSourceInstanceID* instances,
336 size_t instance_count) {
337 PERFETTO_DCHECK_THREAD(thread_checker_);
338 bool all_handled = true;
339 if (muxer_) {
340 for (size_t i = 0; i < instance_count; i++) {
341 DataSourceInstanceID ds_id = instances[i];
342 bool handled =
343 muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id, flush_id);
344 if (!handled) {
345 pending_flushes_[flush_id].insert(ds_id);
346 all_handled = false;
347 }
348 }
349 }
350
351 if (all_handled) {
352 service_->NotifyFlushComplete(flush_id);
353 }
354 }
355
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)356 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
357 const DataSourceInstanceID* instances,
358 size_t instance_count) {
359 PERFETTO_DCHECK_THREAD(thread_checker_);
360 if (!muxer_)
361 return;
362 for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
363 muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
364 }
365 }
366
SweepDeadServices()367 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
368 PERFETTO_DCHECK_THREAD(thread_checker_);
369 auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
370 auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
371 return !arbiter || arbiter->TryShutdown();
372 };
373 for (auto it = dead_services_.begin(); it != dead_services_.end();) {
374 auto next_it = it;
375 next_it++;
376 if (is_unused(*it)) {
377 dead_services_.erase(it);
378 }
379 it = next_it;
380 }
381 return dead_services_.empty();
382 }
383
SendOnConnectTriggers()384 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
385 PERFETTO_DCHECK_THREAD(thread_checker_);
386 base::TimeMillis now = base::GetWallTimeMs();
387 std::vector<std::string> triggers;
388 while (!on_connect_triggers_.empty()) {
389 // Skip if we passed TTL.
390 if (on_connect_triggers_.front().second > now) {
391 triggers.push_back(std::move(on_connect_triggers_.front().first));
392 }
393 on_connect_triggers_.pop_front();
394 }
395 if (!triggers.empty()) {
396 service_->ActivateTriggers(triggers);
397 }
398 }
399
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)400 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
401 DataSourceInstanceID ds_id,
402 FlushRequestID flush_id) {
403 if (!connected_) {
404 return;
405 }
406
407 {
408 auto it = pending_flushes_.find(flush_id);
409 if (it == pending_flushes_.end()) {
410 return;
411 }
412 std::set<DataSourceInstanceID>& ds_ids = it->second;
413 ds_ids.erase(ds_id);
414 }
415
416 std::optional<DataSourceInstanceID> biggest_flush_id;
417 for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
418 if (it->second.empty()) {
419 biggest_flush_id = it->first;
420 it = pending_flushes_.erase(it);
421 } else {
422 break;
423 }
424 }
425
426 if (biggest_flush_id) {
427 service_->NotifyFlushComplete(*biggest_flush_id);
428 }
429 }
430
431 // ----- End of TracingMuxerImpl::ProducerImpl methods.
432
433 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)434 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
435 BackendType backend_type,
436 TracingSessionGlobalID session_id)
437 : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
438
~ConsumerImpl()439 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
440 muxer_ = nullptr;
441 }
442
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)443 void TracingMuxerImpl::ConsumerImpl::Initialize(
444 std::unique_ptr<ConsumerEndpoint> endpoint) {
445 PERFETTO_DCHECK_THREAD(thread_checker_);
446 service_ = std::move(endpoint);
447 // Don't try to use the service here since it may not have connected yet. See
448 // OnConnect().
449 }
450
OnConnect()451 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
452 PERFETTO_DCHECK_THREAD(thread_checker_);
453 PERFETTO_DCHECK(!connected_);
454 connected_ = true;
455
456 // Observe data source instance events so we get notified when tracing starts.
457 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
458 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
459
460 // If the API client configured and started tracing before we connected,
461 // tell the backend about it now.
462 if (trace_config_)
463 muxer_->SetupTracingSession(session_id_, trace_config_);
464 if (start_pending_)
465 muxer_->StartTracingSession(session_id_);
466 if (get_trace_stats_pending_) {
467 auto callback = std::move(get_trace_stats_callback_);
468 get_trace_stats_callback_ = nullptr;
469 muxer_->GetTraceStats(session_id_, std::move(callback));
470 }
471 if (query_service_state_callback_) {
472 auto callback = std::move(query_service_state_callback_);
473 query_service_state_callback_ = nullptr;
474 muxer_->QueryServiceState(session_id_, std::move(callback));
475 }
476 if (stop_pending_)
477 muxer_->StopTracingSession(session_id_);
478 }
479
OnDisconnect()480 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
481 PERFETTO_DCHECK_THREAD(thread_checker_);
482 // If we're being destroyed, bail out.
483 if (!muxer_)
484 return;
485 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
486 if (!connected_ && backend_type_ == kSystemBackend) {
487 PERFETTO_ELOG(
488 "Unable to connect to the system tracing service as a consumer. On "
489 "Android, use the \"perfetto\" command line tool instead to start "
490 "system-wide tracing sessions");
491 }
492 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
493
494 // Notify the client about disconnection.
495 NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
496
497 // Make sure the client doesn't hang in a blocking start/stop because of the
498 // disconnection.
499 NotifyStartComplete();
500 NotifyStopComplete();
501
502 // It shouldn't be necessary to call StopTracingSession. If we get this call
503 // it means that the service did shutdown before us, so there is no point
504 // trying it to ask it to stop the session. We should just remember to cleanup
505 // the consumer vector.
506 connected_ = false;
507
508 // Notify the muxer that it is safe to destroy |this|. This is needed because
509 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
510 // access until OnDisconnect() is called.
511 muxer_->OnConsumerDisconnected(this);
512 }
513
Disconnect()514 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
515 // This is weird and deserves a comment.
516 //
517 // When we called the ConnectConsumer method on the service it returns
518 // us a ConsumerEndpoint which we stored in |service_|, however this
519 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
520 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
521 // the ConsumerImpl pointer has to be valid until the
522 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
523 // ConsumerEndpoint |service_|. Eventually this will call
524 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
525 // call the destructor of |this|.
526 service_.reset();
527 }
528
OnTracingDisabled(const std::string & error)529 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
530 const std::string& error) {
531 PERFETTO_DCHECK_THREAD(thread_checker_);
532 PERFETTO_DCHECK(!stopped_);
533 stopped_ = true;
534
535 if (!error.empty())
536 NotifyError(TracingError{TracingError::kTracingFailed, error});
537
538 // If we're still waiting for the start event, fire it now. This may happen if
539 // there are no active data sources in the session.
540 NotifyStartComplete();
541 NotifyStopComplete();
542 }
543
NotifyStartComplete()544 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
545 PERFETTO_DCHECK_THREAD(thread_checker_);
546 if (start_complete_callback_) {
547 muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
548 start_complete_callback_ = nullptr;
549 }
550 if (blocking_start_complete_callback_) {
551 muxer_->task_runner_->PostTask(
552 std::move(blocking_start_complete_callback_));
553 blocking_start_complete_callback_ = nullptr;
554 }
555 }
556
NotifyError(const TracingError & error)557 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
558 PERFETTO_DCHECK_THREAD(thread_checker_);
559 if (error_callback_) {
560 muxer_->task_runner_->PostTask(
561 std::bind(std::move(error_callback_), error));
562 }
563 }
564
NotifyStopComplete()565 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
566 PERFETTO_DCHECK_THREAD(thread_checker_);
567 if (stop_complete_callback_) {
568 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
569 stop_complete_callback_ = nullptr;
570 }
571 if (blocking_stop_complete_callback_) {
572 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
573 blocking_stop_complete_callback_ = nullptr;
574 }
575 }
576
OnTraceData(std::vector<TracePacket> packets,bool has_more)577 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
578 std::vector<TracePacket> packets,
579 bool has_more) {
580 PERFETTO_DCHECK_THREAD(thread_checker_);
581 if (!read_trace_callback_)
582 return;
583
584 size_t capacity = 0;
585 for (const auto& packet : packets) {
586 // 16 is an over-estimation of the proto preamble size
587 capacity += packet.size() + 16;
588 }
589
590 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
591 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
592 buf->reserve(capacity);
593 for (auto& packet : packets) {
594 char* start;
595 size_t size;
596 std::tie(start, size) = packet.GetProtoPreamble();
597 buf->insert(buf->end(), start, start + size);
598 for (auto& slice : packet.slices()) {
599 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
600 buf->insert(buf->end(), slice_data, slice_data + slice.size);
601 }
602 }
603
604 auto callback = read_trace_callback_;
605 muxer_->task_runner_->PostTask([callback, buf, has_more] {
606 TracingSession::ReadTraceCallbackArgs callback_arg{};
607 callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
608 callback_arg.size = buf->size();
609 callback_arg.has_more = has_more;
610 callback(callback_arg);
611 });
612
613 if (!has_more)
614 read_trace_callback_ = nullptr;
615 }
616
OnObservableEvents(const ObservableEvents & events)617 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
618 const ObservableEvents& events) {
619 if (events.instance_state_changes_size()) {
620 for (const auto& state_change : events.instance_state_changes()) {
621 DataSourceHandle handle{state_change.producer_name(),
622 state_change.data_source_name()};
623 data_source_states_[handle] =
624 state_change.state() ==
625 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
626 }
627 }
628
629 if (events.instance_state_changes_size() ||
630 events.all_data_sources_started()) {
631 // Data sources are first reported as being stopped before starting, so once
632 // all the data sources we know about have started we can declare tracing
633 // begun. In the case where there are no matching data sources for the
634 // session, the service will report the all_data_sources_started() event
635 // without adding any instances (only since Android S / Perfetto v10.0).
636 if (start_complete_callback_ || blocking_start_complete_callback_) {
637 bool all_data_sources_started = std::all_of(
638 data_source_states_.cbegin(), data_source_states_.cend(),
639 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
640 if (all_data_sources_started)
641 NotifyStartComplete();
642 }
643 }
644 }
645
OnSessionCloned(const OnSessionClonedArgs &)646 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
647 const OnSessionClonedArgs&) {
648 // CloneSession is not exposed in the SDK. This should never happen.
649 PERFETTO_DCHECK(false);
650 }
651
OnTraceStats(bool success,const TraceStats & trace_stats)652 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
653 bool success,
654 const TraceStats& trace_stats) {
655 if (!get_trace_stats_callback_)
656 return;
657 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
658 callback_arg.success = success;
659 callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
660 muxer_->task_runner_->PostTask(
661 std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
662 get_trace_stats_callback_ = nullptr;
663 }
664
665 // The callbacks below are not used.
OnDetach(bool)666 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)667 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
668 // ----- End of TracingMuxerImpl::ConsumerImpl
669
670 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
671
672 // TracingSessionImpl is the RAII object returned to API clients when they
673 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
674 // tracing.
675
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)676 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
677 TracingMuxerImpl* muxer,
678 TracingSessionGlobalID session_id,
679 BackendType backend_type)
680 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
681
682 // Can be destroyed from any thread.
~TracingSessionImpl()683 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
684 auto* muxer = muxer_;
685 auto session_id = session_id_;
686 muxer->task_runner_->PostTask(
687 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
688 }
689
690 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)691 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
692 int fd) {
693 auto* muxer = muxer_;
694 auto session_id = session_id_;
695 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
696 if (fd >= 0) {
697 base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
698 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
699 if (backend_type_ != kInProcessBackend) {
700 PERFETTO_FATAL(
701 "Passing a file descriptor to TracingSession::Setup() is only "
702 "supported with the kInProcessBackend on Windows. Use "
703 "TracingSession::ReadTrace() instead");
704 }
705 #endif
706 trace_config->set_write_into_file(true);
707 fd = dup(fd);
708 }
709 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
710 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
711 });
712 }
713
714 // Can be called from any thread.
Start()715 void TracingMuxerImpl::TracingSessionImpl::Start() {
716 auto* muxer = muxer_;
717 auto session_id = session_id_;
718 muxer->task_runner_->PostTask(
719 [muxer, session_id] { muxer->StartTracingSession(session_id); });
720 }
721
722 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)723 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
724 const TraceConfig& cfg) {
725 auto* muxer = muxer_;
726 auto session_id = session_id_;
727 muxer->task_runner_->PostTask([muxer, session_id, cfg] {
728 muxer->ChangeTracingSessionConfig(session_id, cfg);
729 });
730 }
731
732 // Can be called from any thread except the service thread.
StartBlocking()733 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
734 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
735 auto* muxer = muxer_;
736 auto session_id = session_id_;
737 base::WaitableEvent tracing_started;
738 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
739 auto* consumer = muxer->FindConsumer(session_id);
740 if (!consumer) {
741 // TODO(skyostil): Signal an error to the user.
742 tracing_started.Notify();
743 return;
744 }
745 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
746 consumer->blocking_start_complete_callback_ = [&] {
747 tracing_started.Notify();
748 };
749 muxer->StartTracingSession(session_id);
750 });
751 tracing_started.Wait();
752 }
753
754 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)755 void TracingMuxerImpl::TracingSessionImpl::Flush(
756 std::function<void(bool)> user_callback,
757 uint32_t timeout_ms) {
758 auto* muxer = muxer_;
759 auto session_id = session_id_;
760 muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
761 auto* consumer = muxer->FindConsumer(session_id);
762 if (!consumer) {
763 std::move(user_callback)(false);
764 return;
765 }
766 muxer->FlushTracingSession(session_id, timeout_ms,
767 std::move(user_callback));
768 });
769 }
770
771 // Can be called from any thread.
Stop()772 void TracingMuxerImpl::TracingSessionImpl::Stop() {
773 auto* muxer = muxer_;
774 auto session_id = session_id_;
775 muxer->task_runner_->PostTask(
776 [muxer, session_id] { muxer->StopTracingSession(session_id); });
777 }
778
779 // Can be called from any thread except the service thread.
StopBlocking()780 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
781 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
782 auto* muxer = muxer_;
783 auto session_id = session_id_;
784 base::WaitableEvent tracing_stopped;
785 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
786 auto* consumer = muxer->FindConsumer(session_id);
787 if (!consumer) {
788 // TODO(skyostil): Signal an error to the user.
789 tracing_stopped.Notify();
790 return;
791 }
792 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
793 consumer->blocking_stop_complete_callback_ = [&] {
794 tracing_stopped.Notify();
795 };
796 muxer->StopTracingSession(session_id);
797 });
798 tracing_stopped.Wait();
799 }
800
801 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)802 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
803 auto* muxer = muxer_;
804 auto session_id = session_id_;
805 muxer->task_runner_->PostTask([muxer, session_id, cb] {
806 muxer->ReadTracingSessionData(session_id, std::move(cb));
807 });
808 }
809
810 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)811 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
812 std::function<void()> cb) {
813 auto* muxer = muxer_;
814 auto session_id = session_id_;
815 muxer->task_runner_->PostTask([muxer, session_id, cb] {
816 auto* consumer = muxer->FindConsumer(session_id);
817 if (!consumer)
818 return;
819 consumer->start_complete_callback_ = cb;
820 });
821 }
822
823 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)824 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
825 std::function<void(TracingError)> cb) {
826 auto* muxer = muxer_;
827 auto session_id = session_id_;
828 muxer->task_runner_->PostTask([muxer, session_id, cb] {
829 auto* consumer = muxer->FindConsumer(session_id);
830 if (!consumer) {
831 // Notify the client about concurrent disconnection of the session.
832 if (cb)
833 cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
834 return;
835 }
836 consumer->error_callback_ = cb;
837 });
838 }
839
840 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)841 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
842 std::function<void()> cb) {
843 auto* muxer = muxer_;
844 auto session_id = session_id_;
845 muxer->task_runner_->PostTask([muxer, session_id, cb] {
846 auto* consumer = muxer->FindConsumer(session_id);
847 if (!consumer)
848 return;
849 consumer->stop_complete_callback_ = cb;
850 });
851 }
852
853 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)854 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
855 GetTraceStatsCallback cb) {
856 auto* muxer = muxer_;
857 auto session_id = session_id_;
858 muxer->task_runner_->PostTask([muxer, session_id, cb] {
859 muxer->GetTraceStats(session_id, std::move(cb));
860 });
861 }
862
863 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)864 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
865 QueryServiceStateCallback cb) {
866 auto* muxer = muxer_;
867 auto session_id = session_id_;
868 muxer->task_runner_->PostTask([muxer, session_id, cb] {
869 muxer->QueryServiceState(session_id, std::move(cb));
870 });
871 }
872
873 // ----- End of TracingMuxerImpl::TracingSessionImpl
874
875 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
876
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)877 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
878 TracingMuxerImpl* muxer,
879 TracingSessionGlobalID session_id,
880 BackendType backend_type)
881 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
882
883 // Can be destroyed from any thread.
884 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
885 default;
886
Abort()887 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
888 auto* muxer = muxer_;
889 auto session_id = session_id_;
890 auto backend_type = backend_type_;
891 muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
892 muxer->AbortStartupTracingSession(session_id, backend_type);
893 });
894 }
895
896 // Must not be called from the SDK's internal thread.
AbortBlocking()897 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
898 auto* muxer = muxer_;
899 auto session_id = session_id_;
900 auto backend_type = backend_type_;
901 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
902 base::WaitableEvent event;
903 muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
904 muxer->AbortStartupTracingSession(session_id, backend_type);
905 event.Notify();
906 });
907 event.Wait();
908 }
909
910 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
911
912 // static
913 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
914
915 // This is called by perfetto::Tracing::Initialize().
916 // Can be called on any thread. Typically, but not necessarily, that will be
917 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)918 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
919 : TracingMuxer(args.platform ? args.platform
920 : Platform::GetDefaultPlatform()) {
921 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
922 instance_ = this;
923
924 // Create the thread where muxer, producers and service will live.
925 Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
926 task_runner_.reset(new NonReentrantTaskRunner(
927 this, platform_->CreateTaskRunner(std::move(tr_args))));
928
929 // Run the initializer on that thread.
930 task_runner_->PostTask([this, args] {
931 Initialize(args);
932 AddBackends(args);
933 });
934 }
935
Initialize(const TracingInitArgs & args)936 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
937 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
938
939 policy_ = args.tracing_policy;
940 supports_multiple_data_source_instances_ =
941 args.supports_multiple_data_source_instances;
942
943 // Fallback backend for producer creation for an unsupported backend type.
944 PERFETTO_CHECK(producer_backends_.empty());
945 AddProducerBackend(internal::TracingBackendFake::GetInstance(),
946 BackendType::kUnspecifiedBackend, args);
947 // Fallback backend for consumer creation for an unsupported backend type.
948 // This backend simply fails any attempt to start a tracing session.
949 PERFETTO_CHECK(consumer_backends_.empty());
950 AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
951 BackendType::kUnspecifiedBackend);
952 }
953
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)954 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
955 BackendType type) {
956 if (!backend) {
957 // We skip the log in release builds because the *_backend_fake.cc code
958 // has already an ELOG before returning a nullptr.
959 PERFETTO_DLOG("Consumer backend creation failed, type %d",
960 static_cast<int>(type));
961 return;
962 }
963 // Keep the backends sorted by type.
964 auto it =
965 std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
966 type, CompareBackendByType<RegisteredConsumerBackend>());
967 it = consumer_backends_.emplace(it);
968
969 RegisteredConsumerBackend& rb = *it;
970 rb.backend = backend;
971 rb.type = type;
972 }
973
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)974 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
975 BackendType type,
976 const TracingInitArgs& args) {
977 if (!backend) {
978 // We skip the log in release builds because the *_backend_fake.cc code
979 // has already an ELOG before returning a nullptr.
980 PERFETTO_DLOG("Producer backend creation failed, type %d",
981 static_cast<int>(type));
982 return;
983 }
984 TracingBackendId backend_id = producer_backends_.size();
985 // Keep the backends sorted by type.
986 auto it =
987 std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
988 type, CompareBackendByType<RegisteredProducerBackend>());
989 it = producer_backends_.emplace(it);
990
991 RegisteredProducerBackend& rb = *it;
992 rb.backend = backend;
993 rb.id = backend_id;
994 rb.type = type;
995 rb.producer.reset(
996 new ProducerImpl(this, backend_id, args.shmem_batch_commits_duration_ms));
997 rb.producer_conn_args.producer = rb.producer.get();
998 rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
999 rb.producer_conn_args.task_runner = task_runner_.get();
1000 rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
1001 rb.producer_conn_args.shmem_page_size_hint_bytes =
1002 args.shmem_page_size_hint_kb * 1024;
1003 rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
1004 }
1005
1006 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)1007 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
1008 for (RegisteredProducerBackend& b : producer_backends_) {
1009 if (b.id == id) {
1010 return &b;
1011 }
1012 }
1013 return nullptr;
1014 }
1015
1016 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)1017 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
1018 for (RegisteredProducerBackend& b : producer_backends_) {
1019 if (b.type == type) {
1020 return &b;
1021 }
1022 }
1023 return nullptr;
1024 }
1025
1026 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)1027 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
1028 for (RegisteredConsumerBackend& b : consumer_backends_) {
1029 if (b.type == type) {
1030 return &b;
1031 }
1032 }
1033 return nullptr;
1034 }
1035
AddBackends(const TracingInitArgs & args)1036 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1037 if (args.backends & kSystemBackend) {
1038 PERFETTO_CHECK(args.system_producer_backend_factory_);
1039 if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1040 AddProducerBackend(args.system_producer_backend_factory_(),
1041 kSystemBackend, args);
1042 }
1043 if (args.enable_system_consumer) {
1044 PERFETTO_CHECK(args.system_consumer_backend_factory_);
1045 if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1046 AddConsumerBackend(args.system_consumer_backend_factory_(),
1047 kSystemBackend);
1048 }
1049 }
1050 }
1051
1052 if (args.backends & kInProcessBackend) {
1053 TracingBackend* b = nullptr;
1054 if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1055 if (!b) {
1056 PERFETTO_CHECK(args.in_process_backend_factory_);
1057 b = args.in_process_backend_factory_();
1058 }
1059 AddProducerBackend(b, kInProcessBackend, args);
1060 }
1061 if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1062 if (!b) {
1063 PERFETTO_CHECK(args.in_process_backend_factory_);
1064 b = args.in_process_backend_factory_();
1065 }
1066 AddConsumerBackend(b, kInProcessBackend);
1067 }
1068 }
1069
1070 if (args.backends & kCustomBackend) {
1071 PERFETTO_CHECK(args.custom_backend);
1072 if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1073 AddProducerBackend(args.custom_backend, kCustomBackend, args);
1074 }
1075 if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1076 AddConsumerBackend(args.custom_backend, kCustomBackend);
1077 }
1078 }
1079
1080 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1081 PERFETTO_FATAL("Unsupported tracing backend type");
1082 }
1083 }
1084
1085 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,DataSourceStaticState * static_state)1086 bool TracingMuxerImpl::RegisterDataSource(
1087 const DataSourceDescriptor& descriptor,
1088 DataSourceFactory factory,
1089 DataSourceParams params,
1090 DataSourceStaticState* static_state) {
1091 // Ignore repeated registrations.
1092 if (static_state->index != kMaxDataSources)
1093 return true;
1094
1095 uint32_t new_index = next_data_source_index_++;
1096 if (new_index >= kMaxDataSources) {
1097 PERFETTO_DLOG(
1098 "RegisterDataSource failed: too many data sources already registered");
1099 return false;
1100 }
1101
1102 // Initialize the static state.
1103 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1104 "instances[] size mismatch");
1105 for (size_t i = 0; i < static_state->instances.size(); i++)
1106 new (&static_state->instances[i]) DataSourceState{};
1107
1108 static_state->index = new_index;
1109
1110 // Generate a semi-unique id for this data source.
1111 base::Hasher hash;
1112 hash.Update(reinterpret_cast<intptr_t>(static_state));
1113 hash.Update(base::GetWallTimeNs().count());
1114 static_state->id = hash.digest() ? hash.digest() : 1;
1115
1116 task_runner_->PostTask([this, descriptor, factory, static_state, params] {
1117 data_sources_.emplace_back();
1118 RegisteredDataSource& rds = data_sources_.back();
1119 rds.descriptor = descriptor;
1120 rds.factory = factory;
1121 rds.supports_multiple_instances =
1122 supports_multiple_data_source_instances_ &&
1123 params.supports_multiple_instances;
1124 rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1125 rds.static_state = static_state;
1126
1127 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1128 });
1129 return true;
1130 }
1131
1132 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1133 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1134 const DataSourceDescriptor& descriptor,
1135 const DataSourceStaticState* static_state) {
1136 task_runner_->PostTask([this, descriptor, static_state] {
1137 for (auto& rds : data_sources_) {
1138 if (rds.static_state == static_state) {
1139 PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1140 rds.descriptor = descriptor;
1141 rds.descriptor.set_id(static_state->id);
1142 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1143 return;
1144 }
1145 }
1146 });
1147 }
1148
1149 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1150 void TracingMuxerImpl::RegisterInterceptor(
1151 const InterceptorDescriptor& descriptor,
1152 InterceptorFactory factory,
1153 InterceptorBase::TLSFactory tls_factory,
1154 InterceptorBase::TracePacketCallback packet_callback) {
1155 task_runner_->PostTask(
1156 [this, descriptor, factory, tls_factory, packet_callback] {
1157 // Ignore repeated registrations.
1158 for (const auto& interceptor : interceptors_) {
1159 if (interceptor.descriptor.name() == descriptor.name()) {
1160 PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1161 PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1162 return;
1163 }
1164 }
1165 // Only allow certain interceptors for now.
1166 if (descriptor.name() != "test_interceptor" &&
1167 descriptor.name() != "console" &&
1168 descriptor.name() != "etwexport") {
1169 PERFETTO_ELOG(
1170 "Interceptors are experimental. If you want to use them, please "
1171 "get in touch with the project maintainers "
1172 "(https://perfetto.dev/docs/contributing/"
1173 "getting-started#community).");
1174 return;
1175 }
1176 interceptors_.emplace_back();
1177 RegisteredInterceptor& interceptor = interceptors_.back();
1178 interceptor.descriptor = descriptor;
1179 interceptor.factory = factory;
1180 interceptor.tls_factory = tls_factory;
1181 interceptor.packet_callback = packet_callback;
1182 });
1183 }
1184
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1185 void TracingMuxerImpl::ActivateTriggers(
1186 const std::vector<std::string>& triggers,
1187 uint32_t ttl_ms) {
1188 base::TimeMillis expire_time =
1189 base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1190 task_runner_->PostTask([this, triggers, expire_time] {
1191 for (RegisteredProducerBackend& backend : producer_backends_) {
1192 if (backend.producer->connected_) {
1193 backend.producer->service_->ActivateTriggers(triggers);
1194 } else {
1195 for (const std::string& trigger : triggers) {
1196 backend.producer->on_connect_triggers_.emplace_back(trigger,
1197 expire_time);
1198 }
1199 }
1200 }
1201 });
1202 }
1203
1204 // Checks if there is any matching startup tracing data source instance for a
1205 // new SetupDataSource call. If so, moves the data source to this tracing
1206 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1207 static bool MaybeAdoptStartupTracingInDataSource(
1208 TracingBackendId backend_id,
1209 uint32_t backend_connection_id,
1210 DataSourceInstanceID instance_id,
1211 const DataSourceConfig& cfg,
1212 const std::vector<RegisteredDataSource>& data_sources) {
1213 uint64_t startup_config_hash = ComputeStartupConfigHash(cfg);
1214
1215 for (const auto& rds : data_sources) {
1216 DataSourceStaticState* static_state = rds.static_state;
1217 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1218 auto* internal_state = static_state->TryGet(i);
1219
1220 // TODO(eseckler): Instead of comparing config_hashes here, should we ask
1221 // the data source instance for a compat check of the config?
1222 if (internal_state &&
1223 internal_state->startup_target_buffer_reservation.load(
1224 std::memory_order_relaxed) &&
1225 internal_state->data_source_instance_id == 0 &&
1226 internal_state->backend_id == backend_id &&
1227 internal_state->backend_connection_id == backend_connection_id &&
1228 internal_state->startup_config_hash == startup_config_hash) {
1229 PERFETTO_DLOG("Setting up data source %" PRIu64
1230 " %s by adopting it from a startup tracing session",
1231 instance_id, cfg.name().c_str());
1232
1233 std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1234 // Set the associations. The actual takeover happens in
1235 // StartDataSource().
1236 internal_state->data_source_instance_id = instance_id;
1237 internal_state->buffer_id =
1238 static_cast<internal::BufferId>(cfg.target_buffer());
1239 internal_state->config_hash = ComputeConfigHash(cfg);
1240
1241 // TODO(eseckler): Should the data souce config provided by the service
1242 // be allowed to specify additional interceptors / additional data
1243 // source params?
1244
1245 return true;
1246 }
1247 }
1248 }
1249 return false;
1250 }
1251
1252 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1253 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1254 uint32_t backend_connection_id,
1255 DataSourceInstanceID instance_id,
1256 const DataSourceConfig& cfg) {
1257 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1258 cfg.name().c_str());
1259 PERFETTO_DCHECK_THREAD(thread_checker_);
1260
1261 // First check if there is any matching startup tracing data source instance.
1262 if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1263 instance_id, cfg, data_sources_)) {
1264 return;
1265 }
1266 uint64_t config_hash = ComputeConfigHash(cfg);
1267
1268 for (const auto& rds : data_sources_) {
1269 if (rds.descriptor.name() != cfg.name())
1270 continue;
1271 DataSourceStaticState& static_state = *rds.static_state;
1272
1273 // If this data source is already active for this exact config, don't start
1274 // another instance. This happens when we have several data sources with the
1275 // same name, in which case the service sends one SetupDataSource event for
1276 // each one. Since we can't map which event maps to which data source, we
1277 // ensure each event only starts one data source instance.
1278 // TODO(skyostil): Register a unique id with each data source to the service
1279 // to disambiguate.
1280 bool active_for_config = false;
1281 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1282 if (!static_state.TryGet(i))
1283 continue;
1284 auto* internal_state =
1285 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1286 if (internal_state->backend_id == backend_id &&
1287 internal_state->config_hash == config_hash) {
1288 active_for_config = true;
1289 break;
1290 }
1291 }
1292 if (active_for_config) {
1293 PERFETTO_DLOG(
1294 "Data source %s is already active with this config, skipping",
1295 cfg.name().c_str());
1296 continue;
1297 }
1298
1299 SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1300 cfg, config_hash, /*startup_config_hash=*/0,
1301 /*startup_session_id=*/0);
1302 return;
1303 }
1304 }
1305
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,uint64_t config_hash,uint64_t startup_config_hash,TracingSessionGlobalID startup_session_id)1306 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1307 const RegisteredDataSource& rds,
1308 TracingBackendId backend_id,
1309 uint32_t backend_connection_id,
1310 DataSourceInstanceID instance_id,
1311 const DataSourceConfig& cfg,
1312 uint64_t config_hash,
1313 uint64_t startup_config_hash,
1314 TracingSessionGlobalID startup_session_id) {
1315 PERFETTO_DCHECK_THREAD(thread_checker_);
1316 DataSourceStaticState& static_state = *rds.static_state;
1317
1318 // If any bit is set in `static_state.valid_instances` then at least one
1319 // other instance of data source is running.
1320 if (!rds.supports_multiple_instances &&
1321 static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1322 PERFETTO_ELOG(
1323 "Failed to setup data source because some another instance of this "
1324 "data source is already active");
1325 return FindDataSourceRes();
1326 }
1327
1328 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1329 // Find a free slot.
1330 if (static_state.TryGet(i))
1331 continue;
1332
1333 auto* internal_state =
1334 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1335 std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1336 static_assert(
1337 std::is_same<decltype(internal_state->data_source_instance_id),
1338 DataSourceInstanceID>::value,
1339 "data_source_instance_id type mismatch");
1340 internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1341
1342 if (startup_session_id) {
1343 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1344 uint16_t& last_reservation =
1345 backend.producer->last_startup_target_buffer_reservation_;
1346 if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1347 PERFETTO_ELOG(
1348 "Startup buffer reservations exhausted, dropping data source");
1349 return FindDataSourceRes();
1350 }
1351 internal_state->startup_target_buffer_reservation.store(
1352 ++last_reservation, std::memory_order_relaxed);
1353 } else {
1354 internal_state->startup_target_buffer_reservation.store(
1355 0, std::memory_order_relaxed);
1356 }
1357
1358 internal_state->backend_id = backend_id;
1359 internal_state->backend_connection_id = backend_connection_id;
1360 internal_state->data_source_instance_id = instance_id;
1361 internal_state->buffer_id =
1362 static_cast<internal::BufferId>(cfg.target_buffer());
1363 internal_state->config_hash = config_hash;
1364 internal_state->startup_session_id = startup_session_id;
1365 internal_state->startup_config_hash = startup_config_hash;
1366 internal_state->data_source = rds.factory();
1367 internal_state->interceptor = nullptr;
1368 internal_state->interceptor_id = 0;
1369
1370 if (cfg.has_interceptor_config()) {
1371 for (size_t j = 0; j < interceptors_.size(); j++) {
1372 if (cfg.interceptor_config().name() ==
1373 interceptors_[j].descriptor.name()) {
1374 PERFETTO_DLOG("Intercepting data source %" PRIu64
1375 " \"%s\" into \"%s\"",
1376 instance_id, cfg.name().c_str(),
1377 cfg.interceptor_config().name().c_str());
1378 internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1379 internal_state->interceptor = interceptors_[j].factory();
1380 internal_state->interceptor->OnSetup({cfg});
1381 break;
1382 }
1383 }
1384 if (!internal_state->interceptor_id) {
1385 PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1386 cfg.interceptor_config().name().c_str());
1387 }
1388 }
1389
1390 // This must be made at the end. See matching acquire-load in
1391 // DataSource::Trace().
1392 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1393
1394 DataSourceBase::SetupArgs setup_args;
1395 setup_args.config = &cfg;
1396 setup_args.internal_instance_index = i;
1397
1398 if (!rds.requires_callbacks_under_lock)
1399 lock.unlock();
1400 internal_state->data_source->OnSetup(setup_args);
1401
1402 return FindDataSourceRes(&static_state, internal_state, i,
1403 rds.requires_callbacks_under_lock);
1404 }
1405 PERFETTO_ELOG(
1406 "Maximum number of data source instances exhausted. "
1407 "Dropping data source %" PRIu64,
1408 instance_id);
1409 return FindDataSourceRes();
1410 }
1411
1412 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1413 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1414 DataSourceInstanceID instance_id) {
1415 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1416 PERFETTO_DCHECK_THREAD(thread_checker_);
1417
1418 auto ds = FindDataSource(backend_id, instance_id);
1419 if (!ds) {
1420 PERFETTO_ELOG("Could not find data source to start");
1421 return;
1422 }
1423
1424 // Check if the data source was already started for startup tracing.
1425 uint16_t startup_reservation =
1426 ds.internal_state->startup_target_buffer_reservation.load(
1427 std::memory_order_relaxed);
1428 if (startup_reservation) {
1429 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1430 TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1431 auto session_it = std::find_if(
1432 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1433 [session_id](const RegisteredStartupSession& session) {
1434 return session.session_id == session_id;
1435 });
1436 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1437
1438 if (session_it->is_aborting) {
1439 PERFETTO_DLOG("Data source %" PRIu64
1440 " was already aborted for startup tracing, not starting it",
1441 instance_id);
1442 return;
1443 }
1444
1445 PERFETTO_DLOG(
1446 "Data source %" PRIu64
1447 " was already started for startup tracing, binding its target buffer",
1448 instance_id);
1449
1450 backend.producer->service_->MaybeSharedMemoryArbiter()
1451 ->BindStartupTargetBuffer(startup_reservation,
1452 ds.internal_state->buffer_id);
1453
1454 // The reservation ID can be used even after binding it, so there's no need
1455 // for any barriers here - we just need atomicity.
1456 ds.internal_state->startup_target_buffer_reservation.store(
1457 0, std::memory_order_relaxed);
1458
1459 // TODO(eseckler): Should we reset incremental state at this point, or
1460 // notify the data source some other way?
1461
1462 // The session should not have been fully bound yet (or aborted).
1463 PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1464
1465 session_it->num_unbound_data_sources--;
1466 if (session_it->num_unbound_data_sources == 0) {
1467 if (session_it->on_adopted)
1468 task_runner_->PostTask(session_it->on_adopted);
1469 backend.startup_sessions.erase(session_it);
1470 }
1471 return;
1472 }
1473
1474 StartDataSourceImpl(ds);
1475 }
1476
StartDataSourceImpl(const FindDataSourceRes & ds)1477 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1478 PERFETTO_DCHECK_THREAD(thread_checker_);
1479
1480 DataSourceBase::StartArgs start_args{};
1481 start_args.internal_instance_index = ds.instance_idx;
1482
1483 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1484 if (ds.internal_state->interceptor)
1485 ds.internal_state->interceptor->OnStart({});
1486 ds.internal_state->trace_lambda_enabled.store(true,
1487 std::memory_order_relaxed);
1488 PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1489
1490 if (!ds.requires_callbacks_under_lock)
1491 lock.unlock();
1492 ds.internal_state->data_source->OnStart(start_args);
1493 }
1494
1495 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1496 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1497 TracingBackendId backend_id,
1498 DataSourceInstanceID instance_id) {
1499 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1500 PERFETTO_DCHECK_THREAD(thread_checker_);
1501
1502 auto ds = FindDataSource(backend_id, instance_id);
1503 if (!ds) {
1504 PERFETTO_ELOG("Could not find data source to stop");
1505 return;
1506 }
1507
1508 StopDataSource_AsyncBeginImpl(ds);
1509 }
1510
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1511 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1512 const FindDataSourceRes& ds) {
1513 TracingBackendId backend_id = ds.internal_state->backend_id;
1514 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1515 DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1516
1517 StopArgsImpl stop_args{};
1518 stop_args.internal_instance_index = ds.instance_idx;
1519 stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1520 instance_id, ds] {
1521 // TracingMuxerImpl is long lived, capturing |this| is okay.
1522 // The notification closure can be moved out of the StopArgs by the
1523 // embedder to handle stop asynchronously. The embedder might then
1524 // call the closure on a different thread than the current one, hence
1525 // this nested PostTask().
1526 task_runner_->PostTask(
1527 [this, backend_id, backend_connection_id, instance_id, ds] {
1528 StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1529 instance_id, ds);
1530 });
1531 };
1532
1533 {
1534 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1535 if (ds.internal_state->interceptor)
1536 ds.internal_state->interceptor->OnStop({});
1537
1538 if (!ds.requires_callbacks_under_lock)
1539 lock.unlock();
1540 ds.internal_state->data_source->OnStop(stop_args);
1541 }
1542
1543 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1544 // async closure here. In theory we could avoid the PostTask and call
1545 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1546 // divergencies between the deferred-stop vs non-deferred-stop code paths.
1547 if (stop_args.async_stop_closure)
1548 std::move(stop_args.async_stop_closure)();
1549 }
1550
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1551 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1552 uint32_t backend_connection_id,
1553 DataSourceInstanceID instance_id,
1554 const FindDataSourceRes& ds) {
1555 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1556 PERFETTO_DCHECK_THREAD(thread_checker_);
1557
1558 // Check that the data source instance is still active and was not modified
1559 // while it was being stopped.
1560 if (!ds.static_state->TryGet(ds.instance_idx) ||
1561 ds.internal_state->backend_id != backend_id ||
1562 ds.internal_state->backend_connection_id != backend_connection_id ||
1563 ds.internal_state->data_source_instance_id != instance_id) {
1564 PERFETTO_ELOG(
1565 "Async stop of data source %" PRIu64
1566 " failed. This might be due to calling the async_stop_closure twice.",
1567 instance_id);
1568 return;
1569 }
1570
1571 const uint32_t mask = ~(1 << ds.instance_idx);
1572 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1573
1574 // Take the mutex to prevent that the data source is in the middle of
1575 // a Trace() execution where it called GetDataSourceLocked() while we
1576 // destroy it.
1577 uint16_t startup_buffer_reservation;
1578 TracingSessionGlobalID startup_session_id;
1579 {
1580 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1581 ds.internal_state->trace_lambda_enabled.store(false,
1582 std::memory_order_relaxed);
1583 ds.internal_state->data_source.reset();
1584 ds.internal_state->interceptor.reset();
1585 startup_buffer_reservation =
1586 ds.internal_state->startup_target_buffer_reservation.load(
1587 std::memory_order_relaxed);
1588 startup_session_id = ds.internal_state->startup_session_id;
1589 }
1590
1591 // The other fields of internal_state are deliberately *not* cleared.
1592 // See races-related comments of DataSource::Trace().
1593
1594 TracingMuxer::generation_++;
1595
1596 // |producer_backends_| is append-only, Backend instances are always valid.
1597 PERFETTO_CHECK(backend_id < producer_backends_.size());
1598 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1599 ProducerImpl* producer = backend.producer.get();
1600 if (!producer)
1601 return;
1602
1603 // If the data source instance still has a startup buffer reservation, it was
1604 // only active for startup tracing and never started by the service. Discard
1605 // the startup buffer reservation.
1606 if (startup_buffer_reservation) {
1607 PERFETTO_DCHECK(startup_session_id);
1608
1609 if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1610 producer->service_->MaybeSharedMemoryArbiter()
1611 ->AbortStartupTracingForReservation(startup_buffer_reservation);
1612 }
1613
1614 auto session_it = std::find_if(
1615 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1616 [startup_session_id](const RegisteredStartupSession& session) {
1617 return session.session_id == startup_session_id;
1618 });
1619
1620 // Session should not be removed until abortion of all data source instances
1621 // is complete.
1622 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1623
1624 session_it->num_aborting_data_sources--;
1625 if (session_it->num_aborting_data_sources == 0) {
1626 if (session_it->on_aborted)
1627 task_runner_->PostTask(session_it->on_aborted);
1628
1629 backend.startup_sessions.erase(session_it);
1630 }
1631 }
1632
1633 if (producer->connected_) {
1634 // Flush any commits that might have been batched by SharedMemoryArbiter.
1635 producer->service_->MaybeSharedMemoryArbiter()
1636 ->FlushPendingCommitDataRequests();
1637 if (instance_id)
1638 producer->service_->NotifyDataSourceStopped(instance_id);
1639 }
1640 producer->SweepDeadServices();
1641 }
1642
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1643 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1644 TracingBackendId backend_id,
1645 DataSourceInstanceID instance_id) {
1646 PERFETTO_DCHECK_THREAD(thread_checker_);
1647 PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1648 instance_id);
1649 auto ds = FindDataSource(backend_id, instance_id);
1650 if (!ds) {
1651 PERFETTO_ELOG("Could not find data source to clear incremental state for");
1652 return;
1653 }
1654
1655 DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1656 clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1657 {
1658 std::unique_lock<std::recursive_mutex> lock;
1659 if (ds.requires_callbacks_under_lock)
1660 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1661 ds.internal_state->data_source->WillClearIncrementalState(
1662 clear_incremental_state_args);
1663 }
1664
1665 // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1666 // the incremental state should be cleared.
1667 ds.static_state->incremental_state_generation.fetch_add(
1668 1, std::memory_order_relaxed);
1669 }
1670
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id)1671 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1672 TracingBackendId backend_id,
1673 DataSourceInstanceID instance_id,
1674 FlushRequestID flush_id) {
1675 PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1676 auto ds = FindDataSource(backend_id, instance_id);
1677 if (!ds) {
1678 PERFETTO_ELOG("Could not find data source to flush");
1679 return true;
1680 }
1681
1682 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1683
1684 FlushArgsImpl flush_args;
1685 flush_args.internal_instance_index = ds.instance_idx;
1686 flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1687 instance_id, ds, flush_id] {
1688 // TracingMuxerImpl is long lived, capturing |this| is okay.
1689 // The notification closure can be moved out of the StopArgs by the
1690 // embedder to handle stop asynchronously. The embedder might then
1691 // call the closure on a different thread than the current one, hence
1692 // this nested PostTask().
1693 task_runner_->PostTask(
1694 [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1695 FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1696 instance_id, ds, flush_id);
1697 });
1698 };
1699 {
1700 std::unique_lock<std::recursive_mutex> lock;
1701 if (ds.requires_callbacks_under_lock)
1702 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1703 ds.internal_state->data_source->OnFlush(flush_args);
1704 }
1705
1706 // |async_flush_closure| is moved out of |flush_args| if the producer
1707 // requested to handle the flush asynchronously.
1708 bool handled = static_cast<bool>(flush_args.async_flush_closure);
1709 return handled;
1710 }
1711
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1712 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1713 TracingBackendId backend_id,
1714 uint32_t backend_connection_id,
1715 DataSourceInstanceID instance_id,
1716 const FindDataSourceRes& ds,
1717 FlushRequestID flush_id) {
1718 PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1719 PERFETTO_DCHECK_THREAD(thread_checker_);
1720
1721 // Check that the data source instance is still active and was not modified
1722 // while it was being flushed.
1723 if (!ds.static_state->TryGet(ds.instance_idx) ||
1724 ds.internal_state->backend_id != backend_id ||
1725 ds.internal_state->backend_connection_id != backend_connection_id ||
1726 ds.internal_state->data_source_instance_id != instance_id) {
1727 PERFETTO_ELOG("Async flush of data source %" PRIu64
1728 " failed. This might be due to the data source being stopped "
1729 "in the meantime",
1730 instance_id);
1731 return;
1732 }
1733
1734 // |producer_backends_| is append-only, Backend instances are always valid.
1735 PERFETTO_CHECK(backend_id < producer_backends_.size());
1736 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1737
1738 ProducerImpl* producer = backend.producer.get();
1739 if (!producer)
1740 return;
1741
1742 if (producer->connected_) {
1743 producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1744 }
1745 }
1746
SyncProducersForTesting()1747 void TracingMuxerImpl::SyncProducersForTesting() {
1748 std::mutex mutex;
1749 std::condition_variable cv;
1750
1751 // IPC-based producers don't report connection errors explicitly for each
1752 // command, but instead with an asynchronous callback
1753 // (ProducerImpl::OnDisconnected). This means that the sync command below
1754 // may have completed but failed to reach the service because of a
1755 // disconnection, but we can't tell until the disconnection message comes
1756 // through. To guard against this, we run two whole rounds of sync round-trips
1757 // before returning; the first one will detect any disconnected producers and
1758 // the second one will ensure any reconnections have completed and all data
1759 // sources are registered in the service again.
1760 for (size_t i = 0; i < 2; i++) {
1761 size_t countdown = std::numeric_limits<size_t>::max();
1762 task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1763 {
1764 std::unique_lock<std::mutex> countdown_lock(mutex);
1765 countdown = producer_backends_.size();
1766 }
1767 for (auto& backend : producer_backends_) {
1768 auto* producer = backend.producer.get();
1769 producer->service_->Sync([&mutex, &cv, &countdown] {
1770 std::unique_lock<std::mutex> countdown_lock(mutex);
1771 countdown--;
1772 cv.notify_one();
1773 });
1774 }
1775 });
1776
1777 {
1778 std::unique_lock<std::mutex> countdown_lock(mutex);
1779 cv.wait(countdown_lock, [&countdown] { return !countdown; });
1780 }
1781 }
1782
1783 // Check that all producers are indeed connected.
1784 bool done = false;
1785 bool all_producers_connected = true;
1786 task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1787 for (auto& backend : producer_backends_)
1788 all_producers_connected &= backend.producer->connected_;
1789 std::unique_lock<std::mutex> lock(mutex);
1790 done = true;
1791 cv.notify_one();
1792 });
1793
1794 {
1795 std::unique_lock<std::mutex> lock(mutex);
1796 cv.wait(lock, [&done] { return done; });
1797 }
1798 PERFETTO_DCHECK(all_producers_connected);
1799 }
1800
DestroyStoppedTraceWritersForCurrentThread()1801 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1802 // Iterate across all possible data source types.
1803 auto cur_generation = generation_.load(std::memory_order_acquire);
1804 auto* root_tls = GetOrCreateTracingTLS();
1805
1806 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1807 // |tls| has a vector of per-data-source-instance thread-local state.
1808 DataSourceStaticState* static_state = tls.static_state;
1809 if (!static_state)
1810 return; // Slot not used.
1811
1812 // Iterate across all possible instances for this data source.
1813 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1814 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1815 if (!ds_tls.trace_writer)
1816 continue;
1817
1818 DataSourceState* ds_state = static_state->TryGet(inst);
1819 if (ds_state &&
1820 ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1821 ds_state->backend_id == ds_tls.backend_id &&
1822 ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1823 ds_state->startup_target_buffer_reservation.load(
1824 std::memory_order_relaxed) ==
1825 ds_tls.startup_target_buffer_reservation &&
1826 ds_state->buffer_id == ds_tls.buffer_id &&
1827 ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1828 continue;
1829 }
1830
1831 // The DataSource instance has been destroyed or recycled.
1832 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
1833 }
1834 };
1835
1836 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1837 // |tls| has a vector of per-data-source-instance thread-local state.
1838 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1839 destroy_stopped_instances(tls);
1840 }
1841 destroy_stopped_instances(root_tls->track_event_tls);
1842 root_tls->generation = cur_generation;
1843 }
1844
1845 // Called both when a new data source is registered or when a new backend
1846 // connects. In both cases we want to be sure we reflected the data source
1847 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1848 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1849 PERFETTO_DCHECK_THREAD(thread_checker_);
1850 for (RegisteredDataSource& rds : data_sources_) {
1851 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1852 }
1853 }
1854
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1855 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1856 bool is_changed) {
1857 PERFETTO_DCHECK_THREAD(thread_checker_);
1858 for (RegisteredProducerBackend& backend : producer_backends_) {
1859 // We cannot call RegisterDataSource on the backend before it connects.
1860 if (!backend.producer->connected_)
1861 continue;
1862
1863 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1864 bool is_registered = backend.producer->registered_data_sources_.test(
1865 rds.static_state->index);
1866 if (is_registered && !is_changed)
1867 continue;
1868
1869 rds.descriptor.set_will_notify_on_start(true);
1870 rds.descriptor.set_will_notify_on_stop(true);
1871 rds.descriptor.set_handles_incremental_state_clear(true);
1872 rds.descriptor.set_id(rds.static_state->id);
1873 if (is_registered) {
1874 backend.producer->service_->UpdateDataSource(rds.descriptor);
1875 } else {
1876 backend.producer->service_->RegisterDataSource(rds.descriptor);
1877 }
1878 backend.producer->registered_data_sources_.set(rds.static_state->index);
1879 }
1880 }
1881
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1882 void TracingMuxerImpl::SetupTracingSession(
1883 TracingSessionGlobalID session_id,
1884 const std::shared_ptr<TraceConfig>& trace_config,
1885 base::ScopedFile trace_fd) {
1886 PERFETTO_DCHECK_THREAD(thread_checker_);
1887 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1888
1889 auto* consumer = FindConsumer(session_id);
1890 if (!consumer)
1891 return;
1892
1893 consumer->trace_config_ = trace_config;
1894 if (trace_fd)
1895 consumer->trace_fd_ = std::move(trace_fd);
1896
1897 if (!consumer->connected_)
1898 return;
1899
1900 // Only used in the deferred start mode.
1901 if (trace_config->deferred_start()) {
1902 consumer->service_->EnableTracing(*trace_config,
1903 std::move(consumer->trace_fd_));
1904 }
1905 }
1906
StartTracingSession(TracingSessionGlobalID session_id)1907 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1908 PERFETTO_DCHECK_THREAD(thread_checker_);
1909
1910 auto* consumer = FindConsumer(session_id);
1911
1912 if (!consumer)
1913 return;
1914
1915 if (!consumer->trace_config_) {
1916 PERFETTO_ELOG("Must call Setup(config) first");
1917 return;
1918 }
1919
1920 if (!consumer->connected_) {
1921 consumer->start_pending_ = true;
1922 return;
1923 }
1924
1925 consumer->start_pending_ = false;
1926 if (consumer->trace_config_->deferred_start()) {
1927 consumer->service_->StartTracing();
1928 } else {
1929 consumer->service_->EnableTracing(*consumer->trace_config_,
1930 std::move(consumer->trace_fd_));
1931 }
1932
1933 // TODO implement support for the deferred-start + fast-triggering case.
1934 }
1935
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1936 void TracingMuxerImpl::ChangeTracingSessionConfig(
1937 TracingSessionGlobalID session_id,
1938 const TraceConfig& trace_config) {
1939 PERFETTO_DCHECK_THREAD(thread_checker_);
1940
1941 auto* consumer = FindConsumer(session_id);
1942
1943 if (!consumer)
1944 return;
1945
1946 if (!consumer->trace_config_) {
1947 // Changing the config is only supported for started sessions.
1948 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1949 return;
1950 }
1951
1952 consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1953 if (consumer->connected_)
1954 consumer->service_->ChangeTraceConfig(trace_config);
1955 }
1956
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1957 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1958 uint32_t timeout_ms,
1959 std::function<void(bool)> callback) {
1960 PERFETTO_DCHECK_THREAD(thread_checker_);
1961 auto* consumer = FindConsumer(session_id);
1962 if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1963 !consumer->trace_config_) {
1964 PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1965 std::move(callback)(false);
1966 return;
1967 }
1968
1969 consumer->service_->Flush(timeout_ms, std::move(callback));
1970 }
1971
StopTracingSession(TracingSessionGlobalID session_id)1972 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1973 PERFETTO_DCHECK_THREAD(thread_checker_);
1974 auto* consumer = FindConsumer(session_id);
1975 if (!consumer)
1976 return;
1977
1978 if (consumer->start_pending_) {
1979 // If the session hasn't started yet, wait until it does before stopping.
1980 consumer->stop_pending_ = true;
1981 return;
1982 }
1983
1984 consumer->stop_pending_ = false;
1985 if (consumer->stopped_) {
1986 // If the session was already stopped (e.g., it failed to start), don't try
1987 // stopping again.
1988 consumer->NotifyStopComplete();
1989 } else if (!consumer->trace_config_) {
1990 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1991 return;
1992 } else {
1993 consumer->service_->DisableTracing();
1994 }
1995
1996 consumer->trace_config_.reset();
1997 }
1998
DestroyTracingSession(TracingSessionGlobalID session_id)1999 void TracingMuxerImpl::DestroyTracingSession(
2000 TracingSessionGlobalID session_id) {
2001 PERFETTO_DCHECK_THREAD(thread_checker_);
2002 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2003 // We need to find the consumer (if any) and call Disconnect as we destroy
2004 // the tracing session. We can't call Disconnect() inside this for loop
2005 // because in the in-process case this will end up to a synchronous call to
2006 // OnConsumerDisconnect which will invalidate all the iterators to
2007 // |backend.consumers|.
2008 ConsumerImpl* consumer = nullptr;
2009 for (auto& con : backend.consumers) {
2010 if (con->session_id_ == session_id) {
2011 consumer = con.get();
2012 break;
2013 }
2014 }
2015 if (consumer) {
2016 // We broke out of the loop above on the assumption that each backend will
2017 // only have a single consumer per session. This DCHECK ensures that
2018 // this is the case.
2019 PERFETTO_DCHECK(
2020 std::count_if(backend.consumers.begin(), backend.consumers.end(),
2021 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2022 return con->session_id_ == session_id;
2023 }) == 1u);
2024 consumer->Disconnect();
2025 }
2026 }
2027 }
2028
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2029 void TracingMuxerImpl::ReadTracingSessionData(
2030 TracingSessionGlobalID session_id,
2031 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2032 PERFETTO_DCHECK_THREAD(thread_checker_);
2033 auto* consumer = FindConsumer(session_id);
2034 if (!consumer) {
2035 // TODO(skyostil): Signal an error to the user.
2036 TracingSession::ReadTraceCallbackArgs callback_arg{};
2037 callback(callback_arg);
2038 return;
2039 }
2040 PERFETTO_DCHECK(!consumer->read_trace_callback_);
2041 consumer->read_trace_callback_ = std::move(callback);
2042 consumer->service_->ReadBuffers();
2043 }
2044
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2045 void TracingMuxerImpl::GetTraceStats(
2046 TracingSessionGlobalID session_id,
2047 TracingSession::GetTraceStatsCallback callback) {
2048 PERFETTO_DCHECK_THREAD(thread_checker_);
2049 auto* consumer = FindConsumer(session_id);
2050 if (!consumer) {
2051 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2052 callback_arg.success = false;
2053 callback(std::move(callback_arg));
2054 return;
2055 }
2056 PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2057 consumer->get_trace_stats_callback_ = std::move(callback);
2058 if (!consumer->connected_) {
2059 consumer->get_trace_stats_pending_ = true;
2060 return;
2061 }
2062 consumer->get_trace_stats_pending_ = false;
2063 consumer->service_->GetTraceStats();
2064 }
2065
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2066 void TracingMuxerImpl::QueryServiceState(
2067 TracingSessionGlobalID session_id,
2068 TracingSession::QueryServiceStateCallback callback) {
2069 PERFETTO_DCHECK_THREAD(thread_checker_);
2070 auto* consumer = FindConsumer(session_id);
2071 if (!consumer) {
2072 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2073 callback_arg.success = false;
2074 callback(std::move(callback_arg));
2075 return;
2076 }
2077 PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2078 if (!consumer->connected_) {
2079 consumer->query_service_state_callback_ = std::move(callback);
2080 return;
2081 }
2082 auto callback_wrapper = [callback](bool success,
2083 protos::gen::TracingServiceState state) {
2084 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2085 callback_arg.success = success;
2086 callback_arg.service_state_data = state.SerializeAsArray();
2087 callback(std::move(callback_arg));
2088 };
2089 consumer->service_->QueryServiceState(std::move(callback_wrapper));
2090 }
2091
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2092 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2093 uint32_t batch_commits_duration_ms,
2094 BackendType backend_type) {
2095 for (RegisteredProducerBackend& backend : producer_backends_) {
2096 if (backend.producer && backend.producer->connected_ &&
2097 backend.type == backend_type) {
2098 backend.producer->service_->MaybeSharedMemoryArbiter()
2099 ->SetBatchCommitsDuration(batch_commits_duration_ms);
2100 }
2101 }
2102 }
2103
EnableDirectSMBPatchingForTesting(BackendType backend_type)2104 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2105 BackendType backend_type) {
2106 for (RegisteredProducerBackend& backend : producer_backends_) {
2107 if (backend.producer && backend.producer->connected_ &&
2108 backend.type == backend_type &&
2109 !backend.producer->service_->MaybeSharedMemoryArbiter()
2110 ->EnableDirectSMBPatching()) {
2111 return false;
2112 }
2113 }
2114 return true;
2115 }
2116
FindConsumer(TracingSessionGlobalID session_id)2117 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2118 TracingSessionGlobalID session_id) {
2119 PERFETTO_DCHECK_THREAD(thread_checker_);
2120 return FindConsumerAndBackend(session_id).first;
2121 }
2122
2123 std::pair<TracingMuxerImpl::ConsumerImpl*,
2124 TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2125 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2126 PERFETTO_DCHECK_THREAD(thread_checker_);
2127 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2128 for (auto& consumer : backend.consumers) {
2129 if (consumer->session_id_ == session_id) {
2130 return {consumer.get(), &backend};
2131 }
2132 }
2133 }
2134 return {nullptr, nullptr};
2135 }
2136
InitializeConsumer(TracingSessionGlobalID session_id)2137 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2138 PERFETTO_DCHECK_THREAD(thread_checker_);
2139
2140 auto res = FindConsumerAndBackend(session_id);
2141 if (!res.first || !res.second)
2142 return;
2143 TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2144 RegisteredConsumerBackend& backend = *res.second;
2145
2146 TracingBackend::ConnectConsumerArgs conn_args;
2147 conn_args.consumer = consumer;
2148 conn_args.task_runner = task_runner_.get();
2149 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2150 }
2151
OnConsumerDisconnected(ConsumerImpl * consumer)2152 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2153 PERFETTO_DCHECK_THREAD(thread_checker_);
2154 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2155 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2156 return con.get() == consumer;
2157 };
2158 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2159 backend.consumers.end(), pred),
2160 backend.consumers.end());
2161 }
2162 }
2163
SetMaxProducerReconnectionsForTesting(uint32_t count)2164 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2165 max_producer_reconnections_.store(count);
2166 }
2167
OnProducerDisconnected(ProducerImpl * producer)2168 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2169 PERFETTO_DCHECK_THREAD(thread_checker_);
2170 for (RegisteredProducerBackend& backend : producer_backends_) {
2171 if (backend.producer.get() != producer)
2172 continue;
2173
2174 // The tracing service is disconnected. It does not make sense to keep
2175 // tracing (we wouldn't be able to commit). On reconnection, the tracing
2176 // service will restart the data sources.
2177 for (const auto& rds : data_sources_) {
2178 DataSourceStaticState* static_state = rds.static_state;
2179 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2180 auto* internal_state = static_state->TryGet(i);
2181 if (internal_state && internal_state->backend_id == backend.id &&
2182 internal_state->backend_connection_id ==
2183 backend.producer->connection_id_.load(
2184 std::memory_order_relaxed)) {
2185 StopDataSource_AsyncBeginImpl(
2186 FindDataSourceRes(static_state, internal_state, i,
2187 rds.requires_callbacks_under_lock));
2188 }
2189 }
2190 }
2191
2192 // Try reconnecting the disconnected producer. If the connection succeeds,
2193 // all the data sources will be automatically re-registered.
2194 if (producer->connection_id_.load(std::memory_order_relaxed) >
2195 max_producer_reconnections_.load()) {
2196 // Avoid reconnecting a failing producer too many times. Instead we just
2197 // leak the producer instead of trying to avoid further complicating
2198 // cross-thread trace writer creation.
2199 PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2200 continue;
2201 }
2202
2203 backend.producer->Initialize(
2204 backend.backend->ConnectProducer(backend.producer_conn_args));
2205 // Don't use producer-provided SMBs for the next connection unless startup
2206 // tracing requires it again.
2207 backend.producer_conn_args.use_producer_provided_smb = false;
2208 }
2209 }
2210
SweepDeadBackends()2211 void TracingMuxerImpl::SweepDeadBackends() {
2212 PERFETTO_DCHECK_THREAD(thread_checker_);
2213 for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2214 auto next_it = it;
2215 next_it++;
2216 if (it->producer->SweepDeadServices())
2217 dead_backends_.erase(it);
2218 it = next_it;
2219 }
2220 }
2221
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2222 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2223 TracingBackendId backend_id,
2224 DataSourceInstanceID instance_id) {
2225 PERFETTO_DCHECK_THREAD(thread_checker_);
2226 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2227 for (const auto& rds : data_sources_) {
2228 DataSourceStaticState* static_state = rds.static_state;
2229 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2230 auto* internal_state = static_state->TryGet(i);
2231 if (internal_state && internal_state->backend_id == backend_id &&
2232 internal_state->backend_connection_id ==
2233 backend.producer->connection_id_.load(
2234 std::memory_order_relaxed) &&
2235 internal_state->data_source_instance_id == instance_id) {
2236 return FindDataSourceRes(static_state, internal_state, i,
2237 rds.requires_callbacks_under_lock);
2238 }
2239 }
2240 }
2241 return FindDataSourceRes();
2242 }
2243
2244 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2245 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2246 DataSourceStaticState* static_state,
2247 uint32_t data_source_instance_index,
2248 DataSourceState* data_source,
2249 BufferExhaustedPolicy buffer_exhausted_policy) {
2250 if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2251 // If the session is being intercepted, return a heap-backed trace writer
2252 // instead. This is safe because all the data given to the interceptor is
2253 // either thread-local (|instance_index|), statically allocated
2254 // (|static_state|) or constant after initialization (|interceptor|). Access
2255 // to the interceptor instance itself through |data_source| is protected by
2256 // a statically allocated lock (similarly to the data source instance).
2257 auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2258 return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2259 interceptor.tls_factory(static_state, data_source_instance_index),
2260 interceptor.packet_callback, static_state, data_source_instance_index));
2261 }
2262 ProducerImpl* producer =
2263 FindProducerBackendById(data_source->backend_id)->producer.get();
2264 // Atomically load the current service endpoint. We keep the pointer as a
2265 // shared pointer on the stack to guard against it from being concurrently
2266 // modified on the thread by ProducerImpl::Initialize() swapping in a
2267 // reconnected service on the muxer task runner thread.
2268 //
2269 // The endpoint may also be concurrently modified by SweepDeadServices()
2270 // clearing out old disconnected services. We guard against that by
2271 // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2272 // shutdown has started, the trace writer created below will be a null one
2273 // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2274 //
2275 // We use an atomic pointer instead of holding a lock because
2276 // CreateTraceWriter posts tasks under the hood.
2277 std::shared_ptr<ProducerEndpoint> service =
2278 std::atomic_load(&producer->service_);
2279
2280 // The service may have been disconnected and reconnected concurrently after
2281 // the data source was enabled, in which case we may not have an arbiter, or
2282 // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2283 // Instead, early-out now. A relaxed load is fine here because the atomic_load
2284 // above ensures that the |service| isn't newer.
2285 if (producer->connection_id_.load(std::memory_order_relaxed) !=
2286 data_source->backend_connection_id) {
2287 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2288 }
2289
2290 // We just need a relaxed atomic read here: We can use the reservation ID even
2291 // after the buffer was bound, we just need to be sure to read it atomically.
2292 uint16_t startup_buffer_reservation =
2293 data_source->startup_target_buffer_reservation.load(
2294 std::memory_order_relaxed);
2295 if (startup_buffer_reservation) {
2296 return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2297 startup_buffer_reservation);
2298 }
2299 return service->CreateTraceWriter(data_source->buffer_id,
2300 buffer_exhausted_policy);
2301 }
2302
2303 // This is called via the public API Tracing::NewTrace().
2304 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2305 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2306 BackendType requested_backend_type,
2307 TracingConsumerBackend* (*system_backend_factory)()) {
2308 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2309
2310 // |backend_type| can only specify one backend, not an OR-ed mask.
2311 PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2312
2313 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2314 task_runner_->PostTask([this, requested_backend_type, session_id,
2315 system_backend_factory] {
2316 if (requested_backend_type == kSystemBackend && system_backend_factory &&
2317 !FindConsumerBackendByType(kSystemBackend)) {
2318 AddConsumerBackend(system_backend_factory(), kSystemBackend);
2319 }
2320 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2321 if (requested_backend_type && backend.type &&
2322 backend.type != requested_backend_type) {
2323 continue;
2324 }
2325
2326 // Create the consumer now, even if we have to ask the embedder below, so
2327 // that any other tasks executing after this one can find the consumer and
2328 // change its pending attributes.
2329 backend.consumers.emplace_back(
2330 new ConsumerImpl(this, backend.type, session_id));
2331
2332 // The last registered backend in |consumer_backends_| is the unsupported
2333 // backend without a valid type.
2334 if (!backend.type) {
2335 PERFETTO_ELOG(
2336 "No tracing backend ready for type=%d, consumer will disconnect",
2337 requested_backend_type);
2338 InitializeConsumer(session_id);
2339 return;
2340 }
2341
2342 // Check if the embedder wants to be asked for permission before
2343 // connecting the consumer.
2344 if (!policy_) {
2345 InitializeConsumer(session_id);
2346 return;
2347 }
2348
2349 BackendType type = backend.type;
2350 TracingPolicy::ShouldAllowConsumerSessionArgs args;
2351 args.backend_type = backend.type;
2352 args.result_callback = [this, type, session_id](bool allow) {
2353 task_runner_->PostTask([this, type, session_id, allow] {
2354 if (allow) {
2355 InitializeConsumer(session_id);
2356 return;
2357 }
2358
2359 PERFETTO_ELOG(
2360 "Consumer session for backend type type=%d forbidden, "
2361 "consumer will disconnect",
2362 type);
2363
2364 auto* consumer = FindConsumer(session_id);
2365 if (!consumer)
2366 return;
2367
2368 consumer->OnDisconnect();
2369 });
2370 };
2371 policy_->ShouldAllowConsumerSession(args);
2372 return;
2373 }
2374 PERFETTO_DFATAL("Not reached");
2375 });
2376
2377 return std::unique_ptr<TracingSession>(
2378 new TracingSessionImpl(this, session_id, requested_backend_type));
2379 }
2380
2381 // static
2382 // This is called via the public API Tracing::SetupStartupTracing().
2383 // Can be called from any thread.
2384 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2385 TracingMuxerImpl::CreateStartupTracingSession(
2386 const TraceConfig& config,
2387 Tracing::SetupStartupTracingOpts opts) {
2388 BackendType backend_type = opts.backend;
2389 // |backend_type| can only specify one backend, not an OR-ed mask.
2390 PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2391 // The in-process backend doesn't support startup tracing.
2392 PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2393
2394 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2395
2396 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2397 task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2398 for (RegisteredProducerBackend& backend : producer_backends_) {
2399 if (backend_type && backend.type && backend.type != backend_type) {
2400 continue;
2401 }
2402
2403 TracingBackendId backend_id = backend.id;
2404
2405 // The last registered backend in |producer_backends_| is the unsupported
2406 // backend without a valid type.
2407 if (!backend.type) {
2408 PERFETTO_ELOG(
2409 "No tracing backend initialized for type=%d, startup tracing "
2410 "failed",
2411 backend_type);
2412 if (opts.on_setup)
2413 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2414 0 /* num_data_sources_started */});
2415 return;
2416 }
2417
2418 if (!backend.producer->service_ ||
2419 !backend.producer->service_->shared_memory()) {
2420 // If we unsuccessfully attempted to use a producer-provided SMB in the
2421 // past, don't try again.
2422 if (backend.producer->producer_provided_smb_failed_) {
2423 PERFETTO_ELOG(
2424 "Backend %zu doesn't seem to support producer-provided "
2425 "SMBs, startup tracing failed",
2426 backend_id);
2427 if (opts.on_setup)
2428 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2429 0 /* num_data_sources_started */});
2430 return;
2431 }
2432
2433 PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2434 backend_id);
2435 backend.producer_conn_args.use_producer_provided_smb = true;
2436 backend.producer->service_->Disconnect(); // Causes a reconnect.
2437 PERFETTO_DCHECK(backend.producer->service_ &&
2438 backend.producer->service_->MaybeSharedMemoryArbiter());
2439 }
2440
2441 RegisteredStartupSession session;
2442 session.session_id = session_id;
2443 session.on_aborted = opts.on_aborted;
2444 session.on_adopted = opts.on_adopted;
2445
2446 for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2447 // Find all matching data sources and start one instance of each.
2448 for (const auto& rds : data_sources_) {
2449 if (rds.descriptor.name() != ds_cfg.config().name())
2450 continue;
2451
2452 PERFETTO_DLOG(
2453 "Setting up data source %s for startup tracing with target "
2454 "buffer reservation %" PRIi32,
2455 rds.descriptor.name().c_str(),
2456 backend.producer->last_startup_target_buffer_reservation_ + 1u);
2457 auto ds = SetupDataSourceImpl(
2458 rds, backend_id,
2459 backend.producer->connection_id_.load(std::memory_order_relaxed),
2460 /*instance_id=*/0, ds_cfg.config(),
2461 ComputeConfigHash(ds_cfg.config()),
2462 ComputeStartupConfigHash(ds_cfg.config()),
2463 /*startup_session_id=*/session_id);
2464 if (ds) {
2465 StartDataSourceImpl(ds);
2466 session.num_unbound_data_sources++;
2467 }
2468 }
2469 }
2470
2471 int num_ds = session.num_unbound_data_sources;
2472 auto on_setup = opts.on_setup;
2473 if (on_setup) {
2474 backend.producer->OnStartupTracingSetup();
2475 task_runner_->PostTask([on_setup, num_ds] {
2476 on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2477 });
2478 }
2479
2480 if (num_ds > 0) {
2481 backend.startup_sessions.push_back(std::move(session));
2482
2483 if (opts.timeout_ms > 0) {
2484 task_runner_->PostDelayedTask(
2485 [this, session_id, backend_type] {
2486 AbortStartupTracingSession(session_id, backend_type);
2487 },
2488 opts.timeout_ms);
2489 }
2490 }
2491 return;
2492 }
2493 PERFETTO_DFATAL("Invalid startup tracing session backend");
2494 });
2495
2496 return std::unique_ptr<StartupTracingSession>(
2497 new StartupTracingSessionImpl(this, session_id, backend_type));
2498 }
2499
2500 // Must not be called from the SDK's internal thread.
2501 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2502 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2503 const TraceConfig& config,
2504 Tracing::SetupStartupTracingOpts opts) {
2505 auto previous_on_setup = std::move(opts.on_setup);
2506 PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2507 base::WaitableEvent event;
2508 // It is safe to capture by reference because once on_setup is called only
2509 // once before this method returns.
2510 opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2511 if (previous_on_setup) {
2512 previous_on_setup(std::move(args));
2513 }
2514 event.Notify();
2515 };
2516 auto session = CreateStartupTracingSession(config, std::move(opts));
2517 event.Wait();
2518 return session;
2519 }
2520
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2521 void TracingMuxerImpl::AbortStartupTracingSession(
2522 TracingSessionGlobalID session_id,
2523 BackendType backend_type) {
2524 PERFETTO_DCHECK_THREAD(thread_checker_);
2525
2526 for (RegisteredProducerBackend& backend : producer_backends_) {
2527 if (backend_type != backend.type)
2528 continue;
2529
2530 auto session_it = std::find_if(
2531 backend.startup_sessions.begin(), backend.startup_sessions.end(),
2532 [session_id](const RegisteredStartupSession& session) {
2533 return session.session_id == session_id;
2534 });
2535
2536 // The startup session may have already been aborted or fully adopted.
2537 if (session_it == backend.startup_sessions.end())
2538 return;
2539 if (session_it->is_aborting)
2540 return;
2541
2542 session_it->is_aborting = true;
2543
2544 // Iterate all data sources and abort them if they weren't adopted yet.
2545 for (const auto& rds : data_sources_) {
2546 DataSourceStaticState* static_state = rds.static_state;
2547 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2548 auto* internal_state = static_state->TryGet(i);
2549 if (internal_state &&
2550 internal_state->startup_target_buffer_reservation.load(
2551 std::memory_order_relaxed) &&
2552 internal_state->data_source_instance_id == 0 &&
2553 internal_state->startup_session_id == session_id) {
2554 PERFETTO_DLOG(
2555 "Aborting startup tracing for data source %s (target buffer "
2556 "reservation %" PRIu16 ")",
2557 rds.descriptor.name().c_str(),
2558 internal_state->startup_target_buffer_reservation.load(
2559 std::memory_order_relaxed));
2560
2561 // Abort the instance asynchronously by stopping it. From this point
2562 // onwards, the service will not be able to adopt it via
2563 // StartDataSource().
2564 session_it->num_aborting_data_sources++;
2565 StopDataSource_AsyncBeginImpl(
2566 FindDataSourceRes(static_state, internal_state, i,
2567 rds.requires_callbacks_under_lock));
2568 }
2569 }
2570 }
2571
2572 // If we did everything right, we should have aborted all still-unbound data
2573 // source instances.
2574 PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2575 session_it->num_aborting_data_sources);
2576
2577 if (session_it->num_aborting_data_sources == 0) {
2578 if (session_it->on_aborted)
2579 task_runner_->PostTask(session_it->on_aborted);
2580
2581 backend.startup_sessions.erase(session_it);
2582 }
2583 return;
2584 }
2585 // We might reach here in tests because when we start a trace, we post the
2586 // Task(AbortStartupTrace, delay=timeout). When we do
2587 // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2588 // kill those delayed tasks because TaskRunner doesn't have support for
2589 // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2590 // to wait for the completion of all the scheduled tasks (apart from
2591 // deleting the TaskRunner) and we want to avoid doing that because we need
2592 // a long running TaskRunner in muxer.
2593 PERFETTO_DLOG("Invalid startup tracing session backend");
2594 }
2595
InitializeInstance(const TracingInitArgs & args)2596 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2597 if (instance_ != TracingMuxerFake::Get()) {
2598 // The tracing muxer was already initialized. We might need to initialize
2599 // additional backends that were not configured earlier.
2600 auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2601 muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2602 return;
2603 }
2604 // If we previously had a TracingMuxerImpl instance which was reset,
2605 // reinitialize and reuse it instead of trying to create a new one. See
2606 // ResetForTesting().
2607 if (g_prev_instance) {
2608 auto* muxer = g_prev_instance;
2609 g_prev_instance = nullptr;
2610 instance_ = muxer;
2611 muxer->task_runner_->PostTask([muxer, args] {
2612 muxer->Initialize(args);
2613 muxer->AddBackends(args);
2614 });
2615 } else {
2616 new TracingMuxerImpl(args);
2617 }
2618 }
2619
2620 // static
ResetForTesting()2621 void TracingMuxerImpl::ResetForTesting() {
2622 // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2623 // various objects make that a non-starter. In particular:
2624 //
2625 // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2626 // a reference back to ProducerImpl::service_.
2627 //
2628 // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2629 //
2630 // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2631 // turn depends on TracingMuxerImpl itself.
2632 //
2633 // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2634 // threads have dropped their TraceWriters. Since we can't really ask the
2635 // caller to guarantee this, we'll instead reset enough of the muxer's state
2636 // so that it can be reinitialized later and ensure all necessary objects from
2637 // the old state remain alive until all references have gone away.
2638 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2639
2640 base::WaitableEvent reset_done;
2641 auto do_reset = [muxer, &reset_done] {
2642 muxer->DestroyStoppedTraceWritersForCurrentThread();
2643 // Unregister all data sources so they don't interfere with any future
2644 // tracing sessions.
2645 for (RegisteredDataSource& rds : muxer->data_sources_) {
2646 for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2647 if (!backend.producer->service_ || !backend.producer->connected_)
2648 continue;
2649 backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2650 }
2651 }
2652 for (auto& backend : muxer->consumer_backends_) {
2653 // Check that no consumer session is currently active on any backend.
2654 for (auto& consumer : backend.consumers)
2655 PERFETTO_CHECK(!consumer->service_);
2656 }
2657 for (auto& backend : muxer->producer_backends_) {
2658 backend.producer->muxer_ = nullptr;
2659 backend.producer->DisposeConnection();
2660 muxer->dead_backends_.push_back(std::move(backend));
2661 }
2662 muxer->consumer_backends_.clear();
2663 muxer->producer_backends_.clear();
2664 muxer->interceptors_.clear();
2665
2666 for (auto& ds : muxer->data_sources_) {
2667 ds.static_state->ResetForTesting();
2668 }
2669
2670 muxer->data_sources_.clear();
2671 muxer->next_data_source_index_ = 0;
2672
2673 // Free all backends without active trace writers or other inbound
2674 // references. Note that even if all the backends get swept, the muxer still
2675 // needs to stay around since |task_runner_| is assumed to be long-lived.
2676 muxer->SweepDeadBackends();
2677
2678 // Make sure we eventually discard any per-thread trace writers from the
2679 // previous instance.
2680 muxer->muxer_id_for_testing_++;
2681
2682 g_prev_instance = muxer;
2683 instance_ = TracingMuxerFake::Get();
2684
2685 // Call the user provided cleanups on the muxer thread.
2686 for (auto& cb : muxer->reset_callbacks_) {
2687 cb();
2688 }
2689
2690 reset_done.Notify();
2691 };
2692
2693 // Some tests run the muxer and the test on the same thread. In these cases,
2694 // we can reset synchronously.
2695 if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2696 do_reset();
2697 } else {
2698 muxer->DestroyStoppedTraceWritersForCurrentThread();
2699 muxer->task_runner_->PostTask(std::move(do_reset));
2700 reset_done.Wait();
2701 // Call the user provided cleanups also on this thread.
2702 for (auto& cb : muxer->reset_callbacks_) {
2703 cb();
2704 }
2705 }
2706 muxer->reset_callbacks_.clear();
2707 }
2708
2709 // static
Shutdown()2710 void TracingMuxerImpl::Shutdown() {
2711 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2712
2713 // Shutting down on the muxer thread would lead to a deadlock.
2714 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2715 muxer->DestroyStoppedTraceWritersForCurrentThread();
2716
2717 std::unique_ptr<base::TaskRunner> owned_task_runner(
2718 muxer->task_runner_.get());
2719 base::WaitableEvent shutdown_done;
2720 owned_task_runner->PostTask([muxer, &shutdown_done] {
2721 // Check that no consumer session is currently active on any backend.
2722 // Producers will be automatically disconnected as a part of deleting the
2723 // muxer below.
2724 for (auto& backend : muxer->consumer_backends_) {
2725 for (auto& consumer : backend.consumers) {
2726 PERFETTO_CHECK(!consumer->service_);
2727 }
2728 }
2729 // Make sure no trace writers are lingering around on the muxer thread. Note
2730 // that we can't do this for any arbitrary thread in the process; it is the
2731 // caller's responsibility to clean them up before shutting down Perfetto.
2732 muxer->DestroyStoppedTraceWritersForCurrentThread();
2733 // The task runner must be deleted outside the muxer thread. This is done by
2734 // `owned_task_runner` above.
2735 muxer->task_runner_.release();
2736 auto* platform = muxer->platform_;
2737 delete muxer;
2738 instance_ = TracingMuxerFake::Get();
2739 platform->Shutdown();
2740 shutdown_done.Notify();
2741 });
2742 shutdown_done.Wait();
2743 }
2744
AppendResetForTestingCallback(std::function<void ()> cb)2745 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2746 reset_callbacks_.push_back(std::move(cb));
2747 }
2748
2749 TracingMuxer::~TracingMuxer() = default;
2750
2751 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2752 "public's BufferId and tracing/core's BufferID diverged");
2753
2754 } // namespace internal
2755 } // namespace perfetto
2756