1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 19 20 #include <stddef.h> 21 #include <stdint.h> 22 23 #include <array> 24 #include <atomic> 25 #include <bitset> 26 #include <list> 27 #include <map> 28 #include <memory> 29 #include <vector> 30 31 #include "perfetto/ext/base/scoped_file.h" 32 #include "perfetto/ext/base/thread_checker.h" 33 #include "perfetto/ext/tracing/core/basic_types.h" 34 #include "perfetto/ext/tracing/core/consumer.h" 35 #include "perfetto/ext/tracing/core/producer.h" 36 #include "perfetto/tracing/core/data_source_descriptor.h" 37 #include "perfetto/tracing/core/forward_decls.h" 38 #include "perfetto/tracing/core/trace_config.h" 39 #include "perfetto/tracing/internal/basic_types.h" 40 #include "perfetto/tracing/internal/tracing_muxer.h" 41 #include "perfetto/tracing/tracing.h" 42 43 #include "protos/perfetto/common/interceptor_descriptor.gen.h" 44 45 namespace perfetto { 46 47 class ConsumerEndpoint; 48 class DataSourceBase; 49 class ProducerEndpoint; 50 class TraceWriterBase; 51 class TracingBackend; 52 class TracingSession; 53 struct TracingInitArgs; 54 55 namespace base { 56 class TaskRunner; 57 } 58 59 namespace internal { 60 61 struct DataSourceStaticState; 62 63 // This class acts as a bridge between the public API and the TracingBackend(s). 64 // It exposes a simplified view of the world to the API methods handling all the 65 // bookkeeping to map data source instances and trace writers to the various 66 // backends. It deals with N data sources, M backends (1 backend == 1 tracing 67 // service == 1 producer connection) and T concurrent tracing sessions. 68 // 69 // Handing data source registration and start/stop flows [producer side]: 70 // ---------------------------------------------------------------------- 71 // 1. The API client subclasses perfetto::DataSource and calls 72 // DataSource::Register<MyDataSource>(). In turn this calls into the 73 // TracingMuxer. 74 // 2. The tracing muxer iterates through all the backends (1 backend == 1 75 // service == 1 producer connection) and registers the data source on each 76 // backend. 77 // 3. When any (services behind a) backend starts tracing and requests to start 78 // that specific data source, the TracingMuxerImpl constructs a new instance 79 // of MyDataSource and calls the OnStart() method. 80 // 81 // Controlling trace and retrieving trace data [consumer side]: 82 // ------------------------------------------------------------ 83 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession 84 // object. 85 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer 86 // subclasses the TracingSession object (TracingSessionImpl) and returns it. 87 // 3. The tracing muxer identifies the backend (according to the args passed to 88 // NewTrace), creates a new Consumer and connects to it. 89 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the 90 // TracingMuxer forwards them to the consumer associated to the 91 // TracingSession. Likewise for callbacks coming from the consumer-side of 92 // the service. 93 class TracingMuxerImpl : public TracingMuxer { 94 public: 95 // This is different than TracingSessionID because it's global across all 96 // backends. TracingSessionID is global only within the scope of one service. 97 using TracingSessionGlobalID = uint64_t; 98 99 static void InitializeInstance(const TracingInitArgs&); 100 static void ResetForTesting(); 101 102 // TracingMuxer implementation. 103 bool RegisterDataSource(const DataSourceDescriptor&, 104 DataSourceFactory, 105 DataSourceStaticState*) override; 106 void UpdateDataSourceDescriptor(const DataSourceDescriptor&, 107 const DataSourceStaticState*) override; 108 std::unique_ptr<TraceWriterBase> CreateTraceWriter( 109 DataSourceStaticState*, 110 uint32_t data_source_instance_index, 111 DataSourceState*, 112 BufferExhaustedPolicy buffer_exhausted_policy) override; 113 void DestroyStoppedTraceWritersForCurrentThread() override; 114 void RegisterInterceptor(const InterceptorDescriptor&, 115 InterceptorFactory, 116 InterceptorBase::TLSFactory, 117 InterceptorBase::TracePacketCallback) override; 118 119 std::unique_ptr<TracingSession> CreateTracingSession(BackendType); 120 121 // Producer-side bookkeeping methods. 122 void UpdateDataSourcesOnAllBackends(); 123 void SetupDataSource(TracingBackendId, 124 uint32_t backend_connection_id, 125 DataSourceInstanceID, 126 const DataSourceConfig&); 127 void StartDataSource(TracingBackendId, DataSourceInstanceID); 128 void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID); 129 void StopDataSource_AsyncEnd(TracingBackendId, DataSourceInstanceID); 130 void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID); 131 void SyncProducersForTesting(); 132 133 // Consumer-side bookkeeping methods. 134 void SetupTracingSession(TracingSessionGlobalID, 135 const std::shared_ptr<TraceConfig>&, 136 base::ScopedFile trace_fd = base::ScopedFile()); 137 void StartTracingSession(TracingSessionGlobalID); 138 void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&); 139 void StopTracingSession(TracingSessionGlobalID); 140 void DestroyTracingSession(TracingSessionGlobalID); 141 void FlushTracingSession(TracingSessionGlobalID, 142 uint32_t, 143 std::function<void(bool)>); 144 void ReadTracingSessionData( 145 TracingSessionGlobalID, 146 std::function<void(TracingSession::ReadTraceCallbackArgs)>); 147 void GetTraceStats(TracingSessionGlobalID, 148 TracingSession::GetTraceStatsCallback); 149 void QueryServiceState(TracingSessionGlobalID, 150 TracingSession::QueryServiceStateCallback); 151 152 // Sets the batching period to |batch_commits_duration_ms| on the backends 153 // with type |backend_type|. 154 void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms, 155 BackendType backend_type); 156 157 // Enables direct SMB patching on the backends with type |backend_type| (see 158 // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the 159 // operation succeeded for all backends with type |backend_type|, false 160 // otherwise. 161 bool EnableDirectSMBPatchingForTesting(BackendType backend_type); 162 163 void SetMaxProducerReconnectionsForTesting(uint32_t count); 164 165 private: 166 // For each TracingBackend we create and register one ProducerImpl instance. 167 // This talks to the producer-side of the service, gets start/stop requests 168 // from it and routes them to the registered data sources. 169 // One ProducerImpl == one backend == one tracing service. 170 // This class is needed to disambiguate callbacks coming from different 171 // services. TracingMuxerImpl can't directly implement the Producer interface 172 // because the Producer virtual methods don't allow to identify the service. 173 class ProducerImpl : public Producer { 174 public: 175 ProducerImpl(TracingMuxerImpl*, 176 TracingBackendId, 177 uint32_t shmem_batch_commits_duration_ms); 178 ~ProducerImpl() override; 179 180 void Initialize(std::unique_ptr<ProducerEndpoint> endpoint); 181 void RegisterDataSource(const DataSourceDescriptor&, 182 DataSourceFactory, 183 DataSourceStaticState*); 184 void DisposeConnection(); 185 186 // perfetto::Producer implementation. 187 void OnConnect() override; 188 void OnDisconnect() override; 189 void OnTracingSetup() override; 190 void SetupDataSource(DataSourceInstanceID, 191 const DataSourceConfig&) override; 192 void StartDataSource(DataSourceInstanceID, 193 const DataSourceConfig&) override; 194 void StopDataSource(DataSourceInstanceID) override; 195 void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override; 196 void ClearIncrementalState(const DataSourceInstanceID*, size_t) override; 197 198 bool SweepDeadServices(); 199 200 PERFETTO_THREAD_CHECKER(thread_checker_) 201 TracingMuxerImpl* muxer_; 202 TracingBackendId const backend_id_; 203 bool connected_ = false; 204 bool did_setup_tracing_ = false; 205 uint32_t connection_id_ = 0; 206 207 const uint32_t shmem_batch_commits_duration_ms_ = 0; 208 209 // Set of data sources that have been actually registered on this producer. 210 // This can be a subset of the global |data_sources_|, because data sources 211 // can register before the producer is fully connected. 212 std::bitset<kMaxDataSources> registered_data_sources_{}; 213 214 // A collection of disconnected service endpoints. Since trace writers on 215 // arbitrary threads might continue writing data to disconnected services, 216 // we keep the old services around and periodically try to clean up ones 217 // that no longer have any writers (see SweepDeadServices). 218 std::list<std::shared_ptr<ProducerEndpoint>> dead_services_; 219 220 // The currently active service endpoint is maintained as an atomic shared 221 // pointer so it won't get deleted from underneath threads that are creating 222 // trace writers. At any given time one endpoint can be shared (and thus 223 // kept alive) by the |service_| pointer, an entry in |dead_services_| and 224 // as a pointer on the stack in CreateTraceWriter() (on an arbitrary 225 // thread). The endpoint is never shared outside ProducerImpl itself. 226 // 227 // WARNING: Any *write* access to this variable or any *read* access from a 228 // non-muxer thread must be done through std::atomic_{load,store} to avoid 229 // data races. 230 std::shared_ptr<ProducerEndpoint> service_; // Keep last. 231 }; 232 233 // For each TracingSession created by the API client (Tracing::NewTrace() we 234 // create and register one ConsumerImpl instance. 235 // This talks to the consumer-side of the service, gets end-of-trace and 236 // on-trace-data callbacks and routes them to the API client callbacks. 237 // This class is needed to disambiguate callbacks coming from different 238 // tracing sessions. 239 class ConsumerImpl : public Consumer { 240 public: 241 ConsumerImpl(TracingMuxerImpl*, 242 BackendType, 243 TracingBackendId, 244 TracingSessionGlobalID); 245 ~ConsumerImpl() override; 246 247 void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint); 248 249 // perfetto::Consumer implementation. 250 void OnConnect() override; 251 void OnDisconnect() override; 252 void OnTracingDisabled(const std::string& error) override; 253 void OnTraceData(std::vector<TracePacket>, bool has_more) override; 254 void OnDetach(bool success) override; 255 void OnAttach(bool success, const TraceConfig&) override; 256 void OnTraceStats(bool success, const TraceStats&) override; 257 void OnObservableEvents(const ObservableEvents&) override; 258 259 void NotifyStartComplete(); 260 void NotifyError(const TracingError&); 261 void NotifyStopComplete(); 262 263 // Will eventually inform the |muxer_| when it is safe to remove |this|. 264 void Disconnect(); 265 266 TracingMuxerImpl* muxer_; 267 BackendType const backend_type_; 268 TracingBackendId const backend_id_; 269 TracingSessionGlobalID const session_id_; 270 bool connected_ = false; 271 272 // This is to handle the case where the Setup call from the API client 273 // arrives before the consumer has connected. In this case we keep around 274 // the config and check if we have it after connection. 275 bool start_pending_ = false; 276 277 // Similarly if the session is stopped before the consumer was connected, we 278 // need to wait until the session has started before stopping it. 279 bool stop_pending_ = false; 280 281 // Similarly we need to buffer a call to get trace statistics if the 282 // consumer wasn't connected yet. 283 bool get_trace_stats_pending_ = false; 284 285 // Whether this session was already stopped. This will happen in response to 286 // Stop{,Blocking}, but also if the service stops the session for us 287 // automatically (e.g., when there are no data sources). 288 bool stopped_ = false; 289 290 // shared_ptr because it's posted across threads. This is to avoid copying 291 // it more than once. 292 std::shared_ptr<TraceConfig> trace_config_; 293 base::ScopedFile trace_fd_; 294 295 // If the API client passes a callback to start, we should invoke this when 296 // NotifyStartComplete() is invoked. 297 std::function<void()> start_complete_callback_; 298 299 // An internal callback used to implement StartBlocking(). 300 std::function<void()> blocking_start_complete_callback_; 301 302 // If the API client passes a callback to get notification about the 303 // errors, we should invoke this when NotifyError() is invoked. 304 std::function<void(TracingError)> error_callback_; 305 306 // If the API client passes a callback to stop, we should invoke this when 307 // OnTracingDisabled() is invoked. 308 std::function<void()> stop_complete_callback_; 309 310 // An internal callback used to implement StopBlocking(). 311 std::function<void()> blocking_stop_complete_callback_; 312 313 // Callback passed to ReadTrace(). 314 std::function<void(TracingSession::ReadTraceCallbackArgs)> 315 read_trace_callback_; 316 317 // Callback passed to GetTraceStats(). 318 TracingSession::GetTraceStatsCallback get_trace_stats_callback_; 319 320 // Callback for a pending call to QueryServiceState(). 321 TracingSession::QueryServiceStateCallback query_service_state_callback_; 322 323 // The states of all data sources in this tracing session. |true| means the 324 // data source has started tracing. 325 using DataSourceHandle = std::pair<std::string, std::string>; 326 std::map<DataSourceHandle, bool> data_source_states_; 327 328 std::unique_ptr<ConsumerEndpoint> service_; // Keep before last. 329 PERFETTO_THREAD_CHECKER(thread_checker_) // Keep last. 330 }; 331 332 // This object is returned to API clients when they call 333 // Tracing::CreateTracingSession(). 334 class TracingSessionImpl : public TracingSession { 335 public: 336 TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType); 337 ~TracingSessionImpl() override; 338 void Setup(const TraceConfig&, int fd) override; 339 void Start() override; 340 void StartBlocking() override; 341 void SetOnStartCallback(std::function<void()>) override; 342 void SetOnErrorCallback(std::function<void(TracingError)>) override; 343 void Stop() override; 344 void StopBlocking() override; 345 void Flush(std::function<void(bool)>, uint32_t timeout_ms) override; 346 void ReadTrace(ReadTraceCallback) override; 347 void SetOnStopCallback(std::function<void()>) override; 348 void GetTraceStats(GetTraceStatsCallback) override; 349 void QueryServiceState(QueryServiceStateCallback) override; 350 void ChangeTraceConfig(const TraceConfig&) override; 351 352 private: 353 TracingMuxerImpl* const muxer_; 354 TracingSessionGlobalID const session_id_; 355 BackendType const backend_type_; 356 }; 357 358 struct RegisteredDataSource { 359 DataSourceDescriptor descriptor; 360 DataSourceFactory factory{}; 361 DataSourceStaticState* static_state = nullptr; 362 }; 363 364 struct RegisteredInterceptor { 365 protos::gen::InterceptorDescriptor descriptor; 366 InterceptorFactory factory{}; 367 InterceptorBase::TLSFactory tls_factory{}; 368 InterceptorBase::TracePacketCallback packet_callback{}; 369 }; 370 371 struct RegisteredBackend { 372 // Backends are supposed to have static lifetime. 373 TracingBackend* backend = nullptr; 374 TracingBackendId id = 0; 375 BackendType type{}; 376 377 TracingBackend::ConnectProducerArgs producer_conn_args; 378 std::unique_ptr<ProducerImpl> producer; 379 380 // The calling code can request more than one concurrently active tracing 381 // session for the same backend. We need to create one consumer per session. 382 std::vector<std::unique_ptr<ConsumerImpl>> consumers; 383 }; 384 385 void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, 386 bool is_changed); 387 explicit TracingMuxerImpl(const TracingInitArgs&); 388 void Initialize(const TracingInitArgs& args); 389 ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id); 390 void InitializeConsumer(TracingSessionGlobalID session_id); 391 void OnConsumerDisconnected(ConsumerImpl* consumer); 392 void OnProducerDisconnected(ProducerImpl* producer); 393 void SweepDeadBackends(); 394 395 struct FindDataSourceRes { 396 FindDataSourceRes() = default; FindDataSourceResFindDataSourceRes397 FindDataSourceRes(DataSourceStaticState* a, DataSourceState* b, uint32_t c) 398 : static_state(a), internal_state(b), instance_idx(c) {} 399 explicit operator bool() const { return !!internal_state; } 400 401 DataSourceStaticState* static_state = nullptr; 402 DataSourceState* internal_state = nullptr; 403 uint32_t instance_idx = 0; 404 }; 405 FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID); 406 407 // WARNING: If you add new state here, be sure to update ResetForTesting. 408 std::unique_ptr<base::TaskRunner> task_runner_; 409 std::vector<RegisteredDataSource> data_sources_; 410 std::vector<RegisteredBackend> backends_; 411 std::vector<RegisteredInterceptor> interceptors_; 412 TracingPolicy* policy_ = nullptr; 413 414 std::atomic<TracingSessionGlobalID> next_tracing_session_id_{}; 415 std::atomic<uint32_t> next_data_source_index_{}; 416 uint32_t muxer_id_for_testing_{}; 417 418 // Maximum number of times we will try to reconnect producer backend. 419 // Should only be modified for testing purposes. 420 std::atomic<uint32_t> max_producer_reconnections_{100u}; 421 422 // After ResetForTesting() is called, holds tracing backends which needs to be 423 // kept alive until all inbound references have gone away. See 424 // SweepDeadBackends(). 425 std::list<RegisteredBackend> dead_backends_; 426 427 PERFETTO_THREAD_CHECKER(thread_checker_) 428 }; 429 430 } // namespace internal 431 } // namespace perfetto 432 433 #endif // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ 434