• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
18 #define SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
19 
20 #include <algorithm>
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <optional>
25 #include <set>
26 #include <utility>
27 #include <vector>
28 
29 #include "perfetto/base/logging.h"
30 #include "perfetto/base/status.h"
31 #include "perfetto/base/time.h"
32 #include "perfetto/ext/base/circular_queue.h"
33 #include "perfetto/ext/base/clock_snapshots.h"
34 #include "perfetto/ext/base/periodic_task.h"
35 #include "perfetto/ext/base/uuid.h"
36 #include "perfetto/ext/base/weak_ptr.h"
37 #include "perfetto/ext/base/weak_runner.h"
38 #include "perfetto/ext/tracing/core/basic_types.h"
39 #include "perfetto/ext/tracing/core/client_identity.h"
40 #include "perfetto/ext/tracing/core/commit_data_request.h"
41 #include "perfetto/ext/tracing/core/observable_events.h"
42 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
43 #include "perfetto/ext/tracing/core/trace_stats.h"
44 #include "perfetto/ext/tracing/core/tracing_service.h"
45 #include "perfetto/tracing/core/data_source_config.h"
46 #include "perfetto/tracing/core/data_source_descriptor.h"
47 #include "perfetto/tracing/core/forward_decls.h"
48 #include "perfetto/tracing/core/trace_config.h"
49 #include "src/android_stats/perfetto_atoms.h"
50 #include "src/tracing/core/id_allocator.h"
51 #include "src/tracing/service/clock.h"
52 #include "src/tracing/service/dependencies.h"
53 #include "src/tracing/service/random.h"
54 
55 namespace protozero {
56 class MessageFilter;
57 }
58 
59 namespace perfetto {
60 
61 namespace protos {
62 namespace gen {
63 enum TraceStats_FinalFlushOutcome : int;
64 }
65 }  // namespace protos
66 
67 class Consumer;
68 class Producer;
69 class SharedMemory;
70 class SharedMemoryArbiterImpl;
71 class TraceBuffer;
72 class TracePacket;
73 
74 // The tracing service business logic.
75 class TracingServiceImpl : public TracingService {
76  private:
77   struct DataSourceInstance;
78   struct TriggerInfo;
79 
80  public:
81   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
82   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
83   static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
84                                             0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
85                                             0x6d, 0x57, 0xa0, 0x79};
86   static constexpr size_t kMaxTracePacketSliceSize =
87       128 * 1024 - 512;  // This is ipc::kIPCBufferSize - 512, see assertion in
88                          // tracing_integration_test.cc and b/195065199
89 
90   // This is a rough threshold to determine how many bytes to read from the
91   // buffers on each iteration when writing into a file. Since filtering and
92   // compression allocate memory, this effectively limits the amount of memory
93   // allocated.
94   static constexpr size_t kWriteIntoFileChunkSize = 1024 * 1024ul;
95 
96   // The implementation behind the service endpoint exposed to each producer.
97   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
98    public:
99     ProducerEndpointImpl(ProducerID,
100                          const ClientIdentity& client_identity,
101                          TracingServiceImpl*,
102                          base::TaskRunner*,
103                          Producer*,
104                          const std::string& producer_name,
105                          const std::string& sdk_version,
106                          bool in_process,
107                          bool smb_scraping_enabled);
108     ~ProducerEndpointImpl() override;
109 
110     // TracingService::ProducerEndpoint implementation.
111     void Disconnect() override;
112     void RegisterDataSource(const DataSourceDescriptor&) override;
113     void UpdateDataSource(const DataSourceDescriptor&) override;
114     void UnregisterDataSource(const std::string& name) override;
115     void RegisterTraceWriter(uint32_t writer_id,
116                              uint32_t target_buffer) override;
117     void UnregisterTraceWriter(uint32_t writer_id) override;
118     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
119     void SetupSharedMemory(std::unique_ptr<SharedMemory>,
120                            size_t page_size_bytes,
121                            bool provided_by_producer);
122     std::unique_ptr<TraceWriter> CreateTraceWriter(
123         BufferID,
124         BufferExhaustedPolicy) override;
125     SharedMemoryArbiter* MaybeSharedMemoryArbiter() override;
126     bool IsShmemProvidedByProducer() const override;
127     void NotifyFlushComplete(FlushRequestID) override;
128     void NotifyDataSourceStarted(DataSourceInstanceID) override;
129     void NotifyDataSourceStopped(DataSourceInstanceID) override;
130     SharedMemory* shared_memory() const override;
131     size_t shared_buffer_page_size_kb() const override;
132     void ActivateTriggers(const std::vector<std::string>&) override;
133     void Sync(std::function<void()> callback) override;
134 
135     void OnTracingSetup();
136     void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
137     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
138     void StopDataSource(DataSourceInstanceID);
139     void Flush(FlushRequestID,
140                const std::vector<DataSourceInstanceID>&,
141                FlushFlags);
142     void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
143     void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
144 
is_allowed_target_buffer(BufferID buffer_id)145     bool is_allowed_target_buffer(BufferID buffer_id) const {
146       return allowed_target_buffers_.count(buffer_id);
147     }
148 
buffer_id_for_writer(WriterID writer_id)149     std::optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
150       const auto it = writers_.find(writer_id);
151       if (it != writers_.end())
152         return it->second;
153       return std::nullopt;
154     }
155 
156     bool IsAndroidProcessFrozen();
uid()157     uid_t uid() const { return client_identity_.uid(); }
pid()158     pid_t pid() const { return client_identity_.pid(); }
client_identity()159     const ClientIdentity& client_identity() const { return client_identity_; }
160 
161    private:
162     friend class TracingServiceImpl;
163     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
164     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
165 
166     ProducerID const id_;
167     ClientIdentity const client_identity_;
168     TracingServiceImpl* const service_;
169     Producer* producer_;
170     std::unique_ptr<SharedMemory> shared_memory_;
171     size_t shared_buffer_page_size_kb_ = 0;
172     SharedMemoryABI shmem_abi_;
173     size_t shmem_size_hint_bytes_ = 0;
174     size_t shmem_page_size_hint_bytes_ = 0;
175     bool is_shmem_provided_by_producer_ = false;
176     const std::string name_;
177     std::string sdk_version_;
178     bool in_process_;
179     bool smb_scraping_enabled_;
180 
181     // Set of the global target_buffer IDs that the producer is configured to
182     // write into in any active tracing session.
183     std::set<BufferID> allowed_target_buffers_;
184 
185     // Maps registered TraceWriter IDs to their target buffers as registered by
186     // the producer. Note that producers aren't required to register their
187     // writers, so we may see commits of chunks with WriterIDs that aren't
188     // contained in this map. However, if a producer does register a writer, the
189     // service will prevent the writer from writing into any other buffer than
190     // the one associated with it here. The BufferIDs stored in this map are
191     // untrusted, so need to be verified against |allowed_target_buffers_|
192     // before use.
193     std::map<WriterID, BufferID> writers_;
194 
195     // This is used only in in-process configurations.
196     // SharedMemoryArbiterImpl methods themselves are thread-safe.
197     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
198 
199     PERFETTO_THREAD_CHECKER(thread_checker_)
200     base::WeakRunner weak_runner_;
201   };
202 
203   // The implementation behind the service endpoint exposed to each consumer.
204   class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
205    public:
206     ConsumerEndpointImpl(TracingServiceImpl*,
207                          base::TaskRunner*,
208                          Consumer*,
209                          uid_t uid);
210     ~ConsumerEndpointImpl() override;
211 
212     void NotifyOnTracingDisabled(const std::string& error);
213 
214     // TracingService::ConsumerEndpoint implementation.
215     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
216     void ChangeTraceConfig(const TraceConfig& cfg) override;
217     void StartTracing() override;
218     void DisableTracing() override;
219     void ReadBuffers() override;
220     void FreeBuffers() override;
221     void Flush(uint32_t timeout_ms, FlushCallback, FlushFlags) override;
222     void Detach(const std::string& key) override;
223     void Attach(const std::string& key) override;
224     void GetTraceStats() override;
225     void ObserveEvents(uint32_t enabled_event_types) override;
226     void QueryServiceState(QueryServiceStateArgs,
227                            QueryServiceStateCallback) override;
228     void QueryCapabilities(QueryCapabilitiesCallback) override;
229     void SaveTraceForBugreport(SaveTraceForBugreportCallback) override;
230     void CloneSession(CloneSessionArgs) override;
231 
232     // Will queue a task to notify the consumer about the state change.
233     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
234                                          const DataSourceInstance&);
235     void OnAllDataSourcesStarted();
236 
GetWeakPtr()237     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr() {
238       return weak_ptr_factory_.GetWeakPtr();
239     }
240 
241    private:
242     friend class TracingServiceImpl;
243     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
244     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
245 
246     void NotifyCloneSnapshotTrigger(const TriggerInfo& trigger_name);
247 
248     // Returns a pointer to an ObservableEvents object that the caller can fill
249     // and schedules a task to send the ObservableEvents to the consumer.
250     ObservableEvents* AddObservableEvents();
251 
252     base::TaskRunner* const task_runner_;
253     TracingServiceImpl* const service_;
254     Consumer* const consumer_;
255     uid_t const uid_;
256     TracingSessionID tracing_session_id_ = 0;
257 
258     // Whether the consumer is interested in DataSourceInstance state change
259     // events.
260     uint32_t observable_events_mask_ = 0;
261 
262     // ObservableEvents that will be sent to the consumer. If set, a task to
263     // flush the events to the consumer has been queued.
264     std::unique_ptr<ObservableEvents> observable_events_;
265 
266     PERFETTO_THREAD_CHECKER(thread_checker_)
267     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
268   };
269 
270   class RelayEndpointImpl : public TracingService::RelayEndpoint {
271    public:
272     using SyncMode = RelayEndpoint::SyncMode;
273 
274     struct SyncedClockSnapshots {
SyncedClockSnapshotsSyncedClockSnapshots275       SyncedClockSnapshots(SyncMode _sync_mode,
276                            base::ClockSnapshotVector _client_clocks,
277                            base::ClockSnapshotVector _host_clocks)
278           : sync_mode(_sync_mode),
279             client_clocks(std::move(_client_clocks)),
280             host_clocks(std::move(_host_clocks)) {}
281       SyncMode sync_mode;
282       base::ClockSnapshotVector client_clocks;
283       base::ClockSnapshotVector host_clocks;
284     };
285 
286     explicit RelayEndpointImpl(RelayClientID relay_client_id,
287                                TracingServiceImpl* service);
288     ~RelayEndpointImpl() override;
289     void SyncClocks(SyncMode sync_mode,
290                     base::ClockSnapshotVector client_clocks,
291                     base::ClockSnapshotVector host_clocks) override;
292     void Disconnect() override;
293 
machine_id()294     MachineID machine_id() const { return relay_client_id_.first; }
295 
synced_clocks()296     base::CircularQueue<SyncedClockSnapshots>& synced_clocks() {
297       return synced_clocks_;
298     }
299 
300    private:
301     RelayEndpointImpl(const RelayEndpointImpl&) = delete;
302     RelayEndpointImpl& operator=(const RelayEndpointImpl&) = delete;
303 
304     RelayClientID relay_client_id_;
305     TracingServiceImpl* const service_;
306     base::CircularQueue<SyncedClockSnapshots> synced_clocks_;
307 
308     PERFETTO_THREAD_CHECKER(thread_checker_)
309   };
310 
311   explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
312                               base::TaskRunner*,
313                               tracing_service::Dependencies,
314                               InitOpts = {});
315   ~TracingServiceImpl() override;
316 
317   // Called by ProducerEndpointImpl.
318   void DisconnectProducer(ProducerID);
319   void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
320   void UpdateDataSource(ProducerID, const DataSourceDescriptor&);
321   void UnregisterDataSource(ProducerID, const std::string& name);
322   void CopyProducerPageIntoLogBuffer(ProducerID,
323                                      const ClientIdentity&,
324                                      WriterID,
325                                      ChunkID,
326                                      BufferID,
327                                      uint16_t num_fragments,
328                                      uint8_t chunk_flags,
329                                      bool chunk_complete,
330                                      const uint8_t* src,
331                                      size_t size);
332   void ApplyChunkPatches(ProducerID,
333                          const std::vector<CommitDataRequest::ChunkToPatch>&);
334   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
335   void NotifyDataSourceStarted(ProducerID, DataSourceInstanceID);
336   void NotifyDataSourceStopped(ProducerID, DataSourceInstanceID);
337   void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
338 
339   // Called by ConsumerEndpointImpl.
340   bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
341   bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
342   void DisconnectConsumer(ConsumerEndpointImpl*);
343   base::Status EnableTracing(ConsumerEndpointImpl*,
344                              const TraceConfig&,
345                              base::ScopedFile);
346   void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
347 
348   void StartTracing(TracingSessionID);
349   void DisableTracing(TracingSessionID, bool disable_immediately = false);
350   void Flush(TracingSessionID tsid,
351              uint32_t timeout_ms,
352              ConsumerEndpoint::FlushCallback,
353              FlushFlags);
354   void FlushAndDisableTracing(TracingSessionID);
355   base::Status FlushAndCloneSession(ConsumerEndpointImpl*,
356                                     ConsumerEndpoint::CloneSessionArgs);
357 
358   // Starts reading the internal tracing buffers from the tracing session `tsid`
359   // and sends them to `*consumer` (which must be != nullptr).
360   //
361   // Only reads a limited amount of data in one call. If there's more data,
362   // immediately schedules itself on a PostTask.
363   //
364   // Returns false in case of error.
365   bool ReadBuffersIntoConsumer(TracingSessionID tsid,
366                                ConsumerEndpointImpl* consumer);
367 
368   // Reads all the tracing buffers from the tracing session `tsid` and writes
369   // them into the associated file.
370   //
371   // Reads all the data in the buffers (or until the file is full) before
372   // returning.
373   //
374   // If the tracing session write_period_ms is 0, the file is full or there has
375   // been an error, flushes the file and closes it. Otherwise, schedules itself
376   // to be executed after write_period_ms.
377   //
378   // Returns false in case of error.
379   bool ReadBuffersIntoFile(TracingSessionID);
380 
381   void FreeBuffers(TracingSessionID);
382 
383   // Service implementation.
384   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
385       Producer*,
386       const ClientIdentity& client_identity,
387       const std::string& producer_name,
388       size_t shared_memory_size_hint_bytes = 0,
389       bool in_process = false,
390       ProducerSMBScrapingMode smb_scraping_mode =
391           ProducerSMBScrapingMode::kDefault,
392       size_t shared_memory_page_size_hint_bytes = 0,
393       std::unique_ptr<SharedMemory> shm = nullptr,
394       const std::string& sdk_version = {}) override;
395 
396   std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
397       Consumer*,
398       uid_t) override;
399 
400   std::unique_ptr<TracingService::RelayEndpoint> ConnectRelayClient(
401       RelayClientID) override;
402 
403   void DisconnectRelayClient(RelayClientID);
404 
405   // Set whether SMB scraping should be enabled by default or not. Producers can
406   // override this setting for their own SMBs.
SetSMBScrapingEnabled(bool enabled)407   void SetSMBScrapingEnabled(bool enabled) override {
408     smb_scraping_enabled_ = enabled;
409   }
410 
411   // Exposed mainly for testing.
num_producers()412   size_t num_producers() const { return producers_.size(); }
413   ProducerEndpointImpl* GetProducer(ProducerID) const;
414 
415  private:
416   struct TriggerHistory {
417     int64_t timestamp_ns;
418     uint64_t name_hash;
419 
420     bool operator<(const TriggerHistory& other) const {
421       return timestamp_ns < other.timestamp_ns;
422     }
423   };
424 
425   struct RegisteredDataSource {
426     ProducerID producer_id;
427     DataSourceDescriptor descriptor;
428   };
429 
430   // Represents an active data source for a tracing session.
431   struct DataSourceInstance {
DataSourceInstanceDataSourceInstance432     DataSourceInstance(DataSourceInstanceID id,
433                        const DataSourceConfig& cfg,
434                        const std::string& ds_name,
435                        bool notify_on_start,
436                        bool notify_on_stop,
437                        bool handles_incremental_state_invalidation,
438                        bool no_flush_)
439         : instance_id(id),
440           config(cfg),
441           data_source_name(ds_name),
442           will_notify_on_start(notify_on_start),
443           will_notify_on_stop(notify_on_stop),
444           handles_incremental_state_clear(
445               handles_incremental_state_invalidation),
446           no_flush(no_flush_) {}
447     DataSourceInstance(const DataSourceInstance&) = delete;
448     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
449 
450     DataSourceInstanceID instance_id;
451     DataSourceConfig config;
452     std::string data_source_name;
453     bool will_notify_on_start;
454     bool will_notify_on_stop;
455     bool handles_incremental_state_clear;
456     bool no_flush;
457 
458     enum DataSourceInstanceState {
459       CONFIGURED,
460       STARTING,
461       STARTED,
462       STOPPING,
463       STOPPED
464     };
465     DataSourceInstanceState state = CONFIGURED;
466   };
467 
468   struct PendingFlush {
469     std::set<ProducerID> producers;
470     ConsumerEndpoint::FlushCallback callback;
PendingFlushPendingFlush471     explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
472   };
473 
474   using PendingCloneID = uint64_t;
475 
476   struct TriggerInfo {
477     uint64_t boot_time_ns = 0;
478     std::string trigger_name;
479     std::string producer_name;
480     uid_t producer_uid = 0;
481   };
482 
483   struct PendingClone {
484     size_t pending_flush_cnt = 0;
485     // This vector might not be populated all at once. Some buffers might be
486     // nullptr while flushing is not done.
487     std::vector<std::unique_ptr<TraceBuffer>> buffers;
488     std::vector<int64_t> buffer_cloned_timestamps;
489     bool flush_failed = false;
490     base::WeakPtr<ConsumerEndpointImpl> weak_consumer;
491     bool skip_trace_filter = false;
492     std::optional<TriggerInfo> clone_trigger;
493     int64_t clone_started_timestamp_ns = 0;
494   };
495 
496   // Holds the state of a tracing session. A tracing session is uniquely bound
497   // a specific Consumer. Each Consumer can own one or more sessions.
498   struct TracingSession {
499     enum State {
500       DISABLED = 0,
501       CONFIGURED,
502       STARTED,
503       DISABLING_WAITING_STOP_ACKS,
504       CLONED_READ_ONLY,
505     };
506 
507     TracingSession(TracingSessionID,
508                    ConsumerEndpointImpl*,
509                    const TraceConfig&,
510                    base::TaskRunner*);
511     TracingSession(TracingSession&&) = delete;
512     TracingSession& operator=(TracingSession&&) = delete;
513 
num_buffersTracingSession514     size_t num_buffers() const { return buffers_index.size(); }
515 
flush_timeout_msTracingSession516     uint32_t flush_timeout_ms() {
517       uint32_t timeout_ms = config.flush_timeout_ms();
518       return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
519     }
520 
data_source_stop_timeout_msTracingSession521     uint32_t data_source_stop_timeout_ms() {
522       uint32_t timeout_ms = config.data_source_stop_timeout_ms();
523       return timeout_ms ? timeout_ms : kDataSourceStopTimeoutMs;
524     }
525 
GetPacketSequenceIDTracingSession526     PacketSequenceID GetPacketSequenceID(MachineID machine_id,
527                                          ProducerID producer_id,
528                                          WriterID writer_id) {
529       auto key = std::make_tuple(machine_id, producer_id, writer_id);
530       auto it = packet_sequence_ids.find(key);
531       if (it != packet_sequence_ids.end())
532         return it->second;
533       // We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
534       // are limited to 1024).
535       static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
536                     "PacketSequenceID value space doesn't cover service "
537                     "sequence ID and all producer/writer ID combinations!");
538       PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
539       PacketSequenceID sequence_id = ++last_packet_sequence_id;
540       packet_sequence_ids[key] = sequence_id;
541       return sequence_id;
542     }
543 
GetDataSourceInstanceTracingSession544     DataSourceInstance* GetDataSourceInstance(
545         ProducerID producer_id,
546         DataSourceInstanceID instance_id) {
547       for (auto& inst_kv : data_source_instances) {
548         if (inst_kv.first != producer_id ||
549             inst_kv.second.instance_id != instance_id) {
550           continue;
551         }
552         return &inst_kv.second;
553       }
554       return nullptr;
555     }
556 
AllDataSourceInstancesStartedTracingSession557     bool AllDataSourceInstancesStarted() {
558       return std::all_of(
559           data_source_instances.begin(), data_source_instances.end(),
560           [](decltype(data_source_instances)::const_reference x) {
561             return x.second.state == DataSourceInstance::STARTED;
562           });
563     }
564 
AllDataSourceInstancesStoppedTracingSession565     bool AllDataSourceInstancesStopped() {
566       return std::all_of(
567           data_source_instances.begin(), data_source_instances.end(),
568           [](decltype(data_source_instances)::const_reference x) {
569             return x.second.state == DataSourceInstance::STOPPED;
570           });
571     }
572 
573     // Checks whether |clone_uid| is allowed to clone the current tracing
574     // session.
575     bool IsCloneAllowed(uid_t clone_uid) const;
576 
577     const TracingSessionID id;
578 
579     // The consumer that started the session.
580     // Can be nullptr if the consumer detached from the session.
581     ConsumerEndpointImpl* consumer_maybe_null;
582 
583     // Unix uid of the consumer. This is valid even after the consumer detaches
584     // and does not change for the entire duration of the session. It is used to
585     // prevent that a consumer re-attaches to a session from a different uid.
586     uid_t const consumer_uid;
587 
588     // The list of triggers this session received while alive and the time they
589     // were received at. This is used to insert 'fake' packets back to the
590     // consumer so they can tell when some event happened. The order matches the
591     // order they were received.
592     std::vector<TriggerInfo> received_triggers;
593 
594     // The trace config provided by the Consumer when calling
595     // EnableTracing(), plus any updates performed by ChangeTraceConfig.
596     TraceConfig config;
597 
598     // List of data source instances that have been enabled on the various
599     // producers for this tracing session.
600     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
601 
602     // For each Flush(N) request, keeps track of the set of producers for which
603     // we are still awaiting a NotifyFlushComplete(N) ack.
604     std::map<FlushRequestID, PendingFlush> pending_flushes;
605 
606     // For each Clone request, keeps track of the flushes acknowledgement that
607     // we are still waiting for.
608     std::map<PendingCloneID, PendingClone> pending_clones;
609 
610     PendingCloneID last_pending_clone_id_ = 0;
611 
612     // Maps a per-trace-session buffer index into the corresponding global
613     // BufferID (shared namespace amongst all consumers). This vector has as
614     // many entries as |config.buffers_size()|.
615     std::vector<BufferID> buffers_index;
616 
617     std::map<std::tuple<MachineID, ProducerID, WriterID>, PacketSequenceID>
618         packet_sequence_ids;
619     PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
620 
621     // Whether we should emit the trace stats next time we reach EOF while
622     // performing ReadBuffers.
623     bool should_emit_stats = false;
624 
625     // Whether we should emit the sync marker the next time ReadBuffers() is
626     // called.
627     bool should_emit_sync_marker = false;
628 
629     // Whether we put the initial packets (trace config, system info,
630     // etc.) into the trace output yet.
631     bool did_emit_initial_packets = false;
632 
633     // Whether we emitted clock offsets for relay clients yet.
634     bool did_emit_remote_clock_sync_ = false;
635 
636     // Whether we should compress TracePackets after reading them.
637     bool compress_deflate = false;
638 
639     // The number of received triggers we've emitted into the trace output.
640     size_t num_triggers_emitted_into_trace = 0;
641 
642     // Packets that failed validation of the TrustedPacket.
643     uint64_t invalid_packets = 0;
644 
645     // Flush() stats. See comments in trace_stats.proto for more.
646     uint64_t flushes_requested = 0;
647     uint64_t flushes_succeeded = 0;
648     uint64_t flushes_failed = 0;
649 
650     // Outcome of the final Flush() done by FlushAndDisableTracing().
651     protos::gen::TraceStats_FinalFlushOutcome final_flush_outcome{};
652 
653     // Set to true on the first call to MaybeNotifyAllDataSourcesStarted().
654     bool did_notify_all_data_source_started = false;
655 
656     // Stores simple lifecycle events of a particular type (i.e. associated with
657     // a single field id in the TracingServiceEvent proto).
658     struct LifecycleEvent {
659       LifecycleEvent(uint32_t f_id, uint32_t m_size = 1)
field_idTracingSession::LifecycleEvent660           : field_id(f_id), max_size(m_size), timestamps(m_size) {}
661 
662       // The field id of the event in the TracingServiceEvent proto.
663       uint32_t field_id;
664 
665       // Stores the max size of |timestamps|. Set to 1 by default (in
666       // the constructor) but can be overriden in TraceSession constructor
667       // if a larger size is required.
668       uint32_t max_size;
669 
670       // Stores the timestamps emitted for each event type (in nanoseconds).
671       // Emitted into the trace and cleared when the consumer next calls
672       // ReadBuffers.
673       base::CircularQueue<int64_t> timestamps;
674     };
675     std::vector<LifecycleEvent> lifecycle_events;
676 
677     // Stores arbitrary lifecycle events that don't fit in lifecycle_events as
678     // serialized TracePacket protos.
679     struct ArbitraryLifecycleEvent {
680       int64_t timestamp;
681       std::vector<uint8_t> data;
682     };
683 
684     std::optional<ArbitraryLifecycleEvent> slow_start_event;
685 
686     std::vector<ArbitraryLifecycleEvent> last_flush_events;
687 
688     // If this is a cloned tracing session, the timestamp at which each buffer
689     // was cloned.
690     std::vector<int64_t> buffer_cloned_timestamps;
691 
692     using ClockSnapshotData = base::ClockSnapshotVector;
693 
694     // Initial clock snapshot, captured at trace start time (when state goes to
695     // TracingSession::STARTED). Emitted into the trace when the consumer first
696     // calls ReadBuffers().
697     ClockSnapshotData initial_clock_snapshot;
698 
699     // Stores clock snapshots to emit into the trace as a ring buffer. This
700     // buffer is populated both periodically and when lifecycle events happen
701     // but only when significant clock drift is detected. Emitted into the trace
702     // and cleared when the consumer next calls ReadBuffers().
703     base::CircularQueue<ClockSnapshotData> clock_snapshot_ring_buffer;
704 
705     State state = DISABLED;
706 
707     // If the consumer detached the session, this variable defines the key used
708     // for identifying the session later when reattaching.
709     std::string detach_key;
710 
711     // This is set when the Consumer calls sets |write_into_file| == true in the
712     // TraceConfig. In this case this represents the file we should stream the
713     // trace packets into, rather than returning it to the consumer via
714     // OnTraceData().
715     base::ScopedFile write_into_file;
716     uint32_t write_period_ms = 0;
717     uint64_t max_file_size_bytes = 0;
718     uint64_t bytes_written_into_file = 0;
719 
720     // Periodic task for snapshotting service events (e.g. clocks, sync markers
721     // etc)
722     base::PeriodicTask snapshot_periodic_task;
723 
724     // Deferred task that stops the trace when |duration_ms| expires. This is
725     // to handle the case of |prefer_suspend_clock_for_duration| which cannot
726     // use PostDelayedTask.
727     base::PeriodicTask timed_stop_task;
728 
729     // When non-NULL the packets should be post-processed using the filter.
730     std::unique_ptr<protozero::MessageFilter> trace_filter;
731     uint64_t filter_input_packets = 0;
732     uint64_t filter_input_bytes = 0;
733     uint64_t filter_output_bytes = 0;
734     uint64_t filter_errors = 0;
735     uint64_t filter_time_taken_ns = 0;
736     std::vector<uint64_t> filter_bytes_discarded_per_buffer;
737 
738     // A randomly generated trace identifier. Note that this does NOT always
739     // match the requested TraceConfig.trace_uuid_msb/lsb. Spcifically, it does
740     // until a gap-less snapshot is requested. Each snapshot re-generates the
741     // uuid to avoid emitting two different traces with the same uuid.
742     base::Uuid trace_uuid;
743 
744     // This is set when the clone operation was caused by a clone trigger.
745     std::optional<TriggerInfo> clone_trigger;
746 
747     // NOTE: when adding new fields here consider whether that state should be
748     // copied over in DoCloneSession() or not. Ask yourself: is this a
749     // "runtime state" (e.g. active data sources) or a "trace (meta)data state"?
750     // If the latter, it should be handled by DoCloneSession()).
751   };
752 
753   TracingServiceImpl(const TracingServiceImpl&) = delete;
754   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
755 
756   bool IsInitiatorPrivileged(const TracingSession&);
757 
758   DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
759                                       const TraceConfig::ProducerConfig&,
760                                       const RegisteredDataSource&,
761                                       TracingSession*);
762 
763   // Returns the next available ProducerID that is not in |producers_|.
764   ProducerID GetNextProducerID();
765 
766   // Returns a pointer to the |tracing_sessions_| entry or nullptr if the
767   // session doesn't exists.
768   TracingSession* GetTracingSession(TracingSessionID);
769 
770   // Returns a pointer to the |tracing_sessions_| entry with
771   // |unique_session_name| in the config (or nullptr if the
772   // session doesn't exists). CLONED_READ_ONLY sessions are ignored.
773   TracingSession* GetTracingSessionByUniqueName(
774       const std::string& unique_session_name);
775 
776   // Returns a pointer to the tracing session that has the highest
777   // TraceConfig.bugreport_score, if any, or nullptr.
778   TracingSession* FindTracingSessionWithMaxBugreportScore();
779 
780   // Returns a pointer to the |tracing_sessions_| entry, matching the given
781   // uid and detach key, or nullptr if no such session exists.
782   TracingSession* GetDetachedSession(uid_t, const std::string& key);
783 
784   // Update the memory guard rail by using the latest information from the
785   // shared memory and trace buffers.
786   void UpdateMemoryGuardrail();
787 
788   uint32_t DelayToNextWritePeriodMs(const TracingSession&);
789   void StartDataSourceInstance(ProducerEndpointImpl*,
790                                TracingSession*,
791                                DataSourceInstance*);
792   void StopDataSourceInstance(ProducerEndpointImpl*,
793                               TracingSession*,
794                               DataSourceInstance*,
795                               bool disable_immediately);
796   void PeriodicSnapshotTask(TracingSessionID);
797   void MaybeSnapshotClocksIntoRingBuffer(TracingSession*);
798   bool SnapshotClocks(TracingSession::ClockSnapshotData*);
799   // Records a lifecycle event of type |field_id| with the current timestamp.
800   void SnapshotLifecycleEvent(TracingSession*,
801                               uint32_t field_id,
802                               bool snapshot_clocks);
803   // Deletes all the lifecycle events of type |field_id| and records just one,
804   // that happened at time |boot_time_ns|.
805   void SetSingleLifecycleEvent(TracingSession*,
806                                uint32_t field_id,
807                                int64_t boot_time_ns);
808   void EmitClockSnapshot(TracingSession*,
809                          TracingSession::ClockSnapshotData,
810                          std::vector<TracePacket>*);
811   void EmitSyncMarker(std::vector<TracePacket>*);
812   void EmitStats(TracingSession*, std::vector<TracePacket>*);
813   TraceStats GetTraceStats(TracingSession*);
814   void EmitLifecycleEvents(TracingSession*, std::vector<TracePacket>*);
815   void EmitUuid(TracingSession*, std::vector<TracePacket>*);
816   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
817   void EmitSystemInfo(std::vector<TracePacket>*);
818   void MaybeEmitCloneTrigger(TracingSession*, std::vector<TracePacket>*);
819   void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
820   void MaybeEmitRemoteClockSync(TracingSession*, std::vector<TracePacket>*);
821   void MaybeNotifyAllDataSourcesStarted(TracingSession*);
822   void OnFlushTimeout(TracingSessionID, FlushRequestID, FlushFlags);
823   void OnDisableTracingTimeout(TracingSessionID);
824   void OnAllDataSourceStartedTimeout(TracingSessionID);
825   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
826   void PeriodicFlushTask(TracingSessionID, bool post_next_only);
827   void CompleteFlush(TracingSessionID tsid,
828                      ConsumerEndpoint::FlushCallback callback,
829                      bool success);
830   void ScrapeSharedMemoryBuffers(TracingSession*, ProducerEndpointImpl*);
831   void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
832   TraceBuffer* GetBufferByID(BufferID);
833   void FlushDataSourceInstances(
834       TracingSession*,
835       uint32_t timeout_ms,
836       const std::map<ProducerID, std::vector<DataSourceInstanceID>>&,
837       ConsumerEndpoint::FlushCallback,
838       FlushFlags);
839   std::map<ProducerID, std::vector<DataSourceInstanceID>>
840   GetFlushableDataSourceInstancesForBuffers(TracingSession*,
841                                             const std::set<BufferID>&);
842   bool DoCloneBuffers(const TracingSession&,
843                       const std::set<BufferID>&,
844                       PendingClone*);
845   base::Status FinishCloneSession(ConsumerEndpointImpl*,
846                                   TracingSessionID,
847                                   std::vector<std::unique_ptr<TraceBuffer>>,
848                                   std::vector<int64_t> buf_cloned_timestamps,
849                                   bool skip_filter,
850                                   bool final_flush_outcome,
851                                   std::optional<TriggerInfo> clone_trigger,
852                                   base::Uuid*,
853                                   int64_t clone_started_timestamp_ns);
854   void OnFlushDoneForClone(TracingSessionID src_tsid,
855                            PendingCloneID clone_id,
856                            const std::set<BufferID>& buf_ids,
857                            bool final_flush_outcome);
858 
859   // Returns true if `*tracing_session` is waiting for a trigger that hasn't
860   // happened.
861   static bool IsWaitingForTrigger(TracingSession* tracing_session);
862 
863   // Reads the buffers from `*tracing_session` and returns them (along with some
864   // metadata packets).
865   //
866   // The function stops when the cumulative size of the return packets exceeds
867   // `threshold` (so it's not a strict upper bound) and sets `*has_more` to
868   // true, or when there are no more packets (and sets `*has_more` to false).
869   std::vector<TracePacket> ReadBuffers(TracingSession* tracing_session,
870                                        size_t threshold,
871                                        bool* has_more);
872 
873   // If `*tracing_session` has a filter, applies it to `*packets`. Doesn't
874   // change the number of `*packets`, only their content.
875   void MaybeFilterPackets(TracingSession* tracing_session,
876                           std::vector<TracePacket>* packets);
877 
878   // If `*tracing_session` has compression enabled, compress `*packets`.
879   void MaybeCompressPackets(TracingSession* tracing_session,
880                             std::vector<TracePacket>* packets);
881 
882   // If `*tracing_session` is configured to write into a file, writes `packets`
883   // into the file.
884   //
885   // Returns true if the file should be closed (because it's full or there has
886   // been an error), false otherwise.
887   bool WriteIntoFile(TracingSession* tracing_session,
888                      std::vector<TracePacket> packets);
889   void OnStartTriggersTimeout(TracingSessionID tsid);
890   void MaybeLogUploadEvent(const TraceConfig&,
891                            const base::Uuid&,
892                            PerfettoStatsdAtom atom,
893                            const std::string& trigger_name = "");
894   void MaybeLogTriggerEvent(const TraceConfig&,
895                             PerfettoTriggerAtom atom,
896                             const std::string& trigger_name);
897   size_t PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,
898                                              uint64_t trigger_name_hash);
899   void StopOnDurationMsExpiry(TracingSessionID);
900 
901   std::unique_ptr<tracing_service::Clock> clock_;
902   std::unique_ptr<tracing_service::Random> random_;
903   const InitOpts init_opts_;
904   std::unique_ptr<SharedMemory::Factory> shm_factory_;
905   ProducerID last_producer_id_ = 0;
906   DataSourceInstanceID last_data_source_instance_id_ = 0;
907   TracingSessionID last_tracing_session_id_ = 0;
908   FlushRequestID last_flush_request_id_ = 0;
909   uid_t uid_ = 0;
910 
911   // Buffer IDs are global across all consumers (because a Producer can produce
912   // data for more than one trace session, hence more than one consumer).
913   IdAllocator<BufferID> buffer_ids_;
914 
915   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
916   std::map<ProducerID, ProducerEndpointImpl*> producers_;
917   std::map<RelayClientID, RelayEndpointImpl*> relay_clients_;
918   std::map<TracingSessionID, TracingSession> tracing_sessions_;
919   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
920   std::map<std::string, int64_t> session_to_last_trace_s_;
921 
922   // Contains timestamps of triggers.
923   // The queue is sorted by timestamp and invocations older than 24 hours are
924   // purged when a trigger happens.
925   base::CircularQueue<TriggerHistory> trigger_history_;
926 
927   bool smb_scraping_enabled_ = false;
928   bool lockdown_mode_ = false;
929 
930   uint8_t sync_marker_packet_[32];  // Lazily initialized.
931   size_t sync_marker_packet_size_ = 0;
932 
933   // Stats.
934   uint64_t chunks_discarded_ = 0;
935   uint64_t patches_discarded_ = 0;
936 
937   PERFETTO_THREAD_CHECKER(thread_checker_)
938 
939   base::WeakRunner weak_runner_;
940 };
941 
942 }  // namespace perfetto
943 
944 #endif  // SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
945