• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
18 #define SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
19 
20 #include <algorithm>
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <mutex>
25 #include <optional>
26 #include <random>
27 #include <set>
28 #include <utility>
29 #include <vector>
30 
31 #include "perfetto/base/logging.h"
32 #include "perfetto/base/status.h"
33 #include "perfetto/base/time.h"
34 #include "perfetto/ext/base/circular_queue.h"
35 #include "perfetto/ext/base/periodic_task.h"
36 #include "perfetto/ext/base/uuid.h"
37 #include "perfetto/ext/base/weak_ptr.h"
38 #include "perfetto/ext/tracing/core/basic_types.h"
39 #include "perfetto/ext/tracing/core/commit_data_request.h"
40 #include "perfetto/ext/tracing/core/observable_events.h"
41 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
42 #include "perfetto/ext/tracing/core/trace_stats.h"
43 #include "perfetto/ext/tracing/core/tracing_service.h"
44 #include "perfetto/tracing/core/data_source_config.h"
45 #include "perfetto/tracing/core/data_source_descriptor.h"
46 #include "perfetto/tracing/core/forward_decls.h"
47 #include "perfetto/tracing/core/trace_config.h"
48 #include "src/android_stats/perfetto_atoms.h"
49 #include "src/tracing/core/id_allocator.h"
50 
51 namespace protozero {
52 class MessageFilter;
53 }
54 
55 namespace perfetto {
56 
57 namespace base {
58 class TaskRunner;
59 }  // namespace base
60 
61 namespace protos {
62 namespace gen {
63 enum TraceStats_FinalFlushOutcome : int;
64 }
65 }  // namespace protos
66 
67 class Consumer;
68 class Producer;
69 class SharedMemory;
70 class SharedMemoryArbiterImpl;
71 class TraceBuffer;
72 class TracePacket;
73 
74 // The tracing service business logic.
75 class TracingServiceImpl : public TracingService {
76  private:
77   struct DataSourceInstance;
78 
79  public:
80   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
81   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
82   static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
83                                             0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
84                                             0x6d, 0x57, 0xa0, 0x79};
85   static constexpr size_t kMaxTracePacketSliceSize =
86       128 * 1024 - 512;  // This is ipc::kIPCBufferSize - 512, see assertion in
87                          // tracing_integration_test.cc and b/195065199
88 
89   // This is a rough threshold to determine how many bytes to read from the
90   // buffers on each iteration when writing into a file. Since filtering and
91   // compression allocate memory, this effectively limits the amount of memory
92   // allocated.
93   static constexpr size_t kWriteIntoFileChunkSize = 1024 * 1024ul;
94 
95   // The implementation behind the service endpoint exposed to each producer.
96   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
97    public:
98     ProducerEndpointImpl(ProducerID,
99                          uid_t uid,
100                          pid_t pid,
101                          TracingServiceImpl*,
102                          base::TaskRunner*,
103                          Producer*,
104                          const std::string& producer_name,
105                          const std::string& sdk_version,
106                          bool in_process,
107                          bool smb_scraping_enabled);
108     ~ProducerEndpointImpl() override;
109 
110     // TracingService::ProducerEndpoint implementation.
111     void Disconnect() override;
112     void RegisterDataSource(const DataSourceDescriptor&) override;
113     void UpdateDataSource(const DataSourceDescriptor&) override;
114     void UnregisterDataSource(const std::string& name) override;
115     void RegisterTraceWriter(uint32_t writer_id,
116                              uint32_t target_buffer) override;
117     void UnregisterTraceWriter(uint32_t writer_id) override;
118     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
119     void SetupSharedMemory(std::unique_ptr<SharedMemory>,
120                            size_t page_size_bytes,
121                            bool provided_by_producer);
122     std::unique_ptr<TraceWriter> CreateTraceWriter(
123         BufferID,
124         BufferExhaustedPolicy) override;
125     SharedMemoryArbiter* MaybeSharedMemoryArbiter() override;
126     bool IsShmemProvidedByProducer() const override;
127     void NotifyFlushComplete(FlushRequestID) override;
128     void NotifyDataSourceStarted(DataSourceInstanceID) override;
129     void NotifyDataSourceStopped(DataSourceInstanceID) override;
130     SharedMemory* shared_memory() const override;
131     size_t shared_buffer_page_size_kb() const override;
132     void ActivateTriggers(const std::vector<std::string>&) override;
133     void Sync(std::function<void()> callback) override;
134 
135     void OnTracingSetup();
136     void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
137     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
138     void StopDataSource(DataSourceInstanceID);
139     void Flush(FlushRequestID, const std::vector<DataSourceInstanceID>&);
140     void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
141     void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
142 
is_allowed_target_buffer(BufferID buffer_id)143     bool is_allowed_target_buffer(BufferID buffer_id) const {
144       return allowed_target_buffers_.count(buffer_id);
145     }
146 
buffer_id_for_writer(WriterID writer_id)147     std::optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
148       const auto it = writers_.find(writer_id);
149       if (it != writers_.end())
150         return it->second;
151       return std::nullopt;
152     }
153 
uid()154     uid_t uid() const { return uid_; }
pid()155     pid_t pid() const { return pid_; }
156 
157    private:
158     friend class TracingServiceImpl;
159     friend class TracingServiceImplTest;
160     friend class TracingIntegrationTest;
161     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
162     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
163 
164     ProducerID const id_;
165     const uid_t uid_;
166     const pid_t pid_;
167     TracingServiceImpl* const service_;
168     base::TaskRunner* const task_runner_;
169     Producer* producer_;
170     std::unique_ptr<SharedMemory> shared_memory_;
171     size_t shared_buffer_page_size_kb_ = 0;
172     SharedMemoryABI shmem_abi_;
173     size_t shmem_size_hint_bytes_ = 0;
174     size_t shmem_page_size_hint_bytes_ = 0;
175     bool is_shmem_provided_by_producer_ = false;
176     const std::string name_;
177     std::string sdk_version_;
178     bool in_process_;
179     bool smb_scraping_enabled_;
180 
181     // Set of the global target_buffer IDs that the producer is configured to
182     // write into in any active tracing session.
183     std::set<BufferID> allowed_target_buffers_;
184 
185     // Maps registered TraceWriter IDs to their target buffers as registered by
186     // the producer. Note that producers aren't required to register their
187     // writers, so we may see commits of chunks with WriterIDs that aren't
188     // contained in this map. However, if a producer does register a writer, the
189     // service will prevent the writer from writing into any other buffer than
190     // the one associated with it here. The BufferIDs stored in this map are
191     // untrusted, so need to be verified against |allowed_target_buffers_|
192     // before use.
193     std::map<WriterID, BufferID> writers_;
194 
195     // This is used only in in-process configurations.
196     // SharedMemoryArbiterImpl methods themselves are thread-safe.
197     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
198 
199     PERFETTO_THREAD_CHECKER(thread_checker_)
200     base::WeakPtrFactory<ProducerEndpointImpl> weak_ptr_factory_;  // Keep last.
201   };
202 
203   // The implementation behind the service endpoint exposed to each consumer.
204   class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
205    public:
206     ConsumerEndpointImpl(TracingServiceImpl*,
207                          base::TaskRunner*,
208                          Consumer*,
209                          uid_t uid);
210     ~ConsumerEndpointImpl() override;
211 
212     void NotifyOnTracingDisabled(const std::string& error);
213     void NotifyCloneSnapshotTrigger();
214 
215     // TracingService::ConsumerEndpoint implementation.
216     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
217     void ChangeTraceConfig(const TraceConfig& cfg) override;
218     void StartTracing() override;
219     void DisableTracing() override;
220     void ReadBuffers() override;
221     void FreeBuffers() override;
222     void Flush(uint32_t timeout_ms, FlushCallback) override;
223     void Detach(const std::string& key) override;
224     void Attach(const std::string& key) override;
225     void GetTraceStats() override;
226     void ObserveEvents(uint32_t enabled_event_types) override;
227     void QueryServiceState(QueryServiceStateCallback) override;
228     void QueryCapabilities(QueryCapabilitiesCallback) override;
229     void SaveTraceForBugreport(SaveTraceForBugreportCallback) override;
230     void CloneSession(TracingSessionID) override;
231 
232     // Will queue a task to notify the consumer about the state change.
233     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
234                                          const DataSourceInstance&);
235     void OnAllDataSourcesStarted();
236 
GetWeakPtr()237     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr() {
238       return weak_ptr_factory_.GetWeakPtr();
239     }
240 
241    private:
242     friend class TracingServiceImpl;
243     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
244     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
245 
246     // Returns a pointer to an ObservableEvents object that the caller can fill
247     // and schedules a task to send the ObservableEvents to the consumer.
248     ObservableEvents* AddObservableEvents();
249 
250     base::TaskRunner* const task_runner_;
251     TracingServiceImpl* const service_;
252     Consumer* const consumer_;
253     uid_t const uid_;
254     TracingSessionID tracing_session_id_ = 0;
255 
256     // Whether the consumer is interested in DataSourceInstance state change
257     // events.
258     uint32_t observable_events_mask_ = 0;
259 
260     // ObservableEvents that will be sent to the consumer. If set, a task to
261     // flush the events to the consumer has been queued.
262     std::unique_ptr<ObservableEvents> observable_events_;
263 
264     PERFETTO_THREAD_CHECKER(thread_checker_)
265     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
266   };
267 
268   explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
269                               base::TaskRunner*,
270                               InitOpts = {});
271   ~TracingServiceImpl() override;
272 
273   // Called by ProducerEndpointImpl.
274   void DisconnectProducer(ProducerID);
275   void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
276   void UpdateDataSource(ProducerID, const DataSourceDescriptor&);
277   void UnregisterDataSource(ProducerID, const std::string& name);
278   void CopyProducerPageIntoLogBuffer(ProducerID,
279                                      uid_t,
280                                      pid_t,
281                                      WriterID,
282                                      ChunkID,
283                                      BufferID,
284                                      uint16_t num_fragments,
285                                      uint8_t chunk_flags,
286                                      bool chunk_complete,
287                                      const uint8_t* src,
288                                      size_t size);
289   void ApplyChunkPatches(ProducerID,
290                          const std::vector<CommitDataRequest::ChunkToPatch>&);
291   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
292   void NotifyDataSourceStarted(ProducerID, const DataSourceInstanceID);
293   void NotifyDataSourceStopped(ProducerID, const DataSourceInstanceID);
294   void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
295 
296   // Called by ConsumerEndpointImpl.
297   bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
298   bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
299   void DisconnectConsumer(ConsumerEndpointImpl*);
300   base::Status EnableTracing(ConsumerEndpointImpl*,
301                              const TraceConfig&,
302                              base::ScopedFile);
303   void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
304 
305   base::Status StartTracing(TracingSessionID);
306   void DisableTracing(TracingSessionID, bool disable_immediately = false);
307   void Flush(TracingSessionID tsid,
308              uint32_t timeout_ms,
309              ConsumerEndpoint::FlushCallback);
310   void FlushAndDisableTracing(TracingSessionID);
311   void FlushAndCloneSession(ConsumerEndpointImpl*, TracingSessionID);
312 
313   // Starts reading the internal tracing buffers from the tracing session `tsid`
314   // and sends them to `*consumer` (which must be != nullptr).
315   //
316   // Only reads a limited amount of data in one call. If there's more data,
317   // immediately schedules itself on a PostTask.
318   //
319   // Returns false in case of error.
320   bool ReadBuffersIntoConsumer(TracingSessionID tsid,
321                                ConsumerEndpointImpl* consumer);
322 
323   // Reads all the tracing buffers from the tracing session `tsid` and writes
324   // them into the associated file.
325   //
326   // Reads all the data in the buffers (or until the file is full) before
327   // returning.
328   //
329   // If the tracing session write_period_ms is 0, the file is full or there has
330   // been an error, flushes the file and closes it. Otherwise, schedules itself
331   // to be executed after write_period_ms.
332   //
333   // Returns false in case of error.
334   bool ReadBuffersIntoFile(TracingSessionID);
335 
336   void FreeBuffers(TracingSessionID);
337 
338   // Service implementation.
339   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
340       Producer*,
341       uid_t uid,
342       pid_t pid,
343       const std::string& producer_name,
344       size_t shared_memory_size_hint_bytes = 0,
345       bool in_process = false,
346       ProducerSMBScrapingMode smb_scraping_mode =
347           ProducerSMBScrapingMode::kDefault,
348       size_t shared_memory_page_size_hint_bytes = 0,
349       std::unique_ptr<SharedMemory> shm = nullptr,
350       const std::string& sdk_version = {}) override;
351 
352   std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
353       Consumer*,
354       uid_t) override;
355 
356   // Set whether SMB scraping should be enabled by default or not. Producers can
357   // override this setting for their own SMBs.
SetSMBScrapingEnabled(bool enabled)358   void SetSMBScrapingEnabled(bool enabled) override {
359     smb_scraping_enabled_ = enabled;
360   }
361 
362   // Exposed mainly for testing.
num_producers()363   size_t num_producers() const { return producers_.size(); }
364   ProducerEndpointImpl* GetProducer(ProducerID) const;
365 
366  private:
367   friend class TracingServiceImplTest;
368   friend class TracingIntegrationTest;
369 
370   static constexpr int64_t kOneDayInNs = 24ll * 60 * 60 * 1000 * 1000 * 1000;
371 
372   struct TriggerHistory {
373     int64_t timestamp_ns;
374     uint64_t name_hash;
375 
376     bool operator<(const TriggerHistory& other) const {
377       return timestamp_ns < other.timestamp_ns;
378     }
379   };
380 
381   struct RegisteredDataSource {
382     ProducerID producer_id;
383     DataSourceDescriptor descriptor;
384   };
385 
386   // Represents an active data source for a tracing session.
387   struct DataSourceInstance {
DataSourceInstanceDataSourceInstance388     DataSourceInstance(DataSourceInstanceID id,
389                        const DataSourceConfig& cfg,
390                        const std::string& ds_name,
391                        bool notify_on_start,
392                        bool notify_on_stop,
393                        bool handles_incremental_state_invalidation)
394         : instance_id(id),
395           config(cfg),
396           data_source_name(ds_name),
397           will_notify_on_start(notify_on_start),
398           will_notify_on_stop(notify_on_stop),
399           handles_incremental_state_clear(
400               handles_incremental_state_invalidation) {}
401     DataSourceInstance(const DataSourceInstance&) = delete;
402     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
403 
404     DataSourceInstanceID instance_id;
405     DataSourceConfig config;
406     std::string data_source_name;
407     bool will_notify_on_start;
408     bool will_notify_on_stop;
409     bool handles_incremental_state_clear;
410 
411     enum DataSourceInstanceState {
412       CONFIGURED,
413       STARTING,
414       STARTED,
415       STOPPING,
416       STOPPED
417     };
418     DataSourceInstanceState state = CONFIGURED;
419   };
420 
421   struct PendingFlush {
422     std::set<ProducerID> producers;
423     ConsumerEndpoint::FlushCallback callback;
PendingFlushPendingFlush424     explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
425   };
426 
427   // Holds the state of a tracing session. A tracing session is uniquely bound
428   // a specific Consumer. Each Consumer can own one or more sessions.
429   struct TracingSession {
430     enum State {
431       DISABLED = 0,
432       CONFIGURED,
433       STARTED,
434       DISABLING_WAITING_STOP_ACKS,
435       CLONED_READ_ONLY,
436     };
437 
438     TracingSession(TracingSessionID,
439                    ConsumerEndpointImpl*,
440                    const TraceConfig&,
441                    base::TaskRunner*);
442     TracingSession(TracingSession&&) = delete;
443     TracingSession& operator=(TracingSession&&) = delete;
444 
num_buffersTracingSession445     size_t num_buffers() const { return buffers_index.size(); }
446 
delay_to_next_write_period_msTracingSession447     uint32_t delay_to_next_write_period_ms() const {
448       PERFETTO_DCHECK(write_period_ms > 0);
449       return write_period_ms -
450              static_cast<uint32_t>(base::GetWallTimeMs().count() %
451                                    write_period_ms);
452     }
453 
flush_timeout_msTracingSession454     uint32_t flush_timeout_ms() {
455       uint32_t timeout_ms = config.flush_timeout_ms();
456       return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
457     }
458 
data_source_stop_timeout_msTracingSession459     uint32_t data_source_stop_timeout_ms() {
460       uint32_t timeout_ms = config.data_source_stop_timeout_ms();
461       return timeout_ms ? timeout_ms : kDataSourceStopTimeoutMs;
462     }
463 
GetPacketSequenceIDTracingSession464     PacketSequenceID GetPacketSequenceID(ProducerID producer_id,
465                                          WriterID writer_id) {
466       auto key = std::make_pair(producer_id, writer_id);
467       auto it = packet_sequence_ids.find(key);
468       if (it != packet_sequence_ids.end())
469         return it->second;
470       // We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
471       // are limited to 1024).
472       static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
473                     "PacketSequenceID value space doesn't cover service "
474                     "sequence ID and all producer/writer ID combinations!");
475       PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
476       PacketSequenceID sequence_id = ++last_packet_sequence_id;
477       packet_sequence_ids[key] = sequence_id;
478       return sequence_id;
479     }
480 
GetDataSourceInstanceTracingSession481     DataSourceInstance* GetDataSourceInstance(
482         ProducerID producer_id,
483         DataSourceInstanceID instance_id) {
484       for (auto& inst_kv : data_source_instances) {
485         if (inst_kv.first != producer_id ||
486             inst_kv.second.instance_id != instance_id) {
487           continue;
488         }
489         return &inst_kv.second;
490       }
491       return nullptr;
492     }
493 
AllDataSourceInstancesStartedTracingSession494     bool AllDataSourceInstancesStarted() {
495       return std::all_of(
496           data_source_instances.begin(), data_source_instances.end(),
497           [](decltype(data_source_instances)::const_reference x) {
498             return x.second.state == DataSourceInstance::STARTED;
499           });
500     }
501 
AllDataSourceInstancesStoppedTracingSession502     bool AllDataSourceInstancesStopped() {
503       return std::all_of(
504           data_source_instances.begin(), data_source_instances.end(),
505           [](decltype(data_source_instances)::const_reference x) {
506             return x.second.state == DataSourceInstance::STOPPED;
507           });
508     }
509 
510     const TracingSessionID id;
511 
512     // The consumer that started the session.
513     // Can be nullptr if the consumer detached from the session.
514     ConsumerEndpointImpl* consumer_maybe_null;
515 
516     // Unix uid of the consumer. This is valid even after the consumer detaches
517     // and does not change for the entire duration of the session. It is used to
518     // prevent that a consumer re-attaches to a session from a different uid.
519     uid_t const consumer_uid;
520 
521     // The list of triggers this session received while alive and the time they
522     // were received at. This is used to insert 'fake' packets back to the
523     // consumer so they can tell when some event happened. The order matches the
524     // order they were received.
525     struct TriggerInfo {
526       uint64_t boot_time_ns;
527       std::string trigger_name;
528       std::string producer_name;
529       uid_t producer_uid;
530     };
531     std::vector<TriggerInfo> received_triggers;
532 
533     // The trace config provided by the Consumer when calling
534     // EnableTracing(), plus any updates performed by ChangeTraceConfig.
535     TraceConfig config;
536 
537     // List of data source instances that have been enabled on the various
538     // producers for this tracing session.
539     // TODO(rsavitski): at the time of writing, the map structure is unused
540     // (even when the calling code has a key). This is also an opportunity to
541     // consider an alternative data type, e.g. a map of vectors.
542     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
543 
544     // For each Flush(N) request, keeps track of the set of producers for which
545     // we are still awaiting a NotifyFlushComplete(N) ack.
546     std::map<FlushRequestID, PendingFlush> pending_flushes;
547 
548     // Maps a per-trace-session buffer index into the corresponding global
549     // BufferID (shared namespace amongst all consumers). This vector has as
550     // many entries as |config.buffers_size()|.
551     std::vector<BufferID> buffers_index;
552 
553     std::map<std::pair<ProducerID, WriterID>, PacketSequenceID>
554         packet_sequence_ids;
555     PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
556 
557     // Whether we should emit the trace stats next time we reach EOF while
558     // performing ReadBuffers.
559     bool should_emit_stats = false;
560 
561     // Whether we should emit the sync marker the next time ReadBuffers() is
562     // called.
563     bool should_emit_sync_marker = false;
564 
565     // Whether we mirrored the trace config back to the trace output yet.
566     bool did_emit_config = false;
567 
568     // Whether we put the system info into the trace output yet.
569     bool did_emit_system_info = false;
570 
571     // Whether we should compress TracePackets after reading them.
572     bool compress_deflate = false;
573 
574     // The number of received triggers we've emitted into the trace output.
575     size_t num_triggers_emitted_into_trace = 0;
576 
577     // Packets that failed validation of the TrustedPacket.
578     uint64_t invalid_packets = 0;
579 
580     // Flush() stats. See comments in trace_stats.proto for more.
581     uint64_t flushes_requested = 0;
582     uint64_t flushes_succeeded = 0;
583     uint64_t flushes_failed = 0;
584 
585     // Outcome of the final Flush() done by FlushAndDisableTracing().
586     protos::gen::TraceStats_FinalFlushOutcome final_flush_outcome{};
587 
588     // Set to true on the first call to MaybeNotifyAllDataSourcesStarted().
589     bool did_notify_all_data_source_started = false;
590 
591     // Stores all lifecycle events of a particular type (i.e. associated with a
592     // single field id in the TracingServiceEvent proto).
593     struct LifecycleEvent {
594       LifecycleEvent(uint32_t f_id, uint32_t m_size = 1)
field_idTracingSession::LifecycleEvent595           : field_id(f_id), max_size(m_size), timestamps(m_size) {}
596 
597       // The field id of the event in the TracingServiceEvent proto.
598       uint32_t field_id;
599 
600       // Stores the max size of |timestamps|. Set to 1 by default (in
601       // the constructor) but can be overriden in TraceSession constructor
602       // if a larger size is required.
603       uint32_t max_size;
604 
605       // Stores the timestamps emitted for each event type (in nanoseconds).
606       // Emitted into the trace and cleared when the consumer next calls
607       // ReadBuffers.
608       base::CircularQueue<int64_t> timestamps;
609     };
610     std::vector<LifecycleEvent> lifecycle_events;
611 
612     using ClockSnapshotData =
613         std::vector<std::pair<uint32_t /*clock_id*/, uint64_t /*ts*/>>;
614 
615     // Initial clock snapshot, captured at trace start time (when state goes to
616     // TracingSession::STARTED). Emitted into the trace when the consumer first
617     // calls ReadBuffers().
618     ClockSnapshotData initial_clock_snapshot;
619 
620     // Stores clock snapshots to emit into the trace as a ring buffer. This
621     // buffer is populated both periodically and when lifecycle events happen
622     // but only when significant clock drift is detected. Emitted into the trace
623     // and cleared when the consumer next calls ReadBuffers().
624     base::CircularQueue<ClockSnapshotData> clock_snapshot_ring_buffer;
625 
626     State state = DISABLED;
627 
628     // If the consumer detached the session, this variable defines the key used
629     // for identifying the session later when reattaching.
630     std::string detach_key;
631 
632     // This is set when the Consumer calls sets |write_into_file| == true in the
633     // TraceConfig. In this case this represents the file we should stream the
634     // trace packets into, rather than returning it to the consumer via
635     // OnTraceData().
636     base::ScopedFile write_into_file;
637     uint32_t write_period_ms = 0;
638     uint64_t max_file_size_bytes = 0;
639     uint64_t bytes_written_into_file = 0;
640 
641     // Periodic task for snapshotting service events (e.g. clocks, sync markers
642     // etc)
643     base::PeriodicTask snapshot_periodic_task;
644 
645     // Deferred task that stops the trace when |duration_ms| expires. This is
646     // to handle the case of |prefer_suspend_clock_for_duration| which cannot
647     // use PostDelayedTask.
648     base::PeriodicTask timed_stop_task;
649 
650     // When non-NULL the packets should be post-processed using the filter.
651     std::unique_ptr<protozero::MessageFilter> trace_filter;
652     uint64_t filter_input_packets = 0;
653     uint64_t filter_input_bytes = 0;
654     uint64_t filter_output_bytes = 0;
655     uint64_t filter_errors = 0;
656     uint64_t filter_time_taken_ns = 0;
657 
658     // A randomly generated trace identifier. Note that this does NOT always
659     // match the requested TraceConfig.trace_uuid_msb/lsb. Spcifically, it does
660     // until a gap-less snapshot is requested. Each snapshot re-generates the
661     // uuid to avoid emitting two different traces with the same uuid.
662     base::Uuid trace_uuid;
663 
664     // NOTE: when adding new fields here consider whether that state should be
665     // copied over in DoCloneSession() or not. Ask yourself: is this a
666     // "runtime state" (e.g. active data sources) or a "trace (meta)data state"?
667     // If the latter, it should be handled by DoCloneSession()).
668   };
669 
670   TracingServiceImpl(const TracingServiceImpl&) = delete;
671   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
672 
673   DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
674                                       const TraceConfig::ProducerConfig&,
675                                       const RegisteredDataSource&,
676                                       TracingSession*);
677 
678   // Returns the next available ProducerID that is not in |producers_|.
679   ProducerID GetNextProducerID();
680 
681   // Returns a pointer to the |tracing_sessions_| entry or nullptr if the
682   // session doesn't exists.
683   TracingSession* GetTracingSession(TracingSessionID);
684 
685   // Returns a pointer to the tracing session that has the highest
686   // TraceConfig.bugreport_score, if any, or nullptr.
687   TracingSession* FindTracingSessionWithMaxBugreportScore();
688 
689   // Returns a pointer to the |tracing_sessions_| entry, matching the given
690   // uid and detach key, or nullptr if no such session exists.
691   TracingSession* GetDetachedSession(uid_t, const std::string& key);
692 
693   // Update the memory guard rail by using the latest information from the
694   // shared memory and trace buffers.
695   void UpdateMemoryGuardrail();
696 
697   void StartDataSourceInstance(ProducerEndpointImpl*,
698                                TracingSession*,
699                                DataSourceInstance*);
700   void StopDataSourceInstance(ProducerEndpointImpl*,
701                               TracingSession*,
702                               DataSourceInstance*,
703                               bool disable_immediately);
704   void PeriodicSnapshotTask(TracingSessionID);
705   void MaybeSnapshotClocksIntoRingBuffer(TracingSession*);
706   bool SnapshotClocks(TracingSession::ClockSnapshotData*);
707   void SnapshotLifecyleEvent(TracingSession*,
708                              uint32_t field_id,
709                              bool snapshot_clocks);
710   void EmitClockSnapshot(TracingSession*,
711                          TracingSession::ClockSnapshotData,
712                          std::vector<TracePacket>*);
713   void EmitSyncMarker(std::vector<TracePacket>*);
714   void EmitStats(TracingSession*, std::vector<TracePacket>*);
715   TraceStats GetTraceStats(TracingSession*);
716   void EmitLifecycleEvents(TracingSession*, std::vector<TracePacket>*);
717   void MaybeEmitUuidAndTraceConfig(TracingSession*, std::vector<TracePacket>*);
718   void MaybeEmitSystemInfo(TracingSession*, std::vector<TracePacket>*);
719   void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
720   void MaybeNotifyAllDataSourcesStarted(TracingSession*);
721   void OnFlushTimeout(TracingSessionID, FlushRequestID);
722   void OnDisableTracingTimeout(TracingSessionID);
723   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
724   void PeriodicFlushTask(TracingSessionID, bool post_next_only);
725   void CompleteFlush(TracingSessionID tsid,
726                      ConsumerEndpoint::FlushCallback callback,
727                      bool success);
728   void ScrapeSharedMemoryBuffers(TracingSession*, ProducerEndpointImpl*);
729   void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
730   TraceBuffer* GetBufferByID(BufferID);
731   base::Status DoCloneSession(ConsumerEndpointImpl*,
732                               TracingSessionID,
733                               bool final_flush_outcome,
734                               base::Uuid*);
735 
736   // Returns true if `*tracing_session` is waiting for a trigger that hasn't
737   // happened.
738   static bool IsWaitingForTrigger(TracingSession* tracing_session);
739 
740   // Reads the buffers from `*tracing_session` and returns them (along with some
741   // metadata packets).
742   //
743   // The function stops when the cumulative size of the return packets exceeds
744   // `threshold` (so it's not a strict upper bound) and sets `*has_more` to
745   // true, or when there are no more packets (and sets `*has_more` to false).
746   std::vector<TracePacket> ReadBuffers(TracingSession* tracing_session,
747                                        size_t threshold,
748                                        bool* has_more);
749 
750   // If `*tracing_session` has a filter, applies it to `*packets`. Doesn't
751   // change the number of `*packets`, only their content.
752   void MaybeFilterPackets(TracingSession* tracing_session,
753                           std::vector<TracePacket>* packets);
754 
755   // If `*tracing_session` has compression enabled, compress `*packets`.
756   void MaybeCompressPackets(TracingSession* tracing_session,
757                             std::vector<TracePacket>* packets);
758 
759   // If `*tracing_session` is configured to write into a file, writes `packets`
760   // into the file.
761   //
762   // Returns true if the file should be closed (because it's full or there has
763   // been an error), false otherwise.
764   bool WriteIntoFile(TracingSession* tracing_session,
765                      std::vector<TracePacket> packets);
766   void OnStartTriggersTimeout(TracingSessionID tsid);
767   void MaybeLogUploadEvent(const TraceConfig&,
768                            const base::Uuid&,
769                            PerfettoStatsdAtom atom,
770                            const std::string& trigger_name = "");
771   void MaybeLogTriggerEvent(const TraceConfig&,
772                             PerfettoTriggerAtom atom,
773                             const std::string& trigger_name);
774   size_t PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,
775                                              uint64_t trigger_name_hash);
776   static void StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl>,
777                                      TracingSessionID);
778 
779   base::TaskRunner* const task_runner_;
780   const InitOpts init_opts_;
781   std::unique_ptr<SharedMemory::Factory> shm_factory_;
782   ProducerID last_producer_id_ = 0;
783   DataSourceInstanceID last_data_source_instance_id_ = 0;
784   TracingSessionID last_tracing_session_id_ = 0;
785   FlushRequestID last_flush_request_id_ = 0;
786   uid_t uid_ = 0;
787 
788   // Buffer IDs are global across all consumers (because a Producer can produce
789   // data for more than one trace session, hence more than one consumer).
790   IdAllocator<BufferID> buffer_ids_;
791 
792   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
793   std::map<ProducerID, ProducerEndpointImpl*> producers_;
794   std::set<ConsumerEndpointImpl*> consumers_;
795   std::map<TracingSessionID, TracingSession> tracing_sessions_;
796   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
797   std::map<std::string, int64_t> session_to_last_trace_s_;
798 
799   // Contains timestamps of triggers.
800   // The queue is sorted by timestamp and invocations older than
801   // |trigger_window_ns_| are purged when a trigger happens.
802   base::CircularQueue<TriggerHistory> trigger_history_;
803 
804   bool smb_scraping_enabled_ = false;
805   bool lockdown_mode_ = false;
806   uint32_t min_write_period_ms_ = 100;       // Overridable for testing.
807   int64_t trigger_window_ns_ = kOneDayInNs;  // Overridable for testing.
808 
809   std::minstd_rand trigger_probability_rand_;
810   std::uniform_real_distribution<> trigger_probability_dist_;
811   double trigger_rnd_override_for_testing_ = 0;  // Overridable for testing.
812 
813   uint8_t sync_marker_packet_[32];  // Lazily initialized.
814   size_t sync_marker_packet_size_ = 0;
815 
816   // Stats.
817   uint64_t chunks_discarded_ = 0;
818   uint64_t patches_discarded_ = 0;
819 
820   PERFETTO_THREAD_CHECKER(thread_checker_)
821 
822   base::WeakPtrFactory<TracingServiceImpl>
823       weak_ptr_factory_;  // Keep at the end.
824 };
825 
826 }  // namespace perfetto
827 
828 #endif  // SRC_TRACING_CORE_TRACING_SERVICE_IMPL_H_
829