• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
18 #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
19 
20 #include <stddef.h>
21 #include <stdint.h>
22 
23 #include <array>
24 #include <atomic>
25 #include <bitset>
26 #include <list>
27 #include <map>
28 #include <memory>
29 #include <vector>
30 
31 #include "perfetto/ext/base/scoped_file.h"
32 #include "perfetto/ext/base/thread_checker.h"
33 #include "perfetto/ext/tracing/core/basic_types.h"
34 #include "perfetto/ext/tracing/core/consumer.h"
35 #include "perfetto/ext/tracing/core/producer.h"
36 #include "perfetto/tracing/core/data_source_descriptor.h"
37 #include "perfetto/tracing/core/forward_decls.h"
38 #include "perfetto/tracing/core/trace_config.h"
39 #include "perfetto/tracing/internal/basic_types.h"
40 #include "perfetto/tracing/internal/tracing_muxer.h"
41 #include "perfetto/tracing/tracing.h"
42 
43 #include "protos/perfetto/common/interceptor_descriptor.gen.h"
44 
45 namespace perfetto {
46 
47 class ConsumerEndpoint;
48 class DataSourceBase;
49 class ProducerEndpoint;
50 class TraceWriterBase;
51 class TracingBackend;
52 class TracingSession;
53 struct TracingInitArgs;
54 
55 namespace base {
56 class TaskRunner;
57 }
58 
59 namespace internal {
60 
61 struct DataSourceStaticState;
62 
63 // This class acts as a bridge between the public API and the TracingBackend(s).
64 // It exposes a simplified view of the world to the API methods handling all the
65 // bookkeeping to map data source instances and trace writers to the various
66 // backends. It deals with N data sources, M backends (1 backend == 1 tracing
67 // service == 1 producer connection) and T concurrent tracing sessions.
68 //
69 // Handing data source registration and start/stop flows [producer side]:
70 // ----------------------------------------------------------------------
71 // 1. The API client subclasses perfetto::DataSource and calls
72 //    DataSource::Register<MyDataSource>(). In turn this calls into the
73 //    TracingMuxer.
74 // 2. The tracing muxer iterates through all the backends (1 backend == 1
75 //    service == 1 producer connection) and registers the data source on each
76 //    backend.
77 // 3. When any (services behind a) backend starts tracing and requests to start
78 //    that specific data source, the TracingMuxerImpl constructs a new instance
79 //    of MyDataSource and calls the OnStart() method.
80 //
81 // Controlling trace and retrieving trace data [consumer side]:
82 // ------------------------------------------------------------
83 // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession
84 //    object.
85 // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer
86 //    subclasses the TracingSession object (TracingSessionImpl) and returns it.
87 // 3. The tracing muxer identifies the backend (according to the args passed to
88 //    NewTrace), creates a new Consumer and connects to it.
89 // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the
90 //    TracingMuxer forwards them to the consumer associated to the
91 //    TracingSession. Likewise for callbacks coming from the consumer-side of
92 //    the service.
93 class TracingMuxerImpl : public TracingMuxer {
94  public:
95   // This is different than TracingSessionID because it's global across all
96   // backends. TracingSessionID is global only within the scope of one service.
97   using TracingSessionGlobalID = uint64_t;
98 
99   static void InitializeInstance(const TracingInitArgs&);
100   static void ResetForTesting();
101 
102   // TracingMuxer implementation.
103   bool RegisterDataSource(const DataSourceDescriptor&,
104                           DataSourceFactory,
105                           DataSourceStaticState*) override;
106   void UpdateDataSourceDescriptor(const DataSourceDescriptor&,
107                                   const DataSourceStaticState*) override;
108   std::unique_ptr<TraceWriterBase> CreateTraceWriter(
109       DataSourceStaticState*,
110       uint32_t data_source_instance_index,
111       DataSourceState*,
112       BufferExhaustedPolicy buffer_exhausted_policy) override;
113   void DestroyStoppedTraceWritersForCurrentThread() override;
114   void RegisterInterceptor(const InterceptorDescriptor&,
115                            InterceptorFactory,
116                            InterceptorBase::TLSFactory,
117                            InterceptorBase::TracePacketCallback) override;
118 
119   std::unique_ptr<TracingSession> CreateTracingSession(BackendType);
120 
121   // Producer-side bookkeeping methods.
122   void UpdateDataSourcesOnAllBackends();
123   void SetupDataSource(TracingBackendId,
124                        uint32_t backend_connection_id,
125                        DataSourceInstanceID,
126                        const DataSourceConfig&);
127   void StartDataSource(TracingBackendId, DataSourceInstanceID);
128   void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID);
129   void StopDataSource_AsyncEnd(TracingBackendId, DataSourceInstanceID);
130   void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID);
131   void SyncProducersForTesting();
132 
133   // Consumer-side bookkeeping methods.
134   void SetupTracingSession(TracingSessionGlobalID,
135                            const std::shared_ptr<TraceConfig>&,
136                            base::ScopedFile trace_fd = base::ScopedFile());
137   void StartTracingSession(TracingSessionGlobalID);
138   void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&);
139   void StopTracingSession(TracingSessionGlobalID);
140   void DestroyTracingSession(TracingSessionGlobalID);
141   void FlushTracingSession(TracingSessionGlobalID,
142                            uint32_t,
143                            std::function<void(bool)>);
144   void ReadTracingSessionData(
145       TracingSessionGlobalID,
146       std::function<void(TracingSession::ReadTraceCallbackArgs)>);
147   void GetTraceStats(TracingSessionGlobalID,
148                      TracingSession::GetTraceStatsCallback);
149   void QueryServiceState(TracingSessionGlobalID,
150                          TracingSession::QueryServiceStateCallback);
151 
152   // Sets the batching period to |batch_commits_duration_ms| on the backends
153   // with type |backend_type|.
154   void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,
155                                          BackendType backend_type);
156 
157   // Enables direct SMB patching on the backends with type |backend_type| (see
158   // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the
159   // operation succeeded for all backends with type |backend_type|, false
160   // otherwise.
161   bool EnableDirectSMBPatchingForTesting(BackendType backend_type);
162 
163   void SetMaxProducerReconnectionsForTesting(uint32_t count);
164 
165  private:
166   // For each TracingBackend we create and register one ProducerImpl instance.
167   // This talks to the producer-side of the service, gets start/stop requests
168   // from it and routes them to the registered data sources.
169   // One ProducerImpl == one backend == one tracing service.
170   // This class is needed to disambiguate callbacks coming from different
171   // services. TracingMuxerImpl can't directly implement the Producer interface
172   // because the Producer virtual methods don't allow to identify the service.
173   class ProducerImpl : public Producer {
174    public:
175     ProducerImpl(TracingMuxerImpl*,
176                  TracingBackendId,
177                  uint32_t shmem_batch_commits_duration_ms);
178     ~ProducerImpl() override;
179 
180     void Initialize(std::unique_ptr<ProducerEndpoint> endpoint);
181     void RegisterDataSource(const DataSourceDescriptor&,
182                             DataSourceFactory,
183                             DataSourceStaticState*);
184     void DisposeConnection();
185 
186     // perfetto::Producer implementation.
187     void OnConnect() override;
188     void OnDisconnect() override;
189     void OnTracingSetup() override;
190     void SetupDataSource(DataSourceInstanceID,
191                          const DataSourceConfig&) override;
192     void StartDataSource(DataSourceInstanceID,
193                          const DataSourceConfig&) override;
194     void StopDataSource(DataSourceInstanceID) override;
195     void Flush(FlushRequestID, const DataSourceInstanceID*, size_t) override;
196     void ClearIncrementalState(const DataSourceInstanceID*, size_t) override;
197 
198     bool SweepDeadServices();
199 
200     PERFETTO_THREAD_CHECKER(thread_checker_)
201     TracingMuxerImpl* muxer_;
202     TracingBackendId const backend_id_;
203     bool connected_ = false;
204     bool did_setup_tracing_ = false;
205     uint32_t connection_id_ = 0;
206 
207     const uint32_t shmem_batch_commits_duration_ms_ = 0;
208 
209     // Set of data sources that have been actually registered on this producer.
210     // This can be a subset of the global |data_sources_|, because data sources
211     // can register before the producer is fully connected.
212     std::bitset<kMaxDataSources> registered_data_sources_{};
213 
214     // A collection of disconnected service endpoints. Since trace writers on
215     // arbitrary threads might continue writing data to disconnected services,
216     // we keep the old services around and periodically try to clean up ones
217     // that no longer have any writers (see SweepDeadServices).
218     std::list<std::shared_ptr<ProducerEndpoint>> dead_services_;
219 
220     // The currently active service endpoint is maintained as an atomic shared
221     // pointer so it won't get deleted from underneath threads that are creating
222     // trace writers. At any given time one endpoint can be shared (and thus
223     // kept alive) by the |service_| pointer, an entry in |dead_services_| and
224     // as a pointer on the stack in CreateTraceWriter() (on an arbitrary
225     // thread). The endpoint is never shared outside ProducerImpl itself.
226     //
227     // WARNING: Any *write* access to this variable or any *read* access from a
228     // non-muxer thread must be done through std::atomic_{load,store} to avoid
229     // data races.
230     std::shared_ptr<ProducerEndpoint> service_;  // Keep last.
231   };
232 
233   // For each TracingSession created by the API client (Tracing::NewTrace() we
234   // create and register one ConsumerImpl instance.
235   // This talks to the consumer-side of the service, gets end-of-trace and
236   // on-trace-data callbacks and routes them to the API client callbacks.
237   // This class is needed to disambiguate callbacks coming from different
238   // tracing sessions.
239   class ConsumerImpl : public Consumer {
240    public:
241     ConsumerImpl(TracingMuxerImpl*,
242                  BackendType,
243                  TracingBackendId,
244                  TracingSessionGlobalID);
245     ~ConsumerImpl() override;
246 
247     void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint);
248 
249     // perfetto::Consumer implementation.
250     void OnConnect() override;
251     void OnDisconnect() override;
252     void OnTracingDisabled(const std::string& error) override;
253     void OnTraceData(std::vector<TracePacket>, bool has_more) override;
254     void OnDetach(bool success) override;
255     void OnAttach(bool success, const TraceConfig&) override;
256     void OnTraceStats(bool success, const TraceStats&) override;
257     void OnObservableEvents(const ObservableEvents&) override;
258 
259     void NotifyStartComplete();
260     void NotifyError(const TracingError&);
261     void NotifyStopComplete();
262 
263     // Will eventually inform the |muxer_| when it is safe to remove |this|.
264     void Disconnect();
265 
266     TracingMuxerImpl* muxer_;
267     BackendType const backend_type_;
268     TracingBackendId const backend_id_;
269     TracingSessionGlobalID const session_id_;
270     bool connected_ = false;
271 
272     // This is to handle the case where the Setup call from the API client
273     // arrives before the consumer has connected. In this case we keep around
274     // the config and check if we have it after connection.
275     bool start_pending_ = false;
276 
277     // Similarly if the session is stopped before the consumer was connected, we
278     // need to wait until the session has started before stopping it.
279     bool stop_pending_ = false;
280 
281     // Similarly we need to buffer a call to get trace statistics if the
282     // consumer wasn't connected yet.
283     bool get_trace_stats_pending_ = false;
284 
285     // Whether this session was already stopped. This will happen in response to
286     // Stop{,Blocking}, but also if the service stops the session for us
287     // automatically (e.g., when there are no data sources).
288     bool stopped_ = false;
289 
290     // shared_ptr because it's posted across threads. This is to avoid copying
291     // it more than once.
292     std::shared_ptr<TraceConfig> trace_config_;
293     base::ScopedFile trace_fd_;
294 
295     // If the API client passes a callback to start, we should invoke this when
296     // NotifyStartComplete() is invoked.
297     std::function<void()> start_complete_callback_;
298 
299     // An internal callback used to implement StartBlocking().
300     std::function<void()> blocking_start_complete_callback_;
301 
302     // If the API client passes a callback to get notification about the
303     // errors, we should invoke this when NotifyError() is invoked.
304     std::function<void(TracingError)> error_callback_;
305 
306     // If the API client passes a callback to stop, we should invoke this when
307     // OnTracingDisabled() is invoked.
308     std::function<void()> stop_complete_callback_;
309 
310     // An internal callback used to implement StopBlocking().
311     std::function<void()> blocking_stop_complete_callback_;
312 
313     // Callback passed to ReadTrace().
314     std::function<void(TracingSession::ReadTraceCallbackArgs)>
315         read_trace_callback_;
316 
317     // Callback passed to GetTraceStats().
318     TracingSession::GetTraceStatsCallback get_trace_stats_callback_;
319 
320     // Callback for a pending call to QueryServiceState().
321     TracingSession::QueryServiceStateCallback query_service_state_callback_;
322 
323     // The states of all data sources in this tracing session. |true| means the
324     // data source has started tracing.
325     using DataSourceHandle = std::pair<std::string, std::string>;
326     std::map<DataSourceHandle, bool> data_source_states_;
327 
328     std::unique_ptr<ConsumerEndpoint> service_;  // Keep before last.
329     PERFETTO_THREAD_CHECKER(thread_checker_)     // Keep last.
330   };
331 
332   // This object is returned to API clients when they call
333   // Tracing::CreateTracingSession().
334   class TracingSessionImpl : public TracingSession {
335    public:
336     TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType);
337     ~TracingSessionImpl() override;
338     void Setup(const TraceConfig&, int fd) override;
339     void Start() override;
340     void StartBlocking() override;
341     void SetOnStartCallback(std::function<void()>) override;
342     void SetOnErrorCallback(std::function<void(TracingError)>) override;
343     void Stop() override;
344     void StopBlocking() override;
345     void Flush(std::function<void(bool)>, uint32_t timeout_ms) override;
346     void ReadTrace(ReadTraceCallback) override;
347     void SetOnStopCallback(std::function<void()>) override;
348     void GetTraceStats(GetTraceStatsCallback) override;
349     void QueryServiceState(QueryServiceStateCallback) override;
350     void ChangeTraceConfig(const TraceConfig&) override;
351 
352    private:
353     TracingMuxerImpl* const muxer_;
354     TracingSessionGlobalID const session_id_;
355     BackendType const backend_type_;
356   };
357 
358   struct RegisteredDataSource {
359     DataSourceDescriptor descriptor;
360     DataSourceFactory factory{};
361     DataSourceStaticState* static_state = nullptr;
362   };
363 
364   struct RegisteredInterceptor {
365     protos::gen::InterceptorDescriptor descriptor;
366     InterceptorFactory factory{};
367     InterceptorBase::TLSFactory tls_factory{};
368     InterceptorBase::TracePacketCallback packet_callback{};
369   };
370 
371   struct RegisteredBackend {
372     // Backends are supposed to have static lifetime.
373     TracingBackend* backend = nullptr;
374     TracingBackendId id = 0;
375     BackendType type{};
376 
377     TracingBackend::ConnectProducerArgs producer_conn_args;
378     std::unique_ptr<ProducerImpl> producer;
379 
380     // The calling code can request more than one concurrently active tracing
381     // session for the same backend. We need to create one consumer per session.
382     std::vector<std::unique_ptr<ConsumerImpl>> consumers;
383   };
384 
385   void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
386                                      bool is_changed);
387   explicit TracingMuxerImpl(const TracingInitArgs&);
388   void Initialize(const TracingInitArgs& args);
389   ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id);
390   void InitializeConsumer(TracingSessionGlobalID session_id);
391   void OnConsumerDisconnected(ConsumerImpl* consumer);
392   void OnProducerDisconnected(ProducerImpl* producer);
393   void SweepDeadBackends();
394 
395   struct FindDataSourceRes {
396     FindDataSourceRes() = default;
FindDataSourceResFindDataSourceRes397     FindDataSourceRes(DataSourceStaticState* a, DataSourceState* b, uint32_t c)
398         : static_state(a), internal_state(b), instance_idx(c) {}
399     explicit operator bool() const { return !!internal_state; }
400 
401     DataSourceStaticState* static_state = nullptr;
402     DataSourceState* internal_state = nullptr;
403     uint32_t instance_idx = 0;
404   };
405   FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID);
406 
407   // WARNING: If you add new state here, be sure to update ResetForTesting.
408   std::unique_ptr<base::TaskRunner> task_runner_;
409   std::vector<RegisteredDataSource> data_sources_;
410   std::vector<RegisteredBackend> backends_;
411   std::vector<RegisteredInterceptor> interceptors_;
412   TracingPolicy* policy_ = nullptr;
413 
414   std::atomic<TracingSessionGlobalID> next_tracing_session_id_{};
415   std::atomic<uint32_t> next_data_source_index_{};
416   uint32_t muxer_id_for_testing_{};
417 
418   // Maximum number of times we will try to reconnect producer backend.
419   // Should only be modified for testing purposes.
420   std::atomic<uint32_t> max_producer_reconnections_{100u};
421 
422   // After ResetForTesting() is called, holds tracing backends which needs to be
423   // kept alive until all inbound references have gone away. See
424   // SweepDeadBackends().
425   std::list<RegisteredBackend> dead_backends_;
426 
427   PERFETTO_THREAD_CHECKER(thread_checker_)
428 };
429 
430 }  // namespace internal
431 }  // namespace perfetto
432 
433 #endif  // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_
434