• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
19 
20 #include <stddef.h>
21 #include <stdint.h>
22 
23 #include <array>
24 #include <atomic>
25 #include <bitset>
26 #include <functional>
27 #include <list>
28 #include <map>
29 #include <memory>
30 #include <set>
31 #include <utility>
32 #include <vector>
33 
34 #include "perfetto/base/time.h"
35 #include "perfetto/ext/base/scoped_file.h"
36 #include "perfetto/ext/base/thread_checker.h"
37 #include "perfetto/ext/tracing/core/basic_types.h"
38 #include "perfetto/ext/tracing/core/consumer.h"
39 #include "perfetto/ext/tracing/core/producer.h"
40 #include "perfetto/tracing/backend_type.h"
41 #include "perfetto/tracing/core/data_source_descriptor.h"
42 #include "perfetto/tracing/core/forward_decls.h"
43 #include "perfetto/tracing/core/trace_config.h"
44 #include "perfetto/tracing/internal/basic_types.h"
45 #include "perfetto/tracing/internal/tracing_muxer.h"
46 #include "perfetto/tracing/tracing.h"
47 
48 #include "protos/perfetto/common/interceptor_descriptor.gen.h"
49 
50 namespace perfetto {
51 
52 class ConsumerEndpoint;
53 class DataSourceBase;
54 class ProducerEndpoint;
55 class TraceWriterBase;
56 class TracingBackend;
57 class TracingSession;
58 struct TracingInitArgs;
59 
60 namespace base {
61 class TaskRunner;
62 }
63 
64 namespace shlib {
65 void ResetForTesting();
66 }
67 
68 namespace test {
69 class TracingMuxerImplInternalsForTest;
70 }
71 
72 namespace internal {
73 
74 struct DataSourceStaticState;
75 
76 // This class acts as a bridge between the public API and the TracingBackend(s).
77 // It exposes a simplified view of the world to the API methods handling all the
78 // bookkeeping to map data source instances and trace writers to the various
79 // backends. It deals with N data sources, M backends (1 backend == 1 tracing
80 // service == 1 producer connection) and T concurrent tracing sessions.
81 //
82 // Handing data source registration and start/stop flows [producer side]:
83 // ----------------------------------------------------------------------
84 // 1. The API client subclasses perfetto::DataSource and calls
85 //    DataSource::Register<MyDataSource>(). In turn this calls into the
86 //    TracingMuxer.
87 // 2. The tracing muxer iterates through all the backends (1 backend == 1
88 //    service == 1 producer connection) and registers the data source on each
89 //    backend.
90 // 3. When any (services behind a) backend starts tracing and requests to start
91 //    that specific data source, the TracingMuxerImpl constructs a new instance
92 //    of MyDataSource and calls the OnStart() method.
93 //
94 // Controlling trace and retrieving trace data [consumer side]:
95 // ------------------------------------------------------------
96 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession
97 //    object.
98 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer
99 //    subclasses the TracingSession object (TracingSessionImpl) and returns it.
100 // 3. The tracing muxer identifies the backend (according to the args passed to
101 //    NewTrace), creates a new Consumer and connects to it.
102 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the
103 //    TracingMuxer forwards them to the consumer associated to the
104 //    TracingSession. Likewise for callbacks coming from the consumer-side of
105 //    the service.
106 class TracingMuxerImpl : public TracingMuxer {
107  public:
108   // This is different than TracingSessionID because it's global across all
109   // backends. TracingSessionID is global only within the scope of one service.
110   using TracingSessionGlobalID = uint64_t;
111 
112   struct RegisteredDataSource {
113     DataSourceDescriptor descriptor;
114     DataSourceFactory factory{};
115     bool supports_multiple_instances = false;
116     bool requires_callbacks_under_lock = false;
117     bool no_flush = false;
118     DataSourceStaticState* static_state = nullptr;
119   };
120 
121   static void InitializeInstance(const TracingInitArgs&);
122   static void ResetForTesting();
123   static void Shutdown();
124 
125   // TracingMuxer implementation.
126   bool RegisterDataSource(const DataSourceDescriptor&,
127                           DataSourceFactory,
128                           DataSourceParams,
129                           bool no_flush,
130                           DataSourceStaticState*) override;
131   void UpdateDataSourceDescriptor(const DataSourceDescriptor&,
132                                   const DataSourceStaticState*) override;
133   std::unique_ptr<TraceWriterBase> CreateTraceWriter(
134       DataSourceStaticState*,
135       uint32_t data_source_instance_index,
136       DataSourceState*,
137       BufferExhaustedPolicy buffer_exhausted_policy) override;
138   void DestroyStoppedTraceWritersForCurrentThread() override;
139   void RegisterInterceptor(const InterceptorDescriptor&,
140                            InterceptorFactory,
141                            InterceptorBase::TLSFactory,
142                            InterceptorBase::TracePacketCallback) override;
143 
144   void ActivateTriggers(const std::vector<std::string>&, uint32_t) override;
145 
146   std::unique_ptr<TracingSession> CreateTracingSession(
147       BackendType,
148       TracingConsumerBackend* (*system_backend_factory)());
149   std::unique_ptr<StartupTracingSession> CreateStartupTracingSession(
150       const TraceConfig& config,
151       Tracing::SetupStartupTracingOpts);
152   std::unique_ptr<StartupTracingSession> CreateStartupTracingSessionBlocking(
153       const TraceConfig& config,
154       Tracing::SetupStartupTracingOpts);
155 
156   // Producer-side bookkeeping methods.
157   void UpdateDataSourcesOnAllBackends();
158   void SetupDataSource(TracingBackendId,
159                        uint32_t backend_connection_id,
160                        DataSourceInstanceID,
161                        const DataSourceConfig&);
162   void StartDataSource(TracingBackendId, DataSourceInstanceID);
163   void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID);
164   void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID);
165   void SyncProducersForTesting();
166 
167   // Consumer-side bookkeeping methods.
168   void SetupTracingSession(TracingSessionGlobalID,
169                            const std::shared_ptr<TraceConfig>&,
170                            base::ScopedFile trace_fd = base::ScopedFile());
171   void StartTracingSession(TracingSessionGlobalID);
172   void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&);
173   void StopTracingSession(TracingSessionGlobalID);
174   void DestroyTracingSession(TracingSessionGlobalID);
175   void FlushTracingSession(TracingSessionGlobalID,
176                            uint32_t,
177                            std::function<void(bool)>);
178   void ReadTracingSessionData(
179       TracingSessionGlobalID,
180       std::function<void(TracingSession::ReadTraceCallbackArgs)>);
181   void GetTraceStats(TracingSessionGlobalID,
182                      TracingSession::GetTraceStatsCallback);
183   void QueryServiceState(TracingSessionGlobalID,
184                          TracingSession::QueryServiceStateCallback);
185 
186   // Sets the batching period to |batch_commits_duration_ms| on the backends
187   // with type |backend_type|.
188   void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,
189                                          BackendType backend_type);
190 
191   // Enables direct SMB patching on the backends with type |backend_type| (see
192   // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the
193   // operation succeeded for all backends with type |backend_type|, false
194   // otherwise.
195   bool EnableDirectSMBPatchingForTesting(BackendType backend_type);
196 
197   void SetMaxProducerReconnectionsForTesting(uint32_t count);
198 
199  private:
200   friend class test::TracingMuxerImplInternalsForTest;
201   friend void shlib::ResetForTesting();
202 
203   // For each TracingBackend we create and register one ProducerImpl instance.
204   // This talks to the producer-side of the service, gets start/stop requests
205   // from it and routes them to the registered data sources.
206   // One ProducerImpl == one backend == one tracing service.
207   // This class is needed to disambiguate callbacks coming from different
208   // services. TracingMuxerImpl can't directly implement the Producer interface
209   // because the Producer virtual methods don't allow to identify the service.
210   class ProducerImpl : public Producer {
211    public:
212     ProducerImpl(TracingMuxerImpl*,
213                  TracingBackendId,
214                  uint32_t shmem_batch_commits_duration_ms,
215                  bool shmem_direct_patching_enabled);
216     ~ProducerImpl() override;
217 
218     void Initialize(std::unique_ptr<ProducerEndpoint> endpoint);
219     void RegisterDataSource(const DataSourceDescriptor&,
220                             DataSourceFactory,
221                             DataSourceStaticState*);
222     void DisposeConnection();
223 
224     // perfetto::Producer implementation.
225     void OnConnect() override;
226     void OnDisconnect() override;
227     void OnTracingSetup() override;
228     void OnStartupTracingSetup() override;
229     void SetupDataSource(DataSourceInstanceID,
230                          const DataSourceConfig&) override;
231     void StartDataSource(DataSourceInstanceID,
232                          const DataSourceConfig&) override;
233     void StopDataSource(DataSourceInstanceID) override;
234     void Flush(FlushRequestID,
235                const DataSourceInstanceID*,
236                size_t,
237                FlushFlags) override;
238     void ClearIncrementalState(const DataSourceInstanceID*, size_t) override;
239 
240     bool SweepDeadServices();
241     void SendOnConnectTriggers();
242     void NotifyFlushForDataSourceDone(DataSourceInstanceID, FlushRequestID);
243 
244     PERFETTO_THREAD_CHECKER(thread_checker_)
245     TracingMuxerImpl* muxer_;
246     TracingBackendId const backend_id_;
247     bool connected_ = false;
248     bool did_setup_tracing_ = false;
249     bool did_setup_startup_tracing_ = false;
250     std::atomic<uint32_t> connection_id_{0};
251     uint16_t last_startup_target_buffer_reservation_ = 0;
252     bool is_producer_provided_smb_ = false;
253     bool producer_provided_smb_failed_ = false;
254 
255     const uint32_t shmem_batch_commits_duration_ms_ = 0;
256     const bool shmem_direct_patching_enabled_ = false;
257 
258     // Set of data sources that have been actually registered on this producer.
259     // This can be a subset of the global |data_sources_|, because data sources
260     // can register before the producer is fully connected.
261     std::bitset<kMaxDataSources> registered_data_sources_{};
262 
263     // A collection of disconnected service endpoints. Since trace writers on
264     // arbitrary threads might continue writing data to disconnected services,
265     // we keep the old services around and periodically try to clean up ones
266     // that no longer have any writers (see SweepDeadServices).
267     std::list<std::shared_ptr<ProducerEndpoint>> dead_services_;
268 
269     // Triggers that should be sent when the service connects (trigger_name,
270     // expiration).
271     std::list<std::pair<std::string, base::TimeMillis>> on_connect_triggers_;
272 
273     std::map<FlushRequestID, std::set<DataSourceInstanceID>> pending_flushes_;
274 
275     // The currently active service endpoint is maintained as an atomic shared
276     // pointer so it won't get deleted from underneath threads that are creating
277     // trace writers. At any given time one endpoint can be shared (and thus
278     // kept alive) by the |service_| pointer, an entry in |dead_services_| and
279     // as a pointer on the stack in CreateTraceWriter() (on an arbitrary
280     // thread). The endpoint is never shared outside ProducerImpl itself.
281     //
282     // WARNING: Any *write* access to this variable or any *read* access from a
283     // non-muxer thread must be done through std::atomic_{load,store} to avoid
284     // data races.
285     std::shared_ptr<ProducerEndpoint> service_;  // Keep last.
286   };
287 
288   // For each TracingSession created by the API client (Tracing::NewTrace() we
289   // create and register one ConsumerImpl instance.
290   // This talks to the consumer-side of the service, gets end-of-trace and
291   // on-trace-data callbacks and routes them to the API client callbacks.
292   // This class is needed to disambiguate callbacks coming from different
293   // tracing sessions.
294   class ConsumerImpl : public Consumer {
295    public:
296     ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID);
297     ~ConsumerImpl() override;
298 
299     void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint);
300 
301     // perfetto::Consumer implementation.
302     void OnConnect() override;
303     void OnDisconnect() override;
304     void OnTracingDisabled(const std::string& error) override;
305     void OnTraceData(std::vector<TracePacket>, bool has_more) override;
306     void OnDetach(bool success) override;
307     void OnAttach(bool success, const TraceConfig&) override;
308     void OnTraceStats(bool success, const TraceStats&) override;
309     void OnObservableEvents(const ObservableEvents&) override;
310     void OnSessionCloned(const OnSessionClonedArgs&) override;
311 
312     void NotifyStartComplete();
313     void NotifyError(const TracingError&);
314     void NotifyStopComplete();
315 
316     // Will eventually inform the |muxer_| when it is safe to remove |this|.
317     void Disconnect();
318 
319     TracingMuxerImpl* muxer_;
320     BackendType const backend_type_;
321     TracingSessionGlobalID const session_id_;
322     bool connected_ = false;
323 
324     // This is to handle the case where the Setup call from the API client
325     // arrives before the consumer has connected. In this case we keep around
326     // the config and check if we have it after connection.
327     bool start_pending_ = false;
328 
329     // Similarly if the session is stopped before the consumer was connected, we
330     // need to wait until the session has started before stopping it.
331     bool stop_pending_ = false;
332 
333     // Similarly we need to buffer a call to get trace statistics if the
334     // consumer wasn't connected yet.
335     bool get_trace_stats_pending_ = false;
336 
337     // Whether this session was already stopped. This will happen in response to
338     // Stop{,Blocking}, but also if the service stops the session for us
339     // automatically (e.g., when there are no data sources).
340     bool stopped_ = false;
341 
342     // shared_ptr because it's posted across threads. This is to avoid copying
343     // it more than once.
344     std::shared_ptr<TraceConfig> trace_config_;
345     base::ScopedFile trace_fd_;
346 
347     // If the API client passes a callback to start, we should invoke this when
348     // NotifyStartComplete() is invoked.
349     std::function<void()> start_complete_callback_;
350 
351     // An internal callback used to implement StartBlocking().
352     std::function<void()> blocking_start_complete_callback_;
353 
354     // If the API client passes a callback to get notification about the
355     // errors, we should invoke this when NotifyError() is invoked.
356     std::function<void(TracingError)> error_callback_;
357 
358     // If the API client passes a callback to stop, we should invoke this when
359     // OnTracingDisabled() is invoked.
360     std::function<void()> stop_complete_callback_;
361 
362     // An internal callback used to implement StopBlocking().
363     std::function<void()> blocking_stop_complete_callback_;
364 
365     // Callback passed to ReadTrace().
366     std::function<void(TracingSession::ReadTraceCallbackArgs)>
367         read_trace_callback_;
368 
369     // Callback passed to GetTraceStats().
370     TracingSession::GetTraceStatsCallback get_trace_stats_callback_;
371 
372     // Callback for a pending call to QueryServiceState().
373     TracingSession::QueryServiceStateCallback query_service_state_callback_;
374 
375     // The states of all data sources in this tracing session. |true| means the
376     // data source has started tracing.
377     using DataSourceHandle = std::pair<std::string, std::string>;
378     std::map<DataSourceHandle, bool> data_source_states_;
379 
380     std::unique_ptr<ConsumerEndpoint> service_;  // Keep before last.
381     PERFETTO_THREAD_CHECKER(thread_checker_)     // Keep last.
382   };
383 
384   // This object is returned to API clients when they call
385   // Tracing::CreateTracingSession().
386   class TracingSessionImpl : public TracingSession {
387    public:
388     TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType);
389     ~TracingSessionImpl() override;
390     void Setup(const TraceConfig&, int fd) override;
391     void Start() override;
392     void StartBlocking() override;
393     void SetOnStartCallback(std::function<void()>) override;
394     void SetOnErrorCallback(std::function<void(TracingError)>) override;
395     void Stop() override;
396     void StopBlocking() override;
397     void Flush(std::function<void(bool)>, uint32_t timeout_ms) override;
398     void ReadTrace(ReadTraceCallback) override;
399     void SetOnStopCallback(std::function<void()>) override;
400     void GetTraceStats(GetTraceStatsCallback) override;
401     void QueryServiceState(QueryServiceStateCallback) override;
402     void ChangeTraceConfig(const TraceConfig&) override;
403 
404    private:
405     TracingMuxerImpl* const muxer_;
406     TracingSessionGlobalID const session_id_;
407     BackendType const backend_type_;
408   };
409 
410   // This object is returned to API clients when they call
411   // Tracing::SetupStartupTracing().
412   class StartupTracingSessionImpl : public StartupTracingSession {
413    public:
414     StartupTracingSessionImpl(TracingMuxerImpl*,
415                               TracingSessionGlobalID,
416                               BackendType);
417     ~StartupTracingSessionImpl() override;
418     void Abort() override;
419     void AbortBlocking() override;
420 
421    private:
422     TracingMuxerImpl* const muxer_;
423     TracingSessionGlobalID const session_id_;
424     BackendType backend_type_;
425   };
426 
427   struct RegisteredInterceptor {
428     protos::gen::InterceptorDescriptor descriptor;
429     InterceptorFactory factory{};
430     InterceptorBase::TLSFactory tls_factory{};
431     InterceptorBase::TracePacketCallback packet_callback{};
432   };
433 
434   struct RegisteredStartupSession {
435     TracingSessionID session_id = 0;
436     int num_unbound_data_sources = 0;
437 
438     bool is_aborting = false;
439     int num_aborting_data_sources = 0;
440 
441     std::function<void()> on_aborted;
442     std::function<void()> on_adopted;
443   };
444 
445   struct RegisteredProducerBackend {
446     // Backends are supposed to have static lifetime.
447     TracingProducerBackend* backend = nullptr;
448     TracingBackendId id = 0;
449     BackendType type{};
450 
451     TracingBackend::ConnectProducerArgs producer_conn_args;
452     std::unique_ptr<ProducerImpl> producer;
453 
454     std::vector<RegisteredStartupSession> startup_sessions;
455   };
456 
457   struct RegisteredConsumerBackend {
458     // Backends are supposed to have static lifetime.
459     TracingConsumerBackend* backend = nullptr;
460     BackendType type{};
461     // The calling code can request more than one concurrently active tracing
462     // session for the same backend. We need to create one consumer per session.
463     std::vector<std::unique_ptr<ConsumerImpl>> consumers;
464   };
465 
466   void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
467                                      bool is_changed);
468   explicit TracingMuxerImpl(const TracingInitArgs&);
469   void Initialize(const TracingInitArgs& args);
470   void AddBackends(const TracingInitArgs& args);
471   void AddConsumerBackend(TracingConsumerBackend* backend, BackendType type);
472   void AddProducerBackend(TracingProducerBackend* backend,
473                           BackendType type,
474                           const TracingInitArgs& args);
475   ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id);
476   std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend(
477       TracingSessionGlobalID session_id);
478   RegisteredProducerBackend* FindProducerBackendById(TracingBackendId id);
479   RegisteredProducerBackend* FindProducerBackendByType(BackendType type);
480   RegisteredConsumerBackend* FindConsumerBackendByType(BackendType type);
481   void InitializeConsumer(TracingSessionGlobalID session_id);
482   void OnConsumerDisconnected(ConsumerImpl* consumer);
483   void OnProducerDisconnected(ProducerImpl* producer);
484   // Test only method.
485   void SweepDeadBackends();
486 
487   struct FindDataSourceRes {
488     FindDataSourceRes() = default;
FindDataSourceResFindDataSourceRes489     FindDataSourceRes(DataSourceStaticState* a,
490                       DataSourceState* b,
491                       uint32_t c,
492                       bool d)
493         : static_state(a),
494           internal_state(b),
495           instance_idx(c),
496           requires_callbacks_under_lock(d) {}
497     explicit operator bool() const { return !!internal_state; }
498 
499     DataSourceStaticState* static_state = nullptr;
500     DataSourceState* internal_state = nullptr;
501     uint32_t instance_idx = 0;
502     bool requires_callbacks_under_lock = false;
503   };
504   FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID);
505 
506   FindDataSourceRes SetupDataSourceImpl(
507       const RegisteredDataSource&,
508       TracingBackendId,
509       uint32_t backend_connection_id,
510       DataSourceInstanceID,
511       const DataSourceConfig&,
512       TracingSessionGlobalID startup_session_id);
513   void StartDataSourceImpl(const FindDataSourceRes&);
514   void StopDataSource_AsyncBeginImpl(const FindDataSourceRes&);
515   void StopDataSource_AsyncEnd(TracingBackendId,
516                                uint32_t backend_connection_id,
517                                DataSourceInstanceID,
518                                const FindDataSourceRes&);
519   bool FlushDataSource_AsyncBegin(TracingBackendId,
520                                   DataSourceInstanceID,
521                                   FlushRequestID,
522                                   FlushFlags);
523   void FlushDataSource_AsyncEnd(TracingBackendId,
524                                 uint32_t backend_connection_id,
525                                 DataSourceInstanceID,
526                                 const FindDataSourceRes&,
527                                 FlushRequestID);
528   void AbortStartupTracingSession(TracingSessionGlobalID, BackendType);
529   // When ResetForTesting() is executed, `cb` will be called on the calling
530   // thread and on the muxer thread.
531   void AppendResetForTestingCallback(std::function<void()> cb);
532 
533   // WARNING: If you add new state here, be sure to update ResetForTesting.
534   std::unique_ptr<base::TaskRunner> task_runner_;
535   std::vector<RegisteredDataSource> data_sources_;
536   // These lists can only have one backend per BackendType. The elements are
537   // sorted by BackendType priority (see BackendTypePriority). They always
538   // contain a fake low-priority kUnspecifiedBackend at the end.
539   std::list<RegisteredProducerBackend> producer_backends_;
540   std::list<RegisteredConsumerBackend> consumer_backends_;
541   std::vector<RegisteredInterceptor> interceptors_;
542   TracingPolicy* policy_ = nullptr;
543 
544   // Learn more at TracingInitArgs::supports_multiple_data_source_instances
545   bool supports_multiple_data_source_instances_ = true;
546 
547   std::atomic<TracingSessionGlobalID> next_tracing_session_id_{};
548   std::atomic<uint32_t> next_data_source_index_{};
549   uint32_t muxer_id_for_testing_{};
550 
551   // Maximum number of times we will try to reconnect producer backend.
552   // Should only be modified for testing purposes.
553   std::atomic<uint32_t> max_producer_reconnections_{100u};
554 
555   // Test only member.
556   // After ResetForTesting() is called, holds tracing backends which needs to be
557   // kept alive until all inbound references have gone away. See
558   // SweepDeadBackends().
559   std::list<RegisteredProducerBackend> dead_backends_;
560 
561   // Test only member.
562   // Executes these cleanup functions on the calling thread and on the muxer
563   // thread when ResetForTesting() is called.
564   std::list<std::function<void()>> reset_callbacks_;
565 
566   PERFETTO_THREAD_CHECKER(thread_checker_)
567 };
568 
569 }  // namespace internal
570 }  // namespace perfetto
571 
572 #endif  // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
573