1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 19 20 #include <stddef.h> 21 #include <stdint.h> 22 23 #include <array> 24 #include <atomic> 25 #include <bitset> 26 #include <functional> 27 #include <list> 28 #include <map> 29 #include <memory> 30 #include <set> 31 #include <utility> 32 #include <vector> 33 34 #include "perfetto/base/time.h" 35 #include "perfetto/ext/base/scoped_file.h" 36 #include "perfetto/ext/base/thread_checker.h" 37 #include "perfetto/ext/tracing/core/basic_types.h" 38 #include "perfetto/ext/tracing/core/consumer.h" 39 #include "perfetto/ext/tracing/core/producer.h" 40 #include "perfetto/tracing/backend_type.h" 41 #include "perfetto/tracing/core/data_source_descriptor.h" 42 #include "perfetto/tracing/core/forward_decls.h" 43 #include "perfetto/tracing/core/trace_config.h" 44 #include "perfetto/tracing/internal/basic_types.h" 45 #include "perfetto/tracing/internal/tracing_muxer.h" 46 #include "perfetto/tracing/tracing.h" 47 48 #include "protos/perfetto/common/interceptor_descriptor.gen.h" 49 50 namespace perfetto { 51 52 class ConsumerEndpoint; 53 class DataSourceBase; 54 class ProducerEndpoint; 55 class TraceWriterBase; 56 class TracingBackend; 57 class TracingSession; 58 struct TracingInitArgs; 59 60 namespace base { 61 class TaskRunner; 62 } 63 64 namespace shlib { 65 void ResetForTesting(); 66 } 67 68 namespace test { 69 class TracingMuxerImplInternalsForTest; 70 } 71 72 namespace internal { 73 74 struct DataSourceStaticState; 75 76 // This class acts as a bridge between the public API and the TracingBackend(s). 77 // It exposes a simplified view of the world to the API methods handling all the 78 // bookkeeping to map data source instances and trace writers to the various 79 // backends. It deals with N data sources, M backends (1 backend == 1 tracing 80 // service == 1 producer connection) and T concurrent tracing sessions. 81 // 82 // Handing data source registration and start/stop flows [producer side]: 83 // ---------------------------------------------------------------------- 84 // 1. The API client subclasses perfetto::DataSource and calls 85 // DataSource::Register<MyDataSource>(). In turn this calls into the 86 // TracingMuxer. 87 // 2. The tracing muxer iterates through all the backends (1 backend == 1 88 // service == 1 producer connection) and registers the data source on each 89 // backend. 90 // 3. When any (services behind a) backend starts tracing and requests to start 91 // that specific data source, the TracingMuxerImpl constructs a new instance 92 // of MyDataSource and calls the OnStart() method. 93 // 94 // Controlling trace and retrieving trace data [consumer side]: 95 // ------------------------------------------------------------ 96 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession 97 // object. 98 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer 99 // subclasses the TracingSession object (TracingSessionImpl) and returns it. 100 // 3. The tracing muxer identifies the backend (according to the args passed to 101 // NewTrace), creates a new Consumer and connects to it. 102 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the 103 // TracingMuxer forwards them to the consumer associated to the 104 // TracingSession. Likewise for callbacks coming from the consumer-side of 105 // the service. 106 class TracingMuxerImpl : public TracingMuxer { 107 public: 108 // This is different than TracingSessionID because it's global across all 109 // backends. TracingSessionID is global only within the scope of one service. 110 using TracingSessionGlobalID = uint64_t; 111 112 struct RegisteredDataSource { 113 DataSourceDescriptor descriptor; 114 DataSourceFactory factory{}; 115 bool supports_multiple_instances = false; 116 bool requires_callbacks_under_lock = false; 117 bool no_flush = false; 118 DataSourceStaticState* static_state = nullptr; 119 }; 120 121 static void InitializeInstance(const TracingInitArgs&); 122 static void ResetForTesting(); 123 static void Shutdown(); 124 125 // TracingMuxer implementation. 126 bool RegisterDataSource(const DataSourceDescriptor&, 127 DataSourceFactory, 128 DataSourceParams, 129 bool no_flush, 130 DataSourceStaticState*) override; 131 void UpdateDataSourceDescriptor(const DataSourceDescriptor&, 132 const DataSourceStaticState*) override; 133 std::unique_ptr<TraceWriterBase> CreateTraceWriter( 134 DataSourceStaticState*, 135 uint32_t data_source_instance_index, 136 DataSourceState*, 137 BufferExhaustedPolicy buffer_exhausted_policy) override; 138 void DestroyStoppedTraceWritersForCurrentThread() override; 139 void RegisterInterceptor(const InterceptorDescriptor&, 140 InterceptorFactory, 141 InterceptorBase::TLSFactory, 142 InterceptorBase::TracePacketCallback) override; 143 144 void ActivateTriggers(const std::vector<std::string>&, uint32_t) override; 145 146 std::unique_ptr<TracingSession> CreateTracingSession( 147 BackendType, 148 TracingConsumerBackend* (*system_backend_factory)()); 149 std::unique_ptr<StartupTracingSession> CreateStartupTracingSession( 150 const TraceConfig& config, 151 Tracing::SetupStartupTracingOpts); 152 std::unique_ptr<StartupTracingSession> CreateStartupTracingSessionBlocking( 153 const TraceConfig& config, 154 Tracing::SetupStartupTracingOpts); 155 156 // Producer-side bookkeeping methods. 157 void UpdateDataSourcesOnAllBackends(); 158 void SetupDataSource(TracingBackendId, 159 uint32_t backend_connection_id, 160 DataSourceInstanceID, 161 const DataSourceConfig&); 162 void StartDataSource(TracingBackendId, DataSourceInstanceID); 163 void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID); 164 void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID); 165 void SyncProducersForTesting(); 166 167 // Consumer-side bookkeeping methods. 168 void SetupTracingSession(TracingSessionGlobalID, 169 const std::shared_ptr<TraceConfig>&, 170 base::ScopedFile trace_fd = base::ScopedFile()); 171 void StartTracingSession(TracingSessionGlobalID); 172 void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&); 173 void StopTracingSession(TracingSessionGlobalID); 174 void DestroyTracingSession(TracingSessionGlobalID); 175 void FlushTracingSession(TracingSessionGlobalID, 176 uint32_t, 177 std::function<void(bool)>); 178 void ReadTracingSessionData( 179 TracingSessionGlobalID, 180 std::function<void(TracingSession::ReadTraceCallbackArgs)>); 181 void GetTraceStats(TracingSessionGlobalID, 182 TracingSession::GetTraceStatsCallback); 183 void QueryServiceState(TracingSessionGlobalID, 184 TracingSession::QueryServiceStateCallback); 185 186 // Sets the batching period to |batch_commits_duration_ms| on the backends 187 // with type |backend_type|. 188 void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms, 189 BackendType backend_type); 190 191 // Enables direct SMB patching on the backends with type |backend_type| (see 192 // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the 193 // operation succeeded for all backends with type |backend_type|, false 194 // otherwise. 195 bool EnableDirectSMBPatchingForTesting(BackendType backend_type); 196 197 void SetMaxProducerReconnectionsForTesting(uint32_t count); 198 199 private: 200 friend class test::TracingMuxerImplInternalsForTest; 201 friend void shlib::ResetForTesting(); 202 203 // For each TracingBackend we create and register one ProducerImpl instance. 204 // This talks to the producer-side of the service, gets start/stop requests 205 // from it and routes them to the registered data sources. 206 // One ProducerImpl == one backend == one tracing service. 207 // This class is needed to disambiguate callbacks coming from different 208 // services. TracingMuxerImpl can't directly implement the Producer interface 209 // because the Producer virtual methods don't allow to identify the service. 210 class ProducerImpl : public Producer { 211 public: 212 ProducerImpl(TracingMuxerImpl*, 213 TracingBackendId, 214 uint32_t shmem_batch_commits_duration_ms, 215 bool shmem_direct_patching_enabled); 216 ~ProducerImpl() override; 217 218 void Initialize(std::unique_ptr<ProducerEndpoint> endpoint); 219 void RegisterDataSource(const DataSourceDescriptor&, 220 DataSourceFactory, 221 DataSourceStaticState*); 222 void DisposeConnection(); 223 224 // perfetto::Producer implementation. 225 void OnConnect() override; 226 void OnDisconnect() override; 227 void OnTracingSetup() override; 228 void OnStartupTracingSetup() override; 229 void SetupDataSource(DataSourceInstanceID, 230 const DataSourceConfig&) override; 231 void StartDataSource(DataSourceInstanceID, 232 const DataSourceConfig&) override; 233 void StopDataSource(DataSourceInstanceID) override; 234 void Flush(FlushRequestID, 235 const DataSourceInstanceID*, 236 size_t, 237 FlushFlags) override; 238 void ClearIncrementalState(const DataSourceInstanceID*, size_t) override; 239 240 bool SweepDeadServices(); 241 void SendOnConnectTriggers(); 242 void NotifyFlushForDataSourceDone(DataSourceInstanceID, FlushRequestID); 243 244 PERFETTO_THREAD_CHECKER(thread_checker_) 245 TracingMuxerImpl* muxer_; 246 TracingBackendId const backend_id_; 247 bool connected_ = false; 248 bool did_setup_tracing_ = false; 249 bool did_setup_startup_tracing_ = false; 250 std::atomic<uint32_t> connection_id_{0}; 251 uint16_t last_startup_target_buffer_reservation_ = 0; 252 bool is_producer_provided_smb_ = false; 253 bool producer_provided_smb_failed_ = false; 254 255 const uint32_t shmem_batch_commits_duration_ms_ = 0; 256 const bool shmem_direct_patching_enabled_ = false; 257 258 // Set of data sources that have been actually registered on this producer. 259 // This can be a subset of the global |data_sources_|, because data sources 260 // can register before the producer is fully connected. 261 std::bitset<kMaxDataSources> registered_data_sources_{}; 262 263 // A collection of disconnected service endpoints. Since trace writers on 264 // arbitrary threads might continue writing data to disconnected services, 265 // we keep the old services around and periodically try to clean up ones 266 // that no longer have any writers (see SweepDeadServices). 267 std::list<std::shared_ptr<ProducerEndpoint>> dead_services_; 268 269 // Triggers that should be sent when the service connects (trigger_name, 270 // expiration). 271 std::list<std::pair<std::string, base::TimeMillis>> on_connect_triggers_; 272 273 std::map<FlushRequestID, std::set<DataSourceInstanceID>> pending_flushes_; 274 275 // The currently active service endpoint is maintained as an atomic shared 276 // pointer so it won't get deleted from underneath threads that are creating 277 // trace writers. At any given time one endpoint can be shared (and thus 278 // kept alive) by the |service_| pointer, an entry in |dead_services_| and 279 // as a pointer on the stack in CreateTraceWriter() (on an arbitrary 280 // thread). The endpoint is never shared outside ProducerImpl itself. 281 // 282 // WARNING: Any *write* access to this variable or any *read* access from a 283 // non-muxer thread must be done through std::atomic_{load,store} to avoid 284 // data races. 285 std::shared_ptr<ProducerEndpoint> service_; // Keep last. 286 }; 287 288 // For each TracingSession created by the API client (Tracing::NewTrace() we 289 // create and register one ConsumerImpl instance. 290 // This talks to the consumer-side of the service, gets end-of-trace and 291 // on-trace-data callbacks and routes them to the API client callbacks. 292 // This class is needed to disambiguate callbacks coming from different 293 // tracing sessions. 294 class ConsumerImpl : public Consumer { 295 public: 296 ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID); 297 ~ConsumerImpl() override; 298 299 void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint); 300 301 // perfetto::Consumer implementation. 302 void OnConnect() override; 303 void OnDisconnect() override; 304 void OnTracingDisabled(const std::string& error) override; 305 void OnTraceData(std::vector<TracePacket>, bool has_more) override; 306 void OnDetach(bool success) override; 307 void OnAttach(bool success, const TraceConfig&) override; 308 void OnTraceStats(bool success, const TraceStats&) override; 309 void OnObservableEvents(const ObservableEvents&) override; 310 void OnSessionCloned(const OnSessionClonedArgs&) override; 311 312 void NotifyStartComplete(); 313 void NotifyError(const TracingError&); 314 void NotifyStopComplete(); 315 316 // Will eventually inform the |muxer_| when it is safe to remove |this|. 317 void Disconnect(); 318 319 TracingMuxerImpl* muxer_; 320 BackendType const backend_type_; 321 TracingSessionGlobalID const session_id_; 322 bool connected_ = false; 323 324 // This is to handle the case where the Setup call from the API client 325 // arrives before the consumer has connected. In this case we keep around 326 // the config and check if we have it after connection. 327 bool start_pending_ = false; 328 329 // Similarly if the session is stopped before the consumer was connected, we 330 // need to wait until the session has started before stopping it. 331 bool stop_pending_ = false; 332 333 // Similarly we need to buffer a call to get trace statistics if the 334 // consumer wasn't connected yet. 335 bool get_trace_stats_pending_ = false; 336 337 // Whether this session was already stopped. This will happen in response to 338 // Stop{,Blocking}, but also if the service stops the session for us 339 // automatically (e.g., when there are no data sources). 340 bool stopped_ = false; 341 342 // shared_ptr because it's posted across threads. This is to avoid copying 343 // it more than once. 344 std::shared_ptr<TraceConfig> trace_config_; 345 base::ScopedFile trace_fd_; 346 347 // If the API client passes a callback to start, we should invoke this when 348 // NotifyStartComplete() is invoked. 349 std::function<void()> start_complete_callback_; 350 351 // An internal callback used to implement StartBlocking(). 352 std::function<void()> blocking_start_complete_callback_; 353 354 // If the API client passes a callback to get notification about the 355 // errors, we should invoke this when NotifyError() is invoked. 356 std::function<void(TracingError)> error_callback_; 357 358 // If the API client passes a callback to stop, we should invoke this when 359 // OnTracingDisabled() is invoked. 360 std::function<void()> stop_complete_callback_; 361 362 // An internal callback used to implement StopBlocking(). 363 std::function<void()> blocking_stop_complete_callback_; 364 365 // Callback passed to ReadTrace(). 366 std::function<void(TracingSession::ReadTraceCallbackArgs)> 367 read_trace_callback_; 368 369 // Callback passed to GetTraceStats(). 370 TracingSession::GetTraceStatsCallback get_trace_stats_callback_; 371 372 // Callback for a pending call to QueryServiceState(). 373 TracingSession::QueryServiceStateCallback query_service_state_callback_; 374 375 // The states of all data sources in this tracing session. |true| means the 376 // data source has started tracing. 377 using DataSourceHandle = std::pair<std::string, std::string>; 378 std::map<DataSourceHandle, bool> data_source_states_; 379 380 std::unique_ptr<ConsumerEndpoint> service_; // Keep before last. 381 PERFETTO_THREAD_CHECKER(thread_checker_) // Keep last. 382 }; 383 384 // This object is returned to API clients when they call 385 // Tracing::CreateTracingSession(). 386 class TracingSessionImpl : public TracingSession { 387 public: 388 TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType); 389 ~TracingSessionImpl() override; 390 void Setup(const TraceConfig&, int fd) override; 391 void Start() override; 392 void StartBlocking() override; 393 void SetOnStartCallback(std::function<void()>) override; 394 void SetOnErrorCallback(std::function<void(TracingError)>) override; 395 void Stop() override; 396 void StopBlocking() override; 397 void Flush(std::function<void(bool)>, uint32_t timeout_ms) override; 398 void ReadTrace(ReadTraceCallback) override; 399 void SetOnStopCallback(std::function<void()>) override; 400 void GetTraceStats(GetTraceStatsCallback) override; 401 void QueryServiceState(QueryServiceStateCallback) override; 402 void ChangeTraceConfig(const TraceConfig&) override; 403 404 private: 405 TracingMuxerImpl* const muxer_; 406 TracingSessionGlobalID const session_id_; 407 BackendType const backend_type_; 408 }; 409 410 // This object is returned to API clients when they call 411 // Tracing::SetupStartupTracing(). 412 class StartupTracingSessionImpl : public StartupTracingSession { 413 public: 414 StartupTracingSessionImpl(TracingMuxerImpl*, 415 TracingSessionGlobalID, 416 BackendType); 417 ~StartupTracingSessionImpl() override; 418 void Abort() override; 419 void AbortBlocking() override; 420 421 private: 422 TracingMuxerImpl* const muxer_; 423 TracingSessionGlobalID const session_id_; 424 BackendType backend_type_; 425 }; 426 427 struct RegisteredInterceptor { 428 protos::gen::InterceptorDescriptor descriptor; 429 InterceptorFactory factory{}; 430 InterceptorBase::TLSFactory tls_factory{}; 431 InterceptorBase::TracePacketCallback packet_callback{}; 432 }; 433 434 struct RegisteredStartupSession { 435 TracingSessionID session_id = 0; 436 int num_unbound_data_sources = 0; 437 438 bool is_aborting = false; 439 int num_aborting_data_sources = 0; 440 441 std::function<void()> on_aborted; 442 std::function<void()> on_adopted; 443 }; 444 445 struct RegisteredProducerBackend { 446 // Backends are supposed to have static lifetime. 447 TracingProducerBackend* backend = nullptr; 448 TracingBackendId id = 0; 449 BackendType type{}; 450 451 TracingBackend::ConnectProducerArgs producer_conn_args; 452 std::unique_ptr<ProducerImpl> producer; 453 454 std::vector<RegisteredStartupSession> startup_sessions; 455 }; 456 457 struct RegisteredConsumerBackend { 458 // Backends are supposed to have static lifetime. 459 TracingConsumerBackend* backend = nullptr; 460 BackendType type{}; 461 // The calling code can request more than one concurrently active tracing 462 // session for the same backend. We need to create one consumer per session. 463 std::vector<std::unique_ptr<ConsumerImpl>> consumers; 464 }; 465 466 void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, 467 bool is_changed); 468 explicit TracingMuxerImpl(const TracingInitArgs&); 469 void Initialize(const TracingInitArgs& args); 470 void AddBackends(const TracingInitArgs& args); 471 void AddConsumerBackend(TracingConsumerBackend* backend, BackendType type); 472 void AddProducerBackend(TracingProducerBackend* backend, 473 BackendType type, 474 const TracingInitArgs& args); 475 ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id); 476 std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend( 477 TracingSessionGlobalID session_id); 478 RegisteredProducerBackend* FindProducerBackendById(TracingBackendId id); 479 RegisteredProducerBackend* FindProducerBackendByType(BackendType type); 480 RegisteredConsumerBackend* FindConsumerBackendByType(BackendType type); 481 void InitializeConsumer(TracingSessionGlobalID session_id); 482 void OnConsumerDisconnected(ConsumerImpl* consumer); 483 void OnProducerDisconnected(ProducerImpl* producer); 484 // Test only method. 485 void SweepDeadBackends(); 486 487 struct FindDataSourceRes { 488 FindDataSourceRes() = default; FindDataSourceResFindDataSourceRes489 FindDataSourceRes(DataSourceStaticState* a, 490 DataSourceState* b, 491 uint32_t c, 492 bool d) 493 : static_state(a), 494 internal_state(b), 495 instance_idx(c), 496 requires_callbacks_under_lock(d) {} 497 explicit operator bool() const { return !!internal_state; } 498 499 DataSourceStaticState* static_state = nullptr; 500 DataSourceState* internal_state = nullptr; 501 uint32_t instance_idx = 0; 502 bool requires_callbacks_under_lock = false; 503 }; 504 FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID); 505 506 FindDataSourceRes SetupDataSourceImpl( 507 const RegisteredDataSource&, 508 TracingBackendId, 509 uint32_t backend_connection_id, 510 DataSourceInstanceID, 511 const DataSourceConfig&, 512 TracingSessionGlobalID startup_session_id); 513 void StartDataSourceImpl(const FindDataSourceRes&); 514 void StopDataSource_AsyncBeginImpl(const FindDataSourceRes&); 515 void StopDataSource_AsyncEnd(TracingBackendId, 516 uint32_t backend_connection_id, 517 DataSourceInstanceID, 518 const FindDataSourceRes&); 519 bool FlushDataSource_AsyncBegin(TracingBackendId, 520 DataSourceInstanceID, 521 FlushRequestID, 522 FlushFlags); 523 void FlushDataSource_AsyncEnd(TracingBackendId, 524 uint32_t backend_connection_id, 525 DataSourceInstanceID, 526 const FindDataSourceRes&, 527 FlushRequestID); 528 void AbortStartupTracingSession(TracingSessionGlobalID, BackendType); 529 // When ResetForTesting() is executed, `cb` will be called on the calling 530 // thread and on the muxer thread. 531 void AppendResetForTestingCallback(std::function<void()> cb); 532 533 // WARNING: If you add new state here, be sure to update ResetForTesting. 534 std::unique_ptr<base::TaskRunner> task_runner_; 535 std::vector<RegisteredDataSource> data_sources_; 536 // These lists can only have one backend per BackendType. The elements are 537 // sorted by BackendType priority (see BackendTypePriority). They always 538 // contain a fake low-priority kUnspecifiedBackend at the end. 539 std::list<RegisteredProducerBackend> producer_backends_; 540 std::list<RegisteredConsumerBackend> consumer_backends_; 541 std::vector<RegisteredInterceptor> interceptors_; 542 TracingPolicy* policy_ = nullptr; 543 544 // Learn more at TracingInitArgs::supports_multiple_data_source_instances 545 bool supports_multiple_data_source_instances_ = true; 546 547 std::atomic<TracingSessionGlobalID> next_tracing_session_id_{}; 548 std::atomic<uint32_t> next_data_source_index_{}; 549 uint32_t muxer_id_for_testing_{}; 550 551 // Maximum number of times we will try to reconnect producer backend. 552 // Should only be modified for testing purposes. 553 std::atomic<uint32_t> max_producer_reconnections_{100u}; 554 555 // Test only member. 556 // After ResetForTesting() is called, holds tracing backends which needs to be 557 // kept alive until all inbound references have gone away. See 558 // SweepDeadBackends(). 559 std::list<RegisteredProducerBackend> dead_backends_; 560 561 // Test only member. 562 // Executes these cleanup functions on the calling thread and on the muxer 563 // thread when ResetForTesting() is called. 564 std::list<std::function<void()>> reset_callbacks_; 565 566 PERFETTO_THREAD_CHECKER(thread_checker_) 567 }; 568 569 } // namespace internal 570 } // namespace perfetto 571 572 #endif // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 573