• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
19 
20 #include <stddef.h>
21 #include <stdint.h>
22 
23 #include <array>
24 #include <atomic>
25 #include <bitset>
26 #include <functional>
27 #include <list>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <utility>
32 #include <vector>
33 
34 #include "perfetto/base/time.h"
35 #include "perfetto/ext/base/scoped_file.h"
36 #include "perfetto/ext/base/thread_checker.h"
37 #include "perfetto/ext/tracing/core/basic_types.h"
38 #include "perfetto/ext/tracing/core/consumer.h"
39 #include "perfetto/ext/tracing/core/producer.h"
40 #include "perfetto/tracing/backend_type.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "perfetto/tracing/core/forward_decls.h"
43 #include "perfetto/tracing/core/trace_config.h"
44 #include "perfetto/tracing/internal/basic_types.h"
45 #include "perfetto/tracing/internal/tracing_muxer.h"
46 #include "perfetto/tracing/tracing.h"
47 
48 #include "protos/perfetto/common/interceptor_descriptor.gen.h"
49 
50 namespace perfetto {
51 
52 class ConsumerEndpoint;
53 class DataSourceBase;
54 class ProducerEndpoint;
55 class TraceWriterBase;
56 class TracingBackend;
57 class TracingSession;
58 struct TracingInitArgs;
59 
60 namespace base {
61 class TaskRunner;
62 }
63 
64 namespace shlib {
65 void ResetForTesting();
66 }
67 
68 namespace test {
69 class TracingMuxerImplInternalsForTest;
70 }
71 
72 namespace internal {
73 
74 struct DataSourceStaticState;
75 
76 // This class acts as a bridge between the public API and the TracingBackend(s).
77 // It exposes a simplified view of the world to the API methods handling all the
78 // bookkeeping to map data source instances and trace writers to the various
79 // backends. It deals with N data sources, M backends (1 backend == 1 tracing
80 // service == 1 producer connection) and T concurrent tracing sessions.
81 //
82 // Handing data source registration and start/stop flows [producer side]:
83 // ----------------------------------------------------------------------
84 // 1. The API client subclasses perfetto::DataSource and calls
85 //    DataSource::Register<MyDataSource>(). In turn this calls into the
86 //    TracingMuxer.
87 // 2. The tracing muxer iterates through all the backends (1 backend == 1
88 //    service == 1 producer connection) and registers the data source on each
89 //    backend.
90 // 3. When any (services behind a) backend starts tracing and requests to start
91 //    that specific data source, the TracingMuxerImpl constructs a new instance
92 //    of MyDataSource and calls the OnStart() method.
93 //
94 // Controlling trace and retrieving trace data [consumer side]:
95 // ------------------------------------------------------------
96 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession
97 //    object.
98 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer
99 //    subclasses the TracingSession object (TracingSessionImpl) and returns it.
100 // 3. The tracing muxer identifies the backend (according to the args passed to
101 //    NewTrace), creates a new Consumer and connects to it.
102 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the
103 //    TracingMuxer forwards them to the consumer associated to the
104 //    TracingSession. Likewise for callbacks coming from the consumer-side of
105 //    the service.
106 class TracingMuxerImpl : public TracingMuxer {
107  public:
108   // This is different than TracingSessionID because it's global across all
109   // backends. TracingSessionID is global only within the scope of one service.
110   using TracingSessionGlobalID = uint64_t;
111 
112   struct RegisteredDataSource {
113     DataSourceDescriptor descriptor;
114     DataSourceFactory factory{};
115     bool supports_multiple_instances = false;
116     bool requires_callbacks_under_lock = false;
117     DataSourceStaticState* static_state = nullptr;
118   };
119 
120   static void InitializeInstance(const TracingInitArgs&);
121   static void ResetForTesting();
122   static void Shutdown();
123 
124   // TracingMuxer implementation.
125   bool RegisterDataSource(const DataSourceDescriptor&,
126                           DataSourceFactory,
127                           DataSourceParams,
128                           DataSourceStaticState*) override;
129   void UpdateDataSourceDescriptor(const DataSourceDescriptor&,
130                                   const DataSourceStaticState*) override;
131   std::unique_ptr<TraceWriterBase> CreateTraceWriter(
132       DataSourceStaticState*,
133       uint32_t data_source_instance_index,
134       DataSourceState*,
135       BufferExhaustedPolicy buffer_exhausted_policy) override;
136   void DestroyStoppedTraceWritersForCurrentThread() override;
137   void RegisterInterceptor(const InterceptorDescriptor&,
138                            InterceptorFactory,
139                            InterceptorBase::TLSFactory,
140                            InterceptorBase::TracePacketCallback) override;
141 
142   void ActivateTriggers(const std::vector<std::string>&, uint32_t) override;
143 
144   std::unique_ptr<TracingSession> CreateTracingSession(
145       BackendType,
146       TracingConsumerBackend* (*system_backend_factory)());
147   std::unique_ptr<StartupTracingSession> CreateStartupTracingSession(
148       const TraceConfig& config,
149       Tracing::SetupStartupTracingOpts);
150   std::unique_ptr<StartupTracingSession> CreateStartupTracingSessionBlocking(
151       const TraceConfig& config,
152       Tracing::SetupStartupTracingOpts);
153 
154   // Producer-side bookkeeping methods.
155   void UpdateDataSourcesOnAllBackends();
156   void SetupDataSource(TracingBackendId,
157                        uint32_t backend_connection_id,
158                        DataSourceInstanceID,
159                        const DataSourceConfig&);
160   void StartDataSource(TracingBackendId, DataSourceInstanceID);
161   void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID);
162   void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID);
163   void SyncProducersForTesting();
164 
165   // Consumer-side bookkeeping methods.
166   void SetupTracingSession(TracingSessionGlobalID,
167                            const std::shared_ptr<TraceConfig>&,
168                            base::ScopedFile trace_fd = base::ScopedFile());
169   void StartTracingSession(TracingSessionGlobalID);
170   void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&);
171   void StopTracingSession(TracingSessionGlobalID);
172   void DestroyTracingSession(TracingSessionGlobalID);
173   void FlushTracingSession(TracingSessionGlobalID,
174                            uint32_t,
175                            std::function<void(bool)>);
176   void ReadTracingSessionData(
177       TracingSessionGlobalID,
178       std::function<void(TracingSession::ReadTraceCallbackArgs)>);
179   void GetTraceStats(TracingSessionGlobalID,
180                      TracingSession::GetTraceStatsCallback);
181   void QueryServiceState(TracingSessionGlobalID,
182                          TracingSession::QueryServiceStateCallback);
183 
184   // Sets the batching period to |batch_commits_duration_ms| on the backends
185   // with type |backend_type|.
186   void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,
187                                          BackendType backend_type);
188 
189   // Enables direct SMB patching on the backends with type |backend_type| (see
190   // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the
191   // operation succeeded for all backends with type |backend_type|, false
192   // otherwise.
193   bool EnableDirectSMBPatchingForTesting(BackendType backend_type);
194 
195   void SetMaxProducerReconnectionsForTesting(uint32_t count);
196 
197  private:
198   friend class test::TracingMuxerImplInternalsForTest;
199   friend void shlib::ResetForTesting();
200 
201   // For each TracingBackend we create and register one ProducerImpl instance.
202   // This talks to the producer-side of the service, gets start/stop requests
203   // from it and routes them to the registered data sources.
204   // One ProducerImpl == one backend == one tracing service.
205   // This class is needed to disambiguate callbacks coming from different
206   // services. TracingMuxerImpl can't directly implement the Producer interface
207   // because the Producer virtual methods don't allow to identify the service.
208   class ProducerImpl : public Producer {
209    public:
210     ProducerImpl(TracingMuxerImpl*,
211                  TracingBackendId,
212                  uint32_t shmem_batch_commits_duration_ms);
213     ~ProducerImpl() override;
214 
215     void Initialize(std::unique_ptr<ProducerEndpoint> endpoint);
216     void RegisterDataSource(const DataSourceDescriptor&,
217                             DataSourceFactory,
218                             DataSourceStaticState*);
219     void DisposeConnection();
220 
221     // perfetto::Producer implementation.
222     void OnConnect() override;
223     void OnDisconnect() override;
224     void OnTracingSetup() override;
225     void OnStartupTracingSetup() override;
226     void SetupDataSource(DataSourceInstanceID,
227                          const DataSourceConfig&) override;
228     void StartDataSource(DataSourceInstanceID,
229                          const DataSourceConfig&) override;
230     void StopDataSource(DataSourceInstanceID) override;
231     void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override;
232     void ClearIncrementalState(const DataSourceInstanceID*, size_t) override;
233 
234     bool SweepDeadServices();
235     void SendOnConnectTriggers();
236     void NotifyFlushForDataSourceDone(DataSourceInstanceID, FlushRequestID);
237 
238     PERFETTO_THREAD_CHECKER(thread_checker_)
239     TracingMuxerImpl* muxer_;
240     TracingBackendId const backend_id_;
241     bool connected_ = false;
242     bool did_setup_tracing_ = false;
243     bool did_setup_startup_tracing_ = false;
244     std::atomic<uint32_t> connection_id_{0};
245     uint16_t last_startup_target_buffer_reservation_ = 0;
246     bool is_producer_provided_smb_ = false;
247     bool producer_provided_smb_failed_ = false;
248 
249     const uint32_t shmem_batch_commits_duration_ms_ = 0;
250 
251     // Set of data sources that have been actually registered on this producer.
252     // This can be a subset of the global |data_sources_|, because data sources
253     // can register before the producer is fully connected.
254     std::bitset<kMaxDataSources> registered_data_sources_{};
255 
256     // A collection of disconnected service endpoints. Since trace writers on
257     // arbitrary threads might continue writing data to disconnected services,
258     // we keep the old services around and periodically try to clean up ones
259     // that no longer have any writers (see SweepDeadServices).
260     std::list<std::shared_ptr<ProducerEndpoint>> dead_services_;
261 
262     // Triggers that should be sent when the service connects (trigger_name,
263     // expiration).
264     std::list<std::pair<std::string, base::TimeMillis>> on_connect_triggers_;
265 
266     std::map<FlushRequestID, std::set<DataSourceInstanceID>> pending_flushes_;
267 
268     // The currently active service endpoint is maintained as an atomic shared
269     // pointer so it won't get deleted from underneath threads that are creating
270     // trace writers. At any given time one endpoint can be shared (and thus
271     // kept alive) by the |service_| pointer, an entry in |dead_services_| and
272     // as a pointer on the stack in CreateTraceWriter() (on an arbitrary
273     // thread). The endpoint is never shared outside ProducerImpl itself.
274     //
275     // WARNING: Any *write* access to this variable or any *read* access from a
276     // non-muxer thread must be done through std::atomic_{load,store} to avoid
277     // data races.
278     std::shared_ptr<ProducerEndpoint> service_;  // Keep last.
279   };
280 
281   // For each TracingSession created by the API client (Tracing::NewTrace() we
282   // create and register one ConsumerImpl instance.
283   // This talks to the consumer-side of the service, gets end-of-trace and
284   // on-trace-data callbacks and routes them to the API client callbacks.
285   // This class is needed to disambiguate callbacks coming from different
286   // tracing sessions.
287   class ConsumerImpl : public Consumer {
288    public:
289     ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID);
290     ~ConsumerImpl() override;
291 
292     void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint);
293 
294     // perfetto::Consumer implementation.
295     void OnConnect() override;
296     void OnDisconnect() override;
297     void OnTracingDisabled(const std::string& error) override;
298     void OnTraceData(std::vector<TracePacket>, bool has_more) override;
299     void OnDetach(bool success) override;
300     void OnAttach(bool success, const TraceConfig&) override;
301     void OnTraceStats(bool success, const TraceStats&) override;
302     void OnObservableEvents(const ObservableEvents&) override;
303     void OnSessionCloned(const OnSessionClonedArgs&) override;
304 
305     void NotifyStartComplete();
306     void NotifyError(const TracingError&);
307     void NotifyStopComplete();
308 
309     // Will eventually inform the |muxer_| when it is safe to remove |this|.
310     void Disconnect();
311 
312     TracingMuxerImpl* muxer_;
313     BackendType const backend_type_;
314     TracingSessionGlobalID const session_id_;
315     bool connected_ = false;
316 
317     // This is to handle the case where the Setup call from the API client
318     // arrives before the consumer has connected. In this case we keep around
319     // the config and check if we have it after connection.
320     bool start_pending_ = false;
321 
322     // Similarly if the session is stopped before the consumer was connected, we
323     // need to wait until the session has started before stopping it.
324     bool stop_pending_ = false;
325 
326     // Similarly we need to buffer a call to get trace statistics if the
327     // consumer wasn't connected yet.
328     bool get_trace_stats_pending_ = false;
329 
330     // Whether this session was already stopped. This will happen in response to
331     // Stop{,Blocking}, but also if the service stops the session for us
332     // automatically (e.g., when there are no data sources).
333     bool stopped_ = false;
334 
335     // shared_ptr because it's posted across threads. This is to avoid copying
336     // it more than once.
337     std::shared_ptr<TraceConfig> trace_config_;
338     base::ScopedFile trace_fd_;
339 
340     // If the API client passes a callback to start, we should invoke this when
341     // NotifyStartComplete() is invoked.
342     std::function<void()> start_complete_callback_;
343 
344     // An internal callback used to implement StartBlocking().
345     std::function<void()> blocking_start_complete_callback_;
346 
347     // If the API client passes a callback to get notification about the
348     // errors, we should invoke this when NotifyError() is invoked.
349     std::function<void(TracingError)> error_callback_;
350 
351     // If the API client passes a callback to stop, we should invoke this when
352     // OnTracingDisabled() is invoked.
353     std::function<void()> stop_complete_callback_;
354 
355     // An internal callback used to implement StopBlocking().
356     std::function<void()> blocking_stop_complete_callback_;
357 
358     // Callback passed to ReadTrace().
359     std::function<void(TracingSession::ReadTraceCallbackArgs)>
360         read_trace_callback_;
361 
362     // Callback passed to GetTraceStats().
363     TracingSession::GetTraceStatsCallback get_trace_stats_callback_;
364 
365     // Callback for a pending call to QueryServiceState().
366     TracingSession::QueryServiceStateCallback query_service_state_callback_;
367 
368     // The states of all data sources in this tracing session. |true| means the
369     // data source has started tracing.
370     using DataSourceHandle = std::pair<std::string, std::string>;
371     std::map<DataSourceHandle, bool> data_source_states_;
372 
373     std::unique_ptr<ConsumerEndpoint> service_;  // Keep before last.
374     PERFETTO_THREAD_CHECKER(thread_checker_)     // Keep last.
375   };
376 
377   // This object is returned to API clients when they call
378   // Tracing::CreateTracingSession().
379   class TracingSessionImpl : public TracingSession {
380    public:
381     TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType);
382     ~TracingSessionImpl() override;
383     void Setup(const TraceConfig&, int fd) override;
384     void Start() override;
385     void StartBlocking() override;
386     void SetOnStartCallback(std::function<void()>) override;
387     void SetOnErrorCallback(std::function<void(TracingError)>) override;
388     void Stop() override;
389     void StopBlocking() override;
390     void Flush(std::function<void(bool)>, uint32_t timeout_ms) override;
391     void ReadTrace(ReadTraceCallback) override;
392     void SetOnStopCallback(std::function<void()>) override;
393     void GetTraceStats(GetTraceStatsCallback) override;
394     void QueryServiceState(QueryServiceStateCallback) override;
395     void ChangeTraceConfig(const TraceConfig&) override;
396 
397    private:
398     TracingMuxerImpl* const muxer_;
399     TracingSessionGlobalID const session_id_;
400     BackendType const backend_type_;
401   };
402 
403   // This object is returned to API clients when they call
404   // Tracing::SetupStartupTracing().
405   class StartupTracingSessionImpl : public StartupTracingSession {
406    public:
407     StartupTracingSessionImpl(TracingMuxerImpl*,
408                               TracingSessionGlobalID,
409                               BackendType);
410     ~StartupTracingSessionImpl() override;
411     void Abort() override;
412     void AbortBlocking() override;
413 
414    private:
415     TracingMuxerImpl* const muxer_;
416     TracingSessionGlobalID const session_id_;
417     BackendType backend_type_;
418   };
419 
420   struct RegisteredInterceptor {
421     protos::gen::InterceptorDescriptor descriptor;
422     InterceptorFactory factory{};
423     InterceptorBase::TLSFactory tls_factory{};
424     InterceptorBase::TracePacketCallback packet_callback{};
425   };
426 
427   struct RegisteredStartupSession {
428     TracingSessionID session_id = 0;
429     int num_unbound_data_sources = 0;
430 
431     bool is_aborting = false;
432     int num_aborting_data_sources = 0;
433 
434     std::function<void()> on_aborted;
435     std::function<void()> on_adopted;
436   };
437 
438   struct RegisteredProducerBackend {
439     // Backends are supposed to have static lifetime.
440     TracingProducerBackend* backend = nullptr;
441     TracingBackendId id = 0;
442     BackendType type{};
443 
444     TracingBackend::ConnectProducerArgs producer_conn_args;
445     std::unique_ptr<ProducerImpl> producer;
446 
447     std::vector<RegisteredStartupSession> startup_sessions;
448   };
449 
450   struct RegisteredConsumerBackend {
451     // Backends are supposed to have static lifetime.
452     TracingConsumerBackend* backend = nullptr;
453     BackendType type{};
454     // The calling code can request more than one concurrently active tracing
455     // session for the same backend. We need to create one consumer per session.
456     std::vector<std::unique_ptr<ConsumerImpl>> consumers;
457   };
458 
459   void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
460                                      bool is_changed);
461   explicit TracingMuxerImpl(const TracingInitArgs&);
462   void Initialize(const TracingInitArgs& args);
463   void AddBackends(const TracingInitArgs& args);
464   void AddConsumerBackend(TracingConsumerBackend* backend, BackendType type);
465   void AddProducerBackend(TracingProducerBackend* backend,
466                           BackendType type,
467                           const TracingInitArgs& args);
468   ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id);
469   std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend(
470       TracingSessionGlobalID session_id);
471   RegisteredProducerBackend* FindProducerBackendById(TracingBackendId id);
472   RegisteredProducerBackend* FindProducerBackendByType(BackendType type);
473   RegisteredConsumerBackend* FindConsumerBackendByType(BackendType type);
474   void InitializeConsumer(TracingSessionGlobalID session_id);
475   void OnConsumerDisconnected(ConsumerImpl* consumer);
476   void OnProducerDisconnected(ProducerImpl* producer);
477   // Test only method.
478   void SweepDeadBackends();
479 
480   struct FindDataSourceRes {
481     FindDataSourceRes() = default;
FindDataSourceResFindDataSourceRes482     FindDataSourceRes(DataSourceStaticState* a,
483                       DataSourceState* b,
484                       uint32_t c,
485                       bool d)
486         : static_state(a),
487           internal_state(b),
488           instance_idx(c),
489           requires_callbacks_under_lock(d) {}
490     explicit operator bool() const { return !!internal_state; }
491 
492     DataSourceStaticState* static_state = nullptr;
493     DataSourceState* internal_state = nullptr;
494     uint32_t instance_idx = 0;
495     bool requires_callbacks_under_lock = false;
496   };
497   FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID);
498 
499   FindDataSourceRes SetupDataSourceImpl(
500       const RegisteredDataSource&,
501       TracingBackendId,
502       uint32_t backend_connection_id,
503       DataSourceInstanceID,
504       const DataSourceConfig&,
505       uint64_t config_hash,
506       uint64_t startup_config_hash,
507       TracingSessionGlobalID startup_session_id);
508   void StartDataSourceImpl(const FindDataSourceRes&);
509   void StopDataSource_AsyncBeginImpl(const FindDataSourceRes&);
510   void StopDataSource_AsyncEnd(TracingBackendId,
511                                uint32_t backend_connection_id,
512                                DataSourceInstanceID,
513                                const FindDataSourceRes&);
514   bool FlushDataSource_AsyncBegin(TracingBackendId,
515                                   DataSourceInstanceID,
516                                   FlushRequestID);
517   void FlushDataSource_AsyncEnd(TracingBackendId,
518                                 uint32_t backend_connection_id,
519                                 DataSourceInstanceID,
520                                 const FindDataSourceRes&,
521                                 FlushRequestID);
522   void AbortStartupTracingSession(TracingSessionGlobalID, BackendType);
523   // When ResetForTesting() is executed, `cb` will be called on the calling
524   // thread and on the muxer thread.
525   void AppendResetForTestingCallback(std::function<void()> cb);
526 
527   // WARNING: If you add new state here, be sure to update ResetForTesting.
528   std::unique_ptr<base::TaskRunner> task_runner_;
529   std::vector<RegisteredDataSource> data_sources_;
530   // These lists can only have one backend per BackendType. The elements are
531   // sorted by BackendType priority (see BackendTypePriority). They always
532   // contain a fake low-priority kUnspecifiedBackend at the end.
533   std::list<RegisteredProducerBackend> producer_backends_;
534   std::list<RegisteredConsumerBackend> consumer_backends_;
535   std::vector<RegisteredInterceptor> interceptors_;
536   TracingPolicy* policy_ = nullptr;
537 
538   // Learn more at TracingInitArgs::supports_multiple_data_source_instances
539   bool supports_multiple_data_source_instances_ = true;
540 
541   std::atomic<TracingSessionGlobalID> next_tracing_session_id_{};
542   std::atomic<uint32_t> next_data_source_index_{};
543   uint32_t muxer_id_for_testing_{};
544 
545   // Maximum number of times we will try to reconnect producer backend.
546   // Should only be modified for testing purposes.
547   std::atomic<uint32_t> max_producer_reconnections_{100u};
548 
549   // Test only member.
550   // After ResetForTesting() is called, holds tracing backends which needs to be
551   // kept alive until all inbound references have gone away. See
552   // SweepDeadBackends().
553   std::list<RegisteredProducerBackend> dead_backends_;
554 
555   // Test only member.
556   // Executes these cleanup functions on the calling thread and on the muxer
557   // thread when ResetForTesting() is called.
558   std::list<std::function<void()>> reset_callbacks_;
559 
560   PERFETTO_THREAD_CHECKER(thread_checker_)
561 };
562 
563 }  // namespace internal
564 }  // namespace perfetto
565 
566 #endif  // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
567