1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 19 20 #include <stddef.h> 21 #include <stdint.h> 22 23 #include <array> 24 #include <atomic> 25 #include <bitset> 26 #include <functional> 27 #include <list> 28 #include <map> 29 #include <memory> 30 #include <set> 31 #include <utility> 32 #include <vector> 33 34 #include "perfetto/base/time.h" 35 #include "perfetto/ext/base/scoped_file.h" 36 #include "perfetto/ext/base/thread_checker.h" 37 #include "perfetto/ext/tracing/core/basic_types.h" 38 #include "perfetto/ext/tracing/core/consumer.h" 39 #include "perfetto/ext/tracing/core/producer.h" 40 #include "perfetto/tracing/backend_type.h" 41 #include "perfetto/tracing/core/data_source_descriptor.h" 42 #include "perfetto/tracing/core/forward_decls.h" 43 #include "perfetto/tracing/core/trace_config.h" 44 #include "perfetto/tracing/internal/basic_types.h" 45 #include "perfetto/tracing/internal/tracing_muxer.h" 46 #include "perfetto/tracing/tracing.h" 47 48 #include "protos/perfetto/common/interceptor_descriptor.gen.h" 49 50 namespace perfetto { 51 52 class ConsumerEndpoint; 53 class DataSourceBase; 54 class ProducerEndpoint; 55 class TraceWriterBase; 56 class TracingBackend; 57 class TracingSession; 58 struct TracingInitArgs; 59 60 namespace base { 61 class TaskRunner; 62 } 63 64 namespace shlib { 65 void ResetForTesting(); 66 } 67 68 namespace test { 69 class TracingMuxerImplInternalsForTest; 70 } 71 72 namespace internal { 73 74 struct DataSourceStaticState; 75 76 // This class acts as a bridge between the public API and the TracingBackend(s). 77 // It exposes a simplified view of the world to the API methods handling all the 78 // bookkeeping to map data source instances and trace writers to the various 79 // backends. It deals with N data sources, M backends (1 backend == 1 tracing 80 // service == 1 producer connection) and T concurrent tracing sessions. 81 // 82 // Handing data source registration and start/stop flows [producer side]: 83 // ---------------------------------------------------------------------- 84 // 1. The API client subclasses perfetto::DataSource and calls 85 // DataSource::Register<MyDataSource>(). In turn this calls into the 86 // TracingMuxer. 87 // 2. The tracing muxer iterates through all the backends (1 backend == 1 88 // service == 1 producer connection) and registers the data source on each 89 // backend. 90 // 3. When any (services behind a) backend starts tracing and requests to start 91 // that specific data source, the TracingMuxerImpl constructs a new instance 92 // of MyDataSource and calls the OnStart() method. 93 // 94 // Controlling trace and retrieving trace data [consumer side]: 95 // ------------------------------------------------------------ 96 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession 97 // object. 98 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer 99 // subclasses the TracingSession object (TracingSessionImpl) and returns it. 100 // 3. The tracing muxer identifies the backend (according to the args passed to 101 // NewTrace), creates a new Consumer and connects to it. 102 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the 103 // TracingMuxer forwards them to the consumer associated to the 104 // TracingSession. Likewise for callbacks coming from the consumer-side of 105 // the service. 106 class TracingMuxerImpl : public TracingMuxer { 107 public: 108 // This is different than TracingSessionID because it's global across all 109 // backends. TracingSessionID is global only within the scope of one service. 110 using TracingSessionGlobalID = uint64_t; 111 112 struct RegisteredDataSource { 113 DataSourceDescriptor descriptor; 114 DataSourceFactory factory{}; 115 bool supports_multiple_instances = false; 116 bool requires_callbacks_under_lock = false; 117 DataSourceStaticState* static_state = nullptr; 118 }; 119 120 static void InitializeInstance(const TracingInitArgs&); 121 static void ResetForTesting(); 122 static void Shutdown(); 123 124 // TracingMuxer implementation. 125 bool RegisterDataSource(const DataSourceDescriptor&, 126 DataSourceFactory, 127 DataSourceParams, 128 DataSourceStaticState*) override; 129 void UpdateDataSourceDescriptor(const DataSourceDescriptor&, 130 const DataSourceStaticState*) override; 131 std::unique_ptr<TraceWriterBase> CreateTraceWriter( 132 DataSourceStaticState*, 133 uint32_t data_source_instance_index, 134 DataSourceState*, 135 BufferExhaustedPolicy buffer_exhausted_policy) override; 136 void DestroyStoppedTraceWritersForCurrentThread() override; 137 void RegisterInterceptor(const InterceptorDescriptor&, 138 InterceptorFactory, 139 InterceptorBase::TLSFactory, 140 InterceptorBase::TracePacketCallback) override; 141 142 void ActivateTriggers(const std::vector<std::string>&, uint32_t) override; 143 144 std::unique_ptr<TracingSession> CreateTracingSession( 145 BackendType, 146 TracingConsumerBackend* (*system_backend_factory)()); 147 std::unique_ptr<StartupTracingSession> CreateStartupTracingSession( 148 const TraceConfig& config, 149 Tracing::SetupStartupTracingOpts); 150 std::unique_ptr<StartupTracingSession> CreateStartupTracingSessionBlocking( 151 const TraceConfig& config, 152 Tracing::SetupStartupTracingOpts); 153 154 // Producer-side bookkeeping methods. 155 void UpdateDataSourcesOnAllBackends(); 156 void SetupDataSource(TracingBackendId, 157 uint32_t backend_connection_id, 158 DataSourceInstanceID, 159 const DataSourceConfig&); 160 void StartDataSource(TracingBackendId, DataSourceInstanceID); 161 void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID); 162 void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID); 163 void SyncProducersForTesting(); 164 165 // Consumer-side bookkeeping methods. 166 void SetupTracingSession(TracingSessionGlobalID, 167 const std::shared_ptr<TraceConfig>&, 168 base::ScopedFile trace_fd = base::ScopedFile()); 169 void StartTracingSession(TracingSessionGlobalID); 170 void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&); 171 void StopTracingSession(TracingSessionGlobalID); 172 void DestroyTracingSession(TracingSessionGlobalID); 173 void FlushTracingSession(TracingSessionGlobalID, 174 uint32_t, 175 std::function<void(bool)>); 176 void ReadTracingSessionData( 177 TracingSessionGlobalID, 178 std::function<void(TracingSession::ReadTraceCallbackArgs)>); 179 void GetTraceStats(TracingSessionGlobalID, 180 TracingSession::GetTraceStatsCallback); 181 void QueryServiceState(TracingSessionGlobalID, 182 TracingSession::QueryServiceStateCallback); 183 184 // Sets the batching period to |batch_commits_duration_ms| on the backends 185 // with type |backend_type|. 186 void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms, 187 BackendType backend_type); 188 189 // Enables direct SMB patching on the backends with type |backend_type| (see 190 // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the 191 // operation succeeded for all backends with type |backend_type|, false 192 // otherwise. 193 bool EnableDirectSMBPatchingForTesting(BackendType backend_type); 194 195 void SetMaxProducerReconnectionsForTesting(uint32_t count); 196 197 private: 198 friend class test::TracingMuxerImplInternalsForTest; 199 friend void shlib::ResetForTesting(); 200 201 // For each TracingBackend we create and register one ProducerImpl instance. 202 // This talks to the producer-side of the service, gets start/stop requests 203 // from it and routes them to the registered data sources. 204 // One ProducerImpl == one backend == one tracing service. 205 // This class is needed to disambiguate callbacks coming from different 206 // services. TracingMuxerImpl can't directly implement the Producer interface 207 // because the Producer virtual methods don't allow to identify the service. 208 class ProducerImpl : public Producer { 209 public: 210 ProducerImpl(TracingMuxerImpl*, 211 TracingBackendId, 212 uint32_t shmem_batch_commits_duration_ms); 213 ~ProducerImpl() override; 214 215 void Initialize(std::unique_ptr<ProducerEndpoint> endpoint); 216 void RegisterDataSource(const DataSourceDescriptor&, 217 DataSourceFactory, 218 DataSourceStaticState*); 219 void DisposeConnection(); 220 221 // perfetto::Producer implementation. 222 void OnConnect() override; 223 void OnDisconnect() override; 224 void OnTracingSetup() override; 225 void OnStartupTracingSetup() override; 226 void SetupDataSource(DataSourceInstanceID, 227 const DataSourceConfig&) override; 228 void StartDataSource(DataSourceInstanceID, 229 const DataSourceConfig&) override; 230 void StopDataSource(DataSourceInstanceID) override; 231 void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override; 232 void ClearIncrementalState(const DataSourceInstanceID*, size_t) override; 233 234 bool SweepDeadServices(); 235 void SendOnConnectTriggers(); 236 void NotifyFlushForDataSourceDone(DataSourceInstanceID, FlushRequestID); 237 238 PERFETTO_THREAD_CHECKER(thread_checker_) 239 TracingMuxerImpl* muxer_; 240 TracingBackendId const backend_id_; 241 bool connected_ = false; 242 bool did_setup_tracing_ = false; 243 bool did_setup_startup_tracing_ = false; 244 std::atomic<uint32_t> connection_id_{0}; 245 uint16_t last_startup_target_buffer_reservation_ = 0; 246 bool is_producer_provided_smb_ = false; 247 bool producer_provided_smb_failed_ = false; 248 249 const uint32_t shmem_batch_commits_duration_ms_ = 0; 250 251 // Set of data sources that have been actually registered on this producer. 252 // This can be a subset of the global |data_sources_|, because data sources 253 // can register before the producer is fully connected. 254 std::bitset<kMaxDataSources> registered_data_sources_{}; 255 256 // A collection of disconnected service endpoints. Since trace writers on 257 // arbitrary threads might continue writing data to disconnected services, 258 // we keep the old services around and periodically try to clean up ones 259 // that no longer have any writers (see SweepDeadServices). 260 std::list<std::shared_ptr<ProducerEndpoint>> dead_services_; 261 262 // Triggers that should be sent when the service connects (trigger_name, 263 // expiration). 264 std::list<std::pair<std::string, base::TimeMillis>> on_connect_triggers_; 265 266 std::map<FlushRequestID, std::set<DataSourceInstanceID>> pending_flushes_; 267 268 // The currently active service endpoint is maintained as an atomic shared 269 // pointer so it won't get deleted from underneath threads that are creating 270 // trace writers. At any given time one endpoint can be shared (and thus 271 // kept alive) by the |service_| pointer, an entry in |dead_services_| and 272 // as a pointer on the stack in CreateTraceWriter() (on an arbitrary 273 // thread). The endpoint is never shared outside ProducerImpl itself. 274 // 275 // WARNING: Any *write* access to this variable or any *read* access from a 276 // non-muxer thread must be done through std::atomic_{load,store} to avoid 277 // data races. 278 std::shared_ptr<ProducerEndpoint> service_; // Keep last. 279 }; 280 281 // For each TracingSession created by the API client (Tracing::NewTrace() we 282 // create and register one ConsumerImpl instance. 283 // This talks to the consumer-side of the service, gets end-of-trace and 284 // on-trace-data callbacks and routes them to the API client callbacks. 285 // This class is needed to disambiguate callbacks coming from different 286 // tracing sessions. 287 class ConsumerImpl : public Consumer { 288 public: 289 ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID); 290 ~ConsumerImpl() override; 291 292 void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint); 293 294 // perfetto::Consumer implementation. 295 void OnConnect() override; 296 void OnDisconnect() override; 297 void OnTracingDisabled(const std::string& error) override; 298 void OnTraceData(std::vector<TracePacket>, bool has_more) override; 299 void OnDetach(bool success) override; 300 void OnAttach(bool success, const TraceConfig&) override; 301 void OnTraceStats(bool success, const TraceStats&) override; 302 void OnObservableEvents(const ObservableEvents&) override; 303 void OnSessionCloned(const OnSessionClonedArgs&) override; 304 305 void NotifyStartComplete(); 306 void NotifyError(const TracingError&); 307 void NotifyStopComplete(); 308 309 // Will eventually inform the |muxer_| when it is safe to remove |this|. 310 void Disconnect(); 311 312 TracingMuxerImpl* muxer_; 313 BackendType const backend_type_; 314 TracingSessionGlobalID const session_id_; 315 bool connected_ = false; 316 317 // This is to handle the case where the Setup call from the API client 318 // arrives before the consumer has connected. In this case we keep around 319 // the config and check if we have it after connection. 320 bool start_pending_ = false; 321 322 // Similarly if the session is stopped before the consumer was connected, we 323 // need to wait until the session has started before stopping it. 324 bool stop_pending_ = false; 325 326 // Similarly we need to buffer a call to get trace statistics if the 327 // consumer wasn't connected yet. 328 bool get_trace_stats_pending_ = false; 329 330 // Whether this session was already stopped. This will happen in response to 331 // Stop{,Blocking}, but also if the service stops the session for us 332 // automatically (e.g., when there are no data sources). 333 bool stopped_ = false; 334 335 // shared_ptr because it's posted across threads. This is to avoid copying 336 // it more than once. 337 std::shared_ptr<TraceConfig> trace_config_; 338 base::ScopedFile trace_fd_; 339 340 // If the API client passes a callback to start, we should invoke this when 341 // NotifyStartComplete() is invoked. 342 std::function<void()> start_complete_callback_; 343 344 // An internal callback used to implement StartBlocking(). 345 std::function<void()> blocking_start_complete_callback_; 346 347 // If the API client passes a callback to get notification about the 348 // errors, we should invoke this when NotifyError() is invoked. 349 std::function<void(TracingError)> error_callback_; 350 351 // If the API client passes a callback to stop, we should invoke this when 352 // OnTracingDisabled() is invoked. 353 std::function<void()> stop_complete_callback_; 354 355 // An internal callback used to implement StopBlocking(). 356 std::function<void()> blocking_stop_complete_callback_; 357 358 // Callback passed to ReadTrace(). 359 std::function<void(TracingSession::ReadTraceCallbackArgs)> 360 read_trace_callback_; 361 362 // Callback passed to GetTraceStats(). 363 TracingSession::GetTraceStatsCallback get_trace_stats_callback_; 364 365 // Callback for a pending call to QueryServiceState(). 366 TracingSession::QueryServiceStateCallback query_service_state_callback_; 367 368 // The states of all data sources in this tracing session. |true| means the 369 // data source has started tracing. 370 using DataSourceHandle = std::pair<std::string, std::string>; 371 std::map<DataSourceHandle, bool> data_source_states_; 372 373 std::unique_ptr<ConsumerEndpoint> service_; // Keep before last. 374 PERFETTO_THREAD_CHECKER(thread_checker_) // Keep last. 375 }; 376 377 // This object is returned to API clients when they call 378 // Tracing::CreateTracingSession(). 379 class TracingSessionImpl : public TracingSession { 380 public: 381 TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType); 382 ~TracingSessionImpl() override; 383 void Setup(const TraceConfig&, int fd) override; 384 void Start() override; 385 void StartBlocking() override; 386 void SetOnStartCallback(std::function<void()>) override; 387 void SetOnErrorCallback(std::function<void(TracingError)>) override; 388 void Stop() override; 389 void StopBlocking() override; 390 void Flush(std::function<void(bool)>, uint32_t timeout_ms) override; 391 void ReadTrace(ReadTraceCallback) override; 392 void SetOnStopCallback(std::function<void()>) override; 393 void GetTraceStats(GetTraceStatsCallback) override; 394 void QueryServiceState(QueryServiceStateCallback) override; 395 void ChangeTraceConfig(const TraceConfig&) override; 396 397 private: 398 TracingMuxerImpl* const muxer_; 399 TracingSessionGlobalID const session_id_; 400 BackendType const backend_type_; 401 }; 402 403 // This object is returned to API clients when they call 404 // Tracing::SetupStartupTracing(). 405 class StartupTracingSessionImpl : public StartupTracingSession { 406 public: 407 StartupTracingSessionImpl(TracingMuxerImpl*, 408 TracingSessionGlobalID, 409 BackendType); 410 ~StartupTracingSessionImpl() override; 411 void Abort() override; 412 void AbortBlocking() override; 413 414 private: 415 TracingMuxerImpl* const muxer_; 416 TracingSessionGlobalID const session_id_; 417 BackendType backend_type_; 418 }; 419 420 struct RegisteredInterceptor { 421 protos::gen::InterceptorDescriptor descriptor; 422 InterceptorFactory factory{}; 423 InterceptorBase::TLSFactory tls_factory{}; 424 InterceptorBase::TracePacketCallback packet_callback{}; 425 }; 426 427 struct RegisteredStartupSession { 428 TracingSessionID session_id = 0; 429 int num_unbound_data_sources = 0; 430 431 bool is_aborting = false; 432 int num_aborting_data_sources = 0; 433 434 std::function<void()> on_aborted; 435 std::function<void()> on_adopted; 436 }; 437 438 struct RegisteredProducerBackend { 439 // Backends are supposed to have static lifetime. 440 TracingProducerBackend* backend = nullptr; 441 TracingBackendId id = 0; 442 BackendType type{}; 443 444 TracingBackend::ConnectProducerArgs producer_conn_args; 445 std::unique_ptr<ProducerImpl> producer; 446 447 std::vector<RegisteredStartupSession> startup_sessions; 448 }; 449 450 struct RegisteredConsumerBackend { 451 // Backends are supposed to have static lifetime. 452 TracingConsumerBackend* backend = nullptr; 453 BackendType type{}; 454 // The calling code can request more than one concurrently active tracing 455 // session for the same backend. We need to create one consumer per session. 456 std::vector<std::unique_ptr<ConsumerImpl>> consumers; 457 }; 458 459 void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, 460 bool is_changed); 461 explicit TracingMuxerImpl(const TracingInitArgs&); 462 void Initialize(const TracingInitArgs& args); 463 void AddBackends(const TracingInitArgs& args); 464 void AddConsumerBackend(TracingConsumerBackend* backend, BackendType type); 465 void AddProducerBackend(TracingProducerBackend* backend, 466 BackendType type, 467 const TracingInitArgs& args); 468 ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id); 469 std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend( 470 TracingSessionGlobalID session_id); 471 RegisteredProducerBackend* FindProducerBackendById(TracingBackendId id); 472 RegisteredProducerBackend* FindProducerBackendByType(BackendType type); 473 RegisteredConsumerBackend* FindConsumerBackendByType(BackendType type); 474 void InitializeConsumer(TracingSessionGlobalID session_id); 475 void OnConsumerDisconnected(ConsumerImpl* consumer); 476 void OnProducerDisconnected(ProducerImpl* producer); 477 // Test only method. 478 void SweepDeadBackends(); 479 480 struct FindDataSourceRes { 481 FindDataSourceRes() = default; FindDataSourceResFindDataSourceRes482 FindDataSourceRes(DataSourceStaticState* a, 483 DataSourceState* b, 484 uint32_t c, 485 bool d) 486 : static_state(a), 487 internal_state(b), 488 instance_idx(c), 489 requires_callbacks_under_lock(d) {} 490 explicit operator bool() const { return !!internal_state; } 491 492 DataSourceStaticState* static_state = nullptr; 493 DataSourceState* internal_state = nullptr; 494 uint32_t instance_idx = 0; 495 bool requires_callbacks_under_lock = false; 496 }; 497 FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID); 498 499 FindDataSourceRes SetupDataSourceImpl( 500 const RegisteredDataSource&, 501 TracingBackendId, 502 uint32_t backend_connection_id, 503 DataSourceInstanceID, 504 const DataSourceConfig&, 505 uint64_t config_hash, 506 uint64_t startup_config_hash, 507 TracingSessionGlobalID startup_session_id); 508 void StartDataSourceImpl(const FindDataSourceRes&); 509 void StopDataSource_AsyncBeginImpl(const FindDataSourceRes&); 510 void StopDataSource_AsyncEnd(TracingBackendId, 511 uint32_t backend_connection_id, 512 DataSourceInstanceID, 513 const FindDataSourceRes&); 514 bool FlushDataSource_AsyncBegin(TracingBackendId, 515 DataSourceInstanceID, 516 FlushRequestID); 517 void FlushDataSource_AsyncEnd(TracingBackendId, 518 uint32_t backend_connection_id, 519 DataSourceInstanceID, 520 const FindDataSourceRes&, 521 FlushRequestID); 522 void AbortStartupTracingSession(TracingSessionGlobalID, BackendType); 523 // When ResetForTesting() is executed, `cb` will be called on the calling 524 // thread and on the muxer thread. 525 void AppendResetForTestingCallback(std::function<void()> cb); 526 527 // WARNING: If you add new state here, be sure to update ResetForTesting. 528 std::unique_ptr<base::TaskRunner> task_runner_; 529 std::vector<RegisteredDataSource> data_sources_; 530 // These lists can only have one backend per BackendType. The elements are 531 // sorted by BackendType priority (see BackendTypePriority). They always 532 // contain a fake low-priority kUnspecifiedBackend at the end. 533 std::list<RegisteredProducerBackend> producer_backends_; 534 std::list<RegisteredConsumerBackend> consumer_backends_; 535 std::vector<RegisteredInterceptor> interceptors_; 536 TracingPolicy* policy_ = nullptr; 537 538 // Learn more at TracingInitArgs::supports_multiple_data_source_instances 539 bool supports_multiple_data_source_instances_ = true; 540 541 std::atomic<TracingSessionGlobalID> next_tracing_session_id_{}; 542 std::atomic<uint32_t> next_data_source_index_{}; 543 uint32_t muxer_id_for_testing_{}; 544 545 // Maximum number of times we will try to reconnect producer backend. 546 // Should only be modified for testing purposes. 547 std::atomic<uint32_t> max_producer_reconnections_{100u}; 548 549 // Test only member. 550 // After ResetForTesting() is called, holds tracing backends which needs to be 551 // kept alive until all inbound references have gone away. See 552 // SweepDeadBackends(). 553 std::list<RegisteredProducerBackend> dead_backends_; 554 555 // Test only member. 556 // Executes these cleanup functions on the calling thread and on the muxer 557 // thread when ResetForTesting() is called. 558 std::list<std::function<void()>> reset_callbacks_; 559 560 PERFETTO_THREAD_CHECKER(thread_checker_) 561 }; 562 563 } // namespace internal 564 } // namespace perfetto 565 566 #endif // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 567