1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <vector>
23
24 #include "perfetto/base/build_config.h"
25 #include "perfetto/base/logging.h"
26 #include "perfetto/base/task_runner.h"
27 #include "perfetto/ext/base/hash.h"
28 #include "perfetto/ext/base/thread_checker.h"
29 #include "perfetto/ext/base/waitable_event.h"
30 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
31 #include "perfetto/ext/tracing/core/trace_packet.h"
32 #include "perfetto/ext/tracing/core/trace_stats.h"
33 #include "perfetto/ext/tracing/core/trace_writer.h"
34 #include "perfetto/ext/tracing/core/tracing_service.h"
35 #include "perfetto/tracing/buffer_exhausted_policy.h"
36 #include "perfetto/tracing/core/data_source_config.h"
37 #include "perfetto/tracing/core/tracing_service_state.h"
38 #include "perfetto/tracing/data_source.h"
39 #include "perfetto/tracing/internal/data_source_internal.h"
40 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
41 #include "perfetto/tracing/internal/tracing_backend_fake.h"
42 #include "perfetto/tracing/trace_writer_base.h"
43 #include "perfetto/tracing/tracing.h"
44 #include "perfetto/tracing/tracing_backend.h"
45
46 #include "protos/perfetto/config/interceptor_config.gen.h"
47
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49
50 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
51 #include <io.h> // For dup()
52 #else
53 #include <unistd.h> // For dup()
54 #endif
55
56 namespace perfetto {
57 namespace internal {
58
59 namespace {
60
61 // A task runner which prevents calls to DataSource::Trace() while an operation
62 // is in progress. Used to guard against unexpected re-entrancy where the
63 // user-provided task runner implementation tries to enter a trace point under
64 // the hood.
65 class NonReentrantTaskRunner : public base::TaskRunner {
66 public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)67 NonReentrantTaskRunner(TracingMuxer* muxer,
68 std::unique_ptr<base::TaskRunner> task_runner)
69 : muxer_(muxer), task_runner_(std::move(task_runner)) {}
70
71 // base::TaskRunner implementation.
PostTask(std::function<void ()> task)72 void PostTask(std::function<void()> task) override {
73 CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
74 }
75
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)76 void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
77 CallWithGuard(
78 [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
79 }
80
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)81 void AddFileDescriptorWatch(base::PlatformHandle fd,
82 std::function<void()> callback) override {
83 CallWithGuard(
84 [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
85 }
86
RemoveFileDescriptorWatch(base::PlatformHandle fd)87 void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
88 CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
89 }
90
RunsTasksOnCurrentThread() const91 bool RunsTasksOnCurrentThread() const override {
92 bool result;
93 CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
94 return result;
95 }
96
97 private:
98 template <typename T>
CallWithGuard(T lambda) const99 void CallWithGuard(T lambda) const {
100 auto* root_tls = muxer_->GetOrCreateTracingTLS();
101 if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
102 lambda();
103 return;
104 }
105 ScopedReentrancyAnnotator scoped_annotator(*root_tls);
106 lambda();
107 }
108
109 TracingMuxer* const muxer_;
110 std::unique_ptr<base::TaskRunner> task_runner_;
111 };
112
113 class StopArgsImpl : public DataSourceBase::StopArgs {
114 public:
HandleStopAsynchronously() const115 std::function<void()> HandleStopAsynchronously() const override {
116 auto closure = std::move(async_stop_closure);
117 async_stop_closure = std::function<void()>();
118 return closure;
119 }
120
121 mutable std::function<void()> async_stop_closure;
122 };
123
ComputeConfigHash(const DataSourceConfig & config)124 uint64_t ComputeConfigHash(const DataSourceConfig& config) {
125 base::Hash hasher;
126 std::string config_bytes = config.SerializeAsString();
127 hasher.Update(config_bytes.data(), config_bytes.size());
128 return hasher.digest();
129 }
130
131 } // namespace
132
133 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms)134 TracingMuxerImpl::ProducerImpl::ProducerImpl(
135 TracingMuxerImpl* muxer,
136 TracingBackendId backend_id,
137 uint32_t shmem_batch_commits_duration_ms)
138 : muxer_(muxer),
139 backend_id_(backend_id),
140 shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms) {}
141
142 TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
143
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)144 void TracingMuxerImpl::ProducerImpl::Initialize(
145 std::unique_ptr<ProducerEndpoint> endpoint) {
146 PERFETTO_DCHECK_THREAD(thread_checker_);
147 PERFETTO_DCHECK(!connected_);
148 connection_id_++;
149
150 // Adopt the endpoint into a shared pointer so that we can safely share it
151 // across threads that create trace writers. The custom deleter function
152 // ensures that the endpoint is always destroyed on the muxer's thread. (Note
153 // that |task_runner| is assumed to outlive tracing sessions on all threads.)
154 auto* task_runner = muxer_->task_runner_.get();
155 auto deleter = [task_runner](ProducerEndpoint* e) {
156 task_runner->PostTask([e] { delete e; });
157 };
158 std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
159 // This atomic store is needed because another thread might be concurrently
160 // creating a trace writer using the previous (disconnected) |service_|. See
161 // CreateTraceWriter().
162 std::atomic_store(&service_, std::move(service));
163 // Don't try to use the service here since it may not have connected yet. See
164 // OnConnect().
165 }
166
OnConnect()167 void TracingMuxerImpl::ProducerImpl::OnConnect() {
168 PERFETTO_DLOG("Producer connected");
169 PERFETTO_DCHECK_THREAD(thread_checker_);
170 PERFETTO_DCHECK(!connected_);
171 connected_ = true;
172 muxer_->UpdateDataSourcesOnAllBackends();
173 }
174
OnDisconnect()175 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
176 PERFETTO_DCHECK_THREAD(thread_checker_);
177 connected_ = false;
178 // Active data sources for this producer will be stopped by
179 // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
180 // will have a different connection id (even before it has finished
181 // connecting).
182 registered_data_sources_.reset();
183 // Keep the old service around as a dead connection in case it has active
184 // trace writers. We can't clear |service_| here because other threads may be
185 // concurrently creating new trace writers. The reconnection below will
186 // atomically swap the new service in place of the old one.
187 dead_services_.push_back(service_);
188 // Try reconnecting the producer.
189 muxer_->OnProducerDisconnected(this);
190 }
191
OnTracingSetup()192 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
193 PERFETTO_DCHECK_THREAD(thread_checker_);
194 service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
195 shmem_batch_commits_duration_ms_);
196 }
197
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)198 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
199 DataSourceInstanceID id,
200 const DataSourceConfig& cfg) {
201 PERFETTO_DCHECK_THREAD(thread_checker_);
202 muxer_->SetupDataSource(backend_id_, connection_id_, id, cfg);
203 }
204
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)205 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
206 const DataSourceConfig&) {
207 PERFETTO_DCHECK_THREAD(thread_checker_);
208 muxer_->StartDataSource(backend_id_, id);
209 service_->NotifyDataSourceStarted(id);
210 }
211
StopDataSource(DataSourceInstanceID id)212 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
213 PERFETTO_DCHECK_THREAD(thread_checker_);
214 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
215 }
216
Flush(FlushRequestID flush_id,const DataSourceInstanceID *,size_t)217 void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
218 const DataSourceInstanceID*,
219 size_t) {
220 // Flush is not plumbed for now, we just ack straight away.
221 PERFETTO_DCHECK_THREAD(thread_checker_);
222 service_->NotifyFlushComplete(flush_id);
223 }
224
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)225 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
226 const DataSourceInstanceID* instances,
227 size_t instance_count) {
228 PERFETTO_DCHECK_THREAD(thread_checker_);
229 for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
230 muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
231 }
232 }
233
SweepDeadServices()234 void TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
235 PERFETTO_DCHECK_THREAD(thread_checker_);
236 auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
237 auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
238 return !arbiter || arbiter->TryShutdown();
239 };
240 for (auto it = dead_services_.begin(); it != dead_services_.end();) {
241 auto next_it = it;
242 next_it++;
243 if (is_unused(*it)) {
244 dead_services_.erase(it);
245 }
246 it = next_it;
247 }
248 }
249
250 // ----- End of TracingMuxerImpl::ProducerImpl methods.
251
252 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingBackendId backend_id,TracingSessionGlobalID session_id)253 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
254 BackendType backend_type,
255 TracingBackendId backend_id,
256 TracingSessionGlobalID session_id)
257 : muxer_(muxer),
258 backend_type_(backend_type),
259 backend_id_(backend_id),
260 session_id_(session_id) {}
261
262 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
263
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)264 void TracingMuxerImpl::ConsumerImpl::Initialize(
265 std::unique_ptr<ConsumerEndpoint> endpoint) {
266 PERFETTO_DCHECK_THREAD(thread_checker_);
267 service_ = std::move(endpoint);
268 // Don't try to use the service here since it may not have connected yet. See
269 // OnConnect().
270 }
271
OnConnect()272 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
273 PERFETTO_DCHECK_THREAD(thread_checker_);
274 PERFETTO_DCHECK(!connected_);
275 connected_ = true;
276
277 // Observe data source instance events so we get notified when tracing starts.
278 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
279 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
280
281 // If the API client configured and started tracing before we connected,
282 // tell the backend about it now.
283 if (trace_config_)
284 muxer_->SetupTracingSession(session_id_, trace_config_);
285 if (start_pending_)
286 muxer_->StartTracingSession(session_id_);
287 if (get_trace_stats_pending_) {
288 auto callback = std::move(get_trace_stats_callback_);
289 get_trace_stats_callback_ = nullptr;
290 muxer_->GetTraceStats(session_id_, std::move(callback));
291 }
292 if (query_service_state_callback_) {
293 auto callback = std::move(query_service_state_callback_);
294 query_service_state_callback_ = nullptr;
295 muxer_->QueryServiceState(session_id_, std::move(callback));
296 }
297 if (stop_pending_)
298 muxer_->StopTracingSession(session_id_);
299 }
300
OnDisconnect()301 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
302 PERFETTO_DCHECK_THREAD(thread_checker_);
303 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
304 if (!connected_ && backend_type_ == kSystemBackend) {
305 PERFETTO_ELOG(
306 "Unable to connect to the system tracing service as a consumer. On "
307 "Android, use the \"perfetto\" command line tool instead to start "
308 "system-wide tracing sessions");
309 }
310 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
311
312 // Notify the client about disconnection.
313 NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
314
315 // Make sure the client doesn't hang in a blocking start/stop because of the
316 // disconnection.
317 NotifyStartComplete();
318 NotifyStopComplete();
319
320 // It shouldn't be necessary to call StopTracingSession. If we get this call
321 // it means that the service did shutdown before us, so there is no point
322 // trying it to ask it to stop the session. We should just remember to cleanup
323 // the consumer vector.
324 connected_ = false;
325
326 // Notify the muxer that it is safe to destroy |this|. This is needed because
327 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
328 // access until OnDisconnect() is called.
329 muxer_->OnConsumerDisconnected(this);
330 }
331
Disconnect()332 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
333 // This is weird and deserves a comment.
334 //
335 // When we called the ConnectConsumer method on the service it returns
336 // us a ConsumerEndpoint which we stored in |service_|, however this
337 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
338 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
339 // the ConsumerImpl pointer has to be valid until the
340 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
341 // ConsumerEndpoint |service_|. Eventually this will call
342 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
343 // call the destructor of |this|.
344 service_.reset();
345 }
346
OnTracingDisabled(const std::string & error)347 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
348 const std::string& error) {
349 PERFETTO_DCHECK_THREAD(thread_checker_);
350 PERFETTO_DCHECK(!stopped_);
351 stopped_ = true;
352
353 if (!error.empty())
354 NotifyError(TracingError{TracingError::kTracingFailed, error});
355
356 // If we're still waiting for the start event, fire it now. This may happen if
357 // there are no active data sources in the session.
358 NotifyStartComplete();
359 NotifyStopComplete();
360 }
361
NotifyStartComplete()362 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
363 PERFETTO_DCHECK_THREAD(thread_checker_);
364 if (start_complete_callback_) {
365 muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
366 start_complete_callback_ = nullptr;
367 }
368 if (blocking_start_complete_callback_) {
369 muxer_->task_runner_->PostTask(
370 std::move(blocking_start_complete_callback_));
371 blocking_start_complete_callback_ = nullptr;
372 }
373 }
374
NotifyError(const TracingError & error)375 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
376 PERFETTO_DCHECK_THREAD(thread_checker_);
377 if (error_callback_) {
378 muxer_->task_runner_->PostTask(
379 std::bind(std::move(error_callback_), error));
380 }
381 }
382
NotifyStopComplete()383 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
384 PERFETTO_DCHECK_THREAD(thread_checker_);
385 if (stop_complete_callback_) {
386 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
387 stop_complete_callback_ = nullptr;
388 }
389 if (blocking_stop_complete_callback_) {
390 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
391 blocking_stop_complete_callback_ = nullptr;
392 }
393 }
394
OnTraceData(std::vector<TracePacket> packets,bool has_more)395 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
396 std::vector<TracePacket> packets,
397 bool has_more) {
398 PERFETTO_DCHECK_THREAD(thread_checker_);
399 if (!read_trace_callback_)
400 return;
401
402 size_t capacity = 0;
403 for (const auto& packet : packets) {
404 // 16 is an over-estimation of the proto preamble size
405 capacity += packet.size() + 16;
406 }
407
408 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
409 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
410 buf->reserve(capacity);
411 for (auto& packet : packets) {
412 char* start;
413 size_t size;
414 std::tie(start, size) = packet.GetProtoPreamble();
415 buf->insert(buf->end(), start, start + size);
416 for (auto& slice : packet.slices()) {
417 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
418 buf->insert(buf->end(), slice_data, slice_data + slice.size);
419 }
420 }
421
422 auto callback = read_trace_callback_;
423 muxer_->task_runner_->PostTask([callback, buf, has_more] {
424 TracingSession::ReadTraceCallbackArgs callback_arg{};
425 callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
426 callback_arg.size = buf->size();
427 callback_arg.has_more = has_more;
428 callback(callback_arg);
429 });
430
431 if (!has_more)
432 read_trace_callback_ = nullptr;
433 }
434
OnObservableEvents(const ObservableEvents & events)435 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
436 const ObservableEvents& events) {
437 if (events.instance_state_changes_size()) {
438 for (const auto& state_change : events.instance_state_changes()) {
439 DataSourceHandle handle{state_change.producer_name(),
440 state_change.data_source_name()};
441 data_source_states_[handle] =
442 state_change.state() ==
443 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
444 }
445 }
446
447 if (events.instance_state_changes_size() ||
448 events.all_data_sources_started()) {
449 // Data sources are first reported as being stopped before starting, so once
450 // all the data sources we know about have started we can declare tracing
451 // begun. In the case where there are no matching data sources for the
452 // session, the service will report the all_data_sources_started() event
453 // without adding any instances (only since Android S / Perfetto v10.0).
454 if (start_complete_callback_ || blocking_start_complete_callback_) {
455 bool all_data_sources_started = std::all_of(
456 data_source_states_.cbegin(), data_source_states_.cend(),
457 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
458 if (all_data_sources_started)
459 NotifyStartComplete();
460 }
461 }
462 }
463
OnTraceStats(bool success,const TraceStats & trace_stats)464 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
465 bool success,
466 const TraceStats& trace_stats) {
467 if (!get_trace_stats_callback_)
468 return;
469 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
470 callback_arg.success = success;
471 callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
472 muxer_->task_runner_->PostTask(
473 std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
474 get_trace_stats_callback_ = nullptr;
475 }
476
477 // The callbacks below are not used.
OnDetach(bool)478 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)479 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
480 // ----- End of TracingMuxerImpl::ConsumerImpl
481
482 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
483
484 // TracingSessionImpl is the RAII object returned to API clients when they
485 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
486 // tracing.
487
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)488 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
489 TracingMuxerImpl* muxer,
490 TracingSessionGlobalID session_id,
491 BackendType backend_type)
492 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
493
494 // Can be destroyed from any thread.
~TracingSessionImpl()495 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
496 auto* muxer = muxer_;
497 auto session_id = session_id_;
498 muxer->task_runner_->PostTask(
499 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
500 }
501
502 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)503 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
504 int fd) {
505 auto* muxer = muxer_;
506 auto session_id = session_id_;
507 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
508 if (fd >= 0) {
509 base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
510 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
511 if (backend_type_ != kInProcessBackend) {
512 PERFETTO_FATAL(
513 "Passing a file descriptor to TracingSession::Setup() is only "
514 "supported with the kInProcessBackend on Windows. Use "
515 "TracingSession::ReadTrace() instead");
516 }
517 #endif
518 trace_config->set_write_into_file(true);
519 fd = dup(fd);
520 }
521 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
522 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
523 });
524 }
525
526 // Can be called from any thread.
Start()527 void TracingMuxerImpl::TracingSessionImpl::Start() {
528 auto* muxer = muxer_;
529 auto session_id = session_id_;
530 muxer->task_runner_->PostTask(
531 [muxer, session_id] { muxer->StartTracingSession(session_id); });
532 }
533
534 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)535 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
536 const TraceConfig& cfg) {
537 auto* muxer = muxer_;
538 auto session_id = session_id_;
539 muxer->task_runner_->PostTask([muxer, session_id, cfg] {
540 muxer->ChangeTracingSessionConfig(session_id, cfg);
541 });
542 }
543
544 // Can be called from any thread except the service thread.
StartBlocking()545 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
546 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
547 auto* muxer = muxer_;
548 auto session_id = session_id_;
549 base::WaitableEvent tracing_started;
550 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
551 auto* consumer = muxer->FindConsumer(session_id);
552 if (!consumer) {
553 // TODO(skyostil): Signal an error to the user.
554 tracing_started.Notify();
555 return;
556 }
557 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
558 consumer->blocking_start_complete_callback_ = [&] {
559 tracing_started.Notify();
560 };
561 muxer->StartTracingSession(session_id);
562 });
563 tracing_started.Wait();
564 }
565
566 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)567 void TracingMuxerImpl::TracingSessionImpl::Flush(
568 std::function<void(bool)> user_callback,
569 uint32_t timeout_ms) {
570 auto* muxer = muxer_;
571 auto session_id = session_id_;
572 muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
573 auto* consumer = muxer->FindConsumer(session_id);
574 if (!consumer) {
575 std::move(user_callback)(false);
576 return;
577 }
578 muxer->FlushTracingSession(session_id, timeout_ms,
579 std::move(user_callback));
580 });
581 }
582
583 // Can be called from any thread.
Stop()584 void TracingMuxerImpl::TracingSessionImpl::Stop() {
585 auto* muxer = muxer_;
586 auto session_id = session_id_;
587 muxer->task_runner_->PostTask(
588 [muxer, session_id] { muxer->StopTracingSession(session_id); });
589 }
590
591 // Can be called from any thread except the service thread.
StopBlocking()592 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
593 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
594 auto* muxer = muxer_;
595 auto session_id = session_id_;
596 base::WaitableEvent tracing_stopped;
597 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
598 auto* consumer = muxer->FindConsumer(session_id);
599 if (!consumer) {
600 // TODO(skyostil): Signal an error to the user.
601 tracing_stopped.Notify();
602 return;
603 }
604 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
605 consumer->blocking_stop_complete_callback_ = [&] {
606 tracing_stopped.Notify();
607 };
608 muxer->StopTracingSession(session_id);
609 });
610 tracing_stopped.Wait();
611 }
612
613 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)614 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
615 auto* muxer = muxer_;
616 auto session_id = session_id_;
617 muxer->task_runner_->PostTask([muxer, session_id, cb] {
618 muxer->ReadTracingSessionData(session_id, std::move(cb));
619 });
620 }
621
622 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)623 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
624 std::function<void()> cb) {
625 auto* muxer = muxer_;
626 auto session_id = session_id_;
627 muxer->task_runner_->PostTask([muxer, session_id, cb] {
628 auto* consumer = muxer->FindConsumer(session_id);
629 if (!consumer)
630 return;
631 consumer->start_complete_callback_ = cb;
632 });
633 }
634
635 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)636 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
637 std::function<void(TracingError)> cb) {
638 auto* muxer = muxer_;
639 auto session_id = session_id_;
640 muxer->task_runner_->PostTask([muxer, session_id, cb] {
641 auto* consumer = muxer->FindConsumer(session_id);
642 if (!consumer) {
643 // Notify the client about concurrent disconnection of the session.
644 if (cb)
645 cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
646 return;
647 }
648 consumer->error_callback_ = cb;
649 });
650 }
651
652 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)653 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
654 std::function<void()> cb) {
655 auto* muxer = muxer_;
656 auto session_id = session_id_;
657 muxer->task_runner_->PostTask([muxer, session_id, cb] {
658 auto* consumer = muxer->FindConsumer(session_id);
659 if (!consumer)
660 return;
661 consumer->stop_complete_callback_ = cb;
662 });
663 }
664
665 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)666 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
667 GetTraceStatsCallback cb) {
668 auto* muxer = muxer_;
669 auto session_id = session_id_;
670 muxer->task_runner_->PostTask([muxer, session_id, cb] {
671 muxer->GetTraceStats(session_id, std::move(cb));
672 });
673 }
674
675 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)676 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
677 QueryServiceStateCallback cb) {
678 auto* muxer = muxer_;
679 auto session_id = session_id_;
680 muxer->task_runner_->PostTask([muxer, session_id, cb] {
681 muxer->QueryServiceState(session_id, std::move(cb));
682 });
683 }
684
685 // ----- End of TracingMuxerImpl::TracingSessionImpl
686
687 // static
688 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
689
690 // This is called by perfetto::Tracing::Initialize().
691 // Can be called on any thread. Typically, but not necessarily, that will be
692 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)693 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
694 : TracingMuxer(args.platform ? args.platform
695 : Platform::GetDefaultPlatform()) {
696 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
697 instance_ = this;
698
699 // Create the thread where muxer, producers and service will live.
700 task_runner_.reset(
701 new NonReentrantTaskRunner(this, platform_->CreateTaskRunner({})));
702
703 // Run the initializer on that thread.
704 task_runner_->PostTask([this, args] { Initialize(args); });
705 }
706
Initialize(const TracingInitArgs & args)707 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
708 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
709
710 policy_ = args.tracing_policy;
711
712 auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
713 if (!backend) {
714 // We skip the log in release builds because the *_backend_fake.cc code
715 // has already an ELOG before returning a nullptr.
716 PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
717 return;
718 }
719 TracingBackendId backend_id = backends_.size();
720 backends_.emplace_back();
721 RegisteredBackend& rb = backends_.back();
722 rb.backend = backend;
723 rb.id = backend_id;
724 rb.type = type;
725 rb.producer.reset(new ProducerImpl(this, backend_id,
726 args.shmem_batch_commits_duration_ms));
727 rb.producer_conn_args.producer = rb.producer.get();
728 rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
729 rb.producer_conn_args.task_runner = task_runner_.get();
730 rb.producer_conn_args.shmem_size_hint_bytes =
731 args.shmem_size_hint_kb * 1024;
732 rb.producer_conn_args.shmem_page_size_hint_bytes =
733 args.shmem_page_size_hint_kb * 1024;
734 rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
735 };
736
737 if (args.backends & kSystemBackend) {
738 PERFETTO_CHECK(args.system_backend_factory_);
739 add_backend(args.system_backend_factory_(), kSystemBackend);
740 }
741
742 if (args.backends & kInProcessBackend) {
743 PERFETTO_CHECK(args.in_process_backend_factory_);
744 add_backend(args.in_process_backend_factory_(), kInProcessBackend);
745 }
746
747 if (args.backends & kCustomBackend) {
748 PERFETTO_CHECK(args.custom_backend);
749 add_backend(args.custom_backend, kCustomBackend);
750 }
751
752 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
753 PERFETTO_FATAL("Unsupported tracing backend type");
754 }
755
756 // Fallback backend for consumer creation for an unsupported backend type.
757 // This backend simply fails any attempt to start a tracing session.
758 // NOTE: This backend instance has to be added last.
759 add_backend(internal::TracingBackendFake::GetInstance(),
760 BackendType::kUnspecifiedBackend);
761 }
762
763 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceStaticState * static_state)764 bool TracingMuxerImpl::RegisterDataSource(
765 const DataSourceDescriptor& descriptor,
766 DataSourceFactory factory,
767 DataSourceStaticState* static_state) {
768 // Ignore repeated registrations.
769 if (static_state->index != kMaxDataSources)
770 return true;
771
772 static std::atomic<uint32_t> last_id{};
773 uint32_t new_index = last_id++;
774 if (new_index >= kMaxDataSources) {
775 PERFETTO_DLOG(
776 "RegisterDataSource failed: too many data sources already registered");
777 return false;
778 }
779
780 // Initialize the static state.
781 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
782 "instances[] size mismatch");
783 for (size_t i = 0; i < static_state->instances.size(); i++)
784 new (&static_state->instances[i]) DataSourceState{};
785
786 static_state->index = new_index;
787
788 task_runner_->PostTask([this, descriptor, factory, static_state] {
789 data_sources_.emplace_back();
790 RegisteredDataSource& rds = data_sources_.back();
791 rds.descriptor = descriptor;
792 rds.factory = factory;
793 rds.static_state = static_state;
794 UpdateDataSourcesOnAllBackends();
795 });
796 return true;
797 }
798
799 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)800 void TracingMuxerImpl::RegisterInterceptor(
801 const InterceptorDescriptor& descriptor,
802 InterceptorFactory factory,
803 InterceptorBase::TLSFactory tls_factory,
804 InterceptorBase::TracePacketCallback packet_callback) {
805 task_runner_->PostTask(
806 [this, descriptor, factory, tls_factory, packet_callback] {
807 // Ignore repeated registrations.
808 for (const auto& interceptor : interceptors_) {
809 if (interceptor.descriptor.name() == descriptor.name()) {
810 PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
811 PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
812 return;
813 }
814 }
815 // Only allow certain interceptors for now.
816 if (descriptor.name() != "test_interceptor" &&
817 descriptor.name() != "console") {
818 PERFETTO_ELOG(
819 "Interceptors are experimental. If you want to use them, please "
820 "get in touch with the project maintainers "
821 "(https://perfetto.dev/docs/contributing/"
822 "getting-started#community).");
823 return;
824 }
825 interceptors_.emplace_back();
826 RegisteredInterceptor& interceptor = interceptors_.back();
827 interceptor.descriptor = descriptor;
828 interceptor.factory = factory;
829 interceptor.tls_factory = tls_factory;
830 interceptor.packet_callback = packet_callback;
831 });
832 }
833
834 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)835 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
836 uint32_t backend_connection_id,
837 DataSourceInstanceID instance_id,
838 const DataSourceConfig& cfg) {
839 PERFETTO_DCHECK_THREAD(thread_checker_);
840 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
841 cfg.name().c_str());
842 uint64_t config_hash = ComputeConfigHash(cfg);
843
844 for (const auto& rds : data_sources_) {
845 if (rds.descriptor.name() != cfg.name())
846 continue;
847 DataSourceStaticState& static_state = *rds.static_state;
848
849 // If this data source is already active for this exact config, don't start
850 // another instance. This happens when we have several data sources with the
851 // same name, in which case the service sends one SetupDataSource event for
852 // each one. Since we can't map which event maps to which data source, we
853 // ensure each event only starts one data source instance.
854 // TODO(skyostil): Register a unique id with each data source to the service
855 // to disambiguate.
856 bool active_for_config = false;
857 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
858 if (!static_state.TryGet(i))
859 continue;
860 auto* internal_state =
861 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
862 if (internal_state->backend_id == backend_id &&
863 internal_state->config_hash == config_hash) {
864 active_for_config = true;
865 break;
866 }
867 }
868 if (active_for_config) {
869 PERFETTO_DLOG(
870 "Data source %s is already active with this config, skipping",
871 cfg.name().c_str());
872 continue;
873 }
874
875 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
876 // Find a free slot.
877 if (static_state.TryGet(i))
878 continue;
879
880 auto* internal_state =
881 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
882 std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
883 static_assert(
884 std::is_same<decltype(internal_state->data_source_instance_id),
885 DataSourceInstanceID>::value,
886 "data_source_instance_id type mismatch");
887 internal_state->backend_id = backend_id;
888 internal_state->backend_connection_id = backend_connection_id;
889 internal_state->data_source_instance_id = instance_id;
890 internal_state->buffer_id =
891 static_cast<internal::BufferId>(cfg.target_buffer());
892 internal_state->config_hash = config_hash;
893 internal_state->data_source = rds.factory();
894 internal_state->interceptor = nullptr;
895 internal_state->interceptor_id = 0;
896
897 if (cfg.has_interceptor_config()) {
898 for (size_t j = 0; j < interceptors_.size(); j++) {
899 if (cfg.interceptor_config().name() ==
900 interceptors_[j].descriptor.name()) {
901 PERFETTO_DLOG("Intercepting data source %" PRIu64
902 " \"%s\" into \"%s\"",
903 instance_id, cfg.name().c_str(),
904 cfg.interceptor_config().name().c_str());
905 internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
906 internal_state->interceptor = interceptors_[j].factory();
907 internal_state->interceptor->OnSetup({cfg});
908 break;
909 }
910 }
911 if (!internal_state->interceptor_id) {
912 PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
913 cfg.interceptor_config().name().c_str());
914 }
915 }
916
917 // This must be made at the end. See matching acquire-load in
918 // DataSource::Trace().
919 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
920
921 DataSourceBase::SetupArgs setup_args;
922 setup_args.config = &cfg;
923 setup_args.internal_instance_index = i;
924 internal_state->data_source->OnSetup(setup_args);
925 return;
926 }
927 PERFETTO_ELOG(
928 "Maximum number of data source instances exhausted. "
929 "Dropping data source %" PRIu64,
930 instance_id);
931 break;
932 }
933 }
934
935 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)936 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
937 DataSourceInstanceID instance_id) {
938 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
939 PERFETTO_DCHECK_THREAD(thread_checker_);
940
941 auto ds = FindDataSource(backend_id, instance_id);
942 if (!ds) {
943 PERFETTO_ELOG("Could not find data source to start");
944 return;
945 }
946
947 DataSourceBase::StartArgs start_args{};
948 start_args.internal_instance_index = ds.instance_idx;
949
950 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
951 if (ds.internal_state->interceptor)
952 ds.internal_state->interceptor->OnStart({});
953 ds.internal_state->trace_lambda_enabled = true;
954 ds.internal_state->data_source->OnStart(start_args);
955 }
956
957 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)958 void TracingMuxerImpl::StopDataSource_AsyncBegin(
959 TracingBackendId backend_id,
960 DataSourceInstanceID instance_id) {
961 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
962 PERFETTO_DCHECK_THREAD(thread_checker_);
963
964 auto ds = FindDataSource(backend_id, instance_id);
965 if (!ds) {
966 PERFETTO_ELOG("Could not find data source to stop");
967 return;
968 }
969
970 StopArgsImpl stop_args{};
971 stop_args.internal_instance_index = ds.instance_idx;
972 stop_args.async_stop_closure = [this, backend_id, instance_id] {
973 // TracingMuxerImpl is long lived, capturing |this| is okay.
974 // The notification closure can be moved out of the StopArgs by the
975 // embedder to handle stop asynchronously. The embedder might then
976 // call the closure on a different thread than the current one, hence
977 // this nested PostTask().
978 task_runner_->PostTask([this, backend_id, instance_id] {
979 StopDataSource_AsyncEnd(backend_id, instance_id);
980 });
981 };
982
983 {
984 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
985 if (ds.internal_state->interceptor)
986 ds.internal_state->interceptor->OnStop({});
987 ds.internal_state->data_source->OnStop(stop_args);
988 }
989
990 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
991 // async closure here. In theory we could avoid the PostTask and call
992 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
993 // divergencies between the deferred-stop vs non-deferred-stop code paths.
994 if (stop_args.async_stop_closure)
995 std::move(stop_args.async_stop_closure)();
996 }
997
StopDataSource_AsyncEnd(TracingBackendId backend_id,DataSourceInstanceID instance_id)998 void TracingMuxerImpl::StopDataSource_AsyncEnd(
999 TracingBackendId backend_id,
1000 DataSourceInstanceID instance_id) {
1001 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1002 PERFETTO_DCHECK_THREAD(thread_checker_);
1003
1004 auto ds = FindDataSource(backend_id, instance_id);
1005 if (!ds) {
1006 PERFETTO_ELOG(
1007 "Async stop of data source %" PRIu64
1008 " failed. This might be due to calling the async_stop_closure twice.",
1009 instance_id);
1010 return;
1011 }
1012
1013 const uint32_t mask = ~(1 << ds.instance_idx);
1014 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1015
1016 // Take the mutex to prevent that the data source is in the middle of
1017 // a Trace() execution where it called GetDataSourceLocked() while we
1018 // destroy it.
1019 {
1020 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1021 ds.internal_state->trace_lambda_enabled = false;
1022 ds.internal_state->data_source.reset();
1023 }
1024
1025 // The other fields of internal_state are deliberately *not* cleared.
1026 // See races-related comments of DataSource::Trace().
1027
1028 TracingMuxer::generation_++;
1029
1030 // |backends_| is append-only, Backend instances are always valid.
1031 PERFETTO_CHECK(backend_id < backends_.size());
1032 ProducerImpl* producer = backends_[backend_id].producer.get();
1033 if (!producer)
1034 return;
1035 if (producer->connected_) {
1036 // Flush any commits that might have been batched by SharedMemoryArbiter.
1037 producer->service_->MaybeSharedMemoryArbiter()
1038 ->FlushPendingCommitDataRequests();
1039 producer->service_->NotifyDataSourceStopped(instance_id);
1040 }
1041 producer->SweepDeadServices();
1042 }
1043
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1044 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1045 TracingBackendId backend_id,
1046 DataSourceInstanceID instance_id) {
1047 PERFETTO_DCHECK_THREAD(thread_checker_);
1048 PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1049 instance_id);
1050 auto ds = FindDataSource(backend_id, instance_id);
1051 if (!ds) {
1052 PERFETTO_ELOG("Could not find data source to clear incremental state for");
1053 return;
1054 }
1055 // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1056 // the incremental state should be cleared.
1057 ds.static_state->incremental_state_generation.fetch_add(
1058 1, std::memory_order_relaxed);
1059 }
1060
SyncProducersForTesting()1061 void TracingMuxerImpl::SyncProducersForTesting() {
1062 std::mutex mutex;
1063 std::condition_variable cv;
1064
1065 // IPC-based producers don't report connection errors explicitly for each
1066 // command, but instead with an asynchronous callback
1067 // (ProducerImpl::OnDisconnected). This means that the sync command below
1068 // may have completed but failed to reach the service because of a
1069 // disconnection, but we can't tell until the disconnection message comes
1070 // through. To guard against this, we run two whole rounds of sync round-trips
1071 // before returning; the first one will detect any disconnected producers and
1072 // the second one will ensure any reconnections have completed and all data
1073 // sources are registered in the service again.
1074 for (size_t i = 0; i < 2; i++) {
1075 size_t countdown = std::numeric_limits<size_t>::max();
1076 task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1077 {
1078 std::unique_lock<std::mutex> countdown_lock(mutex);
1079 countdown = backends_.size();
1080 }
1081 for (auto& backend : backends_) {
1082 auto* producer = backend.producer.get();
1083 producer->service_->Sync([&mutex, &cv, &countdown] {
1084 std::unique_lock<std::mutex> countdown_lock(mutex);
1085 countdown--;
1086 cv.notify_one();
1087 });
1088 }
1089 });
1090
1091 {
1092 std::unique_lock<std::mutex> countdown_lock(mutex);
1093 cv.wait(countdown_lock, [&countdown] { return !countdown; });
1094 }
1095 }
1096
1097 // Check that all producers are indeed connected.
1098 bool done = false;
1099 bool all_producers_connected = true;
1100 task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1101 for (auto& backend : backends_)
1102 all_producers_connected &= backend.producer->connected_;
1103 std::unique_lock<std::mutex> lock(mutex);
1104 done = true;
1105 cv.notify_one();
1106 });
1107
1108 {
1109 std::unique_lock<std::mutex> lock(mutex);
1110 cv.wait(lock, [&done] { return done; });
1111 }
1112 PERFETTO_DCHECK(all_producers_connected);
1113 }
1114
DestroyStoppedTraceWritersForCurrentThread()1115 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1116 // Iterate across all possible data source types.
1117 auto cur_generation = generation_.load(std::memory_order_acquire);
1118 auto* root_tls = GetOrCreateTracingTLS();
1119
1120 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1121 // |tls| has a vector of per-data-source-instance thread-local state.
1122 DataSourceStaticState* static_state = tls.static_state;
1123 if (!static_state)
1124 return; // Slot not used.
1125
1126 // Iterate across all possible instances for this data source.
1127 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1128 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1129 if (!ds_tls.trace_writer)
1130 continue;
1131
1132 DataSourceState* ds_state = static_state->TryGet(inst);
1133 if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
1134 ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1135 ds_state->buffer_id == ds_tls.buffer_id &&
1136 ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1137 continue;
1138 }
1139
1140 // The DataSource instance has been destroyed or recycled.
1141 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
1142 }
1143 };
1144
1145 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1146 // |tls| has a vector of per-data-source-instance thread-local state.
1147 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1148 destroy_stopped_instances(tls);
1149 }
1150 destroy_stopped_instances(root_tls->track_event_tls);
1151 root_tls->generation = cur_generation;
1152 }
1153
1154 // Called both when a new data source is registered or when a new backend
1155 // connects. In both cases we want to be sure we reflected the data source
1156 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1157 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1158 PERFETTO_DCHECK_THREAD(thread_checker_);
1159 for (RegisteredDataSource& rds : data_sources_) {
1160 for (RegisteredBackend& backend : backends_) {
1161 // We cannot call RegisterDataSource on the backend before it connects.
1162 if (!backend.producer->connected_)
1163 continue;
1164
1165 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1166 if (backend.producer->registered_data_sources_.test(
1167 rds.static_state->index))
1168 continue;
1169
1170 rds.descriptor.set_will_notify_on_start(true);
1171 rds.descriptor.set_will_notify_on_stop(true);
1172 rds.descriptor.set_handles_incremental_state_clear(true);
1173 backend.producer->service_->RegisterDataSource(rds.descriptor);
1174 backend.producer->registered_data_sources_.set(rds.static_state->index);
1175 }
1176 }
1177 }
1178
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1179 void TracingMuxerImpl::SetupTracingSession(
1180 TracingSessionGlobalID session_id,
1181 const std::shared_ptr<TraceConfig>& trace_config,
1182 base::ScopedFile trace_fd) {
1183 PERFETTO_DCHECK_THREAD(thread_checker_);
1184 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1185
1186 auto* consumer = FindConsumer(session_id);
1187 if (!consumer)
1188 return;
1189
1190 consumer->trace_config_ = trace_config;
1191 if (trace_fd)
1192 consumer->trace_fd_ = std::move(trace_fd);
1193
1194 if (!consumer->connected_)
1195 return;
1196
1197 // Only used in the deferred start mode.
1198 if (trace_config->deferred_start()) {
1199 consumer->service_->EnableTracing(*trace_config,
1200 std::move(consumer->trace_fd_));
1201 }
1202 }
1203
StartTracingSession(TracingSessionGlobalID session_id)1204 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1205 PERFETTO_DCHECK_THREAD(thread_checker_);
1206
1207 auto* consumer = FindConsumer(session_id);
1208
1209 if (!consumer)
1210 return;
1211
1212 if (!consumer->trace_config_) {
1213 PERFETTO_ELOG("Must call Setup(config) first");
1214 return;
1215 }
1216
1217 if (!consumer->connected_) {
1218 consumer->start_pending_ = true;
1219 return;
1220 }
1221
1222 consumer->start_pending_ = false;
1223 if (consumer->trace_config_->deferred_start()) {
1224 consumer->service_->StartTracing();
1225 } else {
1226 consumer->service_->EnableTracing(*consumer->trace_config_,
1227 std::move(consumer->trace_fd_));
1228 }
1229
1230 // TODO implement support for the deferred-start + fast-triggering case.
1231 }
1232
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1233 void TracingMuxerImpl::ChangeTracingSessionConfig(
1234 TracingSessionGlobalID session_id,
1235 const TraceConfig& trace_config) {
1236 PERFETTO_DCHECK_THREAD(thread_checker_);
1237
1238 auto* consumer = FindConsumer(session_id);
1239
1240 if (!consumer)
1241 return;
1242
1243 if (!consumer->trace_config_) {
1244 // Changing the config is only supported for started sessions.
1245 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1246 return;
1247 }
1248
1249 consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1250 if (consumer->connected_)
1251 consumer->service_->ChangeTraceConfig(trace_config);
1252 }
1253
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1254 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
1255 uint32_t timeout_ms,
1256 std::function<void(bool)> callback) {
1257 PERFETTO_DCHECK_THREAD(thread_checker_);
1258 auto* consumer = FindConsumer(session_id);
1259 if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
1260 !consumer->trace_config_) {
1261 PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
1262 std::move(callback)(false);
1263 return;
1264 }
1265
1266 consumer->service_->Flush(timeout_ms, std::move(callback));
1267 }
1268
StopTracingSession(TracingSessionGlobalID session_id)1269 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
1270 PERFETTO_DCHECK_THREAD(thread_checker_);
1271 auto* consumer = FindConsumer(session_id);
1272 if (!consumer)
1273 return;
1274
1275 if (consumer->start_pending_) {
1276 // If the session hasn't started yet, wait until it does before stopping.
1277 consumer->stop_pending_ = true;
1278 return;
1279 }
1280
1281 consumer->stop_pending_ = false;
1282 if (consumer->stopped_) {
1283 // If the session was already stopped (e.g., it failed to start), don't try
1284 // stopping again.
1285 consumer->NotifyStopComplete();
1286 } else if (!consumer->trace_config_) {
1287 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1288 return;
1289 } else {
1290 consumer->service_->DisableTracing();
1291 }
1292
1293 consumer->trace_config_.reset();
1294 }
1295
DestroyTracingSession(TracingSessionGlobalID session_id)1296 void TracingMuxerImpl::DestroyTracingSession(
1297 TracingSessionGlobalID session_id) {
1298 PERFETTO_DCHECK_THREAD(thread_checker_);
1299 for (RegisteredBackend& backend : backends_) {
1300 // We need to find the consumer (if any) and call Disconnect as we destroy
1301 // the tracing session. We can't call Disconnect() inside this for loop
1302 // because in the in-process case this will end up to a synchronous call to
1303 // OnConsumerDisconnect which will invalidate all the iterators to
1304 // |backend.consumers|.
1305 ConsumerImpl* consumer = nullptr;
1306 for (auto& con : backend.consumers) {
1307 if (con->session_id_ == session_id) {
1308 consumer = con.get();
1309 break;
1310 }
1311 }
1312 if (consumer) {
1313 // We broke out of the loop above on the assumption that each backend will
1314 // only have a single consumer per session. This DCHECK ensures that
1315 // this is the case.
1316 PERFETTO_DCHECK(
1317 std::count_if(backend.consumers.begin(), backend.consumers.end(),
1318 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
1319 return con->session_id_ == session_id;
1320 }) == 1u);
1321 consumer->Disconnect();
1322 }
1323 }
1324 }
1325
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)1326 void TracingMuxerImpl::ReadTracingSessionData(
1327 TracingSessionGlobalID session_id,
1328 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
1329 PERFETTO_DCHECK_THREAD(thread_checker_);
1330 auto* consumer = FindConsumer(session_id);
1331 if (!consumer) {
1332 // TODO(skyostil): Signal an error to the user.
1333 TracingSession::ReadTraceCallbackArgs callback_arg{};
1334 callback(callback_arg);
1335 return;
1336 }
1337 PERFETTO_DCHECK(!consumer->read_trace_callback_);
1338 consumer->read_trace_callback_ = std::move(callback);
1339 consumer->service_->ReadBuffers();
1340 }
1341
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)1342 void TracingMuxerImpl::GetTraceStats(
1343 TracingSessionGlobalID session_id,
1344 TracingSession::GetTraceStatsCallback callback) {
1345 PERFETTO_DCHECK_THREAD(thread_checker_);
1346 auto* consumer = FindConsumer(session_id);
1347 if (!consumer) {
1348 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
1349 callback_arg.success = false;
1350 callback(std::move(callback_arg));
1351 return;
1352 }
1353 PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
1354 consumer->get_trace_stats_callback_ = std::move(callback);
1355 if (!consumer->connected_) {
1356 consumer->get_trace_stats_pending_ = true;
1357 return;
1358 }
1359 consumer->get_trace_stats_pending_ = false;
1360 consumer->service_->GetTraceStats();
1361 }
1362
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)1363 void TracingMuxerImpl::QueryServiceState(
1364 TracingSessionGlobalID session_id,
1365 TracingSession::QueryServiceStateCallback callback) {
1366 PERFETTO_DCHECK_THREAD(thread_checker_);
1367 auto* consumer = FindConsumer(session_id);
1368 if (!consumer) {
1369 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1370 callback_arg.success = false;
1371 callback(std::move(callback_arg));
1372 return;
1373 }
1374 PERFETTO_DCHECK(!consumer->query_service_state_callback_);
1375 if (!consumer->connected_) {
1376 consumer->query_service_state_callback_ = std::move(callback);
1377 return;
1378 }
1379 auto callback_wrapper = [callback](bool success,
1380 protos::gen::TracingServiceState state) {
1381 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
1382 callback_arg.success = success;
1383 callback_arg.service_state_data = state.SerializeAsArray();
1384 callback(std::move(callback_arg));
1385 };
1386 consumer->service_->QueryServiceState(std::move(callback_wrapper));
1387 }
1388
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)1389 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
1390 uint32_t batch_commits_duration_ms,
1391 BackendType backend_type) {
1392 for (RegisteredBackend& backend : backends_) {
1393 if (backend.producer && backend.producer->connected_ &&
1394 backend.type == backend_type) {
1395 backend.producer->service_->MaybeSharedMemoryArbiter()
1396 ->SetBatchCommitsDuration(batch_commits_duration_ms);
1397 }
1398 }
1399 }
1400
EnableDirectSMBPatchingForTesting(BackendType backend_type)1401 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
1402 BackendType backend_type) {
1403 for (RegisteredBackend& backend : backends_) {
1404 if (backend.producer && backend.producer->connected_ &&
1405 backend.type == backend_type &&
1406 !backend.producer->service_->MaybeSharedMemoryArbiter()
1407 ->EnableDirectSMBPatching()) {
1408 return false;
1409 }
1410 }
1411 return true;
1412 }
1413
FindConsumer(TracingSessionGlobalID session_id)1414 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
1415 TracingSessionGlobalID session_id) {
1416 PERFETTO_DCHECK_THREAD(thread_checker_);
1417 for (RegisteredBackend& backend : backends_) {
1418 for (auto& consumer : backend.consumers) {
1419 if (consumer->session_id_ == session_id) {
1420 return consumer.get();
1421 }
1422 }
1423 }
1424 return nullptr;
1425 }
1426
InitializeConsumer(TracingSessionGlobalID session_id)1427 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
1428 PERFETTO_DCHECK_THREAD(thread_checker_);
1429
1430 auto* consumer = FindConsumer(session_id);
1431 if (!consumer)
1432 return;
1433
1434 TracingBackendId backend_id = consumer->backend_id_;
1435 // |backends_| is append-only, Backend instances are always valid.
1436 PERFETTO_CHECK(backend_id < backends_.size());
1437 RegisteredBackend& backend = backends_[backend_id];
1438
1439 TracingBackend::ConnectConsumerArgs conn_args;
1440 conn_args.consumer = consumer;
1441 conn_args.task_runner = task_runner_.get();
1442 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
1443 }
1444
OnConsumerDisconnected(ConsumerImpl * consumer)1445 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
1446 PERFETTO_DCHECK_THREAD(thread_checker_);
1447 for (RegisteredBackend& backend : backends_) {
1448 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
1449 return con.get() == consumer;
1450 };
1451 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
1452 backend.consumers.end(), pred),
1453 backend.consumers.end());
1454 }
1455 }
1456
SetMaxProducerReconnectionsForTesting(uint32_t count)1457 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
1458 max_producer_reconnections_.store(count);
1459 }
1460
OnProducerDisconnected(ProducerImpl * producer)1461 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
1462 PERFETTO_DCHECK_THREAD(thread_checker_);
1463 for (RegisteredBackend& backend : backends_) {
1464 if (backend.producer.get() != producer)
1465 continue;
1466 // Try reconnecting the disconnected producer. If the connection succeeds,
1467 // all the data sources will be automatically re-registered.
1468 if (producer->connection_id_ > max_producer_reconnections_.load()) {
1469 // Avoid reconnecting a failing producer too many times. Instead we just
1470 // leak the producer instead of trying to avoid further complicating
1471 // cross-thread trace writer creation.
1472 PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
1473 continue;
1474 }
1475 backend.producer->Initialize(
1476 backend.backend->ConnectProducer(backend.producer_conn_args));
1477 }
1478
1479 // Increment the generation counter to atomically ensure that:
1480 // 1. Old trace writers from the severed connection eventually get cleaned up
1481 // by DestroyStoppedTraceWritersForCurrentThread().
1482 // 2. No new trace writers can be created for the SharedMemoryArbiter from the
1483 // old connection.
1484 TracingMuxer::generation_++;
1485 }
1486
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1487 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
1488 TracingBackendId backend_id,
1489 DataSourceInstanceID instance_id) {
1490 PERFETTO_DCHECK_THREAD(thread_checker_);
1491 for (const auto& rds : data_sources_) {
1492 DataSourceStaticState* static_state = rds.static_state;
1493 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1494 auto* internal_state = static_state->TryGet(i);
1495 if (internal_state && internal_state->backend_id == backend_id &&
1496 internal_state->data_source_instance_id == instance_id) {
1497 return FindDataSourceRes(static_state, internal_state, i);
1498 }
1499 }
1500 }
1501 return FindDataSourceRes();
1502 }
1503
1504 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)1505 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
1506 DataSourceStaticState* static_state,
1507 uint32_t data_source_instance_index,
1508 DataSourceState* data_source,
1509 BufferExhaustedPolicy buffer_exhausted_policy) {
1510 if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
1511 // If the session is being intercepted, return a heap-backed trace writer
1512 // instead. This is safe because all the data given to the interceptor is
1513 // either thread-local (|instance_index|), statically allocated
1514 // (|static_state|) or constant after initialization (|interceptor|). Access
1515 // to the interceptor instance itself through |data_source| is protected by
1516 // a statically allocated lock (similarly to the data source instance).
1517 auto& interceptor = interceptors_[data_source->interceptor_id - 1];
1518 return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
1519 interceptor.tls_factory(static_state, data_source_instance_index),
1520 interceptor.packet_callback, static_state, data_source_instance_index));
1521 }
1522 ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
1523 // Atomically load the current service endpoint. We keep the pointer as a
1524 // shared pointer on the stack to guard against it from being concurrently
1525 // modified on the thread by ProducerImpl::Initialize() swapping in a
1526 // reconnected service on the muxer task runner thread.
1527 //
1528 // The endpoint may also be concurrently modified by SweepDeadServices()
1529 // clearing out old disconnected services. We guard against that by
1530 // SharedMemoryArbiter keeping track of any outstanding trace writers. After
1531 // shutdown has started, the trace writer created below will be a null one
1532 // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
1533 //
1534 // We use an atomic pointer instead of holding a lock because
1535 // CreateTraceWriter posts tasks under the hood.
1536 std::shared_ptr<ProducerEndpoint> service =
1537 std::atomic_load(&producer->service_);
1538 return service->CreateTraceWriter(data_source->buffer_id,
1539 buffer_exhausted_policy);
1540 }
1541
1542 // This is called via the public API Tracing::NewTrace().
1543 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type)1544 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
1545 BackendType requested_backend_type) {
1546 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
1547
1548 // |backend_type| can only specify one backend, not an OR-ed mask.
1549 PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
1550
1551 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
1552 task_runner_->PostTask([this, requested_backend_type, session_id] {
1553 for (RegisteredBackend& backend : backends_) {
1554 if (requested_backend_type && backend.type &&
1555 backend.type != requested_backend_type) {
1556 continue;
1557 }
1558
1559 TracingBackendId backend_id = backend.id;
1560
1561 // Create the consumer now, even if we have to ask the embedder below, so
1562 // that any other tasks executing after this one can find the consumer and
1563 // change its pending attributes.
1564 backend.consumers.emplace_back(
1565 new ConsumerImpl(this, backend.type, backend.id, session_id));
1566
1567 // The last registered backend in |backends_| is the unsupported backend
1568 // without a valid type.
1569 if (!backend.type) {
1570 PERFETTO_ELOG(
1571 "No tracing backend ready for type=%d, consumer will disconnect",
1572 requested_backend_type);
1573 InitializeConsumer(session_id);
1574 return;
1575 }
1576
1577 // Check if the embedder wants to be asked for permission before
1578 // connecting the consumer.
1579 if (!policy_) {
1580 InitializeConsumer(session_id);
1581 return;
1582 }
1583
1584 TracingPolicy::ShouldAllowConsumerSessionArgs args;
1585 args.backend_type = backend.type;
1586 args.result_callback = [this, backend_id, session_id](bool allow) {
1587 task_runner_->PostTask([this, backend_id, session_id, allow] {
1588 if (allow) {
1589 InitializeConsumer(session_id);
1590 return;
1591 }
1592
1593 PERFETTO_ELOG(
1594 "Consumer session for backend type type=%d forbidden, "
1595 "consumer will disconnect",
1596 backends_[backend_id].type);
1597
1598 auto* consumer = FindConsumer(session_id);
1599 if (!consumer)
1600 return;
1601
1602 consumer->OnDisconnect();
1603 });
1604 };
1605 policy_->ShouldAllowConsumerSession(args);
1606 return;
1607 }
1608 PERFETTO_DFATAL("Not reached");
1609 });
1610
1611 return std::unique_ptr<TracingSession>(
1612 new TracingSessionImpl(this, session_id, requested_backend_type));
1613 }
1614
InitializeInstance(const TracingInitArgs & args)1615 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
1616 if (instance_ != TracingMuxerFake::Get())
1617 PERFETTO_FATAL("Tracing already initialized");
1618 new TracingMuxerImpl(args);
1619 }
1620
1621 TracingMuxer::~TracingMuxer() = default;
1622
1623 static_assert(std::is_same<internal::BufferId, BufferID>::value,
1624 "public's BufferId and tracing/core's BufferID diverged");
1625
1626 } // namespace internal
1627 } // namespace perfetto
1628