• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/service/tracing_service_impl.h"
18 
19 #include <limits.h>
20 #include <string.h>
21 
22 #include <algorithm>
23 #include <cinttypes>
24 #include <cstdint>
25 #include <limits>
26 #include <optional>
27 #include <string>
28 #include <unordered_set>
29 
30 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
31     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
32 #include <sys/uio.h>
33 #include <sys/utsname.h>
34 #include <unistd.h>
35 #endif
36 
37 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
38     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
39 #include "src/android_internal/lazy_library_loader.h"    // nogncheck
40 #include "src/android_internal/tracing_service_proxy.h"  // nogncheck
41 #endif
42 
43 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
44     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
45     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
46 #define PERFETTO_HAS_CHMOD
47 #include <sys/stat.h>
48 #endif
49 
50 #include "perfetto/base/build_config.h"
51 #include "perfetto/base/status.h"
52 #include "perfetto/base/task_runner.h"
53 #include "perfetto/ext/base/android_utils.h"
54 #include "perfetto/ext/base/clock_snapshots.h"
55 #include "perfetto/ext/base/file_utils.h"
56 #include "perfetto/ext/base/metatrace.h"
57 #include "perfetto/ext/base/string_utils.h"
58 #include "perfetto/ext/base/string_view.h"
59 #include "perfetto/ext/base/sys_types.h"
60 #include "perfetto/ext/base/utils.h"
61 #include "perfetto/ext/base/uuid.h"
62 #include "perfetto/ext/base/version.h"
63 #include "perfetto/ext/base/watchdog.h"
64 #include "perfetto/ext/tracing/core/basic_types.h"
65 #include "perfetto/ext/tracing/core/client_identity.h"
66 #include "perfetto/ext/tracing/core/consumer.h"
67 #include "perfetto/ext/tracing/core/observable_events.h"
68 #include "perfetto/ext/tracing/core/producer.h"
69 #include "perfetto/ext/tracing/core/shared_memory.h"
70 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
71 #include "perfetto/ext/tracing/core/trace_packet.h"
72 #include "perfetto/ext/tracing/core/trace_writer.h"
73 #include "perfetto/protozero/scattered_heap_buffer.h"
74 #include "perfetto/protozero/static_buffer.h"
75 #include "perfetto/tracing/core/data_source_descriptor.h"
76 #include "perfetto/tracing/core/tracing_service_capabilities.h"
77 #include "perfetto/tracing/core/tracing_service_state.h"
78 #include "src/android_stats/statsd_logging_helper.h"
79 #include "src/protozero/filtering/message_filter.h"
80 #include "src/protozero/filtering/string_filter.h"
81 #include "src/tracing/core/shared_memory_arbiter_impl.h"
82 #include "src/tracing/service/packet_stream_validator.h"
83 #include "src/tracing/service/trace_buffer.h"
84 
85 #include "protos/perfetto/common/builtin_clock.gen.h"
86 #include "protos/perfetto/common/builtin_clock.pbzero.h"
87 #include "protos/perfetto/common/trace_stats.pbzero.h"
88 #include "protos/perfetto/config/trace_config.pbzero.h"
89 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
90 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
91 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
92 #include "protos/perfetto/trace/system_info.pbzero.h"
93 #include "protos/perfetto/trace/trace_packet.pbzero.h"
94 #include "protos/perfetto/trace/trace_uuid.pbzero.h"
95 #include "protos/perfetto/trace/trigger.pbzero.h"
96 
97 // General note: this class must assume that Producers are malicious and will
98 // try to crash / exploit this class. We can trust pointers because they come
99 // from the IPC layer, but we should never assume that that the producer calls
100 // come in the right order or their arguments are sane / within bounds.
101 
102 // This is a macro because we want the call-site line number for the ELOG.
103 #define PERFETTO_SVC_ERR(...) \
104   (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
105 
106 namespace perfetto {
107 
108 namespace {
109 constexpr int kMaxBuffersPerConsumer = 128;
110 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
111 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
112 constexpr int kMinWriteIntoFilePeriodMs = 100;
113 constexpr uint32_t kAllDataSourceStartedTimeout = 20000;
114 constexpr int kMaxConcurrentTracingSessions = 15;
115 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
116 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
117 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
118 
119 constexpr uint32_t kMillisPerHour = 3600000;
120 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
121 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
122 
123 // These apply only if enable_extra_guardrails is true.
124 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
125 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
126 
127 constexpr size_t kMaxLifecycleEventsListedDataSources = 32;
128 
129 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
130 struct iovec {
131   void* iov_base;  // Address
132   size_t iov_len;  // Block size
133 };
134 
135 // Simple implementation of writev. Note that this does not give the atomicity
136 // guarantees of a real writev, but we don't depend on these (we aren't writing
137 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)138 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
139   ssize_t total_size = 0;
140   for (int i = 0; i < iovcnt; ++i) {
141     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
142     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
143       return -1;
144     total_size += current_size;
145   }
146   return total_size;
147 }
148 
149 #define IOV_MAX 1024  // Linux compatible limit.
150 
151 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_QNX)
152 #define IOV_MAX 1024  // Linux compatible limit.
153 #endif
154 
155 // Partially encodes a CommitDataRequest in an int32 for the purposes of
156 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
157 // (which is technically 16 bits wide).
158 //
159 // Format (by bit range):
160 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
161 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)162 int32_t EncodeCommitDataRequest(ProducerID producer_id,
163                                 const CommitDataRequest& req_untrusted) {
164   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
165   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
166   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
167 
168   uint32_t mask = (1 << 10) - 1;
169   uint32_t acc = 0;
170   acc |= has_flush_id << 30;
171   acc |= (cpatch & mask) << 20;
172   acc |= (cmov & mask) << 10;
173   acc |= (producer_id & mask);
174   return static_cast<int32_t>(acc);
175 }
176 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)177 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
178                               std::vector<uint8_t> packet) {
179   Slice slice = Slice::Allocate(packet.size());
180   memcpy(slice.own_data(), packet.data(), packet.size());
181   packets->emplace_back();
182   packets->back().AddSlice(std::move(slice));
183 }
184 
EnsureValidShmSizes(size_t shm_size,size_t page_size)185 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
186     size_t shm_size,
187     size_t page_size) {
188   // Theoretically the max page size supported by the ABI is 64KB.
189   // However, the current implementation of TraceBuffer (the non-shared
190   // userspace buffer where the service copies data) supports at most
191   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
192   // but then causes the data to be discarded when copying it into
193   // TraceBuffer.
194   constexpr size_t kMaxPageSize = 32 * 1024;
195   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
196 
197   if (page_size == 0)
198     page_size = TracingServiceImpl::kDefaultShmPageSize;
199   if (shm_size == 0)
200     shm_size = TracingServiceImpl::kDefaultShmSize;
201 
202   page_size = std::min<size_t>(page_size, kMaxPageSize);
203   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
204 
205   // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
206   // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
207   // matter here, because the tracing page size is just a logical partitioning
208   // and does not have any dependencies on kernel mm syscalls (read: it's fine
209   // to have trace page sizes of 4K on a system where the kernel page size is
210   // 16K).
211   bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
212   page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
213 
214   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
215   size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
216   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
217 
218   if (!page_size_is_valid || shm_size < page_size ||
219       shm_size % page_size != 0) {
220     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
221                            TracingServiceImpl::kDefaultShmPageSize);
222   }
223   return std::make_tuple(shm_size, page_size);
224 }
225 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)226 bool NameMatchesFilter(const std::string& name,
227                        const std::vector<std::string>& name_filter,
228                        const std::vector<std::string>& name_regex_filter) {
229   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
230   if (!filter_is_set)
231     return true;
232   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
233                                   name) != name_filter.end();
234   bool filter_regex_matches =
235       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
236                    [&](const std::string& regex) {
237                      return std::regex_match(
238                          name, std::regex(regex, std::regex::extended));
239                    }) != name_regex_filter.end();
240   return filter_matches || filter_regex_matches;
241 }
242 
243 // Used when TraceConfig.write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path,bool overwrite)244 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
245 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
246     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
247   // This is NOT trying to preserve any security property, SELinux does that.
248   // It just improves the actionability of the error when people try to save the
249   // trace in a location that is not SELinux-allowed (a generic "permission
250   // denied" vs "don't put it here, put it there").
251   // These are the only SELinux approved dir for trace files that are created
252   // directly by traced.
253   static const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
254   if (!base::StartsWith(path, kTraceDirBasePath)) {
255     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
256                   path.c_str(), kTraceDirBasePath);
257     return base::ScopedFile();
258   }
259 #endif
260   // O_CREAT | O_EXCL will fail if the file exists already.
261   const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
262   auto fd = base::OpenFile(path, flags, 0600);
263   if (fd) {
264 #if defined(PERFETTO_HAS_CHMOD)
265     // Passing 0644 directly above won't work because of umask.
266     PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
267 #endif
268   } else {
269     PERFETTO_PLOG("Failed to create %s", path.c_str());
270   }
271   return fd;
272 }
273 
ShouldLogEvent(const TraceConfig & cfg)274 bool ShouldLogEvent(const TraceConfig& cfg) {
275   switch (cfg.statsd_logging()) {
276     case TraceConfig::STATSD_LOGGING_ENABLED:
277       return true;
278     case TraceConfig::STATSD_LOGGING_DISABLED:
279       return false;
280     case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
281       break;
282   }
283   // For backward compatibility with older versions of perfetto_cmd.
284   return cfg.enable_extra_guardrails();
285 }
286 
287 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
288 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)289 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
290                                size_t size,
291                                size_t max_slice_size,
292                                perfetto::TracePacket* packet) {
293   if (size <= max_slice_size) {
294     packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
295     return;
296   }
297   uint8_t* src_ptr = data.get();
298   for (size_t size_left = size; size_left > 0;) {
299     const size_t slice_size = std::min(size_left, max_slice_size);
300 
301     Slice slice = Slice::Allocate(slice_size);
302     memcpy(slice.own_data(), src_ptr, slice_size);
303     packet->AddSlice(std::move(slice));
304 
305     src_ptr += slice_size;
306     size_left -= slice_size;
307   }
308 }
309 
310 using TraceFilter = protos::gen::TraceConfig::TraceFilter;
ConvertPolicy(TraceFilter::StringFilterPolicy policy)311 std::optional<protozero::StringFilter::Policy> ConvertPolicy(
312     TraceFilter::StringFilterPolicy policy) {
313   switch (policy) {
314     case TraceFilter::SFP_UNSPECIFIED:
315       return std::nullopt;
316     case TraceFilter::SFP_MATCH_REDACT_GROUPS:
317       return protozero::StringFilter::Policy::kMatchRedactGroups;
318     case TraceFilter::SFP_ATRACE_MATCH_REDACT_GROUPS:
319       return protozero::StringFilter::Policy::kAtraceMatchRedactGroups;
320     case TraceFilter::SFP_MATCH_BREAK:
321       return protozero::StringFilter::Policy::kMatchBreak;
322     case TraceFilter::SFP_ATRACE_MATCH_BREAK:
323       return protozero::StringFilter::Policy::kAtraceMatchBreak;
324     case TraceFilter::SFP_ATRACE_REPEATED_SEARCH_REDACT_GROUPS:
325       return protozero::StringFilter::Policy::kAtraceRepeatedSearchRedactGroups;
326   }
327   return std::nullopt;
328 }
329 
330 }  // namespace
331 
332 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)333 std::unique_ptr<TracingService> TracingService::CreateInstance(
334     std::unique_ptr<SharedMemory::Factory> shm_factory,
335     base::TaskRunner* task_runner,
336     InitOpts init_opts) {
337   tracing_service::Dependencies deps;
338   deps.clock = std::make_unique<tracing_service::ClockImpl>();
339   uint32_t seed = static_cast<uint32_t>(deps.clock->GetWallTimeMs().count());
340   deps.random = std::make_unique<tracing_service::RandomImpl>(seed);
341   return std::unique_ptr<TracingService>(new TracingServiceImpl(
342       std::move(shm_factory), task_runner, std::move(deps), init_opts));
343 }
344 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,tracing_service::Dependencies deps,InitOpts init_opts)345 TracingServiceImpl::TracingServiceImpl(
346     std::unique_ptr<SharedMemory::Factory> shm_factory,
347     base::TaskRunner* task_runner,
348     tracing_service::Dependencies deps,
349     InitOpts init_opts)
350     : clock_(std::move(deps.clock)),
351       random_(std::move(deps.random)),
352       init_opts_(init_opts),
353       shm_factory_(std::move(shm_factory)),
354       uid_(base::GetCurrentUserId()),
355       buffer_ids_(kMaxTraceBufferID),
356       weak_runner_(task_runner) {
357   PERFETTO_DCHECK(task_runner);
358 }
359 
~TracingServiceImpl()360 TracingServiceImpl::~TracingServiceImpl() {
361   // TODO(fmayer): handle teardown of all Producer.
362 }
363 
364 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,const ClientIdentity & client_identity,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)365 TracingServiceImpl::ConnectProducer(Producer* producer,
366                                     const ClientIdentity& client_identity,
367                                     const std::string& producer_name,
368                                     size_t shared_memory_size_hint_bytes,
369                                     bool in_process,
370                                     ProducerSMBScrapingMode smb_scraping_mode,
371                                     size_t shared_memory_page_size_hint_bytes,
372                                     std::unique_ptr<SharedMemory> shm,
373                                     const std::string& sdk_version) {
374   PERFETTO_DCHECK_THREAD(thread_checker_);
375 
376   auto uid = client_identity.uid();
377   if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
378     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
379                   static_cast<unsigned long>(uid));
380     return nullptr;
381   }
382 
383   if (producers_.size() >= kMaxProducerID) {
384     PERFETTO_DFATAL("Too many producers.");
385     return nullptr;
386   }
387   const ProducerID id = GetNextProducerID();
388   PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
389                 static_cast<int>(uid));
390   bool smb_scraping_enabled = smb_scraping_enabled_;
391   switch (smb_scraping_mode) {
392     case ProducerSMBScrapingMode::kDefault:
393       break;
394     case ProducerSMBScrapingMode::kEnabled:
395       smb_scraping_enabled = true;
396       break;
397     case ProducerSMBScrapingMode::kDisabled:
398       smb_scraping_enabled = false;
399       break;
400   }
401 
402   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
403       id, client_identity, this, weak_runner_.task_runner(), producer,
404       producer_name, sdk_version, in_process, smb_scraping_enabled));
405   auto it_and_inserted = producers_.emplace(id, endpoint.get());
406   PERFETTO_DCHECK(it_and_inserted.second);
407   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
408   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
409 
410   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
411   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
412   endpoint->weak_runner_.PostTask(
413       [endpoint = endpoint.get()] { endpoint->producer_->OnConnect(); });
414 
415   if (shm) {
416     // The producer supplied an SMB. This is used only by Chrome; in the most
417     // common cases the SMB is created by the service and passed via
418     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
419     // use it. The transport layer has to verify the integrity of the SMB (e.g.
420     // ensure that the producer can't resize if after the fact).
421     size_t shm_size, page_size;
422     std::tie(shm_size, page_size) =
423         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
424     if (shm_size == shm->size() &&
425         page_size == endpoint->shmem_page_size_hint_bytes_) {
426       PERFETTO_DLOG(
427           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
428           shm_size / 1024, endpoint->name_.c_str());
429       endpoint->SetupSharedMemory(std::move(shm), page_size,
430                                   /*provided_by_producer=*/true);
431     } else {
432       PERFETTO_LOG(
433           "Discarding incorrectly sized producer-provided SMB for producer "
434           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
435           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
436           "%zu B page size",
437           endpoint->name_.c_str(), shm->size(),
438           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
439       shm.reset();
440     }
441   }
442 
443   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
444 }
445 
DisconnectProducer(ProducerID id)446 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
447   PERFETTO_DCHECK_THREAD(thread_checker_);
448   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
449   PERFETTO_DCHECK(producers_.count(id));
450 
451   // Scrape remaining chunks for this producer to ensure we don't lose data.
452   if (auto* producer = GetProducer(id)) {
453     for (auto& session_id_and_session : tracing_sessions_)
454       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
455   }
456 
457   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
458     auto next = it;
459     next++;
460     if (it->second.producer_id == id)
461       UnregisterDataSource(id, it->second.descriptor.name());
462     it = next;
463   }
464 
465   producers_.erase(id);
466   UpdateMemoryGuardrail();
467 }
468 
GetProducer(ProducerID id) const469 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
470     ProducerID id) const {
471   PERFETTO_DCHECK_THREAD(thread_checker_);
472   auto it = producers_.find(id);
473   if (it == producers_.end())
474     return nullptr;
475   return it->second;
476 }
477 
478 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)479 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
480   PERFETTO_DCHECK_THREAD(thread_checker_);
481   PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
482                 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
483   std::unique_ptr<ConsumerEndpointImpl> endpoint(new ConsumerEndpointImpl(
484       this, weak_runner_.task_runner(), consumer, uid));
485   // Consumer might go away before we're able to send the connect notification,
486   // if that is the case just bail out.
487   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
488   weak_runner_.task_runner()->PostTask([weak_ptr = std::move(weak_ptr)] {
489     if (weak_ptr)
490       weak_ptr->consumer_->OnConnect();
491   });
492   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
493 }
494 
DisconnectConsumer(ConsumerEndpointImpl * consumer)495 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
496   PERFETTO_DCHECK_THREAD(thread_checker_);
497   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
498 
499   if (consumer->tracing_session_id_)
500     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
501 
502   // At this point no more pointers to |consumer| should be around.
503   PERFETTO_DCHECK(!std::any_of(
504       tracing_sessions_.begin(), tracing_sessions_.end(),
505       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
506         return kv.second.consumer_maybe_null == consumer;
507       }));
508 }
509 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)510 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
511                                         const std::string& key) {
512   PERFETTO_DCHECK_THREAD(thread_checker_);
513   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
514 
515   TracingSessionID tsid = consumer->tracing_session_id_;
516   TracingSession* tracing_session;
517   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
518     return false;
519 
520   if (GetDetachedSession(consumer->uid_, key)) {
521     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
522                   key.c_str());
523     return false;
524   }
525 
526   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
527   tracing_session->consumer_maybe_null = nullptr;
528   tracing_session->detach_key = key;
529   consumer->tracing_session_id_ = 0;
530   return true;
531 }
532 
533 std::unique_ptr<TracingService::RelayEndpoint>
ConnectRelayClient(RelayClientID relay_client_id)534 TracingServiceImpl::ConnectRelayClient(RelayClientID relay_client_id) {
535   PERFETTO_DCHECK_THREAD(thread_checker_);
536 
537   auto endpoint = std::make_unique<RelayEndpointImpl>(relay_client_id, this);
538   relay_clients_[relay_client_id] = endpoint.get();
539 
540   return std::move(endpoint);
541 }
542 
DisconnectRelayClient(RelayClientID relay_client_id)543 void TracingServiceImpl::DisconnectRelayClient(RelayClientID relay_client_id) {
544   PERFETTO_DCHECK_THREAD(thread_checker_);
545 
546   if (relay_clients_.find(relay_client_id) == relay_clients_.end())
547     return;
548   relay_clients_.erase(relay_client_id);
549 }
550 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)551 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
552                                         const std::string& key) {
553   PERFETTO_DCHECK_THREAD(thread_checker_);
554   PERFETTO_DLOG("Consumer %p attaching to session %s",
555                 reinterpret_cast<void*>(consumer), key.c_str());
556 
557   if (consumer->tracing_session_id_) {
558     PERFETTO_ELOG(
559         "Cannot reattach consumer to session %s"
560         " while it already attached tracing session ID %" PRIu64,
561         key.c_str(), consumer->tracing_session_id_);
562     return false;
563   }
564 
565   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
566   if (!tracing_session) {
567     PERFETTO_ELOG(
568         "Failed to attach consumer, session '%s' not found for uid %d",
569         key.c_str(), static_cast<int>(consumer->uid_));
570     return false;
571   }
572 
573   consumer->tracing_session_id_ = tracing_session->id;
574   tracing_session->consumer_maybe_null = consumer;
575   tracing_session->detach_key.clear();
576   return true;
577 }
578 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)579 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
580                                                const TraceConfig& cfg,
581                                                base::ScopedFile fd) {
582   PERFETTO_DCHECK_THREAD(thread_checker_);
583 
584   // If the producer is specifying a UUID, respect that (at least for the first
585   // snapshot). Otherwise generate a new UUID.
586   base::Uuid uuid(cfg.trace_uuid_lsb(), cfg.trace_uuid_msb());
587   if (!uuid)
588     uuid = base::Uuidv4();
589 
590   PERFETTO_DLOG("Enabling tracing for consumer %p, UUID: %s",
591                 reinterpret_cast<void*>(consumer),
592                 uuid.ToPrettyString().c_str());
593   MaybeLogUploadEvent(cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracing);
594   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
595     lockdown_mode_ = true;
596   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
597     lockdown_mode_ = false;
598 
599   // Scope |tracing_session| to this block to prevent accidental use of a null
600   // pointer later in this function.
601   {
602     TracingSession* tracing_session =
603         GetTracingSession(consumer->tracing_session_id_);
604     if (tracing_session) {
605       MaybeLogUploadEvent(
606           cfg, uuid,
607           PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
608       return PERFETTO_SVC_ERR(
609           "A Consumer is trying to EnableTracing() but another tracing "
610           "session is already active (forgot a call to FreeBuffers() ?)");
611     }
612   }
613 
614   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
615                                        ? kGuardrailsMaxTracingDurationMillis
616                                        : kMaxTracingDurationMillis;
617   if (cfg.duration_ms() > max_duration_ms) {
618     MaybeLogUploadEvent(cfg, uuid,
619                         PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
620     return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
621                             "ms  > %" PRIu32 " ms)",
622                             cfg.duration_ms(), max_duration_ms);
623   }
624 
625   const bool has_trigger_config =
626       GetTriggerMode(cfg) != TraceConfig::TriggerConfig::UNSPECIFIED;
627   if (has_trigger_config &&
628       (cfg.trigger_config().trigger_timeout_ms() == 0 ||
629        cfg.trigger_config().trigger_timeout_ms() > max_duration_ms)) {
630     MaybeLogUploadEvent(
631         cfg, uuid,
632         PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
633     return PERFETTO_SVC_ERR(
634         "Traces with START_TRACING triggers must provide a positive "
635         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
636         cfg.trigger_config().trigger_timeout_ms());
637   }
638 
639   // This check has been introduced in May 2023 after finding b/274931668.
640   if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
641       TraceConfig::TriggerConfig::TriggerMode_MAX) {
642     MaybeLogUploadEvent(
643         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
644     return PERFETTO_SVC_ERR(
645         "The trace config specified an invalid trigger_mode");
646   }
647 
648   if (cfg.trigger_config().use_clone_snapshot_if_available() &&
649       cfg.trigger_config().trigger_mode() !=
650           TraceConfig::TriggerConfig::STOP_TRACING) {
651     MaybeLogUploadEvent(
652         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
653     return PERFETTO_SVC_ERR(
654         "trigger_mode must be STOP_TRACING when "
655         "use_clone_snapshot_if_available=true");
656   }
657 
658   if (has_trigger_config && cfg.duration_ms() != 0) {
659     MaybeLogUploadEvent(
660         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
661     return PERFETTO_SVC_ERR(
662         "duration_ms was set, this must not be set for traces with triggers.");
663   }
664 
665   for (char c : cfg.bugreport_filename()) {
666     if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
667           (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.')) {
668       MaybeLogUploadEvent(
669           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidBrFilename);
670       return PERFETTO_SVC_ERR(
671           "bugreport_filename contains invalid chars. Use [a-zA-Z0-9-_.]+");
672     }
673   }
674 
675   if ((GetTriggerMode(cfg) == TraceConfig::TriggerConfig::STOP_TRACING ||
676        GetTriggerMode(cfg) == TraceConfig::TriggerConfig::CLONE_SNAPSHOT) &&
677       cfg.write_into_file()) {
678     // We don't support this usecase because there are subtle assumptions which
679     // break around TracingServiceEvents and windowed sorting (i.e. if we don't
680     // drain the events in ReadBuffersIntoFile because we are waiting for
681     // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
682     // emitting them wildy out of order breaking windowed sorting in trace
683     // processor).
684     MaybeLogUploadEvent(
685         cfg, uuid,
686         PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
687     return PERFETTO_SVC_ERR(
688         "Specifying trigger mode STOP_TRACING/CLONE_SNAPSHOT and "
689         "write_into_file together is unsupported");
690   }
691 
692   std::unordered_set<std::string> triggers;
693   for (const auto& trigger : cfg.trigger_config().triggers()) {
694     if (!triggers.insert(trigger.name()).second) {
695       MaybeLogUploadEvent(
696           cfg, uuid,
697           PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
698       return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
699                               trigger.name().c_str());
700     }
701   }
702 
703   if (cfg.enable_extra_guardrails()) {
704     if (cfg.deferred_start()) {
705       MaybeLogUploadEvent(
706           cfg, uuid,
707           PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
708       return PERFETTO_SVC_ERR(
709           "deferred_start=true is not supported in unsupervised traces");
710     }
711     uint64_t buf_size_sum = 0;
712     for (const auto& buf : cfg.buffers()) {
713       if (buf.size_kb() % 4 != 0) {
714         MaybeLogUploadEvent(
715             cfg, uuid,
716             PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
717         return PERFETTO_SVC_ERR(
718             "buffers.size_kb must be a multiple of 4, got %" PRIu32,
719             buf.size_kb());
720       }
721       buf_size_sum += buf.size_kb();
722     }
723 
724     uint32_t max_tracing_buffer_size_kb =
725         std::max(kGuardrailsMaxTracingBufferSizeKb,
726                  cfg.guardrail_overrides().max_tracing_buffer_size_kb());
727     if (buf_size_sum > max_tracing_buffer_size_kb) {
728       MaybeLogUploadEvent(
729           cfg, uuid,
730           PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
731       return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
732                               "kB  > %" PRIu32 " kB)",
733                               buf_size_sum, max_tracing_buffer_size_kb);
734     }
735   }
736 
737   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
738     MaybeLogUploadEvent(cfg, uuid,
739                         PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
740     return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
741                             cfg.buffers_size());
742   }
743   // Check that the config specifies all buffers for its data sources. This
744   // is also checked in SetupDataSource, but it is simpler to return a proper
745   // error to the consumer from here (and there will be less state to undo).
746   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
747     size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
748     size_t target_buffer = cfg_data_source.config().target_buffer();
749     if (target_buffer >= num_buffers) {
750       MaybeLogUploadEvent(
751           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
752       return PERFETTO_SVC_ERR(
753           "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
754           "%zu)",
755           cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
756     }
757   }
758 
759   if (!cfg.unique_session_name().empty()) {
760     const std::string& name = cfg.unique_session_name();
761     for (auto& kv : tracing_sessions_) {
762       if (kv.second.state == TracingSession::CLONED_READ_ONLY)
763         continue;  // Don't consider cloned sessions in uniqueness checks.
764       if (kv.second.config.unique_session_name() == name) {
765         MaybeLogUploadEvent(
766             cfg, uuid,
767             PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
768         static const char fmt[] =
769             "A trace with this unique session name (%s) already exists";
770         // This happens frequently, don't make it an "E"LOG.
771         PERFETTO_LOG(fmt, name.c_str());
772         return base::ErrStatus(fmt, name.c_str());
773       }
774     }
775   }
776 
777   if (!cfg.session_semaphores().empty()) {
778     struct SemaphoreSessionsState {
779       uint64_t smallest_max_other_session_count =
780           std::numeric_limits<uint64_t>::max();
781       uint64_t session_count = 0;
782     };
783     // For each semaphore, compute the number of active sessions and the
784     // MIN(limit).
785     std::unordered_map<std::string, SemaphoreSessionsState>
786         sem_to_sessions_state;
787     for (const auto& id_and_session : tracing_sessions_) {
788       const auto& session = id_and_session.second;
789       if (session.state == TracingSession::CLONED_READ_ONLY ||
790           session.state == TracingSession::DISABLED) {
791         // Don't consider cloned or disabled sessions in checks.
792         continue;
793       }
794       for (const auto& sem : session.config.session_semaphores()) {
795         auto& sessions_state = sem_to_sessions_state[sem.name()];
796         sessions_state.smallest_max_other_session_count =
797             std::min(sessions_state.smallest_max_other_session_count,
798                      sem.max_other_session_count());
799         sessions_state.session_count++;
800       }
801     }
802 
803     // Check if any of the semaphores declared by the config clashes with any of
804     // the currently active semaphores.
805     for (const auto& semaphore : cfg.session_semaphores()) {
806       auto it = sem_to_sessions_state.find(semaphore.name());
807       if (it == sem_to_sessions_state.end()) {
808         continue;
809       }
810       uint64_t max_other_session_count =
811           std::min(semaphore.max_other_session_count(),
812                    it->second.smallest_max_other_session_count);
813       if (it->second.session_count > max_other_session_count) {
814         MaybeLogUploadEvent(
815             cfg, uuid,
816             PerfettoStatsdAtom::
817                 kTracedEnableTracingFailedSessionSemaphoreCheck);
818         return PERFETTO_SVC_ERR(
819             "Semaphore \"%s\" exceeds maximum allowed other session count "
820             "(%" PRIu64 " > min(%" PRIu64 ", %" PRIu64 "))",
821             semaphore.name().c_str(), it->second.session_count,
822             semaphore.max_other_session_count(),
823             it->second.smallest_max_other_session_count);
824       }
825     }
826   }
827 
828   if (cfg.enable_extra_guardrails()) {
829     // unique_session_name can be empty
830     const std::string& name = cfg.unique_session_name();
831     int64_t now_s = clock_->GetBootTimeS().count();
832 
833     // Remove any entries where the time limit has passed so this map doesn't
834     // grow indefinitely:
835     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
836     for (auto it = sessions.cbegin(); it != sessions.cend();) {
837       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
838         it = sessions.erase(it);
839       } else {
840         ++it;
841       }
842     }
843 
844     int64_t& previous_s = session_to_last_trace_s_[name];
845     if (previous_s == 0) {
846       previous_s = now_s;
847     } else {
848       MaybeLogUploadEvent(
849           cfg, uuid,
850           PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
851       return PERFETTO_SVC_ERR(
852           "A trace with unique session name \"%s\" began less than %" PRId64
853           "s ago (%" PRId64 "s)",
854           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
855     }
856   }
857 
858   const int sessions_for_uid = static_cast<int>(std::count_if(
859       tracing_sessions_.begin(), tracing_sessions_.end(),
860       [consumer](const decltype(tracing_sessions_)::value_type& s) {
861         return s.second.consumer_uid == consumer->uid_;
862       }));
863 
864   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
865   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
866     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
867   }
868   if (sessions_for_uid >= per_uid_limit) {
869     MaybeLogUploadEvent(
870         cfg, uuid,
871         PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
872     return PERFETTO_SVC_ERR(
873         "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
874         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
875   }
876 
877   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
878   // in a state where it stalls by design by having more TraceWriterImpl
879   // instances than free pages in the buffer. This is really a bug in
880   // trace_probes and the way it handles stalls in the shmem buffer.
881   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
882     MaybeLogUploadEvent(
883         cfg, uuid,
884         PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
885     return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
886                             tracing_sessions_.size());
887   }
888 
889   // If the trace config provides a filter bytecode, setup the filter now.
890   // If the filter loading fails, abort the tracing session rather than running
891   // unfiltered.
892   std::unique_ptr<protozero::MessageFilter> trace_filter;
893   if (cfg.has_trace_filter()) {
894     const auto& filt = cfg.trace_filter();
895     trace_filter.reset(new protozero::MessageFilter());
896 
897     protozero::StringFilter& string_filter = trace_filter->string_filter();
898     for (const auto& rule : filt.string_filter_chain().rules()) {
899       auto opt_policy = ConvertPolicy(rule.policy());
900       if (!opt_policy.has_value()) {
901         MaybeLogUploadEvent(
902             cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
903         return PERFETTO_SVC_ERR(
904             "Trace filter has invalid string filtering rules, aborting");
905       }
906       string_filter.AddRule(*opt_policy, rule.regex_pattern(),
907                             rule.atrace_payload_starts_with());
908     }
909 
910     const std::string& bytecode_v1 = filt.bytecode();
911     const std::string& bytecode_v2 = filt.bytecode_v2();
912     const std::string& bytecode =
913         bytecode_v2.empty() ? bytecode_v1 : bytecode_v2;
914     if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
915       MaybeLogUploadEvent(
916           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
917       return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
918     }
919 
920     // The filter is created using perfetto.protos.Trace as root message
921     // (because that makes it possible to play around with the `proto_filter`
922     // tool on actual traces). Here in the service, however, we deal with
923     // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
924     // The IPC client (or the write_into_filte logic in here) are responsible
925     // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
926     // the preamble is not there at ReadBuffer time. Hence we change the root of
927     // the filtering to start at the Trace.packet level.
928     if (!trace_filter->SetFilterRoot({TracePacket::kPacketFieldNumber})) {
929       MaybeLogUploadEvent(
930           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
931       return PERFETTO_SVC_ERR("Failed to set filter root.");
932     }
933   }
934 
935   const TracingSessionID tsid = ++last_tracing_session_id_;
936   TracingSession* tracing_session =
937       &tracing_sessions_
938            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
939                     std::forward_as_tuple(tsid, consumer, cfg,
940                                           weak_runner_.task_runner()))
941            .first->second;
942 
943   tracing_session->trace_uuid = uuid;
944 
945   if (trace_filter)
946     tracing_session->trace_filter = std::move(trace_filter);
947 
948   if (cfg.write_into_file()) {
949     if (!fd ^ !cfg.output_path().empty()) {
950       MaybeLogUploadEvent(
951           tracing_session->config, uuid,
952           PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
953       tracing_sessions_.erase(tsid);
954       return PERFETTO_SVC_ERR(
955           "When write_into_file==true either a FD needs to be passed or "
956           "output_path must be populated (but not both)");
957     }
958     if (!cfg.output_path().empty()) {
959       fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
960       if (!fd) {
961         MaybeLogUploadEvent(
962             tracing_session->config, uuid,
963             PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
964         tracing_sessions_.erase(tsid);
965         return PERFETTO_SVC_ERR("Failed to create the trace file %s",
966                                 cfg.output_path().c_str());
967       }
968     }
969     tracing_session->write_into_file = std::move(fd);
970     uint32_t write_period_ms = cfg.file_write_period_ms();
971     if (write_period_ms == 0)
972       write_period_ms = kDefaultWriteIntoFilePeriodMs;
973     if (write_period_ms < kMinWriteIntoFilePeriodMs)
974       write_period_ms = kMinWriteIntoFilePeriodMs;
975     tracing_session->write_period_ms = write_period_ms;
976     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
977     tracing_session->bytes_written_into_file = 0;
978   }
979 
980   if (cfg.compression_type() == TraceConfig::COMPRESSION_TYPE_DEFLATE) {
981     if (init_opts_.compressor_fn) {
982       tracing_session->compress_deflate = true;
983     } else {
984       PERFETTO_LOG(
985           "COMPRESSION_TYPE_DEFLATE is not supported in the current build "
986           "configuration. Skipping compression");
987     }
988   }
989 
990   // Initialize the log buffers.
991   bool did_allocate_all_buffers = true;
992   bool invalid_buffer_config = false;
993 
994   // Allocate the trace buffers. Also create a map to translate a consumer
995   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
996   // corresponding BufferID, which is a global ID namespace for the service and
997   // all producers.
998   size_t total_buf_size_kb = 0;
999   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
1000   tracing_session->buffers_index.reserve(num_buffers);
1001   for (size_t i = 0; i < num_buffers; i++) {
1002     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
1003     BufferID global_id = buffer_ids_.Allocate();
1004     if (!global_id) {
1005       did_allocate_all_buffers = false;  // We ran out of IDs.
1006       break;
1007     }
1008     tracing_session->buffers_index.push_back(global_id);
1009     // TraceBuffer size is limited to 32-bit.
1010     const uint32_t buf_size_kb = buffer_cfg.size_kb();
1011     const uint64_t buf_size_bytes = buf_size_kb * static_cast<uint64_t>(1024);
1012     const size_t buf_size = static_cast<size_t>(buf_size_bytes);
1013     if (buf_size_bytes == 0 ||
1014         buf_size_bytes > std::numeric_limits<uint32_t>::max() ||
1015         buf_size != buf_size_bytes) {
1016       invalid_buffer_config = true;
1017       did_allocate_all_buffers = false;
1018       break;
1019     }
1020     total_buf_size_kb += buf_size_kb;
1021     TraceBuffer::OverwritePolicy policy =
1022         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
1023             ? TraceBuffer::kDiscard
1024             : TraceBuffer::kOverwrite;
1025     auto it_and_inserted =
1026         buffers_.emplace(global_id, TraceBuffer::Create(buf_size, policy));
1027     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
1028     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
1029     if (!trace_buffer) {
1030       did_allocate_all_buffers = false;
1031       break;
1032     }
1033   }
1034 
1035   // This can happen if either:
1036   // - All the kMaxTraceBufferID slots are taken.
1037   // - OOM, or, more realistically, we exhausted virtual memory.
1038   // - The buffer size in the config is invalid.
1039   // In any case, free all the previously allocated buffers and abort.
1040   if (!did_allocate_all_buffers) {
1041     for (BufferID global_id : tracing_session->buffers_index) {
1042       buffer_ids_.Free(global_id);
1043       buffers_.erase(global_id);
1044     }
1045     MaybeLogUploadEvent(tracing_session->config, uuid,
1046                         PerfettoStatsdAtom::kTracedEnableTracingOom);
1047     tracing_sessions_.erase(tsid);
1048     if (invalid_buffer_config) {
1049       return PERFETTO_SVC_ERR(
1050           "Failed to allocate tracing buffers: Invalid buffer sizes");
1051     }
1052     return PERFETTO_SVC_ERR(
1053         "Failed to allocate tracing buffers: OOM or too many buffers");
1054   }
1055 
1056   UpdateMemoryGuardrail();
1057 
1058   consumer->tracing_session_id_ = tsid;
1059 
1060   // Setup the data sources on the producers without starting them.
1061   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
1062     // Scan all the registered data sources with a matching name.
1063     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1064     for (auto it = range.first; it != range.second; it++) {
1065       TraceConfig::ProducerConfig producer_config;
1066       for (const auto& config : cfg.producers()) {
1067         if (GetProducer(it->second.producer_id)->name_ ==
1068             config.producer_name()) {
1069           producer_config = config;
1070           break;
1071         }
1072       }
1073       SetupDataSource(cfg_data_source, producer_config, it->second,
1074                       tracing_session);
1075     }
1076   }
1077 
1078   bool has_start_trigger = false;
1079   switch (GetTriggerMode(cfg)) {
1080     case TraceConfig::TriggerConfig::UNSPECIFIED:
1081       // no triggers are specified so this isn't a trace that is using triggers.
1082       PERFETTO_DCHECK(!has_trigger_config);
1083       break;
1084     case TraceConfig::TriggerConfig::START_TRACING:
1085       // For traces which use START_TRACE triggers we need to ensure that the
1086       // tracing session will be cleaned up when it times out.
1087       has_start_trigger = true;
1088       weak_runner_.PostDelayedTask(
1089           [tsid, this]() { OnStartTriggersTimeout(tsid); },
1090           cfg.trigger_config().trigger_timeout_ms());
1091       break;
1092     case TraceConfig::TriggerConfig::STOP_TRACING:
1093     case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1094       // Update the tracing_session's duration_ms to ensure that if no trigger
1095       // is received the session will end and be cleaned up equal to the
1096       // timeout.
1097       //
1098       // TODO(nuskos): Refactor this so that rather then modifying the config we
1099       // have a field we look at on the tracing_session.
1100       tracing_session->config.set_duration_ms(
1101           cfg.trigger_config().trigger_timeout_ms());
1102       break;
1103 
1104       // The case of unknown modes (coming from future versions of the service)
1105       // is handled few lines above (search for TriggerMode_MAX).
1106   }
1107 
1108   tracing_session->state = TracingSession::CONFIGURED;
1109   PERFETTO_LOG(
1110       "Configured tracing session %" PRIu64
1111       ", #sources:%zu, duration:%u ms%s, #buffers:%d, total "
1112       "buffer size:%zu KB, total sessions:%zu, uid:%u session name: \"%s\"",
1113       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
1114       tracing_session->config.prefer_suspend_clock_for_duration()
1115           ? " (suspend_clock)"
1116           : "",
1117       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
1118       static_cast<unsigned int>(consumer->uid_),
1119       cfg.unique_session_name().c_str());
1120 
1121   // Start the data sources, unless this is a case of early setup + fast
1122   // triggering, either through TraceConfig.deferred_start or
1123   // TraceConfig.trigger_config(). If both are specified which ever one occurs
1124   // first will initiate the trace.
1125   if (!cfg.deferred_start() && !has_start_trigger)
1126     StartTracing(tsid);
1127 
1128   return base::OkStatus();
1129 }
1130 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)1131 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
1132                                            const TraceConfig& updated_cfg) {
1133   PERFETTO_DCHECK_THREAD(thread_checker_);
1134   TracingSession* tracing_session =
1135       GetTracingSession(consumer->tracing_session_id_);
1136   PERFETTO_DCHECK(tracing_session);
1137 
1138   if ((tracing_session->state != TracingSession::STARTED) &&
1139       (tracing_session->state != TracingSession::CONFIGURED)) {
1140     PERFETTO_ELOG(
1141         "ChangeTraceConfig() was called for a tracing session which isn't "
1142         "running.");
1143     return;
1144   }
1145 
1146   // We only support updating producer_name_{,regex}_filter (and pass-through
1147   // configs) for now; null out any changeable fields and make sure the rest are
1148   // identical.
1149   TraceConfig new_config_copy(updated_cfg);
1150   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1151     ds_cfg.clear_producer_name_filter();
1152     ds_cfg.clear_producer_name_regex_filter();
1153   }
1154 
1155   TraceConfig current_config_copy(tracing_session->config);
1156   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1157     ds_cfg.clear_producer_name_filter();
1158     ds_cfg.clear_producer_name_regex_filter();
1159   }
1160 
1161   if (new_config_copy != current_config_copy) {
1162     PERFETTO_LOG(
1163         "ChangeTraceConfig() was called with a config containing unsupported "
1164         "changes; only adding to the producer_name_{,regex}_filter is "
1165         "currently supported and will have an effect.");
1166   }
1167 
1168   for (TraceConfig::DataSource& cfg_data_source :
1169        *tracing_session->config.mutable_data_sources()) {
1170     // Find the updated producer_filter in the new config.
1171     std::vector<std::string> new_producer_name_filter;
1172     std::vector<std::string> new_producer_name_regex_filter;
1173     bool found_data_source = false;
1174     for (const auto& it : updated_cfg.data_sources()) {
1175       if (cfg_data_source.config().name() == it.config().name()) {
1176         new_producer_name_filter = it.producer_name_filter();
1177         new_producer_name_regex_filter = it.producer_name_regex_filter();
1178         found_data_source = true;
1179         break;
1180       }
1181     }
1182 
1183     // Bail out if data source not present in the new config.
1184     if (!found_data_source) {
1185       PERFETTO_ELOG(
1186           "ChangeTraceConfig() called without a current data source also "
1187           "present in the new config: %s",
1188           cfg_data_source.config().name().c_str());
1189       continue;
1190     }
1191 
1192     // TODO(oysteine): Just replacing the filter means that if
1193     // there are any filter entries which were present in the original config,
1194     // but removed from the config passed to ChangeTraceConfig, any matching
1195     // producers will keep producing but newly added producers after this
1196     // point will never start.
1197     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1198     *cfg_data_source.mutable_producer_name_regex_filter() =
1199         new_producer_name_regex_filter;
1200 
1201     // Get the list of producers that are already set up.
1202     std::unordered_set<uint16_t> set_up_producers;
1203     auto& ds_instances = tracing_session->data_source_instances;
1204     for (auto instance_it = ds_instances.begin();
1205          instance_it != ds_instances.end(); ++instance_it) {
1206       set_up_producers.insert(instance_it->first);
1207     }
1208 
1209     // Scan all the registered data sources with a matching name.
1210     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1211     for (auto it = range.first; it != range.second; it++) {
1212       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1213       PERFETTO_DCHECK(producer);
1214 
1215       // Check if the producer name of this data source is present
1216       // in the name filters. We currently only support new filters, not
1217       // removing old ones.
1218       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1219                              new_producer_name_regex_filter)) {
1220         continue;
1221       }
1222 
1223       // If this producer is already set up, we assume that all datasources
1224       // in it started already.
1225       if (set_up_producers.count(it->second.producer_id))
1226         continue;
1227 
1228       // If it wasn't previously setup, set it up now.
1229       // (The per-producer config is optional).
1230       TraceConfig::ProducerConfig producer_config;
1231       for (const auto& config : tracing_session->config.producers()) {
1232         if (producer->name_ == config.producer_name()) {
1233           producer_config = config;
1234           break;
1235         }
1236       }
1237 
1238       DataSourceInstance* ds_inst = SetupDataSource(
1239           cfg_data_source, producer_config, it->second, tracing_session);
1240 
1241       if (ds_inst && tracing_session->state == TracingSession::STARTED)
1242         StartDataSourceInstance(producer, tracing_session, ds_inst);
1243     }
1244   }
1245 }
1246 
DelayToNextWritePeriodMs(const TracingSession & session)1247 uint32_t TracingServiceImpl::DelayToNextWritePeriodMs(
1248     const TracingSession& session) {
1249   PERFETTO_DCHECK(session.write_period_ms > 0);
1250   return session.write_period_ms -
1251          static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
1252                                session.write_period_ms);
1253 }
1254 
StartTracing(TracingSessionID tsid)1255 void TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1256   PERFETTO_DCHECK_THREAD(thread_checker_);
1257 
1258   TracingSession* tracing_session = GetTracingSession(tsid);
1259   if (!tracing_session) {
1260     PERFETTO_ELOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
1261     return;
1262   }
1263 
1264   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1265                       PerfettoStatsdAtom::kTracedStartTracing);
1266 
1267   if (tracing_session->state != TracingSession::CONFIGURED) {
1268     MaybeLogUploadEvent(
1269         tracing_session->config, tracing_session->trace_uuid,
1270         PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1271     PERFETTO_ELOG("StartTracing() failed, invalid session state: %d",
1272                   tracing_session->state);
1273     return;
1274   }
1275 
1276   tracing_session->state = TracingSession::STARTED;
1277 
1278   // We store the start of trace snapshot separately as it's important to make
1279   // sure we can interpret all the data in the trace and storing it in the ring
1280   // buffer means it could be overwritten by a later snapshot.
1281   if (!tracing_session->config.builtin_data_sources()
1282            .disable_clock_snapshotting()) {
1283     SnapshotClocks(&tracing_session->initial_clock_snapshot);
1284   }
1285 
1286   // We don't snapshot the clocks here because we just did this above.
1287   SnapshotLifecycleEvent(
1288       tracing_session,
1289       protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1290       false /* snapshot_clocks */);
1291 
1292   // Periodically snapshot clocks, stats, sync markers while the trace is
1293   // active. The snapshots are emitted on the future ReadBuffers() calls, which
1294   // means that:
1295   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
1296   //      write snapshots periodically into the trace.
1297   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1298   //      snapshot into the trace. For clock snapshots, we keep track of the
1299   //      snapshot recorded at the beginning of the session
1300   //      (initial_clock_snapshot above), as well as the most recent sampled
1301   //      snapshots that showed significant new drift between different clocks.
1302   //      The latter clock snapshots are sampled periodically and at lifecycle
1303   //      events.
1304   base::PeriodicTask::Args snapshot_task_args;
1305   snapshot_task_args.start_first_task_immediately = true;
1306   snapshot_task_args.use_suspend_aware_timer =
1307       tracing_session->config.builtin_data_sources()
1308           .prefer_suspend_clock_for_snapshot();
1309   snapshot_task_args.task = [this, tsid] { PeriodicSnapshotTask(tsid); };
1310   snapshot_task_args.period_ms =
1311       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1312   if (!snapshot_task_args.period_ms)
1313     snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1314   tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1315 
1316   // Trigger delayed task if the trace is time limited.
1317   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1318   if (trace_duration_ms > 0) {
1319     auto stop_task =
1320         std::bind(&TracingServiceImpl::StopOnDurationMsExpiry, this, tsid);
1321     if (tracing_session->config.prefer_suspend_clock_for_duration()) {
1322       base::PeriodicTask::Args stop_args;
1323       stop_args.use_suspend_aware_timer = true;
1324       stop_args.period_ms = trace_duration_ms;
1325       stop_args.one_shot = true;
1326       stop_args.task = std::move(stop_task);
1327       tracing_session->timed_stop_task.Start(stop_args);
1328     } else {
1329       weak_runner_.PostDelayedTask(std::move(stop_task), trace_duration_ms);
1330     }
1331   }  // if (trace_duration_ms > 0).
1332 
1333   // Start the periodic drain tasks if we should to save the trace into a file.
1334   if (tracing_session->config.write_into_file()) {
1335     weak_runner_.PostDelayedTask([this, tsid] { ReadBuffersIntoFile(tsid); },
1336                                  DelayToNextWritePeriodMs(*tracing_session));
1337   }
1338 
1339   // Start the periodic flush tasks if the config specified a flush period.
1340   if (tracing_session->config.flush_period_ms())
1341     PeriodicFlushTask(tsid, /*post_next_only=*/true);
1342 
1343   // Start the periodic incremental state clear tasks if the config specified a
1344   // period.
1345   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1346     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1347   }
1348 
1349   for (auto& [prod_id, data_source] : tracing_session->data_source_instances) {
1350     ProducerEndpointImpl* producer = GetProducer(prod_id);
1351     if (!producer) {
1352       PERFETTO_DFATAL("Producer does not exist.");
1353       continue;
1354     }
1355     StartDataSourceInstance(producer, tracing_session, &data_source);
1356   }
1357 
1358   MaybeNotifyAllDataSourcesStarted(tracing_session);
1359 
1360   // `did_notify_all_data_source_started` is only set if a consumer is
1361   // connected.
1362   if (tracing_session->consumer_maybe_null) {
1363     weak_runner_.PostDelayedTask(
1364         [this, tsid] { OnAllDataSourceStartedTimeout(tsid); },
1365         kAllDataSourceStartedTimeout);
1366   }
1367 }
1368 
StopOnDurationMsExpiry(TracingSessionID tsid)1369 void TracingServiceImpl::StopOnDurationMsExpiry(TracingSessionID tsid) {
1370   auto* tracing_session_ptr = GetTracingSession(tsid);
1371   if (!tracing_session_ptr)
1372     return;
1373   // If this trace was using STOP_TRACING triggers and we've seen
1374   // one, then the trigger overrides the normal timeout. In this
1375   // case we just return and let the other task clean up this trace.
1376   if (GetTriggerMode(tracing_session_ptr->config) ==
1377           TraceConfig::TriggerConfig::STOP_TRACING &&
1378       !tracing_session_ptr->received_triggers.empty())
1379     return;
1380   // In all other cases (START_TRACING or no triggers) we flush
1381   // after |trace_duration_ms| unconditionally.
1382   FlushAndDisableTracing(tsid);
1383 }
1384 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1385 void TracingServiceImpl::StartDataSourceInstance(
1386     ProducerEndpointImpl* producer,
1387     TracingSession* tracing_session,
1388     TracingServiceImpl::DataSourceInstance* instance) {
1389   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1390 
1391   bool start_immediately = !instance->will_notify_on_start;
1392 
1393   if (producer->IsAndroidProcessFrozen()) {
1394     PERFETTO_DLOG(
1395         "skipping waiting of data source \"%s\" on producer \"%s\" (pid=%u) "
1396         "because it is frozen",
1397         instance->data_source_name.c_str(), producer->name_.c_str(),
1398         producer->pid());
1399     start_immediately = true;
1400   }
1401 
1402   if (!start_immediately) {
1403     instance->state = DataSourceInstance::STARTING;
1404   } else {
1405     instance->state = DataSourceInstance::STARTED;
1406   }
1407   if (tracing_session->consumer_maybe_null) {
1408     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1409         *producer, *instance);
1410   }
1411   producer->StartDataSource(instance->instance_id, instance->config);
1412 
1413   // If all data sources are started, notify the consumer.
1414   if (instance->state == DataSourceInstance::STARTED)
1415     MaybeNotifyAllDataSourcesStarted(tracing_session);
1416 }
1417 
1418 // DisableTracing just stops the data sources but doesn't free up any buffer.
1419 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1420 // and then drain the buffers. The actual teardown of the TracingSession happens
1421 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1422 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1423                                         bool disable_immediately) {
1424   PERFETTO_DCHECK_THREAD(thread_checker_);
1425   TracingSession* tracing_session = GetTracingSession(tsid);
1426   if (!tracing_session) {
1427     // Can happen if the consumer calls this before EnableTracing() or after
1428     // FreeBuffers().
1429     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1430     return;
1431   }
1432 
1433   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1434                       PerfettoStatsdAtom::kTracedDisableTracing);
1435 
1436   switch (tracing_session->state) {
1437     // Spurious call to DisableTracing() while already disabled, nothing to do.
1438     case TracingSession::DISABLED:
1439       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1440       return;
1441 
1442     case TracingSession::CLONED_READ_ONLY:
1443       return;
1444 
1445     // This is either:
1446     // A) The case of a graceful DisableTracing() call followed by a call to
1447     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1448     //    to forcefully transition in the disabled state without waiting for the
1449     //    outstanding acks because the buffers are going to be destroyed soon.
1450     // B) A spurious call, iff |disable_immediately| == false, in which case
1451     //    there is nothing to do.
1452     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1453       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1454       if (disable_immediately)
1455         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1456       return;
1457 
1458     // Continues below.
1459     case TracingSession::CONFIGURED:
1460       // If the session didn't even start there is no need to orchestrate a
1461       // graceful stop of data sources.
1462       disable_immediately = true;
1463       break;
1464 
1465     // This is the nominal case, continues below.
1466     case TracingSession::STARTED:
1467       break;
1468   }
1469 
1470   for (auto& data_source_inst : tracing_session->data_source_instances) {
1471     const ProducerID producer_id = data_source_inst.first;
1472     DataSourceInstance& instance = data_source_inst.second;
1473     ProducerEndpointImpl* producer = GetProducer(producer_id);
1474     PERFETTO_DCHECK(producer);
1475     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1476                     instance.state == DataSourceInstance::STARTING ||
1477                     instance.state == DataSourceInstance::STARTED);
1478     StopDataSourceInstance(producer, tracing_session, &instance,
1479                            disable_immediately);
1480   }
1481 
1482   // If the periodic task is running, we can stop the periodic snapshot timer
1483   // here instead of waiting until FreeBuffers to prevent useless snapshots
1484   // which won't be read.
1485   tracing_session->snapshot_periodic_task.Reset();
1486 
1487   // Either this request is flagged with |disable_immediately| or there are no
1488   // data sources that are requesting a final handshake. In both cases just mark
1489   // the session as disabled immediately, notify the consumer and flush the
1490   // trace file (if used).
1491   if (tracing_session->AllDataSourceInstancesStopped())
1492     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1493 
1494   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1495   weak_runner_.PostDelayedTask([this, tsid] { OnDisableTracingTimeout(tsid); },
1496                                tracing_session->data_source_stop_timeout_ms());
1497 
1498   // Deliberately NOT removing the session from |tracing_session_|, it's still
1499   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1500 }
1501 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1502 void TracingServiceImpl::NotifyDataSourceStarted(
1503     ProducerID producer_id,
1504     DataSourceInstanceID instance_id) {
1505   PERFETTO_DCHECK_THREAD(thread_checker_);
1506   for (auto& kv : tracing_sessions_) {
1507     TracingSession& tracing_session = kv.second;
1508     DataSourceInstance* instance =
1509         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1510 
1511     if (!instance)
1512       continue;
1513 
1514     // If the tracing session was already stopped, ignore this notification.
1515     if (tracing_session.state != TracingSession::STARTED)
1516       continue;
1517 
1518     if (instance->state != DataSourceInstance::STARTING) {
1519       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1520                     instance->state);
1521       continue;
1522     }
1523 
1524     instance->state = DataSourceInstance::STARTED;
1525 
1526     ProducerEndpointImpl* producer = GetProducer(producer_id);
1527     PERFETTO_DCHECK(producer);
1528     if (tracing_session.consumer_maybe_null) {
1529       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1530           *producer, *instance);
1531     }
1532 
1533     // If all data sources are started, notify the consumer.
1534     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1535   }  // for (tracing_session)
1536 }
1537 
OnAllDataSourceStartedTimeout(TracingSessionID tsid)1538 void TracingServiceImpl::OnAllDataSourceStartedTimeout(TracingSessionID tsid) {
1539   PERFETTO_DCHECK_THREAD(thread_checker_);
1540   TracingSession* tracing_session = GetTracingSession(tsid);
1541   // It would be possible to check for `AllDataSourceInstancesStarted()` here,
1542   // but it doesn't make much sense, because a data source can be registered
1543   // after the session has started. Therefore this is tied to
1544   // `did_notify_all_data_source_started`: if that notification happened, do not
1545   // record slow data sources.
1546   if (!tracing_session || !tracing_session->consumer_maybe_null ||
1547       tracing_session->did_notify_all_data_source_started) {
1548     return;
1549   }
1550 
1551   int64_t timestamp = clock_->GetBootTimeNs().count();
1552 
1553   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
1554   packet->set_timestamp(static_cast<uint64_t>(timestamp));
1555   packet->set_trusted_uid(static_cast<int32_t>(uid_));
1556   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
1557 
1558   size_t i = 0;
1559   protos::pbzero::TracingServiceEvent::DataSources* slow_data_sources =
1560       packet->set_service_event()->set_slow_starting_data_sources();
1561   for (const auto& [producer_id, ds_instance] :
1562        tracing_session->data_source_instances) {
1563     if (ds_instance.state == DataSourceInstance::STARTED) {
1564       continue;
1565     }
1566     ProducerEndpointImpl* producer = GetProducer(producer_id);
1567     if (!producer) {
1568       continue;
1569     }
1570     if (++i > kMaxLifecycleEventsListedDataSources) {
1571       break;
1572     }
1573     auto* ds = slow_data_sources->add_data_source();
1574     ds->set_producer_name(producer->name_);
1575     ds->set_data_source_name(ds_instance.data_source_name);
1576     PERFETTO_LOG(
1577         "Data source failed to start within 20s data_source=\"%s\", "
1578         "producer=\"%s\", tsid=%" PRIu64,
1579         ds_instance.data_source_name.c_str(), producer->name_.c_str(), tsid);
1580   }
1581 
1582   tracing_session->slow_start_event = TracingSession::ArbitraryLifecycleEvent{
1583       timestamp, packet.SerializeAsArray()};
1584 }
1585 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1586 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1587     TracingSession* tracing_session) {
1588   if (!tracing_session->consumer_maybe_null)
1589     return;
1590 
1591   if (!tracing_session->AllDataSourceInstancesStarted())
1592     return;
1593 
1594   // In some rare cases, we can get in this state more than once. Consider the
1595   // following scenario: 3 data sources are registered -> trace starts ->
1596   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1597   // Imagine now that a 4th data source registers while the trace is ongoing.
1598   // This would hit the AllDataSourceInstancesStarted() condition again.
1599   // In this case, however, we don't want to re-notify the consumer again.
1600   // That would be unexpected (even if, perhaps, technically correct) and
1601   // trigger bugs in the consumer.
1602   if (tracing_session->did_notify_all_data_source_started)
1603     return;
1604 
1605   PERFETTO_DLOG("All data sources started");
1606 
1607   SnapshotLifecycleEvent(
1608       tracing_session,
1609       protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1610       true /* snapshot_clocks */);
1611 
1612   tracing_session->did_notify_all_data_source_started = true;
1613   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1614 }
1615 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1616 void TracingServiceImpl::NotifyDataSourceStopped(
1617     ProducerID producer_id,
1618     DataSourceInstanceID instance_id) {
1619   PERFETTO_DCHECK_THREAD(thread_checker_);
1620   for (auto& kv : tracing_sessions_) {
1621     TracingSession& tracing_session = kv.second;
1622     DataSourceInstance* instance =
1623         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1624 
1625     if (!instance)
1626       continue;
1627 
1628     if (instance->state != DataSourceInstance::STOPPING) {
1629       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1630                     instance->state);
1631       continue;
1632     }
1633 
1634     instance->state = DataSourceInstance::STOPPED;
1635 
1636     ProducerEndpointImpl* producer = GetProducer(producer_id);
1637     PERFETTO_DCHECK(producer);
1638     if (tracing_session.consumer_maybe_null) {
1639       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1640           *producer, *instance);
1641     }
1642 
1643     if (!tracing_session.AllDataSourceInstancesStopped())
1644       continue;
1645 
1646     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1647       continue;
1648 
1649     // All data sources acked the termination.
1650     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1651   }  // for (tracing_session)
1652 }
1653 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1654 void TracingServiceImpl::ActivateTriggers(
1655     ProducerID producer_id,
1656     const std::vector<std::string>& triggers) {
1657   PERFETTO_DCHECK_THREAD(thread_checker_);
1658   auto* producer = GetProducer(producer_id);
1659   PERFETTO_DCHECK(producer);
1660 
1661   int64_t now_ns = clock_->GetBootTimeNs().count();
1662   for (const auto& trigger_name : triggers) {
1663     PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1664                   trigger_name.c_str());
1665     android_stats::MaybeLogTriggerEvent(PerfettoTriggerAtom::kTracedTrigger,
1666                                         trigger_name);
1667 
1668     base::Hasher hash;
1669     hash.Update(trigger_name.c_str(), trigger_name.size());
1670     std::string triggered_session_name;
1671     base::Uuid triggered_session_uuid;
1672     TracingSessionID triggered_session_id = 0;
1673     auto trigger_mode = TraceConfig::TriggerConfig::UNSPECIFIED;
1674 
1675     uint64_t trigger_name_hash = hash.digest();
1676     size_t count_in_window =
1677         PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1678 
1679     bool trigger_matched = false;
1680     bool trigger_activated = false;
1681     for (auto& id_and_tracing_session : tracing_sessions_) {
1682       auto& tracing_session = id_and_tracing_session.second;
1683       TracingSessionID tsid = id_and_tracing_session.first;
1684       auto iter = std::find_if(
1685           tracing_session.config.trigger_config().triggers().begin(),
1686           tracing_session.config.trigger_config().triggers().end(),
1687           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1688             return trigger.name() == trigger_name;
1689           });
1690       if (iter == tracing_session.config.trigger_config().triggers().end())
1691         continue;
1692       if (tracing_session.state == TracingSession::CLONED_READ_ONLY)
1693         continue;
1694 
1695       // If this trigger requires a certain producer to have sent it
1696       // (non-empty producer_name()) ensure the producer who sent this trigger
1697       // matches.
1698       if (!iter->producer_name_regex().empty() &&
1699           !std::regex_match(
1700               producer->name_,
1701               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1702         continue;
1703       }
1704 
1705       // Use a random number between 0 and 1 to check if we should allow this
1706       // trigger through or not.
1707       double trigger_rnd = random_->GetValue();
1708       PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1709       if (trigger_rnd < iter->skip_probability()) {
1710         MaybeLogTriggerEvent(tracing_session.config,
1711                              PerfettoTriggerAtom::kTracedLimitProbability,
1712                              trigger_name);
1713         continue;
1714       }
1715 
1716       // If we already triggered more times than the limit, silently ignore
1717       // this trigger.
1718       if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1719         MaybeLogTriggerEvent(tracing_session.config,
1720                              PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1721                              trigger_name);
1722         continue;
1723       }
1724       trigger_matched = true;
1725       triggered_session_id = tracing_session.id;
1726       triggered_session_name = tracing_session.config.unique_session_name();
1727       triggered_session_uuid.set_lsb_msb(tracing_session.trace_uuid.lsb(),
1728                                          tracing_session.trace_uuid.msb());
1729       trigger_mode = GetTriggerMode(tracing_session.config);
1730 
1731       const bool triggers_already_received =
1732           !tracing_session.received_triggers.empty();
1733       const TriggerInfo trigger = {static_cast<uint64_t>(now_ns), iter->name(),
1734                                    producer->name_, producer->uid()};
1735       tracing_session.received_triggers.push_back(trigger);
1736       switch (trigger_mode) {
1737         case TraceConfig::TriggerConfig::START_TRACING:
1738           // If the session has already been triggered and moved past
1739           // CONFIGURED then we don't need to repeat StartTracing. This would
1740           // work fine (StartTracing would return false) but would add error
1741           // logs.
1742           if (tracing_session.state != TracingSession::CONFIGURED)
1743             break;
1744 
1745           trigger_activated = true;
1746           MaybeLogUploadEvent(
1747               tracing_session.config, tracing_session.trace_uuid,
1748               PerfettoStatsdAtom::kTracedTriggerStartTracing, iter->name());
1749 
1750           // We override the trace duration to be the trigger's requested
1751           // value, this ensures that the trace will end after this amount
1752           // of time has passed.
1753           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1754           StartTracing(tsid);
1755           break;
1756         case TraceConfig::TriggerConfig::STOP_TRACING:
1757           // Only stop the trace once to avoid confusing log messages. I.E.
1758           // when we've already hit the first trigger we've already Posted the
1759           // task to FlushAndDisable. So all future triggers will just break
1760           // out.
1761           if (triggers_already_received)
1762             break;
1763 
1764           trigger_activated = true;
1765           MaybeLogUploadEvent(
1766               tracing_session.config, tracing_session.trace_uuid,
1767               PerfettoStatsdAtom::kTracedTriggerStopTracing, iter->name());
1768 
1769           // Now that we've seen a trigger we need to stop, flush, and disable
1770           // this session after the configured |stop_delay_ms|.
1771           weak_runner_.PostDelayedTask(
1772               [this, tsid] {
1773                 // Skip entirely the flush if the trace session doesn't exist
1774                 // anymore. This is to prevent misleading error messages to be
1775                 // logged.
1776                 if (GetTracingSession(tsid))
1777                   FlushAndDisableTracing(tsid);
1778               },
1779               // If this trigger is zero this will immediately executable and
1780               // will happen shortly.
1781               iter->stop_delay_ms());
1782           break;
1783 
1784         case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1785           trigger_activated = true;
1786           MaybeLogUploadEvent(
1787               tracing_session.config, tracing_session.trace_uuid,
1788               PerfettoStatsdAtom::kTracedTriggerCloneSnapshot, iter->name());
1789           weak_runner_.PostDelayedTask(
1790               [this, tsid, trigger] {
1791                 auto* tsess = GetTracingSession(tsid);
1792                 if (!tsess || !tsess->consumer_maybe_null)
1793                   return;
1794                 tsess->consumer_maybe_null->NotifyCloneSnapshotTrigger(trigger);
1795               },
1796               iter->stop_delay_ms());
1797           break;
1798 
1799         case TraceConfig::TriggerConfig::UNSPECIFIED:
1800           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1801           break;
1802       }
1803     }  // for (.. : tracing_sessions_)
1804 
1805     if (trigger_matched) {
1806       trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1807     }
1808 
1809     if (trigger_activated) {
1810       // Log only the trigger that actually caused a trace stop/start, don't log
1811       // the follow-up ones, even if they matched.
1812       PERFETTO_LOG(
1813           "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1814           "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1815           trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1816           triggered_session_uuid.ToPrettyString().c_str(),
1817           triggered_session_id);
1818     }
1819   }  // for (trigger_name : triggers)
1820 }
1821 
1822 // Always invoked TraceConfig.data_source_stop_timeout_ms (by default
1823 // kDataSourceStopTimeoutMs) after DisableTracing(). In nominal conditions all
1824 // data sources should have acked the stop and this will early out.
OnDisableTracingTimeout(TracingSessionID tsid)1825 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1826   PERFETTO_DCHECK_THREAD(thread_checker_);
1827   TracingSession* tracing_session = GetTracingSession(tsid);
1828   if (!tracing_session ||
1829       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1830     return;  // Tracing session was successfully disabled.
1831   }
1832 
1833   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1834                 tsid);
1835   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1836   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1837 }
1838 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1839 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1840     TracingSession* tracing_session) {
1841   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1842   for (auto& inst_kv : tracing_session->data_source_instances) {
1843     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1844       continue;
1845     inst_kv.second.state = DataSourceInstance::STOPPED;
1846     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1847     PERFETTO_DCHECK(producer);
1848     if (tracing_session->consumer_maybe_null) {
1849       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1850           *producer, inst_kv.second);
1851     }
1852   }
1853   tracing_session->state = TracingSession::DISABLED;
1854 
1855   // Scrape any remaining chunks that weren't flushed by the producers.
1856   for (auto& producer_id_and_producer : producers_)
1857     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1858 
1859   SnapshotLifecycleEvent(
1860       tracing_session,
1861       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1862       true /* snapshot_clocks */);
1863 
1864   if (tracing_session->write_into_file) {
1865     tracing_session->write_period_ms = 0;
1866     ReadBuffersIntoFile(tracing_session->id);
1867   }
1868 
1869   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1870                       PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1871 
1872   if (tracing_session->consumer_maybe_null)
1873     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1874 }
1875 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1876 void TracingServiceImpl::Flush(TracingSessionID tsid,
1877                                uint32_t timeout_ms,
1878                                ConsumerEndpoint::FlushCallback callback,
1879                                FlushFlags flush_flags) {
1880   PERFETTO_DCHECK_THREAD(thread_checker_);
1881   TracingSession* tracing_session = GetTracingSession(tsid);
1882   if (!tracing_session) {
1883     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1884     return;
1885   }
1886 
1887   SnapshotLifecycleEvent(
1888       tracing_session,
1889       protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
1890       false /* snapshot_clocks */);
1891 
1892   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
1893   for (const auto& [producer_id, ds_inst] :
1894        tracing_session->data_source_instances) {
1895     if (ds_inst.no_flush)
1896       continue;
1897     data_source_instances[producer_id].push_back(ds_inst.instance_id);
1898   }
1899   FlushDataSourceInstances(tracing_session, timeout_ms, data_source_instances,
1900                            std::move(callback), flush_flags);
1901 }
1902 
FlushDataSourceInstances(TracingSession * tracing_session,uint32_t timeout_ms,const std::map<ProducerID,std::vector<DataSourceInstanceID>> & data_source_instances,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1903 void TracingServiceImpl::FlushDataSourceInstances(
1904     TracingSession* tracing_session,
1905     uint32_t timeout_ms,
1906     const std::map<ProducerID, std::vector<DataSourceInstanceID>>&
1907         data_source_instances,
1908     ConsumerEndpoint::FlushCallback callback,
1909     FlushFlags flush_flags) {
1910   PERFETTO_DCHECK_THREAD(thread_checker_);
1911   if (!timeout_ms)
1912     timeout_ms = tracing_session->flush_timeout_ms();
1913 
1914   if (tracing_session->pending_flushes.size() > 1000) {
1915     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1916                   tracing_session->pending_flushes.size());
1917     callback(false);
1918     return;
1919   }
1920 
1921   if (tracing_session->state != TracingSession::STARTED) {
1922     PERFETTO_LOG("Flush() called, but tracing has not been started");
1923     callback(false);
1924     return;
1925   }
1926 
1927   tracing_session->last_flush_events.clear();
1928 
1929   ++tracing_session->flushes_requested;
1930   FlushRequestID flush_request_id = ++last_flush_request_id_;
1931   PendingFlush& pending_flush =
1932       tracing_session->pending_flushes
1933           .emplace_hint(tracing_session->pending_flushes.end(),
1934                         flush_request_id, PendingFlush(std::move(callback)))
1935           ->second;
1936 
1937   // Send a flush request to each producer involved in the tracing session. In
1938   // order to issue a flush request we have to build a map of all data source
1939   // instance ids enabled for each producer.
1940 
1941   for (const auto& [producer_id, data_sources] : data_source_instances) {
1942     ProducerEndpointImpl* producer = GetProducer(producer_id);
1943     producer->Flush(flush_request_id, data_sources, flush_flags);
1944     if (!producer->IsAndroidProcessFrozen()) {
1945       pending_flush.producers.insert(producer_id);
1946     } else {
1947       PERFETTO_DLOG(
1948           "skipping waiting flush for on producer \"%s\" (pid=%" PRIu32
1949           ") because it is frozen",
1950           producer->name_.c_str(), static_cast<uint32_t>(producer->pid()));
1951     }
1952   }
1953 
1954   // If there are no producers to flush (realistically this happens only in
1955   // some tests) fire OnFlushTimeout() straight away, without waiting.
1956   if (data_source_instances.empty())
1957     timeout_ms = 0;
1958 
1959   weak_runner_.PostDelayedTask(
1960       [this, tsid = tracing_session->id, flush_request_id, flush_flags] {
1961         OnFlushTimeout(tsid, flush_request_id, flush_flags);
1962       },
1963       timeout_ms);
1964 }
1965 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1966 void TracingServiceImpl::NotifyFlushDoneForProducer(
1967     ProducerID producer_id,
1968     FlushRequestID flush_request_id) {
1969   for (auto& kv : tracing_sessions_) {
1970     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1971     auto& pending_flushes = kv.second.pending_flushes;
1972     auto end_it = pending_flushes.upper_bound(flush_request_id);
1973     for (auto it = pending_flushes.begin(); it != end_it;) {
1974       PendingFlush& pending_flush = it->second;
1975       pending_flush.producers.erase(producer_id);
1976       if (pending_flush.producers.empty()) {
1977         TracingSessionID tsid = kv.first;
1978         auto callback = std::move(pending_flush.callback);
1979         weak_runner_.PostTask([this, tsid, callback = std::move(callback)]() {
1980           CompleteFlush(tsid, std::move(callback),
1981                         /*success=*/true);
1982         });
1983         it = pending_flushes.erase(it);
1984       } else {
1985         it++;
1986       }
1987     }  // for (pending_flushes)
1988   }    // for (tracing_session)
1989 }
1990 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id,FlushFlags flush_flags)1991 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1992                                         FlushRequestID flush_request_id,
1993                                         FlushFlags flush_flags) {
1994   TracingSession* tracing_session = GetTracingSession(tsid);
1995   if (!tracing_session)
1996     return;
1997   auto it = tracing_session->pending_flushes.find(flush_request_id);
1998   if (it == tracing_session->pending_flushes.end())
1999     return;  // Nominal case: flush was completed and acked on time.
2000 
2001   PendingFlush& pending_flush = it->second;
2002 
2003   // If there were no producers to flush, consider it a success.
2004   bool success = pending_flush.producers.empty();
2005   auto callback = std::move(pending_flush.callback);
2006   // If flush failed and this is a "final" flush, log which data sources were
2007   // slow.
2008   if ((flush_flags.reason() == FlushFlags::Reason::kTraceClone ||
2009        flush_flags.reason() == FlushFlags::Reason::kTraceStop) &&
2010       !success) {
2011     int64_t timestamp = clock_->GetBootTimeNs().count();
2012 
2013     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2014     packet->set_timestamp(static_cast<uint64_t>(timestamp));
2015     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2016     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2017 
2018     size_t i = 0;
2019     protos::pbzero::TracingServiceEvent::DataSources* event =
2020         packet->set_service_event()->set_last_flush_slow_data_sources();
2021     for (const auto& producer_id : pending_flush.producers) {
2022       ProducerEndpointImpl* producer = GetProducer(producer_id);
2023       if (!producer) {
2024         continue;
2025       }
2026       if (++i > kMaxLifecycleEventsListedDataSources) {
2027         break;
2028       }
2029 
2030       auto ds_id_range =
2031           tracing_session->data_source_instances.equal_range(producer_id);
2032       for (auto ds_it = ds_id_range.first; ds_it != ds_id_range.second;
2033            ds_it++) {
2034         auto* ds = event->add_data_source();
2035         ds->set_producer_name(producer->name_);
2036         ds->set_data_source_name(ds_it->second.data_source_name);
2037         if (++i > kMaxLifecycleEventsListedDataSources) {
2038           break;
2039         }
2040       }
2041     }
2042 
2043     tracing_session->last_flush_events.push_back(
2044         {timestamp, packet.SerializeAsArray()});
2045   }
2046   tracing_session->pending_flushes.erase(it);
2047   CompleteFlush(tsid, std::move(callback), success);
2048 }
2049 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)2050 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
2051                                        ConsumerEndpoint::FlushCallback callback,
2052                                        bool success) {
2053   TracingSession* tracing_session = GetTracingSession(tsid);
2054   if (!tracing_session) {
2055     callback(false);
2056     return;
2057   }
2058   // Producers may not have been able to flush all their data, even if they
2059   // indicated flush completion. If possible, also collect uncommitted chunks
2060   // to make sure we have everything they wrote so far.
2061   for (auto& producer_id_and_producer : producers_) {
2062     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
2063   }
2064   SnapshotLifecycleEvent(
2065       tracing_session,
2066       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
2067       true /* snapshot_clocks */);
2068 
2069   tracing_session->flushes_succeeded += success ? 1 : 0;
2070   tracing_session->flushes_failed += success ? 0 : 1;
2071   callback(success);
2072 }
2073 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)2074 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
2075     TracingSession* tracing_session,
2076     ProducerEndpointImpl* producer) {
2077   if (!producer->smb_scraping_enabled_)
2078     return;
2079 
2080   // Can't copy chunks if we don't know about any trace writers.
2081   if (producer->writers_.empty())
2082     return;
2083 
2084   // Performance optimization: On flush or session disconnect, this method is
2085   // called for each producer. If the producer doesn't participate in the
2086   // session, there's no need to scape its chunks right now. We can tell if a
2087   // producer participates in the session by checking if the producer is allowed
2088   // to write into the session's log buffers.
2089   const auto& session_buffers = tracing_session->buffers_index;
2090   bool producer_in_session =
2091       std::any_of(session_buffers.begin(), session_buffers.end(),
2092                   [producer](BufferID buffer_id) {
2093                     return producer->allowed_target_buffers_.count(buffer_id);
2094                   });
2095   if (!producer_in_session)
2096     return;
2097 
2098   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
2099 
2100   // Find and copy any uncommitted chunks from the SMB.
2101   //
2102   // In nominal conditions, the page header bitmap of the used SMB pages should
2103   // never change because the service is the only one who is supposed to modify
2104   // used pages (to make them free again).
2105   //
2106   // However, the code here needs to deal with the case of a malicious producer
2107   // altering the SMB in unpredictable ways. Thankfully the SMB size is
2108   // immutable, so a chunk will always point to some valid memory, even if the
2109   // producer alters the intended layout and chunk header concurrently.
2110   // Ultimately a malicious producer altering the SMB's chunk header bitamp
2111   // while we are iterating in this function is not any different from the case
2112   // of a malicious producer asking to commit a chunk made of random data,
2113   // which is something this class has to deal with regardless.
2114   //
2115   // The only legitimate mutations that can happen from sane producers,
2116   // concurrently to this function, are:
2117   //   A. free pages being partitioned,
2118   //   B. free chunks being migrated to kChunkBeingWritten,
2119   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
2120 
2121   SharedMemoryABI* abi = &producer->shmem_abi_;
2122   // num_pages() is immutable after the SMB is initialized and cannot be changed
2123   // even by a producer even if malicious.
2124   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
2125     uint32_t header_bitmap = abi->GetPageHeaderBitmap(page_idx);
2126 
2127     uint32_t used_chunks =
2128         abi->GetUsedChunks(header_bitmap);  // Returns a bitmap.
2129     // Skip empty pages.
2130     if (used_chunks == 0)
2131       continue;
2132 
2133     // Scrape the chunks that are currently used. These should be either in
2134     // state kChunkBeingWritten or kChunkComplete.
2135     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
2136       if (!(used_chunks & 1))
2137         continue;
2138 
2139       SharedMemoryABI::ChunkState state =
2140           SharedMemoryABI::GetChunkStateFromHeaderBitmap(header_bitmap,
2141                                                          chunk_idx);
2142       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
2143                       state == SharedMemoryABI::kChunkComplete);
2144       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
2145 
2146       SharedMemoryABI::Chunk chunk =
2147           abi->GetChunkUnchecked(page_idx, header_bitmap, chunk_idx);
2148 
2149       uint16_t packet_count;
2150       uint8_t flags;
2151       // GetPacketCountAndFlags has acquire_load semantics.
2152       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
2153 
2154       // It only makes sense to copy an incomplete chunk if there's at least
2155       // one full packet available. (The producer may not have completed the
2156       // last packet in it yet, so we need at least 2.)
2157       if (!chunk_complete && packet_count < 2)
2158         continue;
2159 
2160       // At this point, it is safe to access the remaining header fields of
2161       // the chunk. Even if the chunk was only just transferred from
2162       // kChunkFree into kChunkBeingWritten state, the header should be
2163       // written completely once the packet count increased above 1 (it was
2164       // reset to 0 by the service when the chunk was freed).
2165 
2166       WriterID writer_id = chunk.writer_id();
2167       std::optional<BufferID> target_buffer_id =
2168           producer->buffer_id_for_writer(writer_id);
2169 
2170       // We can only scrape this chunk if we know which log buffer to copy it
2171       // into.
2172       if (!target_buffer_id)
2173         continue;
2174 
2175       // Skip chunks that don't belong to the requested tracing session.
2176       bool target_buffer_belongs_to_session =
2177           std::find(session_buffers.begin(), session_buffers.end(),
2178                     *target_buffer_id) != session_buffers.end();
2179       if (!target_buffer_belongs_to_session)
2180         continue;
2181 
2182       uint32_t chunk_id =
2183           chunk.header()->chunk_id.load(std::memory_order_relaxed);
2184 
2185       CopyProducerPageIntoLogBuffer(
2186           producer->id_, producer->client_identity_, writer_id, chunk_id,
2187           *target_buffer_id, packet_count, flags, chunk_complete,
2188           chunk.payload_begin(), chunk.payload_size());
2189     }
2190   }
2191 }
2192 
FlushAndDisableTracing(TracingSessionID tsid)2193 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
2194   PERFETTO_DCHECK_THREAD(thread_checker_);
2195   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
2196   Flush(
2197       tsid, 0,
2198       [this, tsid](bool success) {
2199         // This was a DLOG up to Jun 2021 (v16, Android S).
2200         PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d",
2201                      tsid, success);
2202         TracingSession* session = GetTracingSession(tsid);
2203         if (!session) {
2204           return;
2205         }
2206         session->final_flush_outcome = success
2207                                            ? TraceStats::FINAL_FLUSH_SUCCEEDED
2208                                            : TraceStats::FINAL_FLUSH_FAILED;
2209         if (session->consumer_maybe_null) {
2210           // If the consumer is still attached, just disable the session but
2211           // give it a chance to read the contents.
2212           DisableTracing(tsid);
2213         } else {
2214           // If the consumer detached, destroy the session. If the consumer did
2215           // start the session in long-tracing mode, the service will have saved
2216           // the contents to the passed file. If not, the contents will be
2217           // destroyed.
2218           FreeBuffers(tsid);
2219         }
2220       },
2221       FlushFlags(FlushFlags::Initiator::kTraced,
2222                  FlushFlags::Reason::kTraceStop));
2223 }
2224 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)2225 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
2226                                            bool post_next_only) {
2227   PERFETTO_DCHECK_THREAD(thread_checker_);
2228   TracingSession* tracing_session = GetTracingSession(tsid);
2229   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2230     return;
2231 
2232   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
2233   weak_runner_.PostDelayedTask(
2234       [this, tsid] { PeriodicFlushTask(tsid, /*post_next_only=*/false); },
2235       flush_period_ms - static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
2236                                               flush_period_ms));
2237 
2238   if (post_next_only)
2239     return;
2240 
2241   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
2242   Flush(
2243       tsid, 0,
2244       [](bool success) {
2245         if (!success)
2246           PERFETTO_ELOG("Periodic flush timed out");
2247       },
2248       FlushFlags(FlushFlags::Initiator::kTraced,
2249                  FlushFlags::Reason::kPeriodic));
2250 }
2251 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)2252 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
2253     TracingSessionID tsid,
2254     bool post_next_only) {
2255   PERFETTO_DCHECK_THREAD(thread_checker_);
2256   TracingSession* tracing_session = GetTracingSession(tsid);
2257   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2258     return;
2259 
2260   uint32_t clear_period_ms =
2261       tracing_session->config.incremental_state_config().clear_period_ms();
2262   weak_runner_.PostDelayedTask(
2263       [this, tsid] {
2264         PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/false);
2265       },
2266       clear_period_ms - static_cast<uint32_t>(clock_->GetWallTimeMs().count() %
2267                                               clear_period_ms));
2268 
2269   if (post_next_only)
2270     return;
2271 
2272   PERFETTO_DLOG(
2273       "Performing periodic incremental state clear for trace session %" PRIu64,
2274       tsid);
2275 
2276   // Queue the IPCs to producers with active data sources that opted in.
2277   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
2278   for (const auto& kv : tracing_session->data_source_instances) {
2279     ProducerID producer_id = kv.first;
2280     const DataSourceInstance& data_source = kv.second;
2281     if (data_source.handles_incremental_state_clear) {
2282       clear_map[producer_id].push_back(data_source.instance_id);
2283     }
2284   }
2285 
2286   for (const auto& kv : clear_map) {
2287     ProducerID producer_id = kv.first;
2288     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
2289     ProducerEndpointImpl* producer = GetProducer(producer_id);
2290     if (!producer) {
2291       PERFETTO_DFATAL("Producer does not exist.");
2292       continue;
2293     }
2294     producer->ClearIncrementalState(data_sources);
2295   }
2296 }
2297 
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2298 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2299     TracingSessionID tsid,
2300     ConsumerEndpointImpl* consumer) {
2301   PERFETTO_DCHECK(consumer);
2302   PERFETTO_DCHECK_THREAD(thread_checker_);
2303   TracingSession* tracing_session = GetTracingSession(tsid);
2304   if (!tracing_session) {
2305     PERFETTO_DLOG(
2306         "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2307     return false;
2308   }
2309 
2310   if (tracing_session->write_into_file) {
2311     // If the consumer enabled tracing and asked to save the contents into the
2312     // passed file makes little sense to also try to read the buffers over IPC,
2313     // as that would just steal data from the periodic draining task.
2314     PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2315     return false;
2316   }
2317 
2318   if (IsWaitingForTrigger(tracing_session))
2319     return false;
2320 
2321   // This is a rough threshold to determine how much to read from the buffer in
2322   // each task. This is to avoid executing a single huge sending task for too
2323   // long and risk to hit the watchdog. This is *not* an upper bound: we just
2324   // stop accumulating new packets and PostTask *after* we cross this threshold.
2325   // This constant essentially balances the PostTask and IPC overhead vs the
2326   // responsiveness of the service. An extremely small value will cause one IPC
2327   // and one PostTask for each slice but will keep the service extremely
2328   // responsive. An extremely large value will batch the send for the full
2329   // buffer in one large task, will hit the blocking send() once the socket
2330   // buffers are full and hang the service for a bit (until the consumer
2331   // catches up).
2332   static constexpr size_t kApproxBytesPerTask = 32768;
2333   bool has_more;
2334   std::vector<TracePacket> packets =
2335       ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2336 
2337   if (has_more) {
2338     auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2339     weak_runner_.PostTask(
2340         [this, weak_consumer = std::move(weak_consumer), tsid] {
2341           if (!weak_consumer)
2342             return;
2343           ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2344         });
2345   }
2346 
2347   // Keep this as tail call, just in case the consumer re-enters.
2348   consumer->consumer_->OnTraceData(std::move(packets), has_more);
2349   return true;
2350 }
2351 
ReadBuffersIntoFile(TracingSessionID tsid)2352 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2353   PERFETTO_DCHECK_THREAD(thread_checker_);
2354   TracingSession* tracing_session = GetTracingSession(tsid);
2355   if (!tracing_session) {
2356     // This will be hit systematically from the PostDelayedTask. Avoid logging,
2357     // it would be just spam.
2358     return false;
2359   }
2360 
2361   // This can happen if the file is closed by a previous task because it reaches
2362   // |max_file_size_bytes|.
2363   if (!tracing_session->write_into_file)
2364     return false;
2365 
2366   if (IsWaitingForTrigger(tracing_session))
2367     return false;
2368 
2369   // ReadBuffers() can allocate memory internally, for filtering. By limiting
2370   // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2371   // we limit the amount of memory used on each iteration.
2372   //
2373   // It would be tempting to split this into multiple tasks like in
2374   // ReadBuffersIntoConsumer, but that's not currently possible.
2375   // ReadBuffersIntoFile has to read the whole available data before returning,
2376   // to support the disable_immediately=true code paths.
2377   bool has_more = true;
2378   bool stop_writing_into_file = false;
2379   do {
2380     std::vector<TracePacket> packets =
2381         ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2382 
2383     stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2384   } while (has_more && !stop_writing_into_file);
2385 
2386   if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2387     // Ensure all data was written to the file before we close it.
2388     base::FlushFile(tracing_session->write_into_file.get());
2389     tracing_session->write_into_file.reset();
2390     tracing_session->write_period_ms = 0;
2391     if (tracing_session->state == TracingSession::STARTED)
2392       DisableTracing(tsid);
2393     return true;
2394   }
2395 
2396   weak_runner_.PostDelayedTask([this, tsid] { ReadBuffersIntoFile(tsid); },
2397                                DelayToNextWritePeriodMs(*tracing_session));
2398   return true;
2399 }
2400 
IsWaitingForTrigger(TracingSession * tracing_session)2401 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2402   // Ignore the logic below for cloned tracing sessions. In this case we
2403   // actually want to read the (cloned) trace buffers even if no trigger was
2404   // hit.
2405   if (tracing_session->state == TracingSession::CLONED_READ_ONLY) {
2406     return false;
2407   }
2408 
2409   // When a tracing session is waiting for a trigger, it is considered empty. If
2410   // a tracing session finishes and moves into DISABLED without ever receiving a
2411   // trigger, the trace should never return any data. This includes the
2412   // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2413   // early and let the consumer know there is no data.
2414   if (!tracing_session->config.trigger_config().triggers().empty() &&
2415       tracing_session->received_triggers.empty()) {
2416     PERFETTO_DLOG(
2417         "ReadBuffers(): tracing session has not received a trigger yet.");
2418     return true;
2419   }
2420 
2421   // Traces with CLONE_SNAPSHOT triggers are a special case of the above. They
2422   // can be read only via a CloneSession() request. This is to keep the
2423   // behavior consistent with the STOP_TRACING+triggers case and avoid periodic
2424   // finalizations and uploads of the main CLONE_SNAPSHOT triggers.
2425   if (GetTriggerMode(tracing_session->config) ==
2426       TraceConfig::TriggerConfig::CLONE_SNAPSHOT) {
2427     PERFETTO_DLOG(
2428         "ReadBuffers(): skipping because the tracing session has "
2429         "CLONE_SNAPSHOT triggers defined");
2430     return true;
2431   }
2432 
2433   return false;
2434 }
2435 
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2436 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2437     TracingSession* tracing_session,
2438     size_t threshold,
2439     bool* has_more) {
2440   PERFETTO_DCHECK_THREAD(thread_checker_);
2441   PERFETTO_DCHECK(tracing_session);
2442   *has_more = false;
2443 
2444   std::vector<TracePacket> packets;
2445   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
2446 
2447   if (!tracing_session->initial_clock_snapshot.empty()) {
2448     EmitClockSnapshot(tracing_session,
2449                       std::move(tracing_session->initial_clock_snapshot),
2450                       &packets);
2451   }
2452 
2453   for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2454     PERFETTO_DCHECK(!snapshot.empty());
2455     EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2456   }
2457   tracing_session->clock_snapshot_ring_buffer.clear();
2458 
2459   if (tracing_session->should_emit_sync_marker) {
2460     EmitSyncMarker(&packets);
2461     tracing_session->should_emit_sync_marker = false;
2462   }
2463 
2464   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2465     MaybeEmitTraceConfig(tracing_session, &packets);
2466     MaybeEmitCloneTrigger(tracing_session, &packets);
2467     MaybeEmitReceivedTriggers(tracing_session, &packets);
2468   }
2469   if (!tracing_session->did_emit_initial_packets) {
2470     EmitUuid(tracing_session, &packets);
2471     if (!tracing_session->config.builtin_data_sources().disable_system_info())
2472       EmitSystemInfo(&packets);
2473   }
2474   tracing_session->did_emit_initial_packets = true;
2475 
2476   // Note that in the proto comment, we guarantee that the tracing_started
2477   // lifecycle event will be emitted before any data packets so make sure to
2478   // keep this before reading the tracing buffers.
2479   if (!tracing_session->config.builtin_data_sources().disable_service_events())
2480     EmitLifecycleEvents(tracing_session, &packets);
2481 
2482   // In a multi-machine tracing session, emit clock synchronization messages for
2483   // remote machines.
2484   if (!relay_clients_.empty())
2485     MaybeEmitRemoteClockSync(tracing_session, &packets);
2486 
2487   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
2488 
2489   // Add up size for packets added by the Maybe* calls above.
2490   for (const TracePacket& packet : packets) {
2491     packets_bytes += packet.size();
2492   }
2493 
2494   bool did_hit_threshold = false;
2495 
2496   for (size_t buf_idx = 0;
2497        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2498        buf_idx++) {
2499     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2500     if (tbuf_iter == buffers_.end()) {
2501       PERFETTO_DFATAL("Buffer not found.");
2502       continue;
2503     }
2504     TraceBuffer& tbuf = *tbuf_iter->second;
2505     tbuf.BeginRead();
2506     while (!did_hit_threshold) {
2507       TracePacket packet;
2508       TraceBuffer::PacketSequenceProperties sequence_properties{};
2509       bool previous_packet_dropped;
2510       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2511                                     &previous_packet_dropped)) {
2512         break;
2513       }
2514       packet.set_buffer_index_for_stats(static_cast<uint32_t>(buf_idx));
2515       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2516       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2517       PERFETTO_DCHECK(sequence_properties.client_identity_trusted.has_uid());
2518       // Not checking sequence_properties.client_identity_trusted.has_pid():
2519       // it is false if the platform doesn't support it.
2520 
2521       PERFETTO_DCHECK(packet.size() > 0);
2522       if (!PacketStreamValidator::Validate(packet.slices())) {
2523         tracing_session->invalid_packets++;
2524         PERFETTO_DLOG("Dropping invalid packet");
2525         continue;
2526       }
2527 
2528       // Append a slice with the trusted field data. This can't be spoofed
2529       // because above we validated that the existing slices don't contain any
2530       // trusted fields. For added safety we append instead of prepending
2531       // because according to protobuf semantics, if the same field is
2532       // encountered multiple times the last instance takes priority. Note that
2533       // truncated packets are also rejected, so the producer can't give us a
2534       // partial packet (e.g., a truncated string) which only becomes valid when
2535       // the trusted data is appended here.
2536       Slice slice = Slice::Allocate(32);
2537       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2538           slice.own_data(), slice.size);
2539       const auto& client_identity_trusted =
2540           sequence_properties.client_identity_trusted;
2541       trusted_packet->set_trusted_uid(
2542           static_cast<int32_t>(client_identity_trusted.uid()));
2543       trusted_packet->set_trusted_packet_sequence_id(
2544           tracing_session->GetPacketSequenceID(
2545               client_identity_trusted.machine_id(),
2546               sequence_properties.producer_id_trusted,
2547               sequence_properties.writer_id));
2548       if (client_identity_trusted.has_pid()) {
2549         // Not supported on all platforms.
2550         trusted_packet->set_trusted_pid(
2551             static_cast<int32_t>(client_identity_trusted.pid()));
2552       }
2553       if (client_identity_trusted.has_non_default_machine_id()) {
2554         trusted_packet->set_machine_id(client_identity_trusted.machine_id());
2555       }
2556       if (previous_packet_dropped)
2557         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2558       slice.size = trusted_packet.Finalize();
2559       packet.AddSlice(std::move(slice));
2560 
2561       // Append the packet (inclusive of the trusted uid) to |packets|.
2562       packets_bytes += packet.size();
2563       did_hit_threshold = packets_bytes >= threshold;
2564       packets.emplace_back(std::move(packet));
2565     }  // for(packets...)
2566   }    // for(buffers...)
2567 
2568   *has_more = did_hit_threshold;
2569 
2570   // Only emit the "read complete" lifetime event when there is no more trace
2571   // data available to read. These events are used as safe points to limit
2572   // sorting in trace processor: the code shouldn't emit the event unless the
2573   // buffers are empty.
2574   if (!*has_more && !tracing_session->config.builtin_data_sources()
2575                          .disable_service_events()) {
2576     // We don't bother snapshotting clocks here because we wouldn't be able to
2577     // emit it and we shouldn't have significant drift from the last snapshot in
2578     // any case.
2579     SnapshotLifecycleEvent(tracing_session,
2580                            protos::pbzero::TracingServiceEvent::
2581                                kReadTracingBuffersCompletedFieldNumber,
2582                            false /* snapshot_clocks */);
2583     EmitLifecycleEvents(tracing_session, &packets);
2584   }
2585 
2586   // Only emit the stats when there is no more trace data is available to read.
2587   // That way, any problems that occur while reading from the buffers are
2588   // reflected in the emitted stats. This is particularly important for use
2589   // cases where ReadBuffers is only ever called after the tracing session is
2590   // stopped.
2591   if (!*has_more && tracing_session->should_emit_stats) {
2592     EmitStats(tracing_session, &packets);
2593     tracing_session->should_emit_stats = false;
2594   }
2595 
2596   MaybeFilterPackets(tracing_session, &packets);
2597 
2598   MaybeCompressPackets(tracing_session, &packets);
2599 
2600   if (!*has_more) {
2601     // We've observed some extremely high memory usage by scudo after
2602     // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2603     // now, but this code asks scudo to release memory just in case.
2604     base::MaybeReleaseAllocatorMemToOS();
2605   }
2606 
2607   return packets;
2608 }
2609 
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2610 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2611                                             std::vector<TracePacket>* packets) {
2612   // If the tracing session specified a filter, run all packets through the
2613   // filter and replace them with the filter results.
2614   // The process below mantains the cardinality of input packets. Even if an
2615   // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2616   // makes debugging and reasoning about the trace stats easier.
2617   // This place swaps the contents of each |packets| entry in place.
2618   if (!tracing_session->trace_filter) {
2619     return;
2620   }
2621   protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2622   // The filter root should be reset from protos.Trace to protos.TracePacket
2623   // by the earlier call to SetFilterRoot() in EnableTracing().
2624   PERFETTO_DCHECK(trace_filter.config().root_msg_index() != 0);
2625   std::vector<protozero::MessageFilter::InputSlice> filter_input;
2626   auto start = clock_->GetWallTimeNs();
2627   for (TracePacket& packet : *packets) {
2628     const auto& packet_slices = packet.slices();
2629     const size_t input_packet_size = packet.size();
2630     filter_input.clear();
2631     filter_input.resize(packet_slices.size());
2632     ++tracing_session->filter_input_packets;
2633     tracing_session->filter_input_bytes += input_packet_size;
2634     for (size_t i = 0; i < packet_slices.size(); ++i)
2635       filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2636     auto filtered_packet = trace_filter.FilterMessageFragments(
2637         &filter_input[0], filter_input.size());
2638 
2639     // Replace the packet in-place with the filtered one (unless failed).
2640     std::optional<uint32_t> maybe_buffer_idx = packet.buffer_index_for_stats();
2641     packet = TracePacket();
2642     if (filtered_packet.error) {
2643       ++tracing_session->filter_errors;
2644       PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2645                     tracing_session->filter_input_packets);
2646       continue;
2647     }
2648     tracing_session->filter_output_bytes += filtered_packet.size;
2649     if (maybe_buffer_idx.has_value()) {
2650       // Keep the per-buffer stats updated. Also propagate the
2651       // buffer_index_for_stats in the output packet to allow accounting by
2652       // other parts of the ReadBuffer pipeline.
2653       uint32_t buffer_idx = maybe_buffer_idx.value();
2654       packet.set_buffer_index_for_stats(buffer_idx);
2655       auto& vec = tracing_session->filter_bytes_discarded_per_buffer;
2656       if (static_cast<size_t>(buffer_idx) >= vec.size())
2657         vec.resize(buffer_idx + 1);
2658       PERFETTO_DCHECK(input_packet_size >= filtered_packet.size);
2659       size_t bytes_filtered_out = input_packet_size - filtered_packet.size;
2660       vec[buffer_idx] += bytes_filtered_out;
2661     }
2662     AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2663                               filtered_packet.size, kMaxTracePacketSliceSize,
2664                               &packet);
2665   }
2666   auto end = clock_->GetWallTimeNs();
2667   tracing_session->filter_time_taken_ns +=
2668       static_cast<uint64_t>((end - start).count());
2669 }
2670 
MaybeCompressPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2671 void TracingServiceImpl::MaybeCompressPackets(
2672     TracingSession* tracing_session,
2673     std::vector<TracePacket>* packets) {
2674   if (!tracing_session->compress_deflate) {
2675     return;
2676   }
2677 
2678   init_opts_.compressor_fn(packets);
2679 }
2680 
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2681 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2682                                        std::vector<TracePacket> packets) {
2683   if (!tracing_session->write_into_file) {
2684     return false;
2685   }
2686   const uint64_t max_size = tracing_session->max_file_size_bytes
2687                                 ? tracing_session->max_file_size_bytes
2688                                 : std::numeric_limits<size_t>::max();
2689 
2690   size_t total_slices = 0;
2691   for (const TracePacket& packet : packets) {
2692     total_slices += packet.slices().size();
2693   }
2694   // When writing into a file, the file should look like a root trace.proto
2695   // message. Each packet should be prepended with a proto preamble stating
2696   // its field id (within trace.proto) and size. Hence the addition below.
2697   const size_t max_iovecs = total_slices + packets.size();
2698 
2699   size_t num_iovecs = 0;
2700   bool stop_writing_into_file = false;
2701   std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2702   size_t num_iovecs_at_last_packet = 0;
2703   uint64_t bytes_about_to_be_written = 0;
2704   for (TracePacket& packet : packets) {
2705     std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2706         packet.GetProtoPreamble();
2707     bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2708     num_iovecs++;
2709     for (const Slice& slice : packet.slices()) {
2710       // writev() doesn't change the passed pointer. However, struct iovec
2711       // take a non-const ptr because it's the same struct used by readv().
2712       // Hence the const_cast here.
2713       char* start = static_cast<char*>(const_cast<void*>(slice.start));
2714       bytes_about_to_be_written += slice.size;
2715       iovecs[num_iovecs++] = {start, slice.size};
2716     }
2717 
2718     if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2719         max_size) {
2720       stop_writing_into_file = true;
2721       num_iovecs = num_iovecs_at_last_packet;
2722       break;
2723     }
2724 
2725     num_iovecs_at_last_packet = num_iovecs;
2726   }
2727   PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2728   int fd = *tracing_session->write_into_file;
2729 
2730   uint64_t total_wr_size = 0;
2731 
2732   // writev() can take at most IOV_MAX entries per call. Batch them.
2733   constexpr size_t kIOVMax = IOV_MAX;
2734   for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2735     int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2736     ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2737     if (wr_size <= 0) {
2738       PERFETTO_PLOG("writev() failed");
2739       stop_writing_into_file = true;
2740       break;
2741     }
2742     total_wr_size += static_cast<size_t>(wr_size);
2743   }
2744 
2745   tracing_session->bytes_written_into_file += total_wr_size;
2746 
2747   PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2748                 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2749   return stop_writing_into_file;
2750 }
2751 
FreeBuffers(TracingSessionID tsid)2752 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2753   PERFETTO_DCHECK_THREAD(thread_checker_);
2754   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2755   TracingSession* tracing_session = GetTracingSession(tsid);
2756   if (!tracing_session) {
2757     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2758     return;  // TODO(primiano): signal failure?
2759   }
2760   DisableTracing(tsid, /*disable_immediately=*/true);
2761 
2762   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2763   tracing_session->data_source_instances.clear();
2764 
2765   for (auto& producer_entry : producers_) {
2766     ProducerEndpointImpl* producer = producer_entry.second;
2767     producer->OnFreeBuffers(tracing_session->buffers_index);
2768   }
2769 
2770   for (BufferID buffer_id : tracing_session->buffers_index) {
2771     buffer_ids_.Free(buffer_id);
2772     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2773     buffers_.erase(buffer_id);
2774   }
2775   bool notify_traceur =
2776       tracing_session->config.notify_traceur() &&
2777       tracing_session->state != TracingSession::CLONED_READ_ONLY;
2778   bool is_long_trace =
2779       (tracing_session->config.write_into_file() &&
2780        tracing_session->config.file_write_period_ms() < kMillisPerDay);
2781   auto pending_clones = std::move(tracing_session->pending_clones);
2782   tracing_sessions_.erase(tsid);
2783   tracing_session = nullptr;
2784   UpdateMemoryGuardrail();
2785 
2786   for (const auto& id_to_clone_op : pending_clones) {
2787     const PendingClone& clone_op = id_to_clone_op.second;
2788     if (clone_op.weak_consumer) {
2789       weak_runner_.task_runner()->PostTask(
2790           [weak_consumer = clone_op.weak_consumer] {
2791             if (weak_consumer) {
2792               weak_consumer->consumer_->OnSessionCloned(
2793                   {false, "Original session ended", {}});
2794             }
2795           });
2796     }
2797   }
2798 
2799   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2800                tracing_sessions_.size());
2801 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2802     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2803   if (notify_traceur && is_long_trace) {
2804     PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2805     if (!notify_fn || !notify_fn(/*session_stolen=*/false))
2806       PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2807   }
2808 #else
2809   base::ignore_result(notify_traceur);
2810   base::ignore_result(is_long_trace);
2811 #endif
2812 }
2813 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2814 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2815                                             const DataSourceDescriptor& desc) {
2816   PERFETTO_DCHECK_THREAD(thread_checker_);
2817   if (desc.name().empty()) {
2818     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2819     return;
2820   }
2821 
2822   ProducerEndpointImpl* producer = GetProducer(producer_id);
2823   if (!producer) {
2824     PERFETTO_DFATAL("Producer not found.");
2825     return;
2826   }
2827 
2828   // Check that the producer doesn't register two data sources with the same ID.
2829   // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2830   // field didn't exist.
2831   for (const auto& kv : data_sources_) {
2832     if (desc.id() && kv.second.producer_id == producer_id &&
2833         kv.second.descriptor.id() == desc.id()) {
2834       PERFETTO_ELOG(
2835           "Failed to register data source \"%s\". A data source with the same "
2836           "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2837           desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2838           producer_id);
2839       return;
2840     }
2841   }
2842 
2843   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2844                 producer_id, desc.name().c_str());
2845 
2846   auto reg_ds = data_sources_.emplace(desc.name(),
2847                                       RegisteredDataSource{producer_id, desc});
2848 
2849   // If there are existing tracing sessions, we need to check if the new
2850   // data source is enabled by any of them.
2851   for (auto& iter : tracing_sessions_) {
2852     TracingSession& tracing_session = iter.second;
2853     if (tracing_session.state != TracingSession::STARTED &&
2854         tracing_session.state != TracingSession::CONFIGURED) {
2855       continue;
2856     }
2857 
2858     TraceConfig::ProducerConfig producer_config;
2859     for (const auto& config : tracing_session.config.producers()) {
2860       if (producer->name_ == config.producer_name()) {
2861         producer_config = config;
2862         break;
2863       }
2864     }
2865     for (const TraceConfig::DataSource& cfg_data_source :
2866          tracing_session.config.data_sources()) {
2867       if (cfg_data_source.config().name() != desc.name())
2868         continue;
2869       DataSourceInstance* ds_inst = SetupDataSource(
2870           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2871       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2872         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2873     }
2874   }  // for(iter : tracing_sessions_)
2875 }
2876 
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2877 void TracingServiceImpl::UpdateDataSource(
2878     ProducerID producer_id,
2879     const DataSourceDescriptor& new_desc) {
2880   if (new_desc.id() == 0) {
2881     PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2882     return;
2883   }
2884 
2885   // If this producer has already registered a matching descriptor name and id,
2886   // just update the descriptor.
2887   RegisteredDataSource* data_source = nullptr;
2888   auto range = data_sources_.equal_range(new_desc.name());
2889   for (auto it = range.first; it != range.second; ++it) {
2890     if (it->second.producer_id == producer_id &&
2891         it->second.descriptor.id() == new_desc.id()) {
2892       data_source = &it->second;
2893       break;
2894     }
2895   }
2896 
2897   if (!data_source) {
2898     PERFETTO_ELOG(
2899         "UpdateDataSource() failed, could not find an existing data source "
2900         "with name=\"%s\" id=%" PRIu64,
2901         new_desc.name().c_str(), new_desc.id());
2902     return;
2903   }
2904 
2905   data_source->descriptor = new_desc;
2906 }
2907 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2908 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2909                                                 TracingSession* tracing_session,
2910                                                 DataSourceInstance* instance,
2911                                                 bool disable_immediately) {
2912   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2913   if (producer->IsAndroidProcessFrozen()) {
2914     PERFETTO_DLOG(
2915         "skipping waiting of data source \"%s\" on producer \"%s\" (pid=%u) "
2916         "because it is frozen",
2917         instance->data_source_name.c_str(), producer->name_.c_str(),
2918         producer->pid());
2919     disable_immediately = true;
2920   }
2921   if (instance->will_notify_on_stop && !disable_immediately) {
2922     instance->state = DataSourceInstance::STOPPING;
2923   } else {
2924     instance->state = DataSourceInstance::STOPPED;
2925   }
2926   if (tracing_session->consumer_maybe_null) {
2927     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2928         *producer, *instance);
2929   }
2930   producer->StopDataSource(ds_inst_id);
2931 }
2932 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2933 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2934                                               const std::string& name) {
2935   PERFETTO_DCHECK_THREAD(thread_checker_);
2936   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2937                 producer_id, name.c_str());
2938   PERFETTO_CHECK(producer_id);
2939   ProducerEndpointImpl* producer = GetProducer(producer_id);
2940   PERFETTO_DCHECK(producer);
2941   for (auto& kv : tracing_sessions_) {
2942     auto& ds_instances = kv.second.data_source_instances;
2943     bool removed = false;
2944     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2945       if (it->first == producer_id && it->second.data_source_name == name) {
2946         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2947         if (it->second.state != DataSourceInstance::STOPPED) {
2948           if (it->second.state != DataSourceInstance::STOPPING) {
2949             StopDataSourceInstance(producer, &kv.second, &it->second,
2950                                    /* disable_immediately = */ false);
2951           }
2952 
2953           // Mark the instance as stopped immediately, since we are
2954           // unregistering it below.
2955           //
2956           //  The StopDataSourceInstance above might have set the state to
2957           //  STOPPING so this condition isn't an else.
2958           if (it->second.state == DataSourceInstance::STOPPING)
2959             NotifyDataSourceStopped(producer_id, ds_inst_id);
2960         }
2961         it = ds_instances.erase(it);
2962         removed = true;
2963       } else {
2964         ++it;
2965       }
2966     }  // for (data_source_instances)
2967     if (removed)
2968       MaybeNotifyAllDataSourcesStarted(&kv.second);
2969   }  // for (tracing_session)
2970 
2971   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2972     if (it->second.producer_id == producer_id &&
2973         it->second.descriptor.name() == name) {
2974       data_sources_.erase(it);
2975       return;
2976     }
2977   }
2978 
2979   PERFETTO_DFATAL(
2980       "Tried to unregister a non-existent data source \"%s\" for "
2981       "producer %" PRIu16,
2982       name.c_str(), producer_id);
2983 }
2984 
IsInitiatorPrivileged(const TracingSession & tracing_session)2985 bool TracingServiceImpl::IsInitiatorPrivileged(
2986     const TracingSession& tracing_session) {
2987 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2988   if (tracing_session.consumer_uid == 1066 /* AID_STATSD */ &&
2989       tracing_session.config.statsd_metadata().triggering_config_uid() !=
2990           2000 /* AID_SHELL */
2991       && tracing_session.config.statsd_metadata().triggering_config_uid() !=
2992              0 /* AID_ROOT */) {
2993     // StatsD can be triggered either by shell, root or an app that has DUMP and
2994     // USAGE_STATS permission. When triggered by shell or root, we do not want
2995     // to consider the trace a trusted system trace, as it was initiated by the
2996     // user. Otherwise, it has to come from an app with DUMP and
2997     // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2998     // system.
2999     // Check for shell / root: https://bit.ly/3b7oZNi
3000     // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
3001     return true;
3002   }
3003   if (tracing_session.consumer_uid == 1000 /* AID_SYSTEM */) {
3004     // AID_SYSTEM is considered a privileged initiator so that system_server can
3005     // profile apps that are not profileable by shell. Other AID_SYSTEM
3006     // processes are not allowed by SELinux to connect to the consumer socket or
3007     // to exec perfetto.
3008     return true;
3009   }
3010 #else
3011   base::ignore_result(tracing_session);
3012 #endif
3013   return false;
3014 }
3015 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)3016 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
3017     const TraceConfig::DataSource& cfg_data_source,
3018     const TraceConfig::ProducerConfig& producer_config,
3019     const RegisteredDataSource& data_source,
3020     TracingSession* tracing_session) {
3021   PERFETTO_DCHECK_THREAD(thread_checker_);
3022   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
3023   PERFETTO_DCHECK(producer);
3024   // An existing producer that is not ftrace could have registered itself as
3025   // ftrace, we must not enable it in that case.
3026   if (lockdown_mode_ && producer->uid() != uid_) {
3027     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
3028     return nullptr;
3029   }
3030   // TODO(primiano): Add tests for registration ordering (data sources vs
3031   // consumers).
3032   if (!NameMatchesFilter(producer->name_,
3033                          cfg_data_source.producer_name_filter(),
3034                          cfg_data_source.producer_name_regex_filter())) {
3035     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
3036                   cfg_data_source.config().name().c_str(),
3037                   producer->name_.c_str());
3038     return nullptr;
3039   }
3040 
3041   auto relative_buffer_id = cfg_data_source.config().target_buffer();
3042   if (relative_buffer_id >= tracing_session->num_buffers()) {
3043     PERFETTO_LOG(
3044         "The TraceConfig for DataSource %s specified a target_buffer out of "
3045         "bound (%u). Skipping it.",
3046         cfg_data_source.config().name().c_str(), relative_buffer_id);
3047     return nullptr;
3048   }
3049 
3050   // Create a copy of the DataSourceConfig specified in the trace config. This
3051   // will be passed to the producer after translating the |target_buffer| id.
3052   // The |target_buffer| parameter passed by the consumer in the trace config is
3053   // relative to the buffers declared in the same trace config. This has to be
3054   // translated to the global BufferID before passing it to the producers, which
3055   // don't know anything about tracing sessions and consumers.
3056 
3057   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
3058   auto insert_iter = tracing_session->data_source_instances.emplace(
3059       std::piecewise_construct,  //
3060       std::forward_as_tuple(producer->id_),
3061       std::forward_as_tuple(
3062           inst_id,
3063           cfg_data_source.config(),  //  Deliberate copy.
3064           data_source.descriptor.name(),
3065           data_source.descriptor.will_notify_on_start(),
3066           data_source.descriptor.will_notify_on_stop(),
3067           data_source.descriptor.handles_incremental_state_clear(),
3068           data_source.descriptor.no_flush()));
3069   DataSourceInstance* ds_instance = &insert_iter->second;
3070 
3071   // New data source instance starts out in CONFIGURED state.
3072   if (tracing_session->consumer_maybe_null) {
3073     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
3074         *producer, *ds_instance);
3075   }
3076 
3077   DataSourceConfig& ds_config = ds_instance->config;
3078   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
3079 
3080   // Rationale for `if (prefer) set_prefer(true)`, rather than `set(prefer)`:
3081   // ComputeStartupConfigHash() in tracing_muxer_impl.cc compares hashes of the
3082   // DataSourceConfig and expects to know (and clear) the fields generated by
3083   // the tracing service. Unconditionally adding a new field breaks backward
3084   // compatibility of startup tracing with older SDKs, because the serialization
3085   // also propagates unkonwn fields, breaking the hash matching check.
3086   if (tracing_session->config.prefer_suspend_clock_for_duration())
3087     ds_config.set_prefer_suspend_clock_for_duration(true);
3088 
3089   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
3090   ds_config.set_enable_extra_guardrails(
3091       tracing_session->config.enable_extra_guardrails());
3092   if (IsInitiatorPrivileged(*tracing_session)) {
3093     ds_config.set_session_initiator(
3094         DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
3095   } else {
3096     // Unset in case the consumer set it.
3097     // We need to be able to trust this field.
3098     ds_config.set_session_initiator(
3099         DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
3100   }
3101   ds_config.set_tracing_session_id(tracing_session->id);
3102   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
3103   PERFETTO_DCHECK(global_id);
3104   ds_config.set_target_buffer(global_id);
3105 
3106   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
3107                 ds_config.name().c_str(), global_id);
3108   if (!producer->shared_memory()) {
3109     // Determine the SMB page size. Must be an integer multiple of 4k.
3110     // As for the SMB size below, the decision tree is as follows:
3111     // 1. Give priority to what is defined in the trace config.
3112     // 2. If unset give priority to the hint passed by the producer.
3113     // 3. Keep within bounds and ensure it's a multiple of 4k.
3114     size_t page_size = producer_config.page_size_kb() * 1024;
3115     if (page_size == 0)
3116       page_size = producer->shmem_page_size_hint_bytes_;
3117 
3118     // Determine the SMB size. Must be an integer multiple of the SMB page size.
3119     // The decision tree is as follows:
3120     // 1. Give priority to what defined in the trace config.
3121     // 2. If unset give priority to the hint passed by the producer.
3122     // 3. Keep within bounds and ensure it's a multiple of the page size.
3123     size_t shm_size = producer_config.shm_size_kb() * 1024;
3124     if (shm_size == 0)
3125       shm_size = producer->shmem_size_hint_bytes_;
3126 
3127     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
3128     if (valid_sizes != std::tie(shm_size, page_size)) {
3129       PERFETTO_DLOG(
3130           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
3131           "back to shm_size %zu page_size %zu.",
3132           shm_size, page_size, std::get<0>(valid_sizes),
3133           std::get<1>(valid_sizes));
3134     }
3135     std::tie(shm_size, page_size) = valid_sizes;
3136 
3137     // TODO(primiano): right now Create() will suicide in case of OOM if the
3138     // mmap fails. We should instead gracefully fail the request and tell the
3139     // client to go away.
3140     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
3141                   producer->name_.c_str());
3142     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
3143     producer->SetupSharedMemory(std::move(shared_memory), page_size,
3144                                 /*provided_by_producer=*/false);
3145   }
3146   producer->SetupDataSource(inst_id, ds_config);
3147   return ds_instance;
3148 }
3149 
3150 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
3151 // might be lying / returning garbage contents. |src| and |size| can be trusted
3152 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,const ClientIdentity & client_identity_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)3153 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
3154     ProducerID producer_id_trusted,
3155     const ClientIdentity& client_identity_trusted,
3156     WriterID writer_id,
3157     ChunkID chunk_id,
3158     BufferID buffer_id,
3159     uint16_t num_fragments,
3160     uint8_t chunk_flags,
3161     bool chunk_complete,
3162     const uint8_t* src,
3163     size_t size) {
3164   PERFETTO_DCHECK_THREAD(thread_checker_);
3165 
3166   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
3167   if (!producer) {
3168     PERFETTO_DFATAL("Producer not found.");
3169     chunks_discarded_++;
3170     return;
3171   }
3172 
3173   TraceBuffer* buf = GetBufferByID(buffer_id);
3174   if (!buf) {
3175     PERFETTO_DLOG("Could not find target buffer %" PRIu16
3176                   " for producer %" PRIu16,
3177                   buffer_id, producer_id_trusted);
3178     chunks_discarded_++;
3179     return;
3180   }
3181 
3182   // Verify that the producer is actually allowed to write into the target
3183   // buffer specified in the request. This prevents a malicious producer from
3184   // injecting data into a log buffer that belongs to a tracing session the
3185   // producer is not part of.
3186   if (!producer->is_allowed_target_buffer(buffer_id)) {
3187     PERFETTO_ELOG("Producer %" PRIu16
3188                   " tried to write into forbidden target buffer %" PRIu16,
3189                   producer_id_trusted, buffer_id);
3190     PERFETTO_DFATAL("Forbidden target buffer");
3191     chunks_discarded_++;
3192     return;
3193   }
3194 
3195   // If the writer was registered by the producer, it should only write into the
3196   // buffer it was registered with.
3197   std::optional<BufferID> associated_buffer =
3198       producer->buffer_id_for_writer(writer_id);
3199   if (associated_buffer && *associated_buffer != buffer_id) {
3200     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
3201                   " was registered to write into target buffer %" PRIu16
3202                   ", but tried to write into buffer %" PRIu16,
3203                   writer_id, producer_id_trusted, *associated_buffer,
3204                   buffer_id);
3205     PERFETTO_DFATAL("Wrong target buffer");
3206     chunks_discarded_++;
3207     return;
3208   }
3209 
3210   buf->CopyChunkUntrusted(producer_id_trusted, client_identity_trusted,
3211                           writer_id, chunk_id, num_fragments, chunk_flags,
3212                           chunk_complete, src, size);
3213 }
3214 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)3215 void TracingServiceImpl::ApplyChunkPatches(
3216     ProducerID producer_id_trusted,
3217     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
3218   PERFETTO_DCHECK_THREAD(thread_checker_);
3219 
3220   for (const auto& chunk : chunks_to_patch) {
3221     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
3222     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
3223     TraceBuffer* buf =
3224         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
3225     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
3226                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
3227     if (!writer_id || writer_id > kMaxWriterID || !buf) {
3228       // This can genuinely happen when the trace is stopped. The producers
3229       // might see the stop signal with some delay and try to keep sending
3230       // patches left soon after.
3231       PERFETTO_DLOG(
3232           "Received invalid chunks_to_patch request from Producer: %" PRIu16
3233           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
3234           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
3235       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3236       continue;
3237     }
3238 
3239     // Note, there's no need to validate that the producer is allowed to write
3240     // to the specified buffer ID (or that it's the correct buffer ID for a
3241     // registered TraceWriter). That's because TraceBuffer uses the producer ID
3242     // and writer ID to look up the chunk to patch. If the producer specifies an
3243     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
3244     // patches. Because the producer ID is trusted, there's also no way for a
3245     // malicious producer to patch another producer's data.
3246 
3247     // Speculate on the fact that there are going to be a limited amount of
3248     // patches per request, so we can allocate the |patches| array on the stack.
3249     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
3250     if (chunk.patches().size() > patches.size()) {
3251       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
3252                     patches.size());
3253       PERFETTO_DFATAL("Too many patches");
3254       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3255       continue;
3256     }
3257 
3258     size_t i = 0;
3259     for (const auto& patch : chunk.patches()) {
3260       const std::string& patch_data = patch.data();
3261       if (patch_data.size() != patches[i].data.size()) {
3262         PERFETTO_ELOG("Received patch from producer: %" PRIu16
3263                       " of unexpected size %zu",
3264                       producer_id_trusted, patch_data.size());
3265         patches_discarded_++;
3266         continue;
3267       }
3268       patches[i].offset_untrusted = patch.offset();
3269       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
3270       i++;
3271     }
3272     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
3273                                &patches[0], i, chunk.has_more_patches());
3274   }
3275 }
3276 
GetDetachedSession(uid_t uid,const std::string & key)3277 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
3278     uid_t uid,
3279     const std::string& key) {
3280   PERFETTO_DCHECK_THREAD(thread_checker_);
3281   for (auto& kv : tracing_sessions_) {
3282     TracingSession* session = &kv.second;
3283     if (session->consumer_uid == uid && session->detach_key == key) {
3284       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
3285       return session;
3286     }
3287   }
3288   return nullptr;
3289 }
3290 
GetTracingSession(TracingSessionID tsid)3291 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
3292     TracingSessionID tsid) {
3293   PERFETTO_DCHECK_THREAD(thread_checker_);
3294   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
3295   if (it == tracing_sessions_.end())
3296     return nullptr;
3297   return &it->second;
3298 }
3299 
3300 TracingServiceImpl::TracingSession*
GetTracingSessionByUniqueName(const std::string & unique_session_name)3301 TracingServiceImpl::GetTracingSessionByUniqueName(
3302     const std::string& unique_session_name) {
3303   PERFETTO_DCHECK_THREAD(thread_checker_);
3304   if (unique_session_name.empty()) {
3305     return nullptr;
3306   }
3307   for (auto& session_id_and_session : tracing_sessions_) {
3308     TracingSession& session = session_id_and_session.second;
3309     if (session.state == TracingSession::CLONED_READ_ONLY) {
3310       continue;
3311     }
3312     if (session.config.unique_session_name() == unique_session_name) {
3313       return &session;
3314     }
3315   }
3316   return nullptr;
3317 }
3318 
3319 TracingServiceImpl::TracingSession*
FindTracingSessionWithMaxBugreportScore()3320 TracingServiceImpl::FindTracingSessionWithMaxBugreportScore() {
3321   TracingSession* max_session = nullptr;
3322   for (auto& session_id_and_session : tracing_sessions_) {
3323     auto& session = session_id_and_session.second;
3324     const int32_t score = session.config.bugreport_score();
3325     // Exclude sessions with 0 (or below) score. By default tracing sessions
3326     // should NOT be eligible to be attached to bugreports.
3327     if (score <= 0 || session.state != TracingSession::STARTED)
3328       continue;
3329 
3330     if (!max_session || score > max_session->config.bugreport_score())
3331       max_session = &session;
3332   }
3333   return max_session;
3334 }
3335 
GetNextProducerID()3336 ProducerID TracingServiceImpl::GetNextProducerID() {
3337   PERFETTO_DCHECK_THREAD(thread_checker_);
3338   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
3339   do {
3340     ++last_producer_id_;
3341   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
3342   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
3343   return last_producer_id_;
3344 }
3345 
GetBufferByID(BufferID buffer_id)3346 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
3347   auto buf_iter = buffers_.find(buffer_id);
3348   if (buf_iter == buffers_.end())
3349     return nullptr;
3350   return &*buf_iter->second;
3351 }
3352 
OnStartTriggersTimeout(TracingSessionID tsid)3353 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
3354   // Skip entirely the flush if the trace session doesn't exist anymore.
3355   // This is to prevent misleading error messages to be logged.
3356   //
3357   // if the trace has started from the trigger we rely on
3358   // the |stop_delay_ms| from the trigger so don't flush and
3359   // disable if we've moved beyond a CONFIGURED state
3360   auto* tracing_session_ptr = GetTracingSession(tsid);
3361   if (tracing_session_ptr &&
3362       tracing_session_ptr->state == TracingSession::CONFIGURED) {
3363     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
3364                   " since no triggers activated.",
3365                   tsid);
3366     // No data should be returned from ReadBuffers() regardless of if we
3367     // call FreeBuffers() or DisableTracing(). This is because in
3368     // STOP_TRACING we need this promise in either case, and using
3369     // DisableTracing() allows a graceful shutdown. Consumers can follow
3370     // their normal path and check the buffers through ReadBuffers() and
3371     // the code won't hang because the tracing session will still be
3372     // alive just disabled.
3373     DisableTracing(tsid);
3374   }
3375 }
3376 
UpdateMemoryGuardrail()3377 void TracingServiceImpl::UpdateMemoryGuardrail() {
3378 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
3379   uint64_t total_buffer_bytes = 0;
3380 
3381   // Sum up all the shared memory buffers.
3382   for (const auto& id_to_producer : producers_) {
3383     if (id_to_producer.second->shared_memory())
3384       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
3385   }
3386 
3387   // Sum up all the trace buffers.
3388   for (const auto& id_to_buffer : buffers_) {
3389     total_buffer_bytes += id_to_buffer.second->size();
3390   }
3391 
3392   // Sum up all the cloned traced buffers.
3393   for (const auto& id_to_ts : tracing_sessions_) {
3394     const TracingSession& ts = id_to_ts.second;
3395     for (const auto& id_to_clone_op : ts.pending_clones) {
3396       const PendingClone& clone_op = id_to_clone_op.second;
3397       for (const std::unique_ptr<TraceBuffer>& buf : clone_op.buffers) {
3398         if (buf) {
3399           total_buffer_bytes += buf->size();
3400         }
3401       }
3402     }
3403   }
3404 
3405   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
3406   // interval.
3407   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
3408   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
3409 #endif
3410 }
3411 
PeriodicSnapshotTask(TracingSessionID tsid)3412 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
3413   auto* tracing_session = GetTracingSession(tsid);
3414   if (!tracing_session)
3415     return;
3416   if (tracing_session->state != TracingSession::STARTED)
3417     return;
3418   tracing_session->should_emit_sync_marker = true;
3419   tracing_session->should_emit_stats = true;
3420   MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3421 }
3422 
SnapshotLifecycleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)3423 void TracingServiceImpl::SnapshotLifecycleEvent(TracingSession* tracing_session,
3424                                                 uint32_t field_id,
3425                                                 bool snapshot_clocks) {
3426   // field_id should be an id of a field in TracingServiceEvent.
3427   auto& lifecycle_events = tracing_session->lifecycle_events;
3428   auto event_it =
3429       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3430                    [field_id](const TracingSession::LifecycleEvent& event) {
3431                      return event.field_id == field_id;
3432                    });
3433 
3434   TracingSession::LifecycleEvent* event;
3435   if (event_it == lifecycle_events.end()) {
3436     lifecycle_events.emplace_back(field_id);
3437     event = &lifecycle_events.back();
3438   } else {
3439     event = &*event_it;
3440   }
3441 
3442   // Snapshot the clocks before capturing the timestamp for the event so we can
3443   // use this snapshot to resolve the event timestamp if necessary.
3444   if (snapshot_clocks)
3445     MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3446 
3447   // Erase before emplacing to prevent a unncessary doubling of memory if
3448   // not needed.
3449   if (event->timestamps.size() >= event->max_size) {
3450     event->timestamps.erase_front(1 + event->timestamps.size() -
3451                                   event->max_size);
3452   }
3453   event->timestamps.emplace_back(clock_->GetBootTimeNs().count());
3454 }
3455 
SetSingleLifecycleEvent(TracingSession * tracing_session,uint32_t field_id,int64_t boot_timestamp_ns)3456 void TracingServiceImpl::SetSingleLifecycleEvent(
3457     TracingSession* tracing_session,
3458     uint32_t field_id,
3459     int64_t boot_timestamp_ns) {
3460   // field_id should be an id of a field in TracingServiceEvent.
3461   auto& lifecycle_events = tracing_session->lifecycle_events;
3462   auto event_it =
3463       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3464                    [field_id](const TracingSession::LifecycleEvent& event) {
3465                      return event.field_id == field_id;
3466                    });
3467 
3468   TracingSession::LifecycleEvent* event;
3469   if (event_it == lifecycle_events.end()) {
3470     lifecycle_events.emplace_back(field_id);
3471     event = &lifecycle_events.back();
3472   } else {
3473     event = &*event_it;
3474   }
3475 
3476   event->timestamps.clear();
3477   event->timestamps.emplace_back(boot_timestamp_ns);
3478 }
3479 
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3480 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3481     TracingSession* tracing_session) {
3482   if (tracing_session->config.builtin_data_sources()
3483           .disable_clock_snapshotting()) {
3484     return;
3485   }
3486 
3487   // We are making an explicit copy of the latest snapshot (if it exists)
3488   // because SnapshotClocks reads this data and computes the drift based on its
3489   // content. If the clock drift is high enough, it will update the contents of
3490   // |snapshot| and return true. Otherwise, it will return false.
3491   TracingSession::ClockSnapshotData snapshot =
3492       tracing_session->clock_snapshot_ring_buffer.empty()
3493           ? TracingSession::ClockSnapshotData()
3494           : tracing_session->clock_snapshot_ring_buffer.back();
3495   bool did_update = SnapshotClocks(&snapshot);
3496   if (did_update) {
3497     // This means clocks drifted enough since last snapshot. See the comment
3498     // in SnapshotClocks.
3499     auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3500 
3501     // Erase before emplacing to prevent a unncessary doubling of memory if
3502     // not needed.
3503     static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3504     if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3505       snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3506                                    kClockSnapshotRingBufferSize);
3507     }
3508     snapshot_buffer->emplace_back(std::move(snapshot));
3509   }
3510 }
3511 
3512 // Returns true when the data in |snapshot_data| is updated with the new state
3513 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3514 bool TracingServiceImpl::SnapshotClocks(
3515     TracingSession::ClockSnapshotData* snapshot_data) {
3516   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3517   // been emitted into the trace yet (see comment below).
3518   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
3519 
3520   TracingSession::ClockSnapshotData new_snapshot_data =
3521       base::CaptureClockSnapshots();
3522   // If we're about to update a session's latest clock snapshot that hasn't been
3523   // emitted into the trace yet, check whether the clocks have drifted enough to
3524   // warrant overriding the current snapshot values. The older snapshot would be
3525   // valid for a larger part of the currently buffered trace data because the
3526   // clock sync protocol in trace processor uses the latest clock <= timestamp
3527   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3528   // we try to keep it if we can.
3529   if (!snapshot_data->empty()) {
3530     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3531     PERFETTO_DCHECK((*snapshot_data)[0].clock_id ==
3532                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
3533 
3534     bool update_snapshot = false;
3535     uint64_t old_boot_ns = (*snapshot_data)[0].timestamp;
3536     uint64_t new_boot_ns = new_snapshot_data[0].timestamp;
3537     int64_t boot_diff =
3538         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3539 
3540     for (size_t i = 1; i < snapshot_data->size(); i++) {
3541       uint64_t old_ns = (*snapshot_data)[i].timestamp;
3542       uint64_t new_ns = new_snapshot_data[i].timestamp;
3543 
3544       int64_t diff =
3545           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3546 
3547       // Compare the boottime delta against the delta of this clock.
3548       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3549         update_snapshot = true;
3550         break;
3551       }
3552     }
3553     if (!update_snapshot)
3554       return false;
3555     snapshot_data->clear();
3556   }
3557 
3558   *snapshot_data = std::move(new_snapshot_data);
3559   return true;
3560 }
3561 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3562 void TracingServiceImpl::EmitClockSnapshot(
3563     TracingSession* tracing_session,
3564     TracingSession::ClockSnapshotData snapshot_data,
3565     std::vector<TracePacket>* packets) {
3566   PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3567                        .disable_clock_snapshotting());
3568 
3569   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3570   auto* snapshot = packet->set_clock_snapshot();
3571 
3572   protos::gen::BuiltinClock trace_clock =
3573       tracing_session->config.builtin_data_sources().primary_trace_clock();
3574   if (!trace_clock)
3575     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3576   snapshot->set_primary_trace_clock(
3577       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3578 
3579   for (auto& clock_id_and_ts : snapshot_data) {
3580     auto* c = snapshot->add_clocks();
3581     c->set_clock_id(clock_id_and_ts.clock_id);
3582     c->set_timestamp(clock_id_and_ts.timestamp);
3583   }
3584 
3585   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3586   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3587   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3588 }
3589 
EmitSyncMarker(std::vector<TracePacket> * packets)3590 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3591   // The sync marks are used to tokenize large traces efficiently.
3592   // See description in trace_packet.proto.
3593   if (sync_marker_packet_size_ == 0) {
3594     // The marker ABI expects that the marker is written after the uid.
3595     // Protozero guarantees that fields are written in the same order of the
3596     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3597     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3598         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3599     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3600     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3601 
3602     // Keep this last.
3603     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3604     sync_marker_packet_size_ = packet.Finalize();
3605   }
3606   packets->emplace_back();
3607   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3608 }
3609 
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3610 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3611                                    std::vector<TracePacket>* packets) {
3612   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3613   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3614   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3615   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3616   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3617 }
3618 
GetTraceStats(TracingSession * tracing_session)3619 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3620   TraceStats trace_stats;
3621   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3622   trace_stats.set_producers_seen(last_producer_id_);
3623   trace_stats.set_data_sources_registered(
3624       static_cast<uint32_t>(data_sources_.size()));
3625   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3626   trace_stats.set_tracing_sessions(
3627       static_cast<uint32_t>(tracing_sessions_.size()));
3628   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3629   trace_stats.set_chunks_discarded(chunks_discarded_);
3630   trace_stats.set_patches_discarded(patches_discarded_);
3631   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3632   trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3633   trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3634   trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3635   trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3636 
3637   if (tracing_session->trace_filter) {
3638     auto* filt_stats = trace_stats.mutable_filter_stats();
3639     filt_stats->set_input_packets(tracing_session->filter_input_packets);
3640     filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3641     filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3642     filt_stats->set_errors(tracing_session->filter_errors);
3643     filt_stats->set_time_taken_ns(tracing_session->filter_time_taken_ns);
3644     for (uint64_t value : tracing_session->filter_bytes_discarded_per_buffer)
3645       filt_stats->add_bytes_discarded_per_buffer(value);
3646   }
3647 
3648   for (BufferID buf_id : tracing_session->buffers_index) {
3649     TraceBuffer* buf = GetBufferByID(buf_id);
3650     if (!buf) {
3651       PERFETTO_DFATAL("Buffer not found.");
3652       continue;
3653     }
3654     *trace_stats.add_buffer_stats() = buf->stats();
3655   }  // for (buf in session).
3656 
3657   if (!tracing_session->config.builtin_data_sources()
3658            .disable_chunk_usage_histograms()) {
3659     // Emit chunk usage stats broken down by sequence ID (i.e. by trace-writer).
3660     // Writer stats are updated by each TraceBuffer object at ReadBuffers time,
3661     // and there can be >1 buffer per session. A trace writer never writes to
3662     // more than one buffer (it's technically allowed but doesn't happen in the
3663     // current impl of the tracing SDK).
3664 
3665     bool has_written_bucket_definition = false;
3666     uint32_t buf_idx = static_cast<uint32_t>(-1);
3667     for (const BufferID buf_id : tracing_session->buffers_index) {
3668       ++buf_idx;
3669       const TraceBuffer* buf = GetBufferByID(buf_id);
3670       if (!buf)
3671         continue;
3672       for (auto it = buf->writer_stats().GetIterator(); it; ++it) {
3673         const auto& hist = it.value().used_chunk_hist;
3674         ProducerID p;
3675         WriterID w;
3676         GetProducerAndWriterID(it.key(), &p, &w);
3677         if (!has_written_bucket_definition) {
3678           // Serialize one-off the histogram bucket definition, which is the
3679           // same for all entries in the map.
3680           has_written_bucket_definition = true;
3681           // The -1 in the loop below is to skip the implicit overflow bucket.
3682           for (size_t i = 0; i < hist.num_buckets() - 1; ++i) {
3683             trace_stats.add_chunk_payload_histogram_def(hist.GetBucketThres(i));
3684           }
3685         }  // if(!has_written_bucket_definition)
3686         auto* wri_stats = trace_stats.add_writer_stats();
3687         wri_stats->set_sequence_id(
3688             tracing_session->GetPacketSequenceID(kDefaultMachineID, p, w));
3689         wri_stats->set_buffer(buf_idx);
3690         for (size_t i = 0; i < hist.num_buckets(); ++i) {
3691           wri_stats->add_chunk_payload_histogram_counts(hist.GetBucketCount(i));
3692           wri_stats->add_chunk_payload_histogram_sum(hist.GetBucketSum(i));
3693         }
3694       }  // for each sequence (writer).
3695     }    // for each buffer.
3696   }      // if (!disable_chunk_usage_histograms)
3697 
3698   return trace_stats;
3699 }
3700 
EmitUuid(TracingSession * tracing_session,std::vector<TracePacket> * packets)3701 void TracingServiceImpl::EmitUuid(TracingSession* tracing_session,
3702                                   std::vector<TracePacket>* packets) {
3703   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3704   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3705   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3706   auto* uuid = packet->set_trace_uuid();
3707   uuid->set_lsb(tracing_session->trace_uuid.lsb());
3708   uuid->set_msb(tracing_session->trace_uuid.msb());
3709   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3710 }
3711 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3712 void TracingServiceImpl::MaybeEmitTraceConfig(
3713     TracingSession* tracing_session,
3714     std::vector<TracePacket>* packets) {
3715   if (tracing_session->did_emit_initial_packets)
3716     return;
3717   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3718   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3719   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3720   tracing_session->config.Serialize(packet->set_trace_config());
3721   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3722 }
3723 
EmitSystemInfo(std::vector<TracePacket> * packets)3724 void TracingServiceImpl::EmitSystemInfo(std::vector<TracePacket>* packets) {
3725   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3726   auto* info = packet->set_system_info();
3727   info->set_tracing_service_version(base::GetVersionString());
3728 
3729   std::optional<int32_t> tzoff = base::GetTimezoneOffsetMins();
3730   if (tzoff.has_value())
3731     info->set_timezone_off_mins(*tzoff);
3732 
3733 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3734     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3735   struct utsname uname_info;
3736   if (uname(&uname_info) == 0) {
3737     auto* utsname_info = info->set_utsname();
3738     utsname_info->set_sysname(uname_info.sysname);
3739     utsname_info->set_version(uname_info.version);
3740     utsname_info->set_machine(uname_info.machine);
3741     utsname_info->set_release(uname_info.release);
3742   }
3743   info->set_page_size(static_cast<uint32_t>(sysconf(_SC_PAGESIZE)));
3744   info->set_num_cpus(static_cast<uint32_t>(sysconf(_SC_NPROCESSORS_CONF)));
3745 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3746 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3747   std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3748   if (!fingerprint_value.empty()) {
3749     info->set_android_build_fingerprint(fingerprint_value);
3750   } else {
3751     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3752   }
3753 
3754   std::string device_manufacturer_value =
3755       base::GetAndroidProp("ro.product.manufacturer");
3756   if (!device_manufacturer_value.empty()) {
3757     info->set_android_device_manufacturer(device_manufacturer_value);
3758   } else {
3759     PERFETTO_ELOG("Unable to read ro.product.manufacturer");
3760   }
3761 
3762   std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3763   std::optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3764   if (sdk_value.has_value()) {
3765     info->set_android_sdk_version(*sdk_value);
3766   } else {
3767     PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3768   }
3769 
3770   std::string soc_model_value = base::GetAndroidProp("ro.soc.model");
3771   if (!soc_model_value.empty()) {
3772     info->set_android_soc_model(soc_model_value);
3773   } else {
3774     PERFETTO_ELOG("Unable to read ro.soc.model");
3775   }
3776 
3777   // guest_soc model is not always present
3778   std::string guest_soc_model_value =
3779       base::GetAndroidProp("ro.boot.guest_soc.model");
3780   if (!guest_soc_model_value.empty()) {
3781     info->set_android_guest_soc_model(guest_soc_model_value);
3782   }
3783 
3784   std::string hw_rev_value = base::GetAndroidProp("ro.boot.hardware.revision");
3785   if (!hw_rev_value.empty()) {
3786     info->set_android_hardware_revision(hw_rev_value);
3787   } else {
3788     PERFETTO_ELOG("Unable to read ro.boot.hardware.revision");
3789   }
3790 
3791   std::string hw_ufs_value = base::GetAndroidProp("ro.boot.hardware.ufs");
3792   if (!hw_ufs_value.empty()) {
3793     info->set_android_storage_model(hw_ufs_value);
3794   } else {
3795     PERFETTO_ELOG("Unable to read ro.boot.hardware.ufs");
3796   }
3797 
3798   std::string hw_ddr_value = base::GetAndroidProp("ro.boot.hardware.ddr");
3799   if (!hw_ddr_value.empty()) {
3800     info->set_android_ram_model(hw_ddr_value);
3801   } else {
3802     PERFETTO_ELOG("Unable to read ro.boot.hardware.ddr");
3803   }
3804 
3805 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3806   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3807   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3808   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3809 }
3810 
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3811 void TracingServiceImpl::EmitLifecycleEvents(
3812     TracingSession* tracing_session,
3813     std::vector<TracePacket>* packets) {
3814   using TimestampedPacket =
3815       std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3816 
3817   std::vector<TimestampedPacket> timestamped_packets;
3818   for (auto& event : tracing_session->lifecycle_events) {
3819     for (int64_t ts : event.timestamps) {
3820       protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3821       packet->set_timestamp(static_cast<uint64_t>(ts));
3822       packet->set_trusted_uid(static_cast<int32_t>(uid_));
3823       packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3824 
3825       auto* service_event = packet->set_service_event();
3826       service_event->AppendVarInt(event.field_id, 1);
3827       timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3828     }
3829     event.timestamps.clear();
3830   }
3831 
3832   if (tracing_session->slow_start_event.has_value()) {
3833     const TracingSession::ArbitraryLifecycleEvent& event =
3834         *tracing_session->slow_start_event;
3835     timestamped_packets.emplace_back(event.timestamp, std::move(event.data));
3836   }
3837   tracing_session->slow_start_event.reset();
3838 
3839   for (auto& event : tracing_session->last_flush_events) {
3840     timestamped_packets.emplace_back(event.timestamp, std::move(event.data));
3841   }
3842   tracing_session->last_flush_events.clear();
3843 
3844   for (size_t i = 0; i < tracing_session->buffer_cloned_timestamps.size();
3845        i++) {
3846     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3847     int64_t ts = tracing_session->buffer_cloned_timestamps[i];
3848     packet->set_timestamp(static_cast<uint64_t>(ts));
3849     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3850     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3851 
3852     auto* service_event = packet->set_service_event();
3853     service_event->set_buffer_cloned(static_cast<uint32_t>(i));
3854 
3855     timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3856   }
3857   tracing_session->buffer_cloned_timestamps.clear();
3858 
3859   // We sort by timestamp here to ensure that the "sequence" of lifecycle
3860   // packets has monotonic timestamps like other sequences in the trace.
3861   // Note that these events could still be out of order with respect to other
3862   // events on the service packet sequence (e.g. trigger received packets).
3863   std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3864             [](const TimestampedPacket& a, const TimestampedPacket& b) {
3865               return a.first < b.first;
3866             });
3867 
3868   for (auto& pair : timestamped_packets)
3869     SerializeAndAppendPacket(packets, std::move(pair.second));
3870 }
3871 
MaybeEmitRemoteClockSync(TracingSession * tracing_session,std::vector<TracePacket> * packets)3872 void TracingServiceImpl::MaybeEmitRemoteClockSync(
3873     TracingSession* tracing_session,
3874     std::vector<TracePacket>* packets) {
3875   if (tracing_session->did_emit_remote_clock_sync_)
3876     return;
3877 
3878   std::unordered_set<MachineID> did_emit_machines;
3879   for (const auto& id_and_relay_client : relay_clients_) {
3880     const auto& relay_client = id_and_relay_client.second;
3881     auto machine_id = relay_client->machine_id();
3882     if (did_emit_machines.find(machine_id) != did_emit_machines.end())
3883       continue;  // Already emitted for the machine (e.g. multiple clients).
3884 
3885     auto& sync_clock_snapshots = relay_client->synced_clocks();
3886     if (sync_clock_snapshots.empty()) {
3887       PERFETTO_DLOG("Clock not synchronized for machine ID = %" PRIu32,
3888                     machine_id);
3889       continue;
3890     }
3891 
3892     // Don't emit twice for the same machine.
3893     did_emit_machines.insert(machine_id);
3894 
3895     protozero::HeapBuffered<protos::pbzero::TracePacket> sync_packet;
3896     sync_packet->set_machine_id(machine_id);
3897     sync_packet->set_trusted_uid(static_cast<int32_t>(uid_));
3898     auto* remote_clock_sync = sync_packet->set_remote_clock_sync();
3899     for (const auto& sync_exchange : relay_client->synced_clocks()) {
3900       auto* sync_exchange_msg = remote_clock_sync->add_synced_clocks();
3901 
3902       auto* client_snapshots = sync_exchange_msg->set_client_clocks();
3903       for (const auto& client_clock : sync_exchange.client_clocks) {
3904         auto* clock = client_snapshots->add_clocks();
3905         clock->set_clock_id(client_clock.clock_id);
3906         clock->set_timestamp(client_clock.timestamp);
3907       }
3908 
3909       auto* host_snapshots = sync_exchange_msg->set_host_clocks();
3910       for (const auto& host_clock : sync_exchange.host_clocks) {
3911         auto* clock = host_snapshots->add_clocks();
3912         clock->set_clock_id(host_clock.clock_id);
3913         clock->set_timestamp(host_clock.timestamp);
3914       }
3915     }
3916 
3917     SerializeAndAppendPacket(packets, sync_packet.SerializeAsArray());
3918   }
3919 
3920   tracing_session->did_emit_remote_clock_sync_ = true;
3921 }
3922 
MaybeEmitCloneTrigger(TracingSession * tracing_session,std::vector<TracePacket> * packets)3923 void TracingServiceImpl::MaybeEmitCloneTrigger(
3924     TracingSession* tracing_session,
3925     std::vector<TracePacket>* packets) {
3926   if (tracing_session->clone_trigger.has_value()) {
3927     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3928     auto* trigger = packet->set_clone_snapshot_trigger();
3929     const auto& info = tracing_session->clone_trigger.value();
3930     trigger->set_trigger_name(info.trigger_name);
3931     trigger->set_producer_name(info.producer_name);
3932     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3933 
3934     packet->set_timestamp(info.boot_time_ns);
3935     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3936     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3937     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3938   }
3939 }
3940 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3941 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3942     TracingSession* tracing_session,
3943     std::vector<TracePacket>* packets) {
3944   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3945                   tracing_session->received_triggers.size());
3946   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3947        i < tracing_session->received_triggers.size(); ++i) {
3948     const auto& info = tracing_session->received_triggers[i];
3949     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3950     auto* trigger = packet->set_trigger();
3951     trigger->set_trigger_name(info.trigger_name);
3952     trigger->set_producer_name(info.producer_name);
3953     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3954 
3955     packet->set_timestamp(info.boot_time_ns);
3956     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3957     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3958     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3959     ++tracing_session->num_triggers_emitted_into_trace;
3960   }
3961 }
3962 
MaybeLogUploadEvent(const TraceConfig & cfg,const base::Uuid & uuid,PerfettoStatsdAtom atom,const std::string & trigger_name)3963 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3964                                              const base::Uuid& uuid,
3965                                              PerfettoStatsdAtom atom,
3966                                              const std::string& trigger_name) {
3967   if (!ShouldLogEvent(cfg))
3968     return;
3969 
3970   PERFETTO_DCHECK(uuid);  // The UUID must be set at this point.
3971   android_stats::MaybeLogUploadEvent(atom, uuid.lsb(), uuid.msb(),
3972                                      trigger_name);
3973 }
3974 
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3975 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3976                                               PerfettoTriggerAtom atom,
3977                                               const std::string& trigger_name) {
3978   if (!ShouldLogEvent(cfg))
3979     return;
3980   android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3981 }
3982 
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3983 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3984     int64_t now_ns,
3985     uint64_t trigger_name_hash) {
3986   constexpr int64_t kOneDayInNs = 24ll * 60 * 60 * 1000 * 1000 * 1000;
3987   PERFETTO_DCHECK(
3988       std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3989   size_t remove_count = 0;
3990   size_t trigger_count = 0;
3991   for (const TriggerHistory& h : trigger_history_) {
3992     if (h.timestamp_ns < now_ns - kOneDayInNs) {
3993       remove_count++;
3994     } else if (h.name_hash == trigger_name_hash) {
3995       trigger_count++;
3996     }
3997   }
3998   trigger_history_.erase_front(remove_count);
3999   return trigger_count;
4000 }
4001 
FlushAndCloneSession(ConsumerEndpointImpl * consumer,ConsumerEndpoint::CloneSessionArgs args)4002 base::Status TracingServiceImpl::FlushAndCloneSession(
4003     ConsumerEndpointImpl* consumer,
4004     ConsumerEndpoint::CloneSessionArgs args) {
4005   PERFETTO_DCHECK_THREAD(thread_checker_);
4006   auto clone_target = FlushFlags::CloneTarget::kUnknown;
4007 
4008   TracingSession* session = nullptr;
4009   if (args.for_bugreport) {
4010     clone_target = FlushFlags::CloneTarget::kBugreport;
4011   }
4012   if (args.tsid != 0) {
4013     if (args.tsid == kBugreportSessionId) {
4014       // This branch is only here to support the legacy protocol where we could
4015       // clone only a single session using the magic ID kBugreportSessionId.
4016       // The newer perfetto --clone-all-for-bugreport first queries the existing
4017       // sessions and then issues individual clone requests specifying real
4018       // session IDs, setting args.{for_bugreport,skip_trace_filter}=true.
4019       PERFETTO_LOG("Looking for sessions for bugreport");
4020       session = FindTracingSessionWithMaxBugreportScore();
4021       if (!session) {
4022         return base::ErrStatus(
4023             "No tracing sessions eligible for bugreport found");
4024       }
4025       args.tsid = session->id;
4026       clone_target = FlushFlags::CloneTarget::kBugreport;
4027       args.skip_trace_filter = true;
4028     } else {
4029       session = GetTracingSession(args.tsid);
4030     }
4031   } else if (!args.unique_session_name.empty()) {
4032     session = GetTracingSessionByUniqueName(args.unique_session_name);
4033   }
4034 
4035   if (!session) {
4036     return base::ErrStatus("Tracing session not found");
4037   }
4038 
4039   // Skip the UID check for sessions marked with a bugreport_score > 0.
4040   // Those sessions, by design, can be stolen by any other consumer for the
4041   // sake of creating snapshots for bugreports.
4042   if (!session->IsCloneAllowed(consumer->uid_)) {
4043     return PERFETTO_SVC_ERR("Not allowed to clone a session from another UID");
4044   }
4045 
4046   // If any of the buffers are marked as clear_before_clone, reset them before
4047   // issuing the Flush(kCloneReason).
4048   size_t buf_idx = 0;
4049   for (BufferID src_buf_id : session->buffers_index) {
4050     if (!session->config.buffers()[buf_idx++].clear_before_clone())
4051       continue;
4052     auto buf_iter = buffers_.find(src_buf_id);
4053     PERFETTO_CHECK(buf_iter != buffers_.end());
4054     std::unique_ptr<TraceBuffer>& buf = buf_iter->second;
4055 
4056     // No need to reset the buffer if nothing has been written into it yet.
4057     // This is the canonical case if producers behive nicely and don't timeout
4058     // the handling of writes during the flush.
4059     // This check avoids a useless re-mmap upon every Clone() if the buffer is
4060     // already empty (when used in combination with `transfer_on_clone`).
4061     if (!buf->has_data())
4062       continue;
4063 
4064     // Some leftover data was left in the buffer. Recreate it to empty it.
4065     const auto buf_policy = buf->overwrite_policy();
4066     const auto buf_size = buf->size();
4067     std::unique_ptr<TraceBuffer> old_buf = std::move(buf);
4068     buf = TraceBuffer::Create(buf_size, buf_policy);
4069     if (!buf) {
4070       // This is extremely rare but could happen on 32-bit. If the new buffer
4071       // allocation failed, put back the buffer where it was and fail the clone.
4072       // We cannot leave the original tracing session buffer-less as it would
4073       // cause crashes when data sources commit new data.
4074       buf = std::move(old_buf);
4075       return base::ErrStatus(
4076           "Buffer allocation failed while attempting to clone");
4077     }
4078   }
4079 
4080   auto weak_consumer = consumer->GetWeakPtr();
4081 
4082   const PendingCloneID clone_id = session->last_pending_clone_id_++;
4083 
4084   auto& clone_op = session->pending_clones[clone_id];
4085   clone_op.pending_flush_cnt = 0;
4086   // Pre-initialize these vectors just as an optimization to avoid reallocations
4087   // in DoCloneBuffers().
4088   clone_op.buffers.reserve(session->buffers_index.size());
4089   clone_op.buffer_cloned_timestamps.reserve(session->buffers_index.size());
4090   clone_op.weak_consumer = weak_consumer;
4091   clone_op.skip_trace_filter = args.skip_trace_filter;
4092   if (!args.clone_trigger_name.empty()) {
4093     clone_op.clone_trigger = {args.clone_trigger_boot_time_ns,
4094                               args.clone_trigger_name,
4095                               args.clone_trigger_producer_name,
4096                               args.clone_trigger_trusted_producer_uid};
4097   }
4098 
4099   // Issue separate flush requests for separate buffer groups. The buffer marked
4100   // as transfer_on_clone will be flushed and cloned separately: even if they're
4101   // slower (like in the case of Winscope tracing), they will not delay the
4102   // snapshot of the other buffers.
4103   //
4104   // In the future we might want to split the buffer into more groups and maybe
4105   // allow this to be configurable.
4106   std::array<std::set<BufferID>, 2> bufs_groups;
4107   for (size_t i = 0; i < session->buffers_index.size(); i++) {
4108     if (session->config.buffers()[i].transfer_on_clone()) {
4109       bufs_groups[0].insert(session->buffers_index[i]);
4110     } else {
4111       bufs_groups[1].insert(session->buffers_index[i]);
4112     }
4113   }
4114 
4115   SnapshotLifecycleEvent(
4116       session, protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
4117       false /* snapshot_clocks */);
4118   clone_op.pending_flush_cnt = bufs_groups.size();
4119   clone_op.clone_started_timestamp_ns = clock_->GetBootTimeNs().count();
4120   for (const std::set<BufferID>& buf_group : bufs_groups) {
4121     FlushDataSourceInstances(
4122         session, 0,
4123         GetFlushableDataSourceInstancesForBuffers(session, buf_group),
4124         [tsid = session->id, clone_id, buf_group, this](bool final_flush) {
4125           OnFlushDoneForClone(tsid, clone_id, buf_group, final_flush);
4126         },
4127         FlushFlags(FlushFlags::Initiator::kTraced,
4128                    FlushFlags::Reason::kTraceClone, clone_target));
4129   }
4130 
4131   return base::OkStatus();
4132 }
4133 
4134 std::map<ProducerID, std::vector<DataSourceInstanceID>>
GetFlushableDataSourceInstancesForBuffers(TracingSession * session,const std::set<BufferID> & bufs)4135 TracingServiceImpl::GetFlushableDataSourceInstancesForBuffers(
4136     TracingSession* session,
4137     const std::set<BufferID>& bufs) {
4138   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
4139 
4140   for (const auto& [producer_id, ds_inst] : session->data_source_instances) {
4141     // TODO(ddiproietto): Consider if we should skip instances if ds_inst.state
4142     // != DataSourceInstance::STARTED
4143     if (ds_inst.no_flush) {
4144       continue;
4145     }
4146     if (!bufs.count(static_cast<BufferID>(ds_inst.config.target_buffer()))) {
4147       continue;
4148     }
4149     data_source_instances[producer_id].push_back(ds_inst.instance_id);
4150   }
4151 
4152   return data_source_instances;
4153 }
4154 
OnFlushDoneForClone(TracingSessionID tsid,PendingCloneID clone_id,const std::set<BufferID> & buf_ids,bool final_flush_outcome)4155 void TracingServiceImpl::OnFlushDoneForClone(TracingSessionID tsid,
4156                                              PendingCloneID clone_id,
4157                                              const std::set<BufferID>& buf_ids,
4158                                              bool final_flush_outcome) {
4159   TracingSession* src = GetTracingSession(tsid);
4160   // The session might be gone by the time we try to clone it.
4161   if (!src) {
4162     return;
4163   }
4164 
4165   auto it = src->pending_clones.find(clone_id);
4166   if (it == src->pending_clones.end()) {
4167     return;
4168   }
4169   auto& clone_op = it->second;
4170 
4171   if (final_flush_outcome == false) {
4172     clone_op.flush_failed = true;
4173   }
4174 
4175   base::Status result;
4176   base::Uuid uuid;
4177 
4178   // First clone the flushed TraceBuffer(s). This can fail because of ENOMEM. If
4179   // it happens bail out early before creating any session.
4180   if (!DoCloneBuffers(*src, buf_ids, &clone_op)) {
4181     result = PERFETTO_SVC_ERR("Buffer allocation failed");
4182   }
4183 
4184   if (result.ok()) {
4185     UpdateMemoryGuardrail();
4186 
4187     if (--clone_op.pending_flush_cnt != 0) {
4188       // Wait for more pending flushes.
4189       return;
4190     }
4191 
4192     PERFETTO_LOG("FlushAndCloneSession(%" PRIu64 ") started, success=%d", tsid,
4193                  final_flush_outcome);
4194 
4195     if (clone_op.weak_consumer) {
4196       result = FinishCloneSession(
4197           &*clone_op.weak_consumer, tsid, std::move(clone_op.buffers),
4198           std::move(clone_op.buffer_cloned_timestamps),
4199           clone_op.skip_trace_filter, !clone_op.flush_failed,
4200           clone_op.clone_trigger, &uuid, clone_op.clone_started_timestamp_ns);
4201     }
4202   }  // if (result.ok())
4203 
4204   if (clone_op.weak_consumer) {
4205     clone_op.weak_consumer->consumer_->OnSessionCloned(
4206         {result.ok(), result.message(), uuid});
4207   }
4208 
4209   src->pending_clones.erase(it);
4210   UpdateMemoryGuardrail();
4211 }
4212 
DoCloneBuffers(const TracingSession & src,const std::set<BufferID> & buf_ids,PendingClone * clone_op)4213 bool TracingServiceImpl::DoCloneBuffers(const TracingSession& src,
4214                                         const std::set<BufferID>& buf_ids,
4215                                         PendingClone* clone_op) {
4216   PERFETTO_DCHECK(src.num_buffers() == src.config.buffers().size());
4217   clone_op->buffers.resize(src.buffers_index.size());
4218   clone_op->buffer_cloned_timestamps.resize(src.buffers_index.size());
4219 
4220   int64_t now = clock_->GetBootTimeNs().count();
4221 
4222   for (size_t buf_idx = 0; buf_idx < src.buffers_index.size(); buf_idx++) {
4223     BufferID src_buf_id = src.buffers_index[buf_idx];
4224     if (buf_ids.count(src_buf_id) == 0)
4225       continue;
4226     auto buf_iter = buffers_.find(src_buf_id);
4227     PERFETTO_CHECK(buf_iter != buffers_.end());
4228     std::unique_ptr<TraceBuffer>& src_buf = buf_iter->second;
4229     std::unique_ptr<TraceBuffer> new_buf;
4230     if (src.config.buffers()[buf_idx].transfer_on_clone()) {
4231       const auto buf_policy = src_buf->overwrite_policy();
4232       const auto buf_size = src_buf->size();
4233       new_buf = std::move(src_buf);
4234       src_buf = TraceBuffer::Create(buf_size, buf_policy);
4235       if (!src_buf) {
4236         // If the allocation fails put the buffer back and let the code below
4237         // handle the failure gracefully.
4238         src_buf = std::move(new_buf);
4239       }
4240     } else {
4241       new_buf = src_buf->CloneReadOnly();
4242     }
4243     if (!new_buf.get()) {
4244       return false;
4245     }
4246     clone_op->buffers[buf_idx] = std::move(new_buf);
4247     clone_op->buffer_cloned_timestamps[buf_idx] = now;
4248   }
4249   return true;
4250 }
4251 
FinishCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID src_tsid,std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,std::vector<int64_t> buf_cloned_timestamps,bool skip_trace_filter,bool final_flush_outcome,std::optional<TriggerInfo> clone_trigger,base::Uuid * new_uuid,int64_t clone_started_timestamp_ns)4252 base::Status TracingServiceImpl::FinishCloneSession(
4253     ConsumerEndpointImpl* consumer,
4254     TracingSessionID src_tsid,
4255     std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,
4256     std::vector<int64_t> buf_cloned_timestamps,
4257     bool skip_trace_filter,
4258     bool final_flush_outcome,
4259     std::optional<TriggerInfo> clone_trigger,
4260     base::Uuid* new_uuid,
4261     int64_t clone_started_timestamp_ns) {
4262   PERFETTO_DLOG("CloneSession(%" PRIu64
4263                 ", skip_trace_filter=%d) started, consumer uid: %d",
4264                 src_tsid, skip_trace_filter, static_cast<int>(consumer->uid_));
4265 
4266   TracingSession* src = GetTracingSession(src_tsid);
4267 
4268   // The session might be gone by the time we try to clone it.
4269   if (!src)
4270     return PERFETTO_SVC_ERR("session not found");
4271 
4272   if (consumer->tracing_session_id_) {
4273     return PERFETTO_SVC_ERR(
4274         "The consumer is already attached to another tracing session");
4275   }
4276 
4277   std::vector<BufferID> buf_ids =
4278       buffer_ids_.AllocateMultiple(buf_snaps.size());
4279   if (buf_ids.size() != buf_snaps.size()) {
4280     return PERFETTO_SVC_ERR("Buffer id allocation failed");
4281   }
4282 
4283   PERFETTO_CHECK(std::none_of(
4284       buf_snaps.begin(), buf_snaps.end(),
4285       [](const std::unique_ptr<TraceBuffer>& buf) { return buf == nullptr; }));
4286 
4287   const TracingSessionID tsid = ++last_tracing_session_id_;
4288   TracingSession* cloned_session =
4289       &tracing_sessions_
4290            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
4291                     std::forward_as_tuple(tsid, consumer, src->config,
4292                                           weak_runner_.task_runner()))
4293            .first->second;
4294 
4295   // Generate a new UUID for the cloned session, but preserve the LSB. In some
4296   // contexts the LSB is used to tie the trace back to the statsd subscription
4297   // that triggered it. See the corresponding code in perfetto_cmd.cc which
4298   // reads at triggering_subscription_id().
4299   const int64_t orig_uuid_lsb = src->trace_uuid.lsb();
4300   cloned_session->state = TracingSession::CLONED_READ_ONLY;
4301   cloned_session->trace_uuid = base::Uuidv4();
4302   cloned_session->trace_uuid.set_lsb(orig_uuid_lsb);
4303   *new_uuid = cloned_session->trace_uuid;
4304 
4305   for (size_t i = 0; i < buf_snaps.size(); i++) {
4306     BufferID buf_global_id = buf_ids[i];
4307     std::unique_ptr<TraceBuffer>& buf = buf_snaps[i];
4308     // This is only needed for transfer_on_clone. Other buffers are already
4309     // marked as read-only by CloneReadOnly(). We cannot do this early because
4310     // in case of an allocation failure we will put std::move() the original
4311     // buffer back in its place and in that case should not be made read-only.
4312     buf->set_read_only();
4313     buffers_.emplace(buf_global_id, std::move(buf));
4314     cloned_session->buffers_index.emplace_back(buf_global_id);
4315   }
4316   UpdateMemoryGuardrail();
4317 
4318   // Copy over relevant state that we want to persist in the cloned session.
4319   // Mostly stats and metadata that is emitted in the trace file by the service.
4320   // Also clear the received trigger list in the main tracing session. A
4321   // CLONE_SNAPSHOT session can go in ring buffer mode for several hours and get
4322   // snapshotted several times. This causes two issues with `received_triggers`:
4323   // 1. Adding noise in the cloned trace emitting triggers that happened too
4324   //    far back (see b/290799105).
4325   // 2. Bloating memory (see b/290798988).
4326   cloned_session->should_emit_stats = true;
4327   cloned_session->clone_trigger = clone_trigger;
4328   cloned_session->received_triggers = std::move(src->received_triggers);
4329   src->received_triggers.clear();
4330   src->num_triggers_emitted_into_trace = 0;
4331   cloned_session->lifecycle_events =
4332       std::vector<TracingSession::LifecycleEvent>(src->lifecycle_events);
4333   cloned_session->slow_start_event = src->slow_start_event;
4334   cloned_session->last_flush_events = src->last_flush_events;
4335   cloned_session->initial_clock_snapshot = src->initial_clock_snapshot;
4336   cloned_session->clock_snapshot_ring_buffer = src->clock_snapshot_ring_buffer;
4337   cloned_session->invalid_packets = src->invalid_packets;
4338   cloned_session->flushes_requested = src->flushes_requested;
4339   cloned_session->flushes_succeeded = src->flushes_succeeded;
4340   cloned_session->flushes_failed = src->flushes_failed;
4341   cloned_session->compress_deflate = src->compress_deflate;
4342   if (src->trace_filter && !skip_trace_filter) {
4343     // Copy the trace filter, unless it's a clone-for-bugreport (b/317065412).
4344     cloned_session->trace_filter.reset(
4345         new protozero::MessageFilter(src->trace_filter->config()));
4346   }
4347 
4348   cloned_session->buffer_cloned_timestamps = std::move(buf_cloned_timestamps);
4349 
4350   SetSingleLifecycleEvent(
4351       cloned_session,
4352       protos::pbzero::TracingServiceEvent::kCloneStartedFieldNumber,
4353       clone_started_timestamp_ns);
4354 
4355   SnapshotLifecycleEvent(
4356       cloned_session,
4357       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
4358       true /* snapshot_clocks */);
4359 
4360   PERFETTO_DLOG("Consumer (uid:%d) cloned tracing session %" PRIu64
4361                 " -> %" PRIu64,
4362                 static_cast<int>(consumer->uid_), src_tsid, tsid);
4363 
4364   consumer->tracing_session_id_ = tsid;
4365   cloned_session->final_flush_outcome = final_flush_outcome
4366                                             ? TraceStats::FINAL_FLUSH_SUCCEEDED
4367                                             : TraceStats::FINAL_FLUSH_FAILED;
4368   return base::OkStatus();
4369 }
4370 
IsCloneAllowed(uid_t clone_uid) const4371 bool TracingServiceImpl::TracingSession::IsCloneAllowed(uid_t clone_uid) const {
4372   if (clone_uid == 0)
4373     return true;  // Root is always allowed to clone everything.
4374   if (clone_uid == this->consumer_uid)
4375     return true;  // Allow cloning if the uids match.
4376 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
4377   // On Android allow shell to clone sessions marked as exported for bugreport.
4378   // Dumpstate (invoked by adb bugreport) invokes commands as shell.
4379   if (clone_uid == AID_SHELL && this->config.bugreport_score() > 0)
4380     return true;
4381 #endif
4382   return false;
4383 }
4384 
4385 ////////////////////////////////////////////////////////////////////////////////
4386 // TracingServiceImpl::ConsumerEndpointImpl implementation
4387 ////////////////////////////////////////////////////////////////////////////////
4388 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)4389 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
4390     TracingServiceImpl* service,
4391     base::TaskRunner* task_runner,
4392     Consumer* consumer,
4393     uid_t uid)
4394     : task_runner_(task_runner),
4395       service_(service),
4396       consumer_(consumer),
4397       uid_(uid),
4398       weak_ptr_factory_(this) {}
4399 
~ConsumerEndpointImpl()4400 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
4401   service_->DisconnectConsumer(this);
4402   consumer_->OnDisconnect();
4403 }
4404 
NotifyOnTracingDisabled(const std::string & error)4405 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
4406     const std::string& error) {
4407   PERFETTO_DCHECK_THREAD(thread_checker_);
4408   task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr(),
4409                           error /* deliberate copy */] {
4410     if (weak_this)
4411       weak_this->consumer_->OnTracingDisabled(error);
4412   });
4413 }
4414 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)4415 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
4416     const TraceConfig& cfg,
4417     base::ScopedFile fd) {
4418   PERFETTO_DCHECK_THREAD(thread_checker_);
4419   auto status = service_->EnableTracing(this, cfg, std::move(fd));
4420   if (!status.ok())
4421     NotifyOnTracingDisabled(status.message());
4422 }
4423 
ChangeTraceConfig(const TraceConfig & cfg)4424 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
4425     const TraceConfig& cfg) {
4426   if (!tracing_session_id_) {
4427     PERFETTO_LOG(
4428         "Consumer called ChangeTraceConfig() but tracing was "
4429         "not active");
4430     return;
4431   }
4432   service_->ChangeTraceConfig(this, cfg);
4433 }
4434 
StartTracing()4435 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
4436   PERFETTO_DCHECK_THREAD(thread_checker_);
4437   if (!tracing_session_id_) {
4438     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
4439     return;
4440   }
4441   service_->StartTracing(tracing_session_id_);
4442 }
4443 
DisableTracing()4444 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
4445   PERFETTO_DCHECK_THREAD(thread_checker_);
4446   if (!tracing_session_id_) {
4447     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
4448     return;
4449   }
4450   service_->DisableTracing(tracing_session_id_);
4451 }
4452 
ReadBuffers()4453 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
4454   PERFETTO_DCHECK_THREAD(thread_checker_);
4455   if (!tracing_session_id_) {
4456     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
4457     consumer_->OnTraceData({}, /* has_more = */ false);
4458     return;
4459   }
4460   if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
4461     consumer_->OnTraceData({}, /* has_more = */ false);
4462   }
4463 }
4464 
FreeBuffers()4465 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
4466   PERFETTO_DCHECK_THREAD(thread_checker_);
4467   if (!tracing_session_id_) {
4468     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
4469     return;
4470   }
4471   service_->FreeBuffers(tracing_session_id_);
4472   tracing_session_id_ = 0;
4473 }
4474 
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)4475 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
4476                                                      FlushCallback callback,
4477                                                      FlushFlags flush_flags) {
4478   PERFETTO_DCHECK_THREAD(thread_checker_);
4479   if (!tracing_session_id_) {
4480     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
4481     return;
4482   }
4483   service_->Flush(tracing_session_id_, timeout_ms, callback, flush_flags);
4484 }
4485 
Detach(const std::string & key)4486 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
4487   PERFETTO_DCHECK_THREAD(thread_checker_);
4488   bool success = service_->DetachConsumer(this, key);
4489   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4490   task_runner_->PostTask([weak_this = std::move(weak_this), success] {
4491     if (weak_this)
4492       weak_this->consumer_->OnDetach(success);
4493   });
4494 }
4495 
Attach(const std::string & key)4496 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
4497   PERFETTO_DCHECK_THREAD(thread_checker_);
4498   bool success = service_->AttachConsumer(this, key);
4499   task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr(), success] {
4500     if (!weak_this)
4501       return;
4502     Consumer* consumer = weak_this->consumer_;
4503     TracingSession* session =
4504         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
4505     if (!session) {
4506       consumer->OnAttach(false, TraceConfig());
4507       return;
4508     }
4509     consumer->OnAttach(success, session->config);
4510   });
4511 }
4512 
GetTraceStats()4513 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
4514   PERFETTO_DCHECK_THREAD(thread_checker_);
4515   bool success = false;
4516   TraceStats stats;
4517   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4518   if (session) {
4519     success = true;
4520     stats = service_->GetTraceStats(session);
4521   }
4522   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4523   task_runner_->PostTask(
4524       [weak_this = std::move(weak_this), success, stats = std::move(stats)] {
4525         if (weak_this)
4526           weak_this->consumer_->OnTraceStats(success, stats);
4527       });
4528 }
4529 
ObserveEvents(uint32_t events_mask)4530 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
4531     uint32_t events_mask) {
4532   PERFETTO_DCHECK_THREAD(thread_checker_);
4533   observable_events_mask_ = events_mask;
4534   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4535   if (!session)
4536     return;
4537 
4538   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
4539     // Issue initial states.
4540     for (const auto& kv : session->data_source_instances) {
4541       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
4542       PERFETTO_DCHECK(producer);
4543       OnDataSourceInstanceStateChange(*producer, kv.second);
4544     }
4545   }
4546 
4547   // If the ObserveEvents() call happens after data sources have acked already
4548   // notify immediately.
4549   if (observable_events_mask_ &
4550       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
4551     service_->MaybeNotifyAllDataSourcesStarted(session);
4552   }
4553 }
4554 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)4555 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
4556     const ProducerEndpointImpl& producer,
4557     const DataSourceInstance& instance) {
4558   if (!(observable_events_mask_ &
4559         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
4560     return;
4561   }
4562 
4563   if (instance.state != DataSourceInstance::CONFIGURED &&
4564       instance.state != DataSourceInstance::STARTED &&
4565       instance.state != DataSourceInstance::STOPPED) {
4566     return;
4567   }
4568 
4569   auto* observable_events = AddObservableEvents();
4570   auto* change = observable_events->add_instance_state_changes();
4571   change->set_producer_name(producer.name_);
4572   change->set_data_source_name(instance.data_source_name);
4573   if (instance.state == DataSourceInstance::STARTED) {
4574     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
4575   } else {
4576     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
4577   }
4578 }
4579 
OnAllDataSourcesStarted()4580 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
4581   if (!(observable_events_mask_ &
4582         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
4583     return;
4584   }
4585   auto* observable_events = AddObservableEvents();
4586   observable_events->set_all_data_sources_started(true);
4587 }
4588 
NotifyCloneSnapshotTrigger(const TriggerInfo & trigger)4589 void TracingServiceImpl::ConsumerEndpointImpl::NotifyCloneSnapshotTrigger(
4590     const TriggerInfo& trigger) {
4591   if (!(observable_events_mask_ & ObservableEvents::TYPE_CLONE_TRIGGER_HIT)) {
4592     return;
4593   }
4594   auto* observable_events = AddObservableEvents();
4595   auto* clone_trig = observable_events->mutable_clone_trigger_hit();
4596   clone_trig->set_tracing_session_id(static_cast<int64_t>(tracing_session_id_));
4597   clone_trig->set_trigger_name(trigger.trigger_name);
4598   clone_trig->set_producer_name(trigger.producer_name);
4599   clone_trig->set_producer_uid(static_cast<uint32_t>(trigger.producer_uid));
4600   clone_trig->set_boot_time_ns(trigger.boot_time_ns);
4601 }
4602 
4603 ObservableEvents*
AddObservableEvents()4604 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
4605   PERFETTO_DCHECK_THREAD(thread_checker_);
4606   if (!observable_events_) {
4607     observable_events_.reset(new ObservableEvents());
4608     task_runner_->PostTask([weak_this = weak_ptr_factory_.GetWeakPtr()] {
4609       if (!weak_this)
4610         return;
4611 
4612       // Move into a temporary to allow reentrancy in OnObservableEvents.
4613       auto observable_events = std::move(weak_this->observable_events_);
4614       weak_this->consumer_->OnObservableEvents(*observable_events);
4615     });
4616   }
4617   return observable_events_.get();
4618 }
4619 
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)4620 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
4621     QueryServiceStateArgs args,
4622     QueryServiceStateCallback callback) {
4623   PERFETTO_DCHECK_THREAD(thread_checker_);
4624   TracingServiceState svc_state;
4625 
4626   const auto& sessions = service_->tracing_sessions_;
4627   svc_state.set_tracing_service_version(base::GetVersionString());
4628   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
4629 
4630   int num_started = 0;
4631   for (const auto& kv : sessions)
4632     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
4633   svc_state.set_num_sessions_started(num_started);
4634 
4635   for (const auto& kv : service_->producers_) {
4636     if (args.sessions_only)
4637       break;
4638     auto* producer = svc_state.add_producers();
4639     producer->set_id(static_cast<int>(kv.first));
4640     producer->set_name(kv.second->name_);
4641     producer->set_sdk_version(kv.second->sdk_version_);
4642     producer->set_uid(static_cast<int32_t>(kv.second->uid()));
4643     producer->set_pid(static_cast<int32_t>(kv.second->pid()));
4644     producer->set_frozen(kv.second->IsAndroidProcessFrozen());
4645   }
4646 
4647   for (const auto& kv : service_->data_sources_) {
4648     if (args.sessions_only)
4649       break;
4650     const auto& registered_data_source = kv.second;
4651     auto* data_source = svc_state.add_data_sources();
4652     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
4653     data_source->set_producer_id(
4654         static_cast<int>(registered_data_source.producer_id));
4655   }
4656 
4657   svc_state.set_supports_tracing_sessions(true);
4658   for (const auto& kv : service_->tracing_sessions_) {
4659     const TracingSession& s = kv.second;
4660     if (!s.IsCloneAllowed(uid_))
4661       continue;
4662     auto* session = svc_state.add_tracing_sessions();
4663     session->set_id(s.id);
4664     session->set_consumer_uid(static_cast<int>(s.consumer_uid));
4665     session->set_duration_ms(s.config.duration_ms());
4666     session->set_num_data_sources(
4667         static_cast<uint32_t>(s.data_source_instances.size()));
4668     session->set_unique_session_name(s.config.unique_session_name());
4669     if (s.config.has_bugreport_score())
4670       session->set_bugreport_score(s.config.bugreport_score());
4671     if (s.config.has_bugreport_filename())
4672       session->set_bugreport_filename(s.config.bugreport_filename());
4673     for (const auto& snap_kv : s.initial_clock_snapshot) {
4674       if (snap_kv.clock_id == protos::pbzero::BUILTIN_CLOCK_REALTIME)
4675         session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.timestamp));
4676     }
4677     for (const auto& buf : s.config.buffers())
4678       session->add_buffer_size_kb(buf.size_kb());
4679 
4680     switch (s.state) {
4681       case TracingSession::State::DISABLED:
4682         session->set_state("DISABLED");
4683         break;
4684       case TracingSession::State::CONFIGURED:
4685         session->set_state("CONFIGURED");
4686         break;
4687       case TracingSession::State::STARTED:
4688         session->set_is_started(true);
4689         session->set_state("STARTED");
4690         break;
4691       case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
4692         session->set_state("STOP_WAIT");
4693         break;
4694       case TracingSession::State::CLONED_READ_ONLY:
4695         session->set_state("CLONED_READ_ONLY");
4696         break;
4697     }
4698   }
4699   callback(/*success=*/true, svc_state);
4700 }
4701 
QueryCapabilities(QueryCapabilitiesCallback callback)4702 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
4703     QueryCapabilitiesCallback callback) {
4704   PERFETTO_DCHECK_THREAD(thread_checker_);
4705   TracingServiceCapabilities caps;
4706   caps.set_has_query_capabilities(true);
4707   caps.set_has_trace_config_output_path(true);
4708   caps.set_has_clone_session(true);
4709   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
4710   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
4711   caps.add_observable_events(ObservableEvents::TYPE_CLONE_TRIGGER_HIT);
4712   static_assert(
4713       ObservableEvents::Type_MAX == ObservableEvents::TYPE_CLONE_TRIGGER_HIT,
4714       "");
4715   callback(caps);
4716 }
4717 
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)4718 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
4719     SaveTraceForBugreportCallback consumer_callback) {
4720   consumer_callback(false,
4721                     "SaveTraceForBugreport is deprecated. Use "
4722                     "CloneSession(kBugreportSessionId) instead.");
4723 }
4724 
CloneSession(CloneSessionArgs args)4725 void TracingServiceImpl::ConsumerEndpointImpl::CloneSession(
4726     CloneSessionArgs args) {
4727   PERFETTO_DCHECK_THREAD(thread_checker_);
4728   // FlushAndCloneSession will call OnSessionCloned after the async flush.
4729   base::Status result = service_->FlushAndCloneSession(this, std::move(args));
4730 
4731   if (!result.ok()) {
4732     consumer_->OnSessionCloned({false, result.message(), {}});
4733   }
4734 }
4735 
4736 ////////////////////////////////////////////////////////////////////////////////
4737 // TracingServiceImpl::ProducerEndpointImpl implementation
4738 ////////////////////////////////////////////////////////////////////////////////
4739 
ProducerEndpointImpl(ProducerID id,const ClientIdentity & client_identity,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)4740 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
4741     ProducerID id,
4742     const ClientIdentity& client_identity,
4743     TracingServiceImpl* service,
4744     base::TaskRunner* task_runner,
4745     Producer* producer,
4746     const std::string& producer_name,
4747     const std::string& sdk_version,
4748     bool in_process,
4749     bool smb_scraping_enabled)
4750     : id_(id),
4751       client_identity_(client_identity),
4752       service_(service),
4753       producer_(producer),
4754       name_(producer_name),
4755       sdk_version_(sdk_version),
4756       in_process_(in_process),
4757       smb_scraping_enabled_(smb_scraping_enabled),
4758       weak_runner_(task_runner) {}
4759 
~ProducerEndpointImpl()4760 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
4761   service_->DisconnectProducer(id_);
4762   producer_->OnDisconnect();
4763 }
4764 
Disconnect()4765 void TracingServiceImpl::ProducerEndpointImpl::Disconnect() {
4766   PERFETTO_DCHECK_THREAD(thread_checker_);
4767   // Disconnection is only supported via destroying the ProducerEndpoint.
4768   PERFETTO_FATAL("Not supported");
4769 }
4770 
RegisterDataSource(const DataSourceDescriptor & desc)4771 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
4772     const DataSourceDescriptor& desc) {
4773   PERFETTO_DCHECK_THREAD(thread_checker_);
4774   service_->RegisterDataSource(id_, desc);
4775 }
4776 
UpdateDataSource(const DataSourceDescriptor & desc)4777 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
4778     const DataSourceDescriptor& desc) {
4779   PERFETTO_DCHECK_THREAD(thread_checker_);
4780   service_->UpdateDataSource(id_, desc);
4781 }
4782 
UnregisterDataSource(const std::string & name)4783 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
4784     const std::string& name) {
4785   PERFETTO_DCHECK_THREAD(thread_checker_);
4786   service_->UnregisterDataSource(id_, name);
4787 }
4788 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)4789 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
4790     uint32_t writer_id,
4791     uint32_t target_buffer) {
4792   PERFETTO_DCHECK_THREAD(thread_checker_);
4793   writers_[static_cast<WriterID>(writer_id)] =
4794       static_cast<BufferID>(target_buffer);
4795 }
4796 
UnregisterTraceWriter(uint32_t writer_id)4797 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
4798     uint32_t writer_id) {
4799   PERFETTO_DCHECK_THREAD(thread_checker_);
4800   writers_.erase(static_cast<WriterID>(writer_id));
4801 }
4802 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)4803 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
4804     const CommitDataRequest& req_untrusted,
4805     CommitDataCallback callback) {
4806   PERFETTO_DCHECK_THREAD(thread_checker_);
4807 
4808   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
4809     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
4810                                EncodeCommitDataRequest(id_, req_untrusted));
4811   }
4812 
4813   if (!shared_memory_) {
4814     PERFETTO_DLOG(
4815         "Attempted to commit data before the shared memory was allocated.");
4816     return;
4817   }
4818   PERFETTO_DCHECK(shmem_abi_.is_valid());
4819   for (const auto& entry : req_untrusted.chunks_to_move()) {
4820     const uint32_t page_idx = entry.page();
4821     if (page_idx >= shmem_abi_.num_pages())
4822       continue;  // A buggy or malicious producer.
4823 
4824     SharedMemoryABI::Chunk chunk;
4825     bool commit_data_over_ipc = entry.has_data();
4826     if (PERFETTO_UNLIKELY(commit_data_over_ipc)) {
4827       // Chunk data is passed over the wire. Create a chunk using the serialized
4828       // protobuf message.
4829       const std::string& data = entry.data();
4830       if (data.size() > SharedMemoryABI::Chunk::kMaxSize) {
4831         PERFETTO_DFATAL("IPC data commit too large: %zu", data.size());
4832         continue;  // A malicious or buggy producer
4833       }
4834       // |data| is not altered, but we need to const_cast becasue Chunk data
4835       // members are non-const.
4836       chunk = SharedMemoryABI::MakeChunkFromSerializedData(
4837           reinterpret_cast<uint8_t*>(const_cast<char*>(data.data())),
4838           static_cast<uint16_t>(entry.data().size()),
4839           static_cast<uint8_t>(entry.chunk()));
4840     } else
4841       chunk = shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
4842     if (!chunk.is_valid()) {
4843       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
4844                     entry.page(), entry.chunk());
4845       continue;
4846     }
4847 
4848     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
4849     // the ABI contract expects the producer to not touch the chunk anymore
4850     // (until the service marks that as free). This is why all the reads below
4851     // are just memory_order_relaxed. Also, the code here assumes that all this
4852     // data can be malicious and just gives up if anything is malformed.
4853     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
4854     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
4855     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
4856     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
4857     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
4858     uint16_t num_fragments = packets.count;
4859     uint8_t chunk_flags = packets.flags;
4860 
4861     service_->CopyProducerPageIntoLogBuffer(
4862         id_, client_identity_, writer_id, chunk_id, buffer_id, num_fragments,
4863         chunk_flags,
4864         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
4865 
4866     if (!commit_data_over_ipc) {
4867       // This one has release-store semantics.
4868       shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
4869     }
4870   }  // for(chunks_to_move)
4871 
4872   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
4873 
4874   if (req_untrusted.flush_request_id()) {
4875     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
4876   }
4877 
4878   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
4879   // callback being invoked within the same callstack and not posted. If this
4880   // changes, the code there needs to be changed accordingly.
4881   if (callback)
4882     callback();
4883 }
4884 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)4885 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
4886     std::unique_ptr<SharedMemory> shared_memory,
4887     size_t page_size_bytes,
4888     bool provided_by_producer) {
4889   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
4890   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
4891 
4892   shared_memory_ = std::move(shared_memory);
4893   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
4894   is_shmem_provided_by_producer_ = provided_by_producer;
4895 
4896   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
4897                         shared_memory_->size(),
4898                         shared_buffer_page_size_kb() * 1024,
4899                         SharedMemoryABI::ShmemMode::kDefault);
4900   if (in_process_) {
4901     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
4902         shared_memory_->start(), shared_memory_->size(),
4903         SharedMemoryABI::ShmemMode::kDefault,
4904         shared_buffer_page_size_kb_ * 1024, this, weak_runner_.task_runner()));
4905     inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
4906   }
4907 
4908   OnTracingSetup();
4909   service_->UpdateMemoryGuardrail();
4910 }
4911 
shared_memory() const4912 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
4913   PERFETTO_DCHECK_THREAD(thread_checker_);
4914   return shared_memory_.get();
4915 }
4916 
shared_buffer_page_size_kb() const4917 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
4918     const {
4919   return shared_buffer_page_size_kb_;
4920 }
4921 
ActivateTriggers(const std::vector<std::string> & triggers)4922 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
4923     const std::vector<std::string>& triggers) {
4924   service_->ActivateTriggers(id_, triggers);
4925 }
4926 
StopDataSource(DataSourceInstanceID ds_inst_id)4927 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
4928     DataSourceInstanceID ds_inst_id) {
4929   // TODO(primiano): When we'll support tearing down the SMB, at this point we
4930   // should send the Producer a TearDownTracing if all its data sources have
4931   // been disabled (see b/77532839 and aosp/655179 PS1).
4932   PERFETTO_DCHECK_THREAD(thread_checker_);
4933   weak_runner_.PostTask(
4934       [this, ds_inst_id] { producer_->StopDataSource(ds_inst_id); });
4935 }
4936 
4937 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()4938 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
4939   if (!inproc_shmem_arbiter_) {
4940     PERFETTO_FATAL(
4941         "The in-process SharedMemoryArbiter can only be used when "
4942         "CreateProducer has been called with in_process=true and after tracing "
4943         "has started.");
4944   }
4945 
4946   PERFETTO_DCHECK(in_process_);
4947   return inproc_shmem_arbiter_.get();
4948 }
4949 
IsShmemProvidedByProducer() const4950 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
4951     const {
4952   return is_shmem_provided_by_producer_;
4953 }
4954 
4955 // Can be called on any thread.
4956 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4957 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4958     BufferID buf_id,
4959     BufferExhaustedPolicy buffer_exhausted_policy) {
4960   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4961   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4962                                                        buffer_exhausted_policy);
4963 }
4964 
NotifyFlushComplete(FlushRequestID id)4965 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4966     FlushRequestID id) {
4967   PERFETTO_DCHECK_THREAD(thread_checker_);
4968   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4969   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4970 }
4971 
OnTracingSetup()4972 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4973   weak_runner_.PostTask([this] { producer_->OnTracingSetup(); });
4974 }
4975 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources,FlushFlags flush_flags)4976 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4977     FlushRequestID flush_request_id,
4978     const std::vector<DataSourceInstanceID>& data_sources,
4979     FlushFlags flush_flags) {
4980   PERFETTO_DCHECK_THREAD(thread_checker_);
4981   weak_runner_.PostTask([this, flush_request_id, data_sources, flush_flags] {
4982     producer_->Flush(flush_request_id, data_sources.data(), data_sources.size(),
4983                      flush_flags);
4984   });
4985 }
4986 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4987 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4988     DataSourceInstanceID ds_id,
4989     const DataSourceConfig& config) {
4990   PERFETTO_DCHECK_THREAD(thread_checker_);
4991   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4992   weak_runner_.PostTask([this, ds_id, config] {
4993     producer_->SetupDataSource(ds_id, std::move(config));
4994   });
4995 }
4996 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4997 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4998     DataSourceInstanceID ds_id,
4999     const DataSourceConfig& config) {
5000   PERFETTO_DCHECK_THREAD(thread_checker_);
5001   weak_runner_.PostTask([this, ds_id, config] {
5002     producer_->StartDataSource(ds_id, std::move(config));
5003   });
5004 }
5005 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)5006 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
5007     DataSourceInstanceID data_source_id) {
5008   PERFETTO_DCHECK_THREAD(thread_checker_);
5009   service_->NotifyDataSourceStarted(id_, data_source_id);
5010 }
5011 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)5012 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
5013     DataSourceInstanceID data_source_id) {
5014   PERFETTO_DCHECK_THREAD(thread_checker_);
5015   service_->NotifyDataSourceStopped(id_, data_source_id);
5016 }
5017 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)5018 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
5019     const std::vector<BufferID>& target_buffers) {
5020   if (allowed_target_buffers_.empty())
5021     return;
5022   for (BufferID buffer : target_buffers)
5023     allowed_target_buffers_.erase(buffer);
5024 }
5025 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)5026 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
5027     const std::vector<DataSourceInstanceID>& data_sources) {
5028   PERFETTO_DCHECK_THREAD(thread_checker_);
5029   weak_runner_.PostTask([this, data_sources] {
5030     base::StringView producer_name(name_);
5031     producer_->ClearIncrementalState(data_sources.data(), data_sources.size());
5032   });
5033 }
5034 
Sync(std::function<void ()> callback)5035 void TracingServiceImpl::ProducerEndpointImpl::Sync(
5036     std::function<void()> callback) {
5037   weak_runner_.task_runner()->PostTask(callback);
5038 }
5039 
IsAndroidProcessFrozen()5040 bool TracingServiceImpl::ProducerEndpointImpl::IsAndroidProcessFrozen() {
5041 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
5042   if (in_process_ || uid() == base::kInvalidUid || pid() == base::kInvalidPid)
5043     return false;
5044 
5045   // As per aosp/3406861, there are three possible mount points for the cgroup.
5046   // Look at all of them.
5047   // - Historically everything was in /uid_xxx/pid_yyy (and still is if
5048   //   PRODUCT_CGROUP_V2_SYS_APP_ISOLATION_ENABLED = false)
5049   // - cgroup isolation introduces /apps /system subdirectories.
5050   base::StackString<255> path_v1(
5051       "/sys/fs/cgroup/uid_%" PRIu32 "/pid_%" PRIu32 "/cgroup.freeze",
5052       static_cast<uint32_t>(uid()), static_cast<uint32_t>(pid()));
5053   base::StackString<255> path_v2_app(
5054       "/sys/fs/cgroup/apps/uid_%" PRIu32 "/pid_%" PRIu32 "/cgroup.freeze",
5055       static_cast<uint32_t>(uid()), static_cast<uint32_t>(pid()));
5056   base::StackString<255> path_v2_system(
5057       "/sys/fs/cgroup/system/uid_%" PRIu32 "/pid_%" PRIu32 "/cgroup.freeze",
5058       static_cast<uint32_t>(uid()), static_cast<uint32_t>(pid()));
5059   const char* paths[] = {path_v1.c_str(), path_v2_app.c_str(),
5060                          path_v2_system.c_str()};
5061 
5062   for (const char* path : paths) {
5063     char frozen = '0';
5064     auto fd = base::OpenFile(path, O_RDONLY);
5065     ssize_t rsize = 0;
5066     if (fd) {
5067       rsize = base::Read(*fd, &frozen, sizeof(frozen));
5068       if (rsize > 0) {
5069         return frozen == '1';
5070       }
5071     }
5072   }
5073   PERFETTO_DLOG("Failed to read cgroup.freeze from [%s, %s, %s]",
5074                 path_v1.c_str(), path_v2_app.c_str(), path_v2_system.c_str());
5075 
5076 #endif
5077   return false;
5078 }
5079 
5080 ////////////////////////////////////////////////////////////////////////////////
5081 // TracingServiceImpl::TracingSession implementation
5082 ////////////////////////////////////////////////////////////////////////////////
5083 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)5084 TracingServiceImpl::TracingSession::TracingSession(
5085     TracingSessionID session_id,
5086     ConsumerEndpointImpl* consumer,
5087     const TraceConfig& new_config,
5088     base::TaskRunner* task_runner)
5089     : id(session_id),
5090       consumer_maybe_null(consumer),
5091       consumer_uid(consumer->uid_),
5092       config(new_config),
5093       snapshot_periodic_task(task_runner),
5094       timed_stop_task(task_runner) {
5095   // all_data_sources_flushed (and flush_started) is special because we store up
5096   // to 64 events of this type. Other events will go through the default case in
5097   // SnapshotLifecycleEvent() where they will be given a max history of 1.
5098   lifecycle_events.emplace_back(
5099       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
5100       64 /* max_size */);
5101   lifecycle_events.emplace_back(
5102       protos::pbzero::TracingServiceEvent::kFlushStartedFieldNumber,
5103       64 /* max_size */);
5104 }
5105 
5106 ////////////////////////////////////////////////////////////////////////////////
5107 // TracingServiceImpl::RelayEndpointImpl implementation
5108 ////////////////////////////////////////////////////////////////////////////////
RelayEndpointImpl(RelayClientID relay_client_id,TracingServiceImpl * service)5109 TracingServiceImpl::RelayEndpointImpl::RelayEndpointImpl(
5110     RelayClientID relay_client_id,
5111     TracingServiceImpl* service)
5112     : relay_client_id_(relay_client_id), service_(service) {}
5113 TracingServiceImpl::RelayEndpointImpl::~RelayEndpointImpl() = default;
5114 
SyncClocks(SyncMode sync_mode,base::ClockSnapshotVector client_clocks,base::ClockSnapshotVector host_clocks)5115 void TracingServiceImpl::RelayEndpointImpl::SyncClocks(
5116     SyncMode sync_mode,
5117     base::ClockSnapshotVector client_clocks,
5118     base::ClockSnapshotVector host_clocks) {
5119   // We keep only the most recent 5 clock sync snapshots.
5120   static constexpr size_t kNumSyncClocks = 5;
5121   if (synced_clocks_.size() >= kNumSyncClocks)
5122     synced_clocks_.pop_front();
5123 
5124   synced_clocks_.emplace_back(sync_mode, std::move(client_clocks),
5125                               std::move(host_clocks));
5126 }
5127 
Disconnect()5128 void TracingServiceImpl::RelayEndpointImpl::Disconnect() {
5129   service_->DisconnectRelayClient(relay_client_id_);
5130 }
5131 
5132 }  // namespace perfetto
5133