• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
18 #define SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
19 
20 #include <algorithm>
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <optional>
25 #include <random>
26 #include <set>
27 #include <utility>
28 #include <vector>
29 
30 #include "perfetto/base/logging.h"
31 #include "perfetto/base/status.h"
32 #include "perfetto/base/time.h"
33 #include "perfetto/ext/base/circular_queue.h"
34 #include "perfetto/ext/base/periodic_task.h"
35 #include "perfetto/ext/base/string_view.h"
36 #include "perfetto/ext/base/uuid.h"
37 #include "perfetto/ext/base/weak_ptr.h"
38 #include "perfetto/ext/tracing/core/basic_types.h"
39 #include "perfetto/ext/tracing/core/client_identity.h"
40 #include "perfetto/ext/tracing/core/commit_data_request.h"
41 #include "perfetto/ext/tracing/core/observable_events.h"
42 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
43 #include "perfetto/ext/tracing/core/trace_stats.h"
44 #include "perfetto/ext/tracing/core/tracing_service.h"
45 #include "perfetto/tracing/core/data_source_config.h"
46 #include "perfetto/tracing/core/data_source_descriptor.h"
47 #include "perfetto/tracing/core/forward_decls.h"
48 #include "perfetto/tracing/core/trace_config.h"
49 #include "src/android_stats/perfetto_atoms.h"
50 #include "src/tracing/core/id_allocator.h"
51 
52 namespace protozero {
53 class MessageFilter;
54 }
55 
56 namespace perfetto {
57 
58 namespace base {
59 class TaskRunner;
60 }  // namespace base
61 
62 namespace protos {
63 namespace gen {
64 enum TraceStats_FinalFlushOutcome : int;
65 }
66 }  // namespace protos
67 
68 class Consumer;
69 class Producer;
70 class SharedMemory;
71 class SharedMemoryArbiterImpl;
72 class TraceBuffer;
73 class TracePacket;
74 
75 // The tracing service business logic.
76 class TracingServiceImpl : public TracingService {
77  private:
78   struct DataSourceInstance;
79 
80  public:
81   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
82   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
83   static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
84                                             0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
85                                             0x6d, 0x57, 0xa0, 0x79};
86   static constexpr size_t kMaxTracePacketSliceSize =
87       128 * 1024 - 512;  // This is ipc::kIPCBufferSize - 512, see assertion in
88                          // tracing_integration_test.cc and b/195065199
89 
90   // This is a rough threshold to determine how many bytes to read from the
91   // buffers on each iteration when writing into a file. Since filtering and
92   // compression allocate memory, this effectively limits the amount of memory
93   // allocated.
94   static constexpr size_t kWriteIntoFileChunkSize = 1024 * 1024ul;
95 
96   // The implementation behind the service endpoint exposed to each producer.
97   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
98    public:
99     ProducerEndpointImpl(ProducerID,
100                          const ClientIdentity& client_identity,
101                          TracingServiceImpl*,
102                          base::TaskRunner*,
103                          Producer*,
104                          const std::string& producer_name,
105                          const std::string& sdk_version,
106                          bool in_process,
107                          bool smb_scraping_enabled);
108     ~ProducerEndpointImpl() override;
109 
110     // TracingService::ProducerEndpoint implementation.
111     void Disconnect() override;
112     void RegisterDataSource(const DataSourceDescriptor&) override;
113     void UpdateDataSource(const DataSourceDescriptor&) override;
114     void UnregisterDataSource(const std::string& name) override;
115     void RegisterTraceWriter(uint32_t writer_id,
116                              uint32_t target_buffer) override;
117     void UnregisterTraceWriter(uint32_t writer_id) override;
118     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
119     void SetupSharedMemory(std::unique_ptr<SharedMemory>,
120                            size_t page_size_bytes,
121                            bool provided_by_producer);
122     std::unique_ptr<TraceWriter> CreateTraceWriter(
123         BufferID,
124         BufferExhaustedPolicy) override;
125     SharedMemoryArbiter* MaybeSharedMemoryArbiter() override;
126     bool IsShmemProvidedByProducer() const override;
127     void NotifyFlushComplete(FlushRequestID) override;
128     void NotifyDataSourceStarted(DataSourceInstanceID) override;
129     void NotifyDataSourceStopped(DataSourceInstanceID) override;
130     SharedMemory* shared_memory() const override;
131     size_t shared_buffer_page_size_kb() const override;
132     void ActivateTriggers(const std::vector<std::string>&) override;
133     void Sync(std::function<void()> callback) override;
134 
135     void OnTracingSetup();
136     void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
137     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
138     void StopDataSource(DataSourceInstanceID);
139     void Flush(FlushRequestID,
140                const std::vector<DataSourceInstanceID>&,
141                FlushFlags);
142     void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
143     void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
144 
is_allowed_target_buffer(BufferID buffer_id)145     bool is_allowed_target_buffer(BufferID buffer_id) const {
146       return allowed_target_buffers_.count(buffer_id);
147     }
148 
buffer_id_for_writer(WriterID writer_id)149     std::optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
150       const auto it = writers_.find(writer_id);
151       if (it != writers_.end())
152         return it->second;
153       return std::nullopt;
154     }
155 
uid()156     uid_t uid() const { return client_identity_.uid(); }
pid()157     pid_t pid() const { return client_identity_.pid(); }
client_identity()158     const ClientIdentity& client_identity() const { return client_identity_; }
159 
160    private:
161     friend class TracingServiceImpl;
162     friend class TracingServiceImplTest;
163     friend class TracingIntegrationTest;
164     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
165     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
166 
167     ProducerID const id_;
168     ClientIdentity const client_identity_;
169     TracingServiceImpl* const service_;
170     base::TaskRunner* const task_runner_;
171     Producer* producer_;
172     std::unique_ptr<SharedMemory> shared_memory_;
173     size_t shared_buffer_page_size_kb_ = 0;
174     SharedMemoryABI shmem_abi_;
175     size_t shmem_size_hint_bytes_ = 0;
176     size_t shmem_page_size_hint_bytes_ = 0;
177     bool is_shmem_provided_by_producer_ = false;
178     const std::string name_;
179     std::string sdk_version_;
180     bool in_process_;
181     bool smb_scraping_enabled_;
182 
183     // Set of the global target_buffer IDs that the producer is configured to
184     // write into in any active tracing session.
185     std::set<BufferID> allowed_target_buffers_;
186 
187     // Maps registered TraceWriter IDs to their target buffers as registered by
188     // the producer. Note that producers aren't required to register their
189     // writers, so we may see commits of chunks with WriterIDs that aren't
190     // contained in this map. However, if a producer does register a writer, the
191     // service will prevent the writer from writing into any other buffer than
192     // the one associated with it here. The BufferIDs stored in this map are
193     // untrusted, so need to be verified against |allowed_target_buffers_|
194     // before use.
195     std::map<WriterID, BufferID> writers_;
196 
197     // This is used only in in-process configurations.
198     // SharedMemoryArbiterImpl methods themselves are thread-safe.
199     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
200 
201     PERFETTO_THREAD_CHECKER(thread_checker_)
202     base::WeakPtrFactory<ProducerEndpointImpl> weak_ptr_factory_;  // Keep last.
203   };
204 
205   // The implementation behind the service endpoint exposed to each consumer.
206   class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
207    public:
208     ConsumerEndpointImpl(TracingServiceImpl*,
209                          base::TaskRunner*,
210                          Consumer*,
211                          uid_t uid);
212     ~ConsumerEndpointImpl() override;
213 
214     void NotifyOnTracingDisabled(const std::string& error);
215     void NotifyCloneSnapshotTrigger(const std::string& trigger_name);
216 
217     // TracingService::ConsumerEndpoint implementation.
218     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
219     void ChangeTraceConfig(const TraceConfig& cfg) override;
220     void StartTracing() override;
221     void DisableTracing() override;
222     void ReadBuffers() override;
223     void FreeBuffers() override;
224     void Flush(uint32_t timeout_ms, FlushCallback, FlushFlags) override;
225     void Detach(const std::string& key) override;
226     void Attach(const std::string& key) override;
227     void GetTraceStats() override;
228     void ObserveEvents(uint32_t enabled_event_types) override;
229     void QueryServiceState(QueryServiceStateArgs,
230                            QueryServiceStateCallback) override;
231     void QueryCapabilities(QueryCapabilitiesCallback) override;
232     void SaveTraceForBugreport(SaveTraceForBugreportCallback) override;
233     void CloneSession(TracingSessionID, CloneSessionArgs) override;
234 
235     // Will queue a task to notify the consumer about the state change.
236     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
237                                          const DataSourceInstance&);
238     void OnAllDataSourcesStarted();
239 
GetWeakPtr()240     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr() {
241       return weak_ptr_factory_.GetWeakPtr();
242     }
243 
244    private:
245     friend class TracingServiceImpl;
246     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
247     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
248 
249     // Returns a pointer to an ObservableEvents object that the caller can fill
250     // and schedules a task to send the ObservableEvents to the consumer.
251     ObservableEvents* AddObservableEvents();
252 
253     base::TaskRunner* const task_runner_;
254     TracingServiceImpl* const service_;
255     Consumer* const consumer_;
256     uid_t const uid_;
257     TracingSessionID tracing_session_id_ = 0;
258 
259     // Whether the consumer is interested in DataSourceInstance state change
260     // events.
261     uint32_t observable_events_mask_ = 0;
262 
263     // ObservableEvents that will be sent to the consumer. If set, a task to
264     // flush the events to the consumer has been queued.
265     std::unique_ptr<ObservableEvents> observable_events_;
266 
267     PERFETTO_THREAD_CHECKER(thread_checker_)
268     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
269   };
270 
271   class RelayEndpointImpl : public TracingService::RelayEndpoint {
272    public:
273     using SyncMode = RelayEndpoint::SyncMode;
274 
275     struct SyncedClockSnapshots {
SyncedClockSnapshotsSyncedClockSnapshots276       SyncedClockSnapshots(SyncMode _sync_mode,
277                            ClockSnapshotVector _client_clocks,
278                            ClockSnapshotVector _host_clocks)
279           : sync_mode(_sync_mode),
280             client_clocks(std::move(_client_clocks)),
281             host_clocks(std::move(_host_clocks)) {}
282       SyncMode sync_mode;
283       ClockSnapshotVector client_clocks;
284       ClockSnapshotVector host_clocks;
285     };
286 
287     explicit RelayEndpointImpl(RelayClientID relay_client_id,
288                                TracingServiceImpl* service);
289     ~RelayEndpointImpl() override;
290     void SyncClocks(SyncMode sync_mode,
291                     ClockSnapshotVector client_clocks,
292                     ClockSnapshotVector host_clocks) override;
293     void Disconnect() override;
294 
machine_id()295     MachineID machine_id() const { return relay_client_id_.first; }
296 
synced_clocks()297     base::CircularQueue<SyncedClockSnapshots>& synced_clocks() {
298       return synced_clocks_;
299     }
300 
301    private:
302     RelayEndpointImpl(const RelayEndpointImpl&) = delete;
303     RelayEndpointImpl& operator=(const RelayEndpointImpl&) = delete;
304 
305     RelayClientID relay_client_id_;
306     TracingServiceImpl* const service_;
307     base::CircularQueue<SyncedClockSnapshots> synced_clocks_;
308 
309     PERFETTO_THREAD_CHECKER(thread_checker_)
310   };
311 
312   explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
313                               base::TaskRunner*,
314                               InitOpts = {});
315   ~TracingServiceImpl() override;
316 
317   // Called by ProducerEndpointImpl.
318   void DisconnectProducer(ProducerID);
319   void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
320   void UpdateDataSource(ProducerID, const DataSourceDescriptor&);
321   void UnregisterDataSource(ProducerID, const std::string& name);
322   void CopyProducerPageIntoLogBuffer(ProducerID,
323                                      const ClientIdentity&,
324                                      WriterID,
325                                      ChunkID,
326                                      BufferID,
327                                      uint16_t num_fragments,
328                                      uint8_t chunk_flags,
329                                      bool chunk_complete,
330                                      const uint8_t* src,
331                                      size_t size);
332   void ApplyChunkPatches(ProducerID,
333                          const std::vector<CommitDataRequest::ChunkToPatch>&);
334   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
335   void NotifyDataSourceStarted(ProducerID, DataSourceInstanceID);
336   void NotifyDataSourceStopped(ProducerID, DataSourceInstanceID);
337   void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
338 
339   // Called by ConsumerEndpointImpl.
340   bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
341   bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
342   void DisconnectConsumer(ConsumerEndpointImpl*);
343   base::Status EnableTracing(ConsumerEndpointImpl*,
344                              const TraceConfig&,
345                              base::ScopedFile);
346   void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
347 
348   base::Status StartTracing(TracingSessionID);
349   void DisableTracing(TracingSessionID, bool disable_immediately = false);
350   void Flush(TracingSessionID tsid,
351              uint32_t timeout_ms,
352              ConsumerEndpoint::FlushCallback,
353              FlushFlags);
354   void FlushAndDisableTracing(TracingSessionID);
355   base::Status FlushAndCloneSession(ConsumerEndpointImpl*,
356                                     TracingSessionID,
357                                     bool skip_filter,
358                                     bool for_bugreport);
359 
360   // Starts reading the internal tracing buffers from the tracing session `tsid`
361   // and sends them to `*consumer` (which must be != nullptr).
362   //
363   // Only reads a limited amount of data in one call. If there's more data,
364   // immediately schedules itself on a PostTask.
365   //
366   // Returns false in case of error.
367   bool ReadBuffersIntoConsumer(TracingSessionID tsid,
368                                ConsumerEndpointImpl* consumer);
369 
370   // Reads all the tracing buffers from the tracing session `tsid` and writes
371   // them into the associated file.
372   //
373   // Reads all the data in the buffers (or until the file is full) before
374   // returning.
375   //
376   // If the tracing session write_period_ms is 0, the file is full or there has
377   // been an error, flushes the file and closes it. Otherwise, schedules itself
378   // to be executed after write_period_ms.
379   //
380   // Returns false in case of error.
381   bool ReadBuffersIntoFile(TracingSessionID);
382 
383   void FreeBuffers(TracingSessionID);
384 
385   // Service implementation.
386   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
387       Producer*,
388       const ClientIdentity& client_identity,
389       const std::string& producer_name,
390       size_t shared_memory_size_hint_bytes = 0,
391       bool in_process = false,
392       ProducerSMBScrapingMode smb_scraping_mode =
393           ProducerSMBScrapingMode::kDefault,
394       size_t shared_memory_page_size_hint_bytes = 0,
395       std::unique_ptr<SharedMemory> shm = nullptr,
396       const std::string& sdk_version = {}) override;
397 
398   std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
399       Consumer*,
400       uid_t) override;
401 
402   std::unique_ptr<TracingService::RelayEndpoint> ConnectRelayClient(
403       RelayClientID) override;
404 
405   void DisconnectRelayClient(RelayClientID);
406 
407   // Set whether SMB scraping should be enabled by default or not. Producers can
408   // override this setting for their own SMBs.
SetSMBScrapingEnabled(bool enabled)409   void SetSMBScrapingEnabled(bool enabled) override {
410     smb_scraping_enabled_ = enabled;
411   }
412 
413   // Exposed mainly for testing.
num_producers()414   size_t num_producers() const { return producers_.size(); }
415   ProducerEndpointImpl* GetProducer(ProducerID) const;
416 
417  private:
418   friend class TracingServiceImplTest;
419   friend class TracingIntegrationTest;
420 
421   static constexpr int64_t kOneDayInNs = 24ll * 60 * 60 * 1000 * 1000 * 1000;
422 
423   struct TriggerHistory {
424     int64_t timestamp_ns;
425     uint64_t name_hash;
426 
427     bool operator<(const TriggerHistory& other) const {
428       return timestamp_ns < other.timestamp_ns;
429     }
430   };
431 
432   struct RegisteredDataSource {
433     ProducerID producer_id;
434     DataSourceDescriptor descriptor;
435   };
436 
437   // Represents an active data source for a tracing session.
438   struct DataSourceInstance {
DataSourceInstanceDataSourceInstance439     DataSourceInstance(DataSourceInstanceID id,
440                        const DataSourceConfig& cfg,
441                        const std::string& ds_name,
442                        bool notify_on_start,
443                        bool notify_on_stop,
444                        bool handles_incremental_state_invalidation,
445                        bool no_flush_)
446         : instance_id(id),
447           config(cfg),
448           data_source_name(ds_name),
449           will_notify_on_start(notify_on_start),
450           will_notify_on_stop(notify_on_stop),
451           handles_incremental_state_clear(
452               handles_incremental_state_invalidation),
453           no_flush(no_flush_) {}
454     DataSourceInstance(const DataSourceInstance&) = delete;
455     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
456 
457     DataSourceInstanceID instance_id;
458     DataSourceConfig config;
459     std::string data_source_name;
460     bool will_notify_on_start;
461     bool will_notify_on_stop;
462     bool handles_incremental_state_clear;
463     bool no_flush;
464 
465     enum DataSourceInstanceState {
466       CONFIGURED,
467       STARTING,
468       STARTED,
469       STOPPING,
470       STOPPED
471     };
472     DataSourceInstanceState state = CONFIGURED;
473   };
474 
475   struct PendingFlush {
476     std::set<ProducerID> producers;
477     ConsumerEndpoint::FlushCallback callback;
PendingFlushPendingFlush478     explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
479   };
480 
481   using PendingCloneID = uint64_t;
482 
483   struct PendingClone {
484     size_t pending_flush_cnt = 0;
485     // This vector might not be populated all at once. Some buffers might be
486     // nullptr while flushing is not done.
487     std::vector<std::unique_ptr<TraceBuffer>> buffers;
488     bool flush_failed = false;
489     base::WeakPtr<ConsumerEndpointImpl> weak_consumer;
490     bool skip_trace_filter = false;
491   };
492 
493   // Holds the state of a tracing session. A tracing session is uniquely bound
494   // a specific Consumer. Each Consumer can own one or more sessions.
495   struct TracingSession {
496     enum State {
497       DISABLED = 0,
498       CONFIGURED,
499       STARTED,
500       DISABLING_WAITING_STOP_ACKS,
501       CLONED_READ_ONLY,
502     };
503 
504     TracingSession(TracingSessionID,
505                    ConsumerEndpointImpl*,
506                    const TraceConfig&,
507                    base::TaskRunner*);
508     TracingSession(TracingSession&&) = delete;
509     TracingSession& operator=(TracingSession&&) = delete;
510 
num_buffersTracingSession511     size_t num_buffers() const { return buffers_index.size(); }
512 
delay_to_next_write_period_msTracingSession513     uint32_t delay_to_next_write_period_ms() const {
514       PERFETTO_DCHECK(write_period_ms > 0);
515       return write_period_ms -
516              static_cast<uint32_t>(base::GetWallTimeMs().count() %
517                                    write_period_ms);
518     }
519 
flush_timeout_msTracingSession520     uint32_t flush_timeout_ms() {
521       uint32_t timeout_ms = config.flush_timeout_ms();
522       return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
523     }
524 
data_source_stop_timeout_msTracingSession525     uint32_t data_source_stop_timeout_ms() {
526       uint32_t timeout_ms = config.data_source_stop_timeout_ms();
527       return timeout_ms ? timeout_ms : kDataSourceStopTimeoutMs;
528     }
529 
GetPacketSequenceIDTracingSession530     PacketSequenceID GetPacketSequenceID(MachineID machine_id,
531                                          ProducerID producer_id,
532                                          WriterID writer_id) {
533       auto key = std::make_tuple(machine_id, producer_id, writer_id);
534       auto it = packet_sequence_ids.find(key);
535       if (it != packet_sequence_ids.end())
536         return it->second;
537       // We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
538       // are limited to 1024).
539       static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
540                     "PacketSequenceID value space doesn't cover service "
541                     "sequence ID and all producer/writer ID combinations!");
542       PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
543       PacketSequenceID sequence_id = ++last_packet_sequence_id;
544       packet_sequence_ids[key] = sequence_id;
545       return sequence_id;
546     }
547 
GetDataSourceInstanceTracingSession548     DataSourceInstance* GetDataSourceInstance(
549         ProducerID producer_id,
550         DataSourceInstanceID instance_id) {
551       for (auto& inst_kv : data_source_instances) {
552         if (inst_kv.first != producer_id ||
553             inst_kv.second.instance_id != instance_id) {
554           continue;
555         }
556         return &inst_kv.second;
557       }
558       return nullptr;
559     }
560 
AllDataSourceInstancesStartedTracingSession561     bool AllDataSourceInstancesStarted() {
562       return std::all_of(
563           data_source_instances.begin(), data_source_instances.end(),
564           [](decltype(data_source_instances)::const_reference x) {
565             return x.second.state == DataSourceInstance::STARTED;
566           });
567     }
568 
AllDataSourceInstancesStoppedTracingSession569     bool AllDataSourceInstancesStopped() {
570       return std::all_of(
571           data_source_instances.begin(), data_source_instances.end(),
572           [](decltype(data_source_instances)::const_reference x) {
573             return x.second.state == DataSourceInstance::STOPPED;
574           });
575     }
576 
577     // Checks whether |clone_uid| is allowed to clone the current tracing
578     // session.
579     bool IsCloneAllowed(uid_t clone_uid) const;
580 
581     const TracingSessionID id;
582 
583     // The consumer that started the session.
584     // Can be nullptr if the consumer detached from the session.
585     ConsumerEndpointImpl* consumer_maybe_null;
586 
587     // Unix uid of the consumer. This is valid even after the consumer detaches
588     // and does not change for the entire duration of the session. It is used to
589     // prevent that a consumer re-attaches to a session from a different uid.
590     uid_t const consumer_uid;
591 
592     // The list of triggers this session received while alive and the time they
593     // were received at. This is used to insert 'fake' packets back to the
594     // consumer so they can tell when some event happened. The order matches the
595     // order they were received.
596     struct TriggerInfo {
597       uint64_t boot_time_ns;
598       std::string trigger_name;
599       std::string producer_name;
600       uid_t producer_uid;
601     };
602     std::vector<TriggerInfo> received_triggers;
603 
604     // The trace config provided by the Consumer when calling
605     // EnableTracing(), plus any updates performed by ChangeTraceConfig.
606     TraceConfig config;
607 
608     // List of data source instances that have been enabled on the various
609     // producers for this tracing session.
610     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
611 
612     // For each Flush(N) request, keeps track of the set of producers for which
613     // we are still awaiting a NotifyFlushComplete(N) ack.
614     std::map<FlushRequestID, PendingFlush> pending_flushes;
615 
616     // For each Clone request, keeps track of the flushes acknowledgement that
617     // we are still waiting for.
618     std::map<PendingCloneID, PendingClone> pending_clones;
619 
620     PendingCloneID last_pending_clone_id_ = 0;
621 
622     // Maps a per-trace-session buffer index into the corresponding global
623     // BufferID (shared namespace amongst all consumers). This vector has as
624     // many entries as |config.buffers_size()|.
625     std::vector<BufferID> buffers_index;
626 
627     std::map<std::tuple<MachineID, ProducerID, WriterID>, PacketSequenceID>
628         packet_sequence_ids;
629     PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
630 
631     // Whether we should emit the trace stats next time we reach EOF while
632     // performing ReadBuffers.
633     bool should_emit_stats = false;
634 
635     // Whether we should emit the sync marker the next time ReadBuffers() is
636     // called.
637     bool should_emit_sync_marker = false;
638 
639     // Whether we put the initial packets (trace config, system info,
640     // etc.) into the trace output yet.
641     bool did_emit_initial_packets = false;
642 
643     // Whether we emitted clock offsets for relay clients yet.
644     bool did_emit_remote_clock_sync_ = false;
645 
646     // Whether we should compress TracePackets after reading them.
647     bool compress_deflate = false;
648 
649     // The number of received triggers we've emitted into the trace output.
650     size_t num_triggers_emitted_into_trace = 0;
651 
652     // Packets that failed validation of the TrustedPacket.
653     uint64_t invalid_packets = 0;
654 
655     // Flush() stats. See comments in trace_stats.proto for more.
656     uint64_t flushes_requested = 0;
657     uint64_t flushes_succeeded = 0;
658     uint64_t flushes_failed = 0;
659 
660     // Outcome of the final Flush() done by FlushAndDisableTracing().
661     protos::gen::TraceStats_FinalFlushOutcome final_flush_outcome{};
662 
663     // Set to true on the first call to MaybeNotifyAllDataSourcesStarted().
664     bool did_notify_all_data_source_started = false;
665 
666     // Stores all lifecycle events of a particular type (i.e. associated with a
667     // single field id in the TracingServiceEvent proto).
668     struct LifecycleEvent {
669       LifecycleEvent(uint32_t f_id, uint32_t m_size = 1)
field_idTracingSession::LifecycleEvent670           : field_id(f_id), max_size(m_size), timestamps(m_size) {}
671 
672       // The field id of the event in the TracingServiceEvent proto.
673       uint32_t field_id;
674 
675       // Stores the max size of |timestamps|. Set to 1 by default (in
676       // the constructor) but can be overriden in TraceSession constructor
677       // if a larger size is required.
678       uint32_t max_size;
679 
680       // Stores the timestamps emitted for each event type (in nanoseconds).
681       // Emitted into the trace and cleared when the consumer next calls
682       // ReadBuffers.
683       base::CircularQueue<int64_t> timestamps;
684     };
685     std::vector<LifecycleEvent> lifecycle_events;
686 
687     using ClockSnapshotData = ClockSnapshotVector;
688 
689     // Initial clock snapshot, captured at trace start time (when state goes to
690     // TracingSession::STARTED). Emitted into the trace when the consumer first
691     // calls ReadBuffers().
692     ClockSnapshotData initial_clock_snapshot;
693 
694     // Stores clock snapshots to emit into the trace as a ring buffer. This
695     // buffer is populated both periodically and when lifecycle events happen
696     // but only when significant clock drift is detected. Emitted into the trace
697     // and cleared when the consumer next calls ReadBuffers().
698     base::CircularQueue<ClockSnapshotData> clock_snapshot_ring_buffer;
699 
700     State state = DISABLED;
701 
702     // If the consumer detached the session, this variable defines the key used
703     // for identifying the session later when reattaching.
704     std::string detach_key;
705 
706     // This is set when the Consumer calls sets |write_into_file| == true in the
707     // TraceConfig. In this case this represents the file we should stream the
708     // trace packets into, rather than returning it to the consumer via
709     // OnTraceData().
710     base::ScopedFile write_into_file;
711     uint32_t write_period_ms = 0;
712     uint64_t max_file_size_bytes = 0;
713     uint64_t bytes_written_into_file = 0;
714 
715     // Periodic task for snapshotting service events (e.g. clocks, sync markers
716     // etc)
717     base::PeriodicTask snapshot_periodic_task;
718 
719     // Deferred task that stops the trace when |duration_ms| expires. This is
720     // to handle the case of |prefer_suspend_clock_for_duration| which cannot
721     // use PostDelayedTask.
722     base::PeriodicTask timed_stop_task;
723 
724     // When non-NULL the packets should be post-processed using the filter.
725     std::unique_ptr<protozero::MessageFilter> trace_filter;
726     uint64_t filter_input_packets = 0;
727     uint64_t filter_input_bytes = 0;
728     uint64_t filter_output_bytes = 0;
729     uint64_t filter_errors = 0;
730     uint64_t filter_time_taken_ns = 0;
731     std::vector<uint64_t> filter_bytes_discarded_per_buffer;
732 
733     // A randomly generated trace identifier. Note that this does NOT always
734     // match the requested TraceConfig.trace_uuid_msb/lsb. Spcifically, it does
735     // until a gap-less snapshot is requested. Each snapshot re-generates the
736     // uuid to avoid emitting two different traces with the same uuid.
737     base::Uuid trace_uuid;
738 
739     // NOTE: when adding new fields here consider whether that state should be
740     // copied over in DoCloneSession() or not. Ask yourself: is this a
741     // "runtime state" (e.g. active data sources) or a "trace (meta)data state"?
742     // If the latter, it should be handled by DoCloneSession()).
743   };
744 
745   TracingServiceImpl(const TracingServiceImpl&) = delete;
746   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
747 
748   bool IsInitiatorPrivileged(const TracingSession&);
749 
750   DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
751                                       const TraceConfig::ProducerConfig&,
752                                       const RegisteredDataSource&,
753                                       TracingSession*);
754 
755   // Returns the next available ProducerID that is not in |producers_|.
756   ProducerID GetNextProducerID();
757 
758   // Returns a pointer to the |tracing_sessions_| entry or nullptr if the
759   // session doesn't exists.
760   TracingSession* GetTracingSession(TracingSessionID);
761 
762   // Returns a pointer to the tracing session that has the highest
763   // TraceConfig.bugreport_score, if any, or nullptr.
764   TracingSession* FindTracingSessionWithMaxBugreportScore();
765 
766   // Returns a pointer to the |tracing_sessions_| entry, matching the given
767   // uid and detach key, or nullptr if no such session exists.
768   TracingSession* GetDetachedSession(uid_t, const std::string& key);
769 
770   // Update the memory guard rail by using the latest information from the
771   // shared memory and trace buffers.
772   void UpdateMemoryGuardrail();
773 
774   void StartDataSourceInstance(ProducerEndpointImpl*,
775                                TracingSession*,
776                                DataSourceInstance*);
777   void StopDataSourceInstance(ProducerEndpointImpl*,
778                               TracingSession*,
779                               DataSourceInstance*,
780                               bool disable_immediately);
781   void PeriodicSnapshotTask(TracingSessionID);
782   void MaybeSnapshotClocksIntoRingBuffer(TracingSession*);
783   bool SnapshotClocks(TracingSession::ClockSnapshotData*);
784   void SnapshotLifecyleEvent(TracingSession*,
785                              uint32_t field_id,
786                              bool snapshot_clocks);
787   void EmitClockSnapshot(TracingSession*,
788                          TracingSession::ClockSnapshotData,
789                          std::vector<TracePacket>*);
790   void EmitSyncMarker(std::vector<TracePacket>*);
791   void EmitStats(TracingSession*, std::vector<TracePacket>*);
792   TraceStats GetTraceStats(TracingSession*);
793   void EmitLifecycleEvents(TracingSession*, std::vector<TracePacket>*);
794   void EmitUuid(TracingSession*, std::vector<TracePacket>*);
795   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
796   void EmitSystemInfo(std::vector<TracePacket>*);
797   void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
798   void MaybeEmitRemoteClockSync(TracingSession*, std::vector<TracePacket>*);
799   void MaybeNotifyAllDataSourcesStarted(TracingSession*);
800   void OnFlushTimeout(TracingSessionID, FlushRequestID);
801   void OnDisableTracingTimeout(TracingSessionID);
802   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
803   void PeriodicFlushTask(TracingSessionID, bool post_next_only);
804   void CompleteFlush(TracingSessionID tsid,
805                      ConsumerEndpoint::FlushCallback callback,
806                      bool success);
807   void ScrapeSharedMemoryBuffers(TracingSession*, ProducerEndpointImpl*);
808   void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
809   TraceBuffer* GetBufferByID(BufferID);
810   void FlushDataSourceInstances(
811       TracingSession*,
812       uint32_t timeout_ms,
813       const std::map<ProducerID, std::vector<DataSourceInstanceID>>&,
814       ConsumerEndpoint::FlushCallback,
815       FlushFlags);
816   std::map<ProducerID, std::vector<DataSourceInstanceID>>
817   GetFlushableDataSourceInstancesForBuffers(TracingSession*,
818                                             const std::set<BufferID>&);
819   bool DoCloneBuffers(TracingSession*,
820                       const std::set<BufferID>&,
821                       std::vector<std::unique_ptr<TraceBuffer>>*);
822   base::Status FinishCloneSession(ConsumerEndpointImpl*,
823                                   TracingSessionID,
824                                   std::vector<std::unique_ptr<TraceBuffer>>,
825                                   bool skip_filter,
826                                   bool final_flush_outcome,
827                                   base::Uuid*);
828   void OnFlushDoneForClone(TracingSessionID src_tsid,
829                            PendingCloneID clone_id,
830                            const std::set<BufferID>& buf_ids,
831                            bool final_flush_outcome);
832 
833   // Returns true if `*tracing_session` is waiting for a trigger that hasn't
834   // happened.
835   static bool IsWaitingForTrigger(TracingSession* tracing_session);
836 
837   // Reads the buffers from `*tracing_session` and returns them (along with some
838   // metadata packets).
839   //
840   // The function stops when the cumulative size of the return packets exceeds
841   // `threshold` (so it's not a strict upper bound) and sets `*has_more` to
842   // true, or when there are no more packets (and sets `*has_more` to false).
843   std::vector<TracePacket> ReadBuffers(TracingSession* tracing_session,
844                                        size_t threshold,
845                                        bool* has_more);
846 
847   // If `*tracing_session` has a filter, applies it to `*packets`. Doesn't
848   // change the number of `*packets`, only their content.
849   void MaybeFilterPackets(TracingSession* tracing_session,
850                           std::vector<TracePacket>* packets);
851 
852   // If `*tracing_session` has compression enabled, compress `*packets`.
853   void MaybeCompressPackets(TracingSession* tracing_session,
854                             std::vector<TracePacket>* packets);
855 
856   // If `*tracing_session` is configured to write into a file, writes `packets`
857   // into the file.
858   //
859   // Returns true if the file should be closed (because it's full or there has
860   // been an error), false otherwise.
861   bool WriteIntoFile(TracingSession* tracing_session,
862                      std::vector<TracePacket> packets);
863   void OnStartTriggersTimeout(TracingSessionID tsid);
864   void MaybeLogUploadEvent(const TraceConfig&,
865                            const base::Uuid&,
866                            PerfettoStatsdAtom atom,
867                            const std::string& trigger_name = "");
868   void MaybeLogTriggerEvent(const TraceConfig&,
869                             PerfettoTriggerAtom atom,
870                             const std::string& trigger_name);
871   size_t PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,
872                                              uint64_t trigger_name_hash);
873   static void StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl>,
874                                      TracingSessionID);
875 
876   base::TaskRunner* const task_runner_;
877   const InitOpts init_opts_;
878   std::unique_ptr<SharedMemory::Factory> shm_factory_;
879   ProducerID last_producer_id_ = 0;
880   DataSourceInstanceID last_data_source_instance_id_ = 0;
881   TracingSessionID last_tracing_session_id_ = 0;
882   FlushRequestID last_flush_request_id_ = 0;
883   uid_t uid_ = 0;
884 
885   // Buffer IDs are global across all consumers (because a Producer can produce
886   // data for more than one trace session, hence more than one consumer).
887   IdAllocator<BufferID> buffer_ids_;
888 
889   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
890   std::map<ProducerID, ProducerEndpointImpl*> producers_;
891   std::set<ConsumerEndpointImpl*> consumers_;
892   std::map<RelayClientID, RelayEndpointImpl*> relay_clients_;
893   std::map<TracingSessionID, TracingSession> tracing_sessions_;
894   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
895   std::map<std::string, int64_t> session_to_last_trace_s_;
896 
897   // Contains timestamps of triggers.
898   // The queue is sorted by timestamp and invocations older than
899   // |trigger_window_ns_| are purged when a trigger happens.
900   base::CircularQueue<TriggerHistory> trigger_history_;
901 
902   bool smb_scraping_enabled_ = false;
903   bool lockdown_mode_ = false;
904   uint32_t min_write_period_ms_ = 100;       // Overridable for testing.
905   int64_t trigger_window_ns_ = kOneDayInNs;  // Overridable for testing.
906 
907   std::minstd_rand trigger_probability_rand_;
908   std::uniform_real_distribution<> trigger_probability_dist_;
909   double trigger_rnd_override_for_testing_ = 0;  // Overridable for testing.
910 
911   uint8_t sync_marker_packet_[32];  // Lazily initialized.
912   size_t sync_marker_packet_size_ = 0;
913 
914   // Stats.
915   uint64_t chunks_discarded_ = 0;
916   uint64_t patches_discarded_ = 0;
917 
918   PERFETTO_THREAD_CHECKER(thread_checker_)
919 
920   base::WeakPtrFactory<TracingServiceImpl>
921       weak_ptr_factory_;  // Keep at the end.
922 };
923 
924 }  // namespace perfetto
925 
926 #endif  // SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
927