• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/service/tracing_service_impl.h"
18 
19 #include <limits.h>
20 #include <string.h>
21 
22 #include <cinttypes>
23 #include <cstdint>
24 #include <limits>
25 #include <optional>
26 #include <regex>
27 #include <string>
28 #include <unordered_set>
29 #include "perfetto/base/time.h"
30 #include "perfetto/ext/tracing/core/client_identity.h"
31 #include "perfetto/tracing/core/clock_snapshots.h"
32 
33 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
34     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
35 #include <sys/uio.h>
36 #include <sys/utsname.h>
37 #include <unistd.h>
38 #endif
39 
40 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
41     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
42 #include "src/android_internal/lazy_library_loader.h"    // nogncheck
43 #include "src/android_internal/tracing_service_proxy.h"  // nogncheck
44 #endif
45 
46 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
47     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
48     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
49 #define PERFETTO_HAS_CHMOD
50 #include <sys/stat.h>
51 #endif
52 
53 #include <algorithm>
54 
55 #include "perfetto/base/build_config.h"
56 #include "perfetto/base/status.h"
57 #include "perfetto/base/task_runner.h"
58 #include "perfetto/ext/base/android_utils.h"
59 #include "perfetto/ext/base/file_utils.h"
60 #include "perfetto/ext/base/metatrace.h"
61 #include "perfetto/ext/base/string_utils.h"
62 #include "perfetto/ext/base/string_view.h"
63 #include "perfetto/ext/base/sys_types.h"
64 #include "perfetto/ext/base/utils.h"
65 #include "perfetto/ext/base/uuid.h"
66 #include "perfetto/ext/base/version.h"
67 #include "perfetto/ext/base/watchdog.h"
68 #include "perfetto/ext/tracing/core/basic_types.h"
69 #include "perfetto/ext/tracing/core/consumer.h"
70 #include "perfetto/ext/tracing/core/observable_events.h"
71 #include "perfetto/ext/tracing/core/producer.h"
72 #include "perfetto/ext/tracing/core/shared_memory.h"
73 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
74 #include "perfetto/ext/tracing/core/trace_packet.h"
75 #include "perfetto/ext/tracing/core/trace_writer.h"
76 #include "perfetto/protozero/scattered_heap_buffer.h"
77 #include "perfetto/protozero/static_buffer.h"
78 #include "perfetto/tracing/core/data_source_descriptor.h"
79 #include "perfetto/tracing/core/tracing_service_capabilities.h"
80 #include "perfetto/tracing/core/tracing_service_state.h"
81 #include "src/android_stats/statsd_logging_helper.h"
82 #include "src/protozero/filtering/message_filter.h"
83 #include "src/protozero/filtering/string_filter.h"
84 #include "src/tracing/core/shared_memory_arbiter_impl.h"
85 #include "src/tracing/service/packet_stream_validator.h"
86 #include "src/tracing/service/trace_buffer.h"
87 
88 #include "protos/perfetto/common/builtin_clock.gen.h"
89 #include "protos/perfetto/common/builtin_clock.pbzero.h"
90 #include "protos/perfetto/common/trace_stats.pbzero.h"
91 #include "protos/perfetto/config/trace_config.pbzero.h"
92 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
93 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
94 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
95 #include "protos/perfetto/trace/system_info.pbzero.h"
96 #include "protos/perfetto/trace/trace_packet.pbzero.h"
97 #include "protos/perfetto/trace/trace_uuid.pbzero.h"
98 #include "protos/perfetto/trace/trigger.pbzero.h"
99 
100 // General note: this class must assume that Producers are malicious and will
101 // try to crash / exploit this class. We can trust pointers because they come
102 // from the IPC layer, but we should never assume that that the producer calls
103 // come in the right order or their arguments are sane / within bounds.
104 
105 // This is a macro because we want the call-site line number for the ELOG.
106 #define PERFETTO_SVC_ERR(...) \
107   (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
108 
109 namespace perfetto {
110 
111 namespace {
112 constexpr int kMaxBuffersPerConsumer = 128;
113 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
114 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
115 constexpr int kMaxConcurrentTracingSessions = 15;
116 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
117 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
118 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
119 
120 constexpr uint32_t kMillisPerHour = 3600000;
121 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
122 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
123 
124 // These apply only if enable_extra_guardrails is true.
125 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
126 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
127 
128 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
129 struct iovec {
130   void* iov_base;  // Address
131   size_t iov_len;  // Block size
132 };
133 
134 // Simple implementation of writev. Note that this does not give the atomicity
135 // guarantees of a real writev, but we don't depend on these (we aren't writing
136 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)137 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
138   ssize_t total_size = 0;
139   for (int i = 0; i < iovcnt; ++i) {
140     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
141     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
142       return -1;
143     total_size += current_size;
144   }
145   return total_size;
146 }
147 
148 #define IOV_MAX 1024  // Linux compatible limit.
149 
150 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
151         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
152 
153 // Partially encodes a CommitDataRequest in an int32 for the purposes of
154 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
155 // (which is technically 16 bits wide).
156 //
157 // Format (by bit range):
158 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
159 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)160 int32_t EncodeCommitDataRequest(ProducerID producer_id,
161                                 const CommitDataRequest& req_untrusted) {
162   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
163   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
164   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
165 
166   uint32_t mask = (1 << 10) - 1;
167   uint32_t acc = 0;
168   acc |= has_flush_id << 30;
169   acc |= (cpatch & mask) << 20;
170   acc |= (cmov & mask) << 10;
171   acc |= (producer_id & mask);
172   return static_cast<int32_t>(acc);
173 }
174 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)175 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
176                               std::vector<uint8_t> packet) {
177   Slice slice = Slice::Allocate(packet.size());
178   memcpy(slice.own_data(), packet.data(), packet.size());
179   packets->emplace_back();
180   packets->back().AddSlice(std::move(slice));
181 }
182 
EnsureValidShmSizes(size_t shm_size,size_t page_size)183 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
184     size_t shm_size,
185     size_t page_size) {
186   // Theoretically the max page size supported by the ABI is 64KB.
187   // However, the current implementation of TraceBuffer (the non-shared
188   // userspace buffer where the service copies data) supports at most
189   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
190   // but then causes the data to be discarded when copying it into
191   // TraceBuffer.
192   constexpr size_t kMaxPageSize = 32 * 1024;
193   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
194 
195   if (page_size == 0)
196     page_size = TracingServiceImpl::kDefaultShmPageSize;
197   if (shm_size == 0)
198     shm_size = TracingServiceImpl::kDefaultShmSize;
199 
200   page_size = std::min<size_t>(page_size, kMaxPageSize);
201   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
202 
203   // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
204   // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
205   // matter here, because the tracing page size is just a logical partitioning
206   // and does not have any dependencies on kernel mm syscalls (read: it's fine
207   // to have trace page sizes of 4K on a system where the kernel page size is
208   // 16K).
209   bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
210   page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
211 
212   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
213   size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
214   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
215 
216   if (!page_size_is_valid || shm_size < page_size ||
217       shm_size % page_size != 0) {
218     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
219                            TracingServiceImpl::kDefaultShmPageSize);
220   }
221   return std::make_tuple(shm_size, page_size);
222 }
223 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)224 bool NameMatchesFilter(const std::string& name,
225                        const std::vector<std::string>& name_filter,
226                        const std::vector<std::string>& name_regex_filter) {
227   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
228   if (!filter_is_set)
229     return true;
230   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
231                                   name) != name_filter.end();
232   bool filter_regex_matches =
233       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
234                    [&](const std::string& regex) {
235                      return std::regex_match(
236                          name, std::regex(regex, std::regex::extended));
237                    }) != name_regex_filter.end();
238   return filter_matches || filter_regex_matches;
239 }
240 
241 // Used when TraceConfig.write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path,bool overwrite)242 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
243 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
244     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
245   // This is NOT trying to preserve any security property, SELinux does that.
246   // It just improves the actionability of the error when people try to save the
247   // trace in a location that is not SELinux-allowed (a generic "permission
248   // denied" vs "don't put it here, put it there").
249   // These are the only SELinux approved dir for trace files that are created
250   // directly by traced.
251   static const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
252   if (!base::StartsWith(path, kTraceDirBasePath)) {
253     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
254                   path.c_str(), kTraceDirBasePath);
255     return base::ScopedFile();
256   }
257 #endif
258   // O_CREAT | O_EXCL will fail if the file exists already.
259   const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
260   auto fd = base::OpenFile(path, flags, 0600);
261   if (fd) {
262 #if defined(PERFETTO_HAS_CHMOD)
263     // Passing 0644 directly above won't work because of umask.
264     PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
265 #endif
266   } else {
267     PERFETTO_PLOG("Failed to create %s", path.c_str());
268   }
269   return fd;
270 }
271 
ShouldLogEvent(const TraceConfig & cfg)272 bool ShouldLogEvent(const TraceConfig& cfg) {
273   switch (cfg.statsd_logging()) {
274     case TraceConfig::STATSD_LOGGING_ENABLED:
275       return true;
276     case TraceConfig::STATSD_LOGGING_DISABLED:
277       return false;
278     case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
279       break;
280   }
281   // For backward compatibility with older versions of perfetto_cmd.
282   return cfg.enable_extra_guardrails();
283 }
284 
285 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
286 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)287 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
288                                size_t size,
289                                size_t max_slice_size,
290                                perfetto::TracePacket* packet) {
291   if (size <= max_slice_size) {
292     packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
293     return;
294   }
295   uint8_t* src_ptr = data.get();
296   for (size_t size_left = size; size_left > 0;) {
297     const size_t slice_size = std::min(size_left, max_slice_size);
298 
299     Slice slice = Slice::Allocate(slice_size);
300     memcpy(slice.own_data(), src_ptr, slice_size);
301     packet->AddSlice(std::move(slice));
302 
303     src_ptr += slice_size;
304     size_left -= slice_size;
305   }
306 }
307 
308 using TraceFilter = protos::gen::TraceConfig::TraceFilter;
ConvertPolicy(TraceFilter::StringFilterPolicy policy)309 std::optional<protozero::StringFilter::Policy> ConvertPolicy(
310     TraceFilter::StringFilterPolicy policy) {
311   switch (policy) {
312     case TraceFilter::SFP_UNSPECIFIED:
313       return std::nullopt;
314     case TraceFilter::SFP_MATCH_REDACT_GROUPS:
315       return protozero::StringFilter::Policy::kMatchRedactGroups;
316     case TraceFilter::SFP_ATRACE_MATCH_REDACT_GROUPS:
317       return protozero::StringFilter::Policy::kAtraceMatchRedactGroups;
318     case TraceFilter::SFP_MATCH_BREAK:
319       return protozero::StringFilter::Policy::kMatchBreak;
320     case TraceFilter::SFP_ATRACE_MATCH_BREAK:
321       return protozero::StringFilter::Policy::kAtraceMatchBreak;
322     case TraceFilter::SFP_ATRACE_REPEATED_SEARCH_REDACT_GROUPS:
323       return protozero::StringFilter::Policy::kAtraceRepeatedSearchRedactGroups;
324   }
325   return std::nullopt;
326 }
327 
328 }  // namespace
329 
330 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)331 std::unique_ptr<TracingService> TracingService::CreateInstance(
332     std::unique_ptr<SharedMemory::Factory> shm_factory,
333     base::TaskRunner* task_runner,
334     InitOpts init_opts) {
335   return std::unique_ptr<TracingService>(
336       new TracingServiceImpl(std::move(shm_factory), task_runner, init_opts));
337 }
338 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)339 TracingServiceImpl::TracingServiceImpl(
340     std::unique_ptr<SharedMemory::Factory> shm_factory,
341     base::TaskRunner* task_runner,
342     InitOpts init_opts)
343     : task_runner_(task_runner),
344       init_opts_(init_opts),
345       shm_factory_(std::move(shm_factory)),
346       uid_(base::GetCurrentUserId()),
347       buffer_ids_(kMaxTraceBufferID),
348       trigger_probability_rand_(
349           static_cast<uint32_t>(base::GetWallTimeNs().count())),
350       weak_ptr_factory_(this) {
351   PERFETTO_DCHECK(task_runner_);
352 }
353 
~TracingServiceImpl()354 TracingServiceImpl::~TracingServiceImpl() {
355   // TODO(fmayer): handle teardown of all Producer.
356 }
357 
358 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,const ClientIdentity & client_identity,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)359 TracingServiceImpl::ConnectProducer(Producer* producer,
360                                     const ClientIdentity& client_identity,
361                                     const std::string& producer_name,
362                                     size_t shared_memory_size_hint_bytes,
363                                     bool in_process,
364                                     ProducerSMBScrapingMode smb_scraping_mode,
365                                     size_t shared_memory_page_size_hint_bytes,
366                                     std::unique_ptr<SharedMemory> shm,
367                                     const std::string& sdk_version) {
368   PERFETTO_DCHECK_THREAD(thread_checker_);
369 
370   auto uid = client_identity.uid();
371   if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
372     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
373                   static_cast<unsigned long>(uid));
374     return nullptr;
375   }
376 
377   if (producers_.size() >= kMaxProducerID) {
378     PERFETTO_DFATAL("Too many producers.");
379     return nullptr;
380   }
381   const ProducerID id = GetNextProducerID();
382   PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
383                 static_cast<int>(uid));
384   bool smb_scraping_enabled = smb_scraping_enabled_;
385   switch (smb_scraping_mode) {
386     case ProducerSMBScrapingMode::kDefault:
387       break;
388     case ProducerSMBScrapingMode::kEnabled:
389       smb_scraping_enabled = true;
390       break;
391     case ProducerSMBScrapingMode::kDisabled:
392       smb_scraping_enabled = false;
393       break;
394   }
395 
396   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
397       id, client_identity, this, task_runner_, producer, producer_name,
398       sdk_version, in_process, smb_scraping_enabled));
399   auto it_and_inserted = producers_.emplace(id, endpoint.get());
400   PERFETTO_DCHECK(it_and_inserted.second);
401   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
402   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
403 
404   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
405   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
406   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
407   task_runner_->PostTask([weak_ptr] {
408     if (weak_ptr)
409       weak_ptr->producer_->OnConnect();
410   });
411 
412   if (shm) {
413     // The producer supplied an SMB. This is used only by Chrome; in the most
414     // common cases the SMB is created by the service and passed via
415     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
416     // use it. The transport layer has to verify the integrity of the SMB (e.g.
417     // ensure that the producer can't resize if after the fact).
418     size_t shm_size, page_size;
419     std::tie(shm_size, page_size) =
420         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
421     if (shm_size == shm->size() &&
422         page_size == endpoint->shmem_page_size_hint_bytes_) {
423       PERFETTO_DLOG(
424           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
425           shm_size / 1024, endpoint->name_.c_str());
426       endpoint->SetupSharedMemory(std::move(shm), page_size,
427                                   /*provided_by_producer=*/true);
428     } else {
429       PERFETTO_LOG(
430           "Discarding incorrectly sized producer-provided SMB for producer "
431           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
432           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
433           "%zu B page size",
434           endpoint->name_.c_str(), shm->size(),
435           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
436       shm.reset();
437     }
438   }
439 
440   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
441 }
442 
DisconnectProducer(ProducerID id)443 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
444   PERFETTO_DCHECK_THREAD(thread_checker_);
445   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
446   PERFETTO_DCHECK(producers_.count(id));
447 
448   // Scrape remaining chunks for this producer to ensure we don't lose data.
449   if (auto* producer = GetProducer(id)) {
450     for (auto& session_id_and_session : tracing_sessions_)
451       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
452   }
453 
454   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
455     auto next = it;
456     next++;
457     if (it->second.producer_id == id)
458       UnregisterDataSource(id, it->second.descriptor.name());
459     it = next;
460   }
461 
462   producers_.erase(id);
463   UpdateMemoryGuardrail();
464 }
465 
GetProducer(ProducerID id) const466 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
467     ProducerID id) const {
468   PERFETTO_DCHECK_THREAD(thread_checker_);
469   auto it = producers_.find(id);
470   if (it == producers_.end())
471     return nullptr;
472   return it->second;
473 }
474 
475 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)476 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
477   PERFETTO_DCHECK_THREAD(thread_checker_);
478   PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
479                 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
480   std::unique_ptr<ConsumerEndpointImpl> endpoint(
481       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
482   auto it_and_inserted = consumers_.emplace(endpoint.get());
483   PERFETTO_DCHECK(it_and_inserted.second);
484   // Consumer might go away before we're able to send the connect notification,
485   // if that is the case just bail out.
486   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
487   task_runner_->PostTask([weak_ptr] {
488     if (weak_ptr)
489       weak_ptr->consumer_->OnConnect();
490   });
491   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
492 }
493 
DisconnectConsumer(ConsumerEndpointImpl * consumer)494 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
495   PERFETTO_DCHECK_THREAD(thread_checker_);
496   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
497   PERFETTO_DCHECK(consumers_.count(consumer));
498 
499   if (consumer->tracing_session_id_)
500     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
501   consumers_.erase(consumer);
502 
503   // At this point no more pointers to |consumer| should be around.
504   PERFETTO_DCHECK(!std::any_of(
505       tracing_sessions_.begin(), tracing_sessions_.end(),
506       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
507         return kv.second.consumer_maybe_null == consumer;
508       }));
509 }
510 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)511 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
512                                         const std::string& key) {
513   PERFETTO_DCHECK_THREAD(thread_checker_);
514   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
515   PERFETTO_DCHECK(consumers_.count(consumer));
516 
517   TracingSessionID tsid = consumer->tracing_session_id_;
518   TracingSession* tracing_session;
519   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
520     return false;
521 
522   if (GetDetachedSession(consumer->uid_, key)) {
523     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
524                   key.c_str());
525     return false;
526   }
527 
528   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
529   tracing_session->consumer_maybe_null = nullptr;
530   tracing_session->detach_key = key;
531   consumer->tracing_session_id_ = 0;
532   return true;
533 }
534 
535 std::unique_ptr<TracingService::RelayEndpoint>
ConnectRelayClient(RelayClientID relay_client_id)536 TracingServiceImpl::ConnectRelayClient(RelayClientID relay_client_id) {
537   PERFETTO_DCHECK_THREAD(thread_checker_);
538 
539   auto endpoint = std::make_unique<RelayEndpointImpl>(relay_client_id, this);
540   relay_clients_[relay_client_id] = endpoint.get();
541 
542   return std::move(endpoint);
543 }
544 
DisconnectRelayClient(RelayClientID relay_client_id)545 void TracingServiceImpl::DisconnectRelayClient(RelayClientID relay_client_id) {
546   PERFETTO_DCHECK_THREAD(thread_checker_);
547 
548   if (relay_clients_.find(relay_client_id) == relay_clients_.end())
549     return;
550   relay_clients_.erase(relay_client_id);
551 }
552 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)553 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
554                                         const std::string& key) {
555   PERFETTO_DCHECK_THREAD(thread_checker_);
556   PERFETTO_DLOG("Consumer %p attaching to session %s",
557                 reinterpret_cast<void*>(consumer), key.c_str());
558   PERFETTO_DCHECK(consumers_.count(consumer));
559 
560   if (consumer->tracing_session_id_) {
561     PERFETTO_ELOG(
562         "Cannot reattach consumer to session %s"
563         " while it already attached tracing session ID %" PRIu64,
564         key.c_str(), consumer->tracing_session_id_);
565     return false;
566   }
567 
568   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
569   if (!tracing_session) {
570     PERFETTO_ELOG(
571         "Failed to attach consumer, session '%s' not found for uid %d",
572         key.c_str(), static_cast<int>(consumer->uid_));
573     return false;
574   }
575 
576   consumer->tracing_session_id_ = tracing_session->id;
577   tracing_session->consumer_maybe_null = consumer;
578   tracing_session->detach_key.clear();
579   return true;
580 }
581 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)582 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
583                                                const TraceConfig& cfg,
584                                                base::ScopedFile fd) {
585   PERFETTO_DCHECK_THREAD(thread_checker_);
586 
587   // If the producer is specifying a UUID, respect that (at least for the first
588   // snapshot). Otherwise generate a new UUID.
589   base::Uuid uuid(cfg.trace_uuid_lsb(), cfg.trace_uuid_msb());
590   if (!uuid)
591     uuid = base::Uuidv4();
592 
593   PERFETTO_DLOG("Enabling tracing for consumer %p, UUID: %s",
594                 reinterpret_cast<void*>(consumer),
595                 uuid.ToPrettyString().c_str());
596   MaybeLogUploadEvent(cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracing);
597   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
598     lockdown_mode_ = true;
599   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
600     lockdown_mode_ = false;
601 
602   // Scope |tracing_session| to this block to prevent accidental use of a null
603   // pointer later in this function.
604   {
605     TracingSession* tracing_session =
606         GetTracingSession(consumer->tracing_session_id_);
607     if (tracing_session) {
608       MaybeLogUploadEvent(
609           cfg, uuid,
610           PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
611       return PERFETTO_SVC_ERR(
612           "A Consumer is trying to EnableTracing() but another tracing "
613           "session is already active (forgot a call to FreeBuffers() ?)");
614     }
615   }
616 
617   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
618                                        ? kGuardrailsMaxTracingDurationMillis
619                                        : kMaxTracingDurationMillis;
620   if (cfg.duration_ms() > max_duration_ms) {
621     MaybeLogUploadEvent(cfg, uuid,
622                         PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
623     return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
624                             "ms  > %" PRIu32 " ms)",
625                             cfg.duration_ms(), max_duration_ms);
626   }
627 
628   const bool has_trigger_config =
629       GetTriggerMode(cfg) != TraceConfig::TriggerConfig::UNSPECIFIED;
630   if (has_trigger_config &&
631       (cfg.trigger_config().trigger_timeout_ms() == 0 ||
632        cfg.trigger_config().trigger_timeout_ms() > max_duration_ms)) {
633     MaybeLogUploadEvent(
634         cfg, uuid,
635         PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
636     return PERFETTO_SVC_ERR(
637         "Traces with START_TRACING triggers must provide a positive "
638         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
639         cfg.trigger_config().trigger_timeout_ms());
640   }
641 
642   // This check has been introduced in May 2023 after finding b/274931668.
643   if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
644       TraceConfig::TriggerConfig::TriggerMode_MAX) {
645     MaybeLogUploadEvent(
646         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
647     return PERFETTO_SVC_ERR(
648         "The trace config specified an invalid trigger_mode");
649   }
650 
651   if (cfg.trigger_config().use_clone_snapshot_if_available() &&
652       cfg.trigger_config().trigger_mode() !=
653           TraceConfig::TriggerConfig::STOP_TRACING) {
654     MaybeLogUploadEvent(
655         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
656     return PERFETTO_SVC_ERR(
657         "trigger_mode must be STOP_TRACING when "
658         "use_clone_snapshot_if_available=true");
659   }
660 
661   if (has_trigger_config && cfg.duration_ms() != 0) {
662     MaybeLogUploadEvent(
663         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
664     return PERFETTO_SVC_ERR(
665         "duration_ms was set, this must not be set for traces with triggers.");
666   }
667 
668   for (char c : cfg.bugreport_filename()) {
669     if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
670           (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.')) {
671       MaybeLogUploadEvent(
672           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidBrFilename);
673       return PERFETTO_SVC_ERR(
674           "bugreport_filename contains invalid chars. Use [a-zA-Z0-9-_.]+");
675     }
676   }
677 
678   if ((GetTriggerMode(cfg) == TraceConfig::TriggerConfig::STOP_TRACING ||
679        GetTriggerMode(cfg) == TraceConfig::TriggerConfig::CLONE_SNAPSHOT) &&
680       cfg.write_into_file()) {
681     // We don't support this usecase because there are subtle assumptions which
682     // break around TracingServiceEvents and windowed sorting (i.e. if we don't
683     // drain the events in ReadBuffersIntoFile because we are waiting for
684     // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
685     // emitting them wildy out of order breaking windowed sorting in trace
686     // processor).
687     MaybeLogUploadEvent(
688         cfg, uuid,
689         PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
690     return PERFETTO_SVC_ERR(
691         "Specifying trigger mode STOP_TRACING/CLONE_SNAPSHOT and "
692         "write_into_file together is unsupported");
693   }
694 
695   std::unordered_set<std::string> triggers;
696   for (const auto& trigger : cfg.trigger_config().triggers()) {
697     if (!triggers.insert(trigger.name()).second) {
698       MaybeLogUploadEvent(
699           cfg, uuid,
700           PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
701       return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
702                               trigger.name().c_str());
703     }
704   }
705 
706   if (cfg.enable_extra_guardrails()) {
707     if (cfg.deferred_start()) {
708       MaybeLogUploadEvent(
709           cfg, uuid,
710           PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
711       return PERFETTO_SVC_ERR(
712           "deferred_start=true is not supported in unsupervised traces");
713     }
714     uint64_t buf_size_sum = 0;
715     for (const auto& buf : cfg.buffers()) {
716       if (buf.size_kb() % 4 != 0) {
717         MaybeLogUploadEvent(
718             cfg, uuid,
719             PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
720         return PERFETTO_SVC_ERR(
721             "buffers.size_kb must be a multiple of 4, got %" PRIu32,
722             buf.size_kb());
723       }
724       buf_size_sum += buf.size_kb();
725     }
726 
727     uint32_t max_tracing_buffer_size_kb =
728         std::max(kGuardrailsMaxTracingBufferSizeKb,
729                  cfg.guardrail_overrides().max_tracing_buffer_size_kb());
730     if (buf_size_sum > max_tracing_buffer_size_kb) {
731       MaybeLogUploadEvent(
732           cfg, uuid,
733           PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
734       return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
735                               "kB  > %" PRIu32 " kB)",
736                               buf_size_sum, max_tracing_buffer_size_kb);
737     }
738   }
739 
740   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
741     MaybeLogUploadEvent(cfg, uuid,
742                         PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
743     return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
744                             cfg.buffers_size());
745   }
746   // Check that the config specifies all buffers for its data sources. This
747   // is also checked in SetupDataSource, but it is simpler to return a proper
748   // error to the consumer from here (and there will be less state to undo).
749   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
750     size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
751     size_t target_buffer = cfg_data_source.config().target_buffer();
752     if (target_buffer >= num_buffers) {
753       MaybeLogUploadEvent(
754           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
755       return PERFETTO_SVC_ERR(
756           "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
757           "%zu)",
758           cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
759     }
760   }
761 
762   if (!cfg.unique_session_name().empty()) {
763     const std::string& name = cfg.unique_session_name();
764     for (auto& kv : tracing_sessions_) {
765       if (kv.second.state == TracingSession::CLONED_READ_ONLY)
766         continue;  // Don't consider cloned sessions in uniqueness checks.
767       if (kv.second.config.unique_session_name() == name) {
768         MaybeLogUploadEvent(
769             cfg, uuid,
770             PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
771         static const char fmt[] =
772             "A trace with this unique session name (%s) already exists";
773         // This happens frequently, don't make it an "E"LOG.
774         PERFETTO_LOG(fmt, name.c_str());
775         return base::ErrStatus(fmt, name.c_str());
776       }
777     }
778   }
779 
780   if (!cfg.session_semaphores().empty()) {
781     struct SemaphoreSessionsState {
782       uint64_t smallest_max_other_session_count =
783           std::numeric_limits<uint64_t>::max();
784       uint64_t session_count = 0;
785     };
786     // For each semaphore, compute the number of active sessions and the
787     // MIN(limit).
788     std::unordered_map<std::string, SemaphoreSessionsState>
789         sem_to_sessions_state;
790     for (const auto& id_and_session : tracing_sessions_) {
791       const auto& session = id_and_session.second;
792       if (session.state == TracingSession::CLONED_READ_ONLY ||
793           session.state == TracingSession::DISABLED) {
794         // Don't consider cloned or disabled sessions in checks.
795         continue;
796       }
797       for (const auto& sem : session.config.session_semaphores()) {
798         auto& sessions_state = sem_to_sessions_state[sem.name()];
799         sessions_state.smallest_max_other_session_count =
800             std::min(sessions_state.smallest_max_other_session_count,
801                      sem.max_other_session_count());
802         sessions_state.session_count++;
803       }
804     }
805 
806     // Check if any of the semaphores declared by the config clashes with any of
807     // the currently active semaphores.
808     for (const auto& semaphore : cfg.session_semaphores()) {
809       auto it = sem_to_sessions_state.find(semaphore.name());
810       if (it == sem_to_sessions_state.end()) {
811         continue;
812       }
813       uint64_t max_other_session_count =
814           std::min(semaphore.max_other_session_count(),
815                    it->second.smallest_max_other_session_count);
816       if (it->second.session_count > max_other_session_count) {
817         MaybeLogUploadEvent(
818             cfg, uuid,
819             PerfettoStatsdAtom::
820                 kTracedEnableTracingFailedSessionSemaphoreCheck);
821         return PERFETTO_SVC_ERR(
822             "Semaphore \"%s\" exceeds maximum allowed other session count "
823             "(%" PRIu64 " > min(%" PRIu64 ", %" PRIu64 "))",
824             semaphore.name().c_str(), it->second.session_count,
825             semaphore.max_other_session_count(),
826             it->second.smallest_max_other_session_count);
827       }
828     }
829   }
830 
831   if (cfg.enable_extra_guardrails()) {
832     // unique_session_name can be empty
833     const std::string& name = cfg.unique_session_name();
834     int64_t now_s = base::GetBootTimeS().count();
835 
836     // Remove any entries where the time limit has passed so this map doesn't
837     // grow indefinitely:
838     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
839     for (auto it = sessions.cbegin(); it != sessions.cend();) {
840       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
841         it = sessions.erase(it);
842       } else {
843         ++it;
844       }
845     }
846 
847     int64_t& previous_s = session_to_last_trace_s_[name];
848     if (previous_s == 0) {
849       previous_s = now_s;
850     } else {
851       MaybeLogUploadEvent(
852           cfg, uuid,
853           PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
854       return PERFETTO_SVC_ERR(
855           "A trace with unique session name \"%s\" began less than %" PRId64
856           "s ago (%" PRId64 "s)",
857           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
858     }
859   }
860 
861   const int sessions_for_uid = static_cast<int>(std::count_if(
862       tracing_sessions_.begin(), tracing_sessions_.end(),
863       [consumer](const decltype(tracing_sessions_)::value_type& s) {
864         return s.second.consumer_uid == consumer->uid_;
865       }));
866 
867   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
868   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
869     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
870   }
871   if (sessions_for_uid >= per_uid_limit) {
872     MaybeLogUploadEvent(
873         cfg, uuid,
874         PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
875     return PERFETTO_SVC_ERR(
876         "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
877         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
878   }
879 
880   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
881   // in a state where it stalls by design by having more TraceWriterImpl
882   // instances than free pages in the buffer. This is really a bug in
883   // trace_probes and the way it handles stalls in the shmem buffer.
884   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
885     MaybeLogUploadEvent(
886         cfg, uuid,
887         PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
888     return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
889                             tracing_sessions_.size());
890   }
891 
892   // If the trace config provides a filter bytecode, setup the filter now.
893   // If the filter loading fails, abort the tracing session rather than running
894   // unfiltered.
895   std::unique_ptr<protozero::MessageFilter> trace_filter;
896   if (cfg.has_trace_filter()) {
897     const auto& filt = cfg.trace_filter();
898     trace_filter.reset(new protozero::MessageFilter());
899 
900     protozero::StringFilter& string_filter = trace_filter->string_filter();
901     for (const auto& rule : filt.string_filter_chain().rules()) {
902       auto opt_policy = ConvertPolicy(rule.policy());
903       if (!opt_policy.has_value()) {
904         MaybeLogUploadEvent(
905             cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
906         return PERFETTO_SVC_ERR(
907             "Trace filter has invalid string filtering rules, aborting");
908       }
909       string_filter.AddRule(*opt_policy, rule.regex_pattern(),
910                             rule.atrace_payload_starts_with());
911     }
912 
913     const std::string& bytecode_v1 = filt.bytecode();
914     const std::string& bytecode_v2 = filt.bytecode_v2();
915     const std::string& bytecode =
916         bytecode_v2.empty() ? bytecode_v1 : bytecode_v2;
917     if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
918       MaybeLogUploadEvent(
919           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
920       return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
921     }
922 
923     // The filter is created using perfetto.protos.Trace as root message
924     // (because that makes it possible to play around with the `proto_filter`
925     // tool on actual traces). Here in the service, however, we deal with
926     // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
927     // The IPC client (or the write_into_filte logic in here) are responsible
928     // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
929     // the preamble is not there at ReadBuffer time. Hence we change the root of
930     // the filtering to start at the Trace.packet level.
931     if (!trace_filter->SetFilterRoot({TracePacket::kPacketFieldNumber})) {
932       MaybeLogUploadEvent(
933           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
934       return PERFETTO_SVC_ERR("Failed to set filter root.");
935     }
936   }
937 
938   const TracingSessionID tsid = ++last_tracing_session_id_;
939   TracingSession* tracing_session =
940       &tracing_sessions_
941            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
942                     std::forward_as_tuple(tsid, consumer, cfg, task_runner_))
943            .first->second;
944 
945   tracing_session->trace_uuid = uuid;
946 
947   if (trace_filter)
948     tracing_session->trace_filter = std::move(trace_filter);
949 
950   if (cfg.write_into_file()) {
951     if (!fd ^ !cfg.output_path().empty()) {
952       MaybeLogUploadEvent(
953           tracing_session->config, uuid,
954           PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
955       tracing_sessions_.erase(tsid);
956       return PERFETTO_SVC_ERR(
957           "When write_into_file==true either a FD needs to be passed or "
958           "output_path must be populated (but not both)");
959     }
960     if (!cfg.output_path().empty()) {
961       fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
962       if (!fd) {
963         MaybeLogUploadEvent(
964             tracing_session->config, uuid,
965             PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
966         tracing_sessions_.erase(tsid);
967         return PERFETTO_SVC_ERR("Failed to create the trace file %s",
968                                 cfg.output_path().c_str());
969       }
970     }
971     tracing_session->write_into_file = std::move(fd);
972     uint32_t write_period_ms = cfg.file_write_period_ms();
973     if (write_period_ms == 0)
974       write_period_ms = kDefaultWriteIntoFilePeriodMs;
975     if (write_period_ms < min_write_period_ms_)
976       write_period_ms = min_write_period_ms_;
977     tracing_session->write_period_ms = write_period_ms;
978     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
979     tracing_session->bytes_written_into_file = 0;
980   }
981 
982   if (cfg.compression_type() == TraceConfig::COMPRESSION_TYPE_DEFLATE) {
983     if (init_opts_.compressor_fn) {
984       tracing_session->compress_deflate = true;
985     } else {
986       PERFETTO_LOG(
987           "COMPRESSION_TYPE_DEFLATE is not supported in the current build "
988           "configuration. Skipping compression");
989     }
990   }
991 
992   // Initialize the log buffers.
993   bool did_allocate_all_buffers = true;
994   bool invalid_buffer_config = false;
995 
996   // Allocate the trace buffers. Also create a map to translate a consumer
997   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
998   // corresponding BufferID, which is a global ID namespace for the service and
999   // all producers.
1000   size_t total_buf_size_kb = 0;
1001   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
1002   tracing_session->buffers_index.reserve(num_buffers);
1003   for (size_t i = 0; i < num_buffers; i++) {
1004     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
1005     BufferID global_id = buffer_ids_.Allocate();
1006     if (!global_id) {
1007       did_allocate_all_buffers = false;  // We ran out of IDs.
1008       break;
1009     }
1010     tracing_session->buffers_index.push_back(global_id);
1011     // TraceBuffer size is limited to 32-bit.
1012     const uint32_t buf_size_kb = buffer_cfg.size_kb();
1013     const uint64_t buf_size_bytes = buf_size_kb * static_cast<uint64_t>(1024);
1014     const size_t buf_size = static_cast<size_t>(buf_size_bytes);
1015     if (buf_size_bytes == 0 ||
1016         buf_size_bytes > std::numeric_limits<uint32_t>::max() ||
1017         buf_size != buf_size_bytes) {
1018       invalid_buffer_config = true;
1019       did_allocate_all_buffers = false;
1020       break;
1021     }
1022     total_buf_size_kb += buf_size_kb;
1023     TraceBuffer::OverwritePolicy policy =
1024         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
1025             ? TraceBuffer::kDiscard
1026             : TraceBuffer::kOverwrite;
1027     auto it_and_inserted =
1028         buffers_.emplace(global_id, TraceBuffer::Create(buf_size, policy));
1029     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
1030     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
1031     if (!trace_buffer) {
1032       did_allocate_all_buffers = false;
1033       break;
1034     }
1035   }
1036 
1037   // This can happen if either:
1038   // - All the kMaxTraceBufferID slots are taken.
1039   // - OOM, or, more realistically, we exhausted virtual memory.
1040   // - The buffer size in the config is invalid.
1041   // In any case, free all the previously allocated buffers and abort.
1042   if (!did_allocate_all_buffers) {
1043     for (BufferID global_id : tracing_session->buffers_index) {
1044       buffer_ids_.Free(global_id);
1045       buffers_.erase(global_id);
1046     }
1047     MaybeLogUploadEvent(tracing_session->config, uuid,
1048                         PerfettoStatsdAtom::kTracedEnableTracingOom);
1049     tracing_sessions_.erase(tsid);
1050     if (invalid_buffer_config) {
1051       return PERFETTO_SVC_ERR(
1052           "Failed to allocate tracing buffers: Invalid buffer sizes");
1053     }
1054     return PERFETTO_SVC_ERR(
1055         "Failed to allocate tracing buffers: OOM or too many buffers");
1056   }
1057 
1058   UpdateMemoryGuardrail();
1059 
1060   consumer->tracing_session_id_ = tsid;
1061 
1062   // Setup the data sources on the producers without starting them.
1063   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
1064     // Scan all the registered data sources with a matching name.
1065     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1066     for (auto it = range.first; it != range.second; it++) {
1067       TraceConfig::ProducerConfig producer_config;
1068       for (const auto& config : cfg.producers()) {
1069         if (GetProducer(it->second.producer_id)->name_ ==
1070             config.producer_name()) {
1071           producer_config = config;
1072           break;
1073         }
1074       }
1075       SetupDataSource(cfg_data_source, producer_config, it->second,
1076                       tracing_session);
1077     }
1078   }
1079 
1080   bool has_start_trigger = false;
1081   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1082   switch (GetTriggerMode(cfg)) {
1083     case TraceConfig::TriggerConfig::UNSPECIFIED:
1084       // no triggers are specified so this isn't a trace that is using triggers.
1085       PERFETTO_DCHECK(!has_trigger_config);
1086       break;
1087     case TraceConfig::TriggerConfig::START_TRACING:
1088       // For traces which use START_TRACE triggers we need to ensure that the
1089       // tracing session will be cleaned up when it times out.
1090       has_start_trigger = true;
1091       task_runner_->PostDelayedTask(
1092           [weak_this, tsid]() {
1093             if (weak_this)
1094               weak_this->OnStartTriggersTimeout(tsid);
1095           },
1096           cfg.trigger_config().trigger_timeout_ms());
1097       break;
1098     case TraceConfig::TriggerConfig::STOP_TRACING:
1099     case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1100       // Update the tracing_session's duration_ms to ensure that if no trigger
1101       // is received the session will end and be cleaned up equal to the
1102       // timeout.
1103       //
1104       // TODO(nuskos): Refactor this so that rather then modifying the config we
1105       // have a field we look at on the tracing_session.
1106       tracing_session->config.set_duration_ms(
1107           cfg.trigger_config().trigger_timeout_ms());
1108       break;
1109 
1110       // The case of unknown modes (coming from future versions of the service)
1111       // is handled few lines above (search for TriggerMode_MAX).
1112   }
1113 
1114   tracing_session->state = TracingSession::CONFIGURED;
1115   PERFETTO_LOG(
1116       "Configured tracing session %" PRIu64
1117       ", #sources:%zu, duration:%d ms%s, #buffers:%d, total "
1118       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
1119       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
1120       tracing_session->config.prefer_suspend_clock_for_duration()
1121           ? " (suspend_clock)"
1122           : "",
1123       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
1124       static_cast<unsigned int>(consumer->uid_),
1125       cfg.unique_session_name().c_str());
1126 
1127   // Start the data sources, unless this is a case of early setup + fast
1128   // triggering, either through TraceConfig.deferred_start or
1129   // TraceConfig.trigger_config(). If both are specified which ever one occurs
1130   // first will initiate the trace.
1131   if (!cfg.deferred_start() && !has_start_trigger)
1132     return StartTracing(tsid);
1133 
1134   return base::OkStatus();
1135 }
1136 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)1137 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
1138                                            const TraceConfig& updated_cfg) {
1139   PERFETTO_DCHECK_THREAD(thread_checker_);
1140   TracingSession* tracing_session =
1141       GetTracingSession(consumer->tracing_session_id_);
1142   PERFETTO_DCHECK(tracing_session);
1143 
1144   if ((tracing_session->state != TracingSession::STARTED) &&
1145       (tracing_session->state != TracingSession::CONFIGURED)) {
1146     PERFETTO_ELOG(
1147         "ChangeTraceConfig() was called for a tracing session which isn't "
1148         "running.");
1149     return;
1150   }
1151 
1152   // We only support updating producer_name_{,regex}_filter (and pass-through
1153   // configs) for now; null out any changeable fields and make sure the rest are
1154   // identical.
1155   TraceConfig new_config_copy(updated_cfg);
1156   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1157     ds_cfg.clear_producer_name_filter();
1158     ds_cfg.clear_producer_name_regex_filter();
1159   }
1160 
1161   TraceConfig current_config_copy(tracing_session->config);
1162   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1163     ds_cfg.clear_producer_name_filter();
1164     ds_cfg.clear_producer_name_regex_filter();
1165   }
1166 
1167   if (new_config_copy != current_config_copy) {
1168     PERFETTO_LOG(
1169         "ChangeTraceConfig() was called with a config containing unsupported "
1170         "changes; only adding to the producer_name_{,regex}_filter is "
1171         "currently supported and will have an effect.");
1172   }
1173 
1174   for (TraceConfig::DataSource& cfg_data_source :
1175        *tracing_session->config.mutable_data_sources()) {
1176     // Find the updated producer_filter in the new config.
1177     std::vector<std::string> new_producer_name_filter;
1178     std::vector<std::string> new_producer_name_regex_filter;
1179     bool found_data_source = false;
1180     for (const auto& it : updated_cfg.data_sources()) {
1181       if (cfg_data_source.config().name() == it.config().name()) {
1182         new_producer_name_filter = it.producer_name_filter();
1183         new_producer_name_regex_filter = it.producer_name_regex_filter();
1184         found_data_source = true;
1185         break;
1186       }
1187     }
1188 
1189     // Bail out if data source not present in the new config.
1190     if (!found_data_source) {
1191       PERFETTO_ELOG(
1192           "ChangeTraceConfig() called without a current data source also "
1193           "present in the new config: %s",
1194           cfg_data_source.config().name().c_str());
1195       continue;
1196     }
1197 
1198     // TODO(oysteine): Just replacing the filter means that if
1199     // there are any filter entries which were present in the original config,
1200     // but removed from the config passed to ChangeTraceConfig, any matching
1201     // producers will keep producing but newly added producers after this
1202     // point will never start.
1203     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1204     *cfg_data_source.mutable_producer_name_regex_filter() =
1205         new_producer_name_regex_filter;
1206 
1207     // Get the list of producers that are already set up.
1208     std::unordered_set<uint16_t> set_up_producers;
1209     auto& ds_instances = tracing_session->data_source_instances;
1210     for (auto instance_it = ds_instances.begin();
1211          instance_it != ds_instances.end(); ++instance_it) {
1212       set_up_producers.insert(instance_it->first);
1213     }
1214 
1215     // Scan all the registered data sources with a matching name.
1216     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1217     for (auto it = range.first; it != range.second; it++) {
1218       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1219       PERFETTO_DCHECK(producer);
1220 
1221       // Check if the producer name of this data source is present
1222       // in the name filters. We currently only support new filters, not
1223       // removing old ones.
1224       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1225                              new_producer_name_regex_filter)) {
1226         continue;
1227       }
1228 
1229       // If this producer is already set up, we assume that all datasources
1230       // in it started already.
1231       if (set_up_producers.count(it->second.producer_id))
1232         continue;
1233 
1234       // If it wasn't previously setup, set it up now.
1235       // (The per-producer config is optional).
1236       TraceConfig::ProducerConfig producer_config;
1237       for (const auto& config : tracing_session->config.producers()) {
1238         if (producer->name_ == config.producer_name()) {
1239           producer_config = config;
1240           break;
1241         }
1242       }
1243 
1244       DataSourceInstance* ds_inst = SetupDataSource(
1245           cfg_data_source, producer_config, it->second, tracing_session);
1246 
1247       if (ds_inst && tracing_session->state == TracingSession::STARTED)
1248         StartDataSourceInstance(producer, tracing_session, ds_inst);
1249     }
1250   }
1251 }
1252 
StartTracing(TracingSessionID tsid)1253 base::Status TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1254   PERFETTO_DCHECK_THREAD(thread_checker_);
1255 
1256   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1257   TracingSession* tracing_session = GetTracingSession(tsid);
1258   if (!tracing_session) {
1259     return PERFETTO_SVC_ERR(
1260         "StartTracing() failed, invalid session ID %" PRIu64, tsid);
1261   }
1262 
1263   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1264                       PerfettoStatsdAtom::kTracedStartTracing);
1265 
1266   if (tracing_session->state != TracingSession::CONFIGURED) {
1267     MaybeLogUploadEvent(
1268         tracing_session->config, tracing_session->trace_uuid,
1269         PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1270     return PERFETTO_SVC_ERR("StartTracing() failed, invalid session state: %d",
1271                             tracing_session->state);
1272   }
1273 
1274   tracing_session->state = TracingSession::STARTED;
1275 
1276   // We store the start of trace snapshot separately as it's important to make
1277   // sure we can interpret all the data in the trace and storing it in the ring
1278   // buffer means it could be overwritten by a later snapshot.
1279   if (!tracing_session->config.builtin_data_sources()
1280            .disable_clock_snapshotting()) {
1281     SnapshotClocks(&tracing_session->initial_clock_snapshot);
1282   }
1283 
1284   // We don't snapshot the clocks here because we just did this above.
1285   SnapshotLifecyleEvent(
1286       tracing_session,
1287       protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1288       false /* snapshot_clocks */);
1289 
1290   // Periodically snapshot clocks, stats, sync markers while the trace is
1291   // active. The snapshots are emitted on the future ReadBuffers() calls, which
1292   // means that:
1293   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
1294   //      write snapshots periodically into the trace.
1295   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1296   //      snapshot into the trace. For clock snapshots, we keep track of the
1297   //      snapshot recorded at the beginning of the session
1298   //      (initial_clock_snapshot above), as well as the most recent sampled
1299   //      snapshots that showed significant new drift between different clocks.
1300   //      The latter clock snapshots are sampled periodically and at lifecycle
1301   //      events.
1302   base::PeriodicTask::Args snapshot_task_args;
1303   snapshot_task_args.start_first_task_immediately = true;
1304   snapshot_task_args.use_suspend_aware_timer =
1305       tracing_session->config.builtin_data_sources()
1306           .prefer_suspend_clock_for_snapshot();
1307   snapshot_task_args.task = [weak_this, tsid] {
1308     if (weak_this)
1309       weak_this->PeriodicSnapshotTask(tsid);
1310   };
1311   snapshot_task_args.period_ms =
1312       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1313   if (!snapshot_task_args.period_ms)
1314     snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1315   tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1316 
1317   // Trigger delayed task if the trace is time limited.
1318   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1319   if (trace_duration_ms > 0) {
1320     auto stop_task =
1321         std::bind(&TracingServiceImpl::StopOnDurationMsExpiry, weak_this, tsid);
1322     if (tracing_session->config.prefer_suspend_clock_for_duration()) {
1323       base::PeriodicTask::Args stop_args;
1324       stop_args.use_suspend_aware_timer = true;
1325       stop_args.period_ms = trace_duration_ms;
1326       stop_args.one_shot = true;
1327       stop_args.task = std::move(stop_task);
1328       tracing_session->timed_stop_task.Start(stop_args);
1329     } else {
1330       task_runner_->PostDelayedTask(std::move(stop_task), trace_duration_ms);
1331     }
1332   }  // if (trace_duration_ms > 0).
1333 
1334   // Start the periodic drain tasks if we should to save the trace into a file.
1335   if (tracing_session->config.write_into_file()) {
1336     task_runner_->PostDelayedTask(
1337         [weak_this, tsid] {
1338           if (weak_this)
1339             weak_this->ReadBuffersIntoFile(tsid);
1340         },
1341         tracing_session->delay_to_next_write_period_ms());
1342   }
1343 
1344   // Start the periodic flush tasks if the config specified a flush period.
1345   if (tracing_session->config.flush_period_ms())
1346     PeriodicFlushTask(tsid, /*post_next_only=*/true);
1347 
1348   // Start the periodic incremental state clear tasks if the config specified a
1349   // period.
1350   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1351     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1352   }
1353 
1354   for (auto& [prod_id, data_source] : tracing_session->data_source_instances) {
1355     ProducerEndpointImpl* producer = GetProducer(prod_id);
1356     if (!producer) {
1357       PERFETTO_DFATAL("Producer does not exist.");
1358       continue;
1359     }
1360     StartDataSourceInstance(producer, tracing_session, &data_source);
1361   }
1362 
1363   MaybeNotifyAllDataSourcesStarted(tracing_session);
1364   return base::OkStatus();
1365 }
1366 
1367 // static
StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl> weak_this,TracingSessionID tsid)1368 void TracingServiceImpl::StopOnDurationMsExpiry(
1369     base::WeakPtr<TracingServiceImpl> weak_this,
1370     TracingSessionID tsid) {
1371   // Skip entirely the flush if the trace session doesn't exist anymore.
1372   // This is to prevent misleading error messages to be logged.
1373   if (!weak_this)
1374     return;
1375   auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
1376   if (!tracing_session_ptr)
1377     return;
1378   // If this trace was using STOP_TRACING triggers and we've seen
1379   // one, then the trigger overrides the normal timeout. In this
1380   // case we just return and let the other task clean up this trace.
1381   if (GetTriggerMode(tracing_session_ptr->config) ==
1382           TraceConfig::TriggerConfig::STOP_TRACING &&
1383       !tracing_session_ptr->received_triggers.empty())
1384     return;
1385   // In all other cases (START_TRACING or no triggers) we flush
1386   // after |trace_duration_ms| unconditionally.
1387   weak_this->FlushAndDisableTracing(tsid);
1388 }
1389 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1390 void TracingServiceImpl::StartDataSourceInstance(
1391     ProducerEndpointImpl* producer,
1392     TracingSession* tracing_session,
1393     TracingServiceImpl::DataSourceInstance* instance) {
1394   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1395   if (instance->will_notify_on_start) {
1396     instance->state = DataSourceInstance::STARTING;
1397   } else {
1398     instance->state = DataSourceInstance::STARTED;
1399   }
1400   if (tracing_session->consumer_maybe_null) {
1401     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1402         *producer, *instance);
1403   }
1404   producer->StartDataSource(instance->instance_id, instance->config);
1405 
1406   // If all data sources are started, notify the consumer.
1407   if (instance->state == DataSourceInstance::STARTED)
1408     MaybeNotifyAllDataSourcesStarted(tracing_session);
1409 }
1410 
1411 // DisableTracing just stops the data sources but doesn't free up any buffer.
1412 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1413 // and then drain the buffers. The actual teardown of the TracingSession happens
1414 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1415 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1416                                         bool disable_immediately) {
1417   PERFETTO_DCHECK_THREAD(thread_checker_);
1418   TracingSession* tracing_session = GetTracingSession(tsid);
1419   if (!tracing_session) {
1420     // Can happen if the consumer calls this before EnableTracing() or after
1421     // FreeBuffers().
1422     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1423     return;
1424   }
1425 
1426   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1427                       PerfettoStatsdAtom::kTracedDisableTracing);
1428 
1429   switch (tracing_session->state) {
1430     // Spurious call to DisableTracing() while already disabled, nothing to do.
1431     case TracingSession::DISABLED:
1432       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1433       return;
1434 
1435     case TracingSession::CLONED_READ_ONLY:
1436       PERFETTO_DLOG("DisableTracing() cannot be called on a cloned session");
1437       return;
1438 
1439     // This is either:
1440     // A) The case of a graceful DisableTracing() call followed by a call to
1441     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1442     //    to forcefully transition in the disabled state without waiting for the
1443     //    outstanding acks because the buffers are going to be destroyed soon.
1444     // B) A spurious call, iff |disable_immediately| == false, in which case
1445     //    there is nothing to do.
1446     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1447       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1448       if (disable_immediately)
1449         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1450       return;
1451 
1452     // Continues below.
1453     case TracingSession::CONFIGURED:
1454       // If the session didn't even start there is no need to orchestrate a
1455       // graceful stop of data sources.
1456       disable_immediately = true;
1457       break;
1458 
1459     // This is the nominal case, continues below.
1460     case TracingSession::STARTED:
1461       break;
1462   }
1463 
1464   for (auto& data_source_inst : tracing_session->data_source_instances) {
1465     const ProducerID producer_id = data_source_inst.first;
1466     DataSourceInstance& instance = data_source_inst.second;
1467     ProducerEndpointImpl* producer = GetProducer(producer_id);
1468     PERFETTO_DCHECK(producer);
1469     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1470                     instance.state == DataSourceInstance::STARTING ||
1471                     instance.state == DataSourceInstance::STARTED);
1472     StopDataSourceInstance(producer, tracing_session, &instance,
1473                            disable_immediately);
1474   }
1475 
1476   // If the periodic task is running, we can stop the periodic snapshot timer
1477   // here instead of waiting until FreeBuffers to prevent useless snapshots
1478   // which won't be read.
1479   tracing_session->snapshot_periodic_task.Reset();
1480 
1481   // Either this request is flagged with |disable_immediately| or there are no
1482   // data sources that are requesting a final handshake. In both cases just mark
1483   // the session as disabled immediately, notify the consumer and flush the
1484   // trace file (if used).
1485   if (tracing_session->AllDataSourceInstancesStopped())
1486     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1487 
1488   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1489   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1490   task_runner_->PostDelayedTask(
1491       [weak_this, tsid] {
1492         if (weak_this)
1493           weak_this->OnDisableTracingTimeout(tsid);
1494       },
1495       tracing_session->data_source_stop_timeout_ms());
1496 
1497   // Deliberately NOT removing the session from |tracing_session_|, it's still
1498   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1499 }
1500 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1501 void TracingServiceImpl::NotifyDataSourceStarted(
1502     ProducerID producer_id,
1503     DataSourceInstanceID instance_id) {
1504   PERFETTO_DCHECK_THREAD(thread_checker_);
1505   for (auto& kv : tracing_sessions_) {
1506     TracingSession& tracing_session = kv.second;
1507     DataSourceInstance* instance =
1508         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1509 
1510     if (!instance)
1511       continue;
1512 
1513     // If the tracing session was already stopped, ignore this notification.
1514     if (tracing_session.state != TracingSession::STARTED)
1515       continue;
1516 
1517     if (instance->state != DataSourceInstance::STARTING) {
1518       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1519                     instance->state);
1520       continue;
1521     }
1522 
1523     instance->state = DataSourceInstance::STARTED;
1524 
1525     ProducerEndpointImpl* producer = GetProducer(producer_id);
1526     PERFETTO_DCHECK(producer);
1527     if (tracing_session.consumer_maybe_null) {
1528       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1529           *producer, *instance);
1530     }
1531 
1532     // If all data sources are started, notify the consumer.
1533     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1534   }  // for (tracing_session)
1535 }
1536 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1537 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1538     TracingSession* tracing_session) {
1539   if (!tracing_session->consumer_maybe_null)
1540     return;
1541 
1542   if (!tracing_session->AllDataSourceInstancesStarted())
1543     return;
1544 
1545   // In some rare cases, we can get in this state more than once. Consider the
1546   // following scenario: 3 data sources are registered -> trace starts ->
1547   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1548   // Imagine now that a 4th data source registers while the trace is ongoing.
1549   // This would hit the AllDataSourceInstancesStarted() condition again.
1550   // In this case, however, we don't want to re-notify the consumer again.
1551   // That would be unexpected (even if, perhaps, technically correct) and
1552   // trigger bugs in the consumer.
1553   if (tracing_session->did_notify_all_data_source_started)
1554     return;
1555 
1556   PERFETTO_DLOG("All data sources started");
1557 
1558   SnapshotLifecyleEvent(
1559       tracing_session,
1560       protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1561       true /* snapshot_clocks */);
1562 
1563   tracing_session->did_notify_all_data_source_started = true;
1564   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1565 }
1566 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1567 void TracingServiceImpl::NotifyDataSourceStopped(
1568     ProducerID producer_id,
1569     DataSourceInstanceID instance_id) {
1570   PERFETTO_DCHECK_THREAD(thread_checker_);
1571   for (auto& kv : tracing_sessions_) {
1572     TracingSession& tracing_session = kv.second;
1573     DataSourceInstance* instance =
1574         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1575 
1576     if (!instance)
1577       continue;
1578 
1579     if (instance->state != DataSourceInstance::STOPPING) {
1580       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1581                     instance->state);
1582       continue;
1583     }
1584 
1585     instance->state = DataSourceInstance::STOPPED;
1586 
1587     ProducerEndpointImpl* producer = GetProducer(producer_id);
1588     PERFETTO_DCHECK(producer);
1589     if (tracing_session.consumer_maybe_null) {
1590       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1591           *producer, *instance);
1592     }
1593 
1594     if (!tracing_session.AllDataSourceInstancesStopped())
1595       continue;
1596 
1597     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1598       continue;
1599 
1600     // All data sources acked the termination.
1601     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1602   }  // for (tracing_session)
1603 }
1604 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1605 void TracingServiceImpl::ActivateTriggers(
1606     ProducerID producer_id,
1607     const std::vector<std::string>& triggers) {
1608   PERFETTO_DCHECK_THREAD(thread_checker_);
1609   auto* producer = GetProducer(producer_id);
1610   PERFETTO_DCHECK(producer);
1611 
1612   int64_t now_ns = base::GetBootTimeNs().count();
1613   for (const auto& trigger_name : triggers) {
1614     PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1615                   trigger_name.c_str());
1616     base::Hasher hash;
1617     hash.Update(trigger_name.c_str(), trigger_name.size());
1618     std::string triggered_session_name;
1619     base::Uuid triggered_session_uuid;
1620     TracingSessionID triggered_session_id = 0;
1621     auto trigger_mode = TraceConfig::TriggerConfig::UNSPECIFIED;
1622 
1623     uint64_t trigger_name_hash = hash.digest();
1624     size_t count_in_window =
1625         PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1626 
1627     bool trigger_matched = false;
1628     bool trigger_activated = false;
1629     for (auto& id_and_tracing_session : tracing_sessions_) {
1630       auto& tracing_session = id_and_tracing_session.second;
1631       TracingSessionID tsid = id_and_tracing_session.first;
1632       auto iter = std::find_if(
1633           tracing_session.config.trigger_config().triggers().begin(),
1634           tracing_session.config.trigger_config().triggers().end(),
1635           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1636             return trigger.name() == trigger_name;
1637           });
1638       if (iter == tracing_session.config.trigger_config().triggers().end())
1639         continue;
1640       if (tracing_session.state == TracingSession::CLONED_READ_ONLY)
1641         continue;
1642 
1643       // If this trigger requires a certain producer to have sent it
1644       // (non-empty producer_name()) ensure the producer who sent this trigger
1645       // matches.
1646       if (!iter->producer_name_regex().empty() &&
1647           !std::regex_match(
1648               producer->name_,
1649               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1650         continue;
1651       }
1652 
1653       // Use a random number between 0 and 1 to check if we should allow this
1654       // trigger through or not.
1655       double trigger_rnd =
1656           trigger_rnd_override_for_testing_ > 0
1657               ? trigger_rnd_override_for_testing_
1658               : trigger_probability_dist_(trigger_probability_rand_);
1659       PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1660       if (trigger_rnd < iter->skip_probability()) {
1661         MaybeLogTriggerEvent(tracing_session.config,
1662                              PerfettoTriggerAtom::kTracedLimitProbability,
1663                              trigger_name);
1664         continue;
1665       }
1666 
1667       // If we already triggered more times than the limit, silently ignore
1668       // this trigger.
1669       if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1670         MaybeLogTriggerEvent(tracing_session.config,
1671                              PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1672                              trigger_name);
1673         continue;
1674       }
1675       trigger_matched = true;
1676       triggered_session_id = tracing_session.id;
1677       triggered_session_name = tracing_session.config.unique_session_name();
1678       triggered_session_uuid.set_lsb_msb(tracing_session.trace_uuid.lsb(),
1679                                          tracing_session.trace_uuid.msb());
1680       trigger_mode = GetTriggerMode(tracing_session.config);
1681 
1682       const bool triggers_already_received =
1683           !tracing_session.received_triggers.empty();
1684       tracing_session.received_triggers.push_back(
1685           {static_cast<uint64_t>(now_ns), iter->name(), producer->name_,
1686            producer->uid()});
1687       auto weak_this = weak_ptr_factory_.GetWeakPtr();
1688       switch (trigger_mode) {
1689         case TraceConfig::TriggerConfig::START_TRACING:
1690           // If the session has already been triggered and moved past
1691           // CONFIGURED then we don't need to repeat StartTracing. This would
1692           // work fine (StartTracing would return false) but would add error
1693           // logs.
1694           if (tracing_session.state != TracingSession::CONFIGURED)
1695             break;
1696 
1697           trigger_activated = true;
1698           MaybeLogUploadEvent(
1699               tracing_session.config, tracing_session.trace_uuid,
1700               PerfettoStatsdAtom::kTracedTriggerStartTracing, iter->name());
1701 
1702           // We override the trace duration to be the trigger's requested
1703           // value, this ensures that the trace will end after this amount
1704           // of time has passed.
1705           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1706           StartTracing(tsid);
1707           break;
1708         case TraceConfig::TriggerConfig::STOP_TRACING:
1709           // Only stop the trace once to avoid confusing log messages. I.E.
1710           // when we've already hit the first trigger we've already Posted the
1711           // task to FlushAndDisable. So all future triggers will just break
1712           // out.
1713           if (triggers_already_received)
1714             break;
1715 
1716           trigger_activated = true;
1717           MaybeLogUploadEvent(
1718               tracing_session.config, tracing_session.trace_uuid,
1719               PerfettoStatsdAtom::kTracedTriggerStopTracing, iter->name());
1720 
1721           // Now that we've seen a trigger we need to stop, flush, and disable
1722           // this session after the configured |stop_delay_ms|.
1723           task_runner_->PostDelayedTask(
1724               [weak_this, tsid] {
1725                 // Skip entirely the flush if the trace session doesn't exist
1726                 // anymore. This is to prevent misleading error messages to be
1727                 // logged.
1728                 if (weak_this && weak_this->GetTracingSession(tsid))
1729                   weak_this->FlushAndDisableTracing(tsid);
1730               },
1731               // If this trigger is zero this will immediately executable and
1732               // will happen shortly.
1733               iter->stop_delay_ms());
1734           break;
1735 
1736         case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1737           trigger_activated = true;
1738           MaybeLogUploadEvent(
1739               tracing_session.config, tracing_session.trace_uuid,
1740               PerfettoStatsdAtom::kTracedTriggerCloneSnapshot, iter->name());
1741           task_runner_->PostDelayedTask(
1742               [weak_this, tsid, trigger_name = iter->name()] {
1743                 if (!weak_this)
1744                   return;
1745                 auto* tsess = weak_this->GetTracingSession(tsid);
1746                 if (!tsess || !tsess->consumer_maybe_null)
1747                   return;
1748                 tsess->consumer_maybe_null->NotifyCloneSnapshotTrigger(
1749                     trigger_name);
1750               },
1751               iter->stop_delay_ms());
1752           break;
1753 
1754         case TraceConfig::TriggerConfig::UNSPECIFIED:
1755           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1756           break;
1757       }
1758     }  // for (.. : tracing_sessions_)
1759 
1760     if (trigger_matched) {
1761       trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1762     }
1763 
1764     if (trigger_activated) {
1765       // Log only the trigger that actually caused a trace stop/start, don't log
1766       // the follow-up ones, even if they matched.
1767       PERFETTO_LOG(
1768           "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1769           "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1770           trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1771           triggered_session_uuid.ToPrettyString().c_str(),
1772           triggered_session_id);
1773     }
1774   }  // for (trigger_name : triggers)
1775 }
1776 
1777 // Always invoked TraceConfig.data_source_stop_timeout_ms (by default
1778 // kDataSourceStopTimeoutMs) after DisableTracing(). In nominal conditions all
1779 // data sources should have acked the stop and this will early out.
OnDisableTracingTimeout(TracingSessionID tsid)1780 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1781   PERFETTO_DCHECK_THREAD(thread_checker_);
1782   TracingSession* tracing_session = GetTracingSession(tsid);
1783   if (!tracing_session ||
1784       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1785     return;  // Tracing session was successfully disabled.
1786   }
1787 
1788   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1789                 tsid);
1790   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1791   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1792 }
1793 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1794 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1795     TracingSession* tracing_session) {
1796   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1797   for (auto& inst_kv : tracing_session->data_source_instances) {
1798     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1799       continue;
1800     inst_kv.second.state = DataSourceInstance::STOPPED;
1801     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1802     PERFETTO_DCHECK(producer);
1803     if (tracing_session->consumer_maybe_null) {
1804       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1805           *producer, inst_kv.second);
1806     }
1807   }
1808   tracing_session->state = TracingSession::DISABLED;
1809 
1810   // Scrape any remaining chunks that weren't flushed by the producers.
1811   for (auto& producer_id_and_producer : producers_)
1812     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1813 
1814   SnapshotLifecyleEvent(
1815       tracing_session,
1816       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1817       true /* snapshot_clocks */);
1818 
1819   if (tracing_session->write_into_file) {
1820     tracing_session->write_period_ms = 0;
1821     ReadBuffersIntoFile(tracing_session->id);
1822   }
1823 
1824   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1825                       PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1826 
1827   if (tracing_session->consumer_maybe_null)
1828     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1829 }
1830 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1831 void TracingServiceImpl::Flush(TracingSessionID tsid,
1832                                uint32_t timeout_ms,
1833                                ConsumerEndpoint::FlushCallback callback,
1834                                FlushFlags flush_flags) {
1835   PERFETTO_DCHECK_THREAD(thread_checker_);
1836   TracingSession* tracing_session = GetTracingSession(tsid);
1837   if (!tracing_session) {
1838     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1839     return;
1840   }
1841 
1842   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
1843   for (const auto& [producer_id, ds_inst] :
1844        tracing_session->data_source_instances) {
1845     if (ds_inst.no_flush)
1846       continue;
1847     data_source_instances[producer_id].push_back(ds_inst.instance_id);
1848   }
1849   FlushDataSourceInstances(tracing_session, timeout_ms, data_source_instances,
1850                            std::move(callback), flush_flags);
1851 }
1852 
FlushDataSourceInstances(TracingSession * tracing_session,uint32_t timeout_ms,const std::map<ProducerID,std::vector<DataSourceInstanceID>> & data_source_instances,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1853 void TracingServiceImpl::FlushDataSourceInstances(
1854     TracingSession* tracing_session,
1855     uint32_t timeout_ms,
1856     const std::map<ProducerID, std::vector<DataSourceInstanceID>>&
1857         data_source_instances,
1858     ConsumerEndpoint::FlushCallback callback,
1859     FlushFlags flush_flags) {
1860   PERFETTO_DCHECK_THREAD(thread_checker_);
1861   if (!timeout_ms)
1862     timeout_ms = tracing_session->flush_timeout_ms();
1863 
1864   if (tracing_session->pending_flushes.size() > 1000) {
1865     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1866                   tracing_session->pending_flushes.size());
1867     callback(false);
1868     return;
1869   }
1870 
1871   if (tracing_session->state != TracingSession::STARTED) {
1872     PERFETTO_LOG("Flush() called, but tracing has not been started");
1873     callback(false);
1874     return;
1875   }
1876 
1877   ++tracing_session->flushes_requested;
1878   FlushRequestID flush_request_id = ++last_flush_request_id_;
1879   PendingFlush& pending_flush =
1880       tracing_session->pending_flushes
1881           .emplace_hint(tracing_session->pending_flushes.end(),
1882                         flush_request_id, PendingFlush(std::move(callback)))
1883           ->second;
1884 
1885   // Send a flush request to each producer involved in the tracing session. In
1886   // order to issue a flush request we have to build a map of all data source
1887   // instance ids enabled for each producer.
1888 
1889   for (const auto& [producer_id, data_sources] : data_source_instances) {
1890     ProducerEndpointImpl* producer = GetProducer(producer_id);
1891     producer->Flush(flush_request_id, data_sources, flush_flags);
1892     pending_flush.producers.insert(producer_id);
1893   }
1894 
1895   // If there are no producers to flush (realistically this happens only in
1896   // some tests) fire OnFlushTimeout() straight away, without waiting.
1897   if (data_source_instances.empty())
1898     timeout_ms = 0;
1899 
1900   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1901   task_runner_->PostDelayedTask(
1902       [weak_this, tsid = tracing_session->id, flush_request_id] {
1903         if (weak_this)
1904           weak_this->OnFlushTimeout(tsid, flush_request_id);
1905       },
1906       timeout_ms);
1907 }
1908 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1909 void TracingServiceImpl::NotifyFlushDoneForProducer(
1910     ProducerID producer_id,
1911     FlushRequestID flush_request_id) {
1912   for (auto& kv : tracing_sessions_) {
1913     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1914     auto& pending_flushes = kv.second.pending_flushes;
1915     auto end_it = pending_flushes.upper_bound(flush_request_id);
1916     for (auto it = pending_flushes.begin(); it != end_it;) {
1917       PendingFlush& pending_flush = it->second;
1918       pending_flush.producers.erase(producer_id);
1919       if (pending_flush.producers.empty()) {
1920         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1921         TracingSessionID tsid = kv.first;
1922         auto callback = std::move(pending_flush.callback);
1923         task_runner_->PostTask([weak_this, tsid, callback]() {
1924           if (weak_this) {
1925             weak_this->CompleteFlush(tsid, std::move(callback),
1926                                      /*success=*/true);
1927           }
1928         });
1929         it = pending_flushes.erase(it);
1930       } else {
1931         it++;
1932       }
1933     }  // for (pending_flushes)
1934   }  // for (tracing_session)
1935 }
1936 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1937 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1938                                         FlushRequestID flush_request_id) {
1939   TracingSession* tracing_session = GetTracingSession(tsid);
1940   if (!tracing_session)
1941     return;
1942   auto it = tracing_session->pending_flushes.find(flush_request_id);
1943   if (it == tracing_session->pending_flushes.end())
1944     return;  // Nominal case: flush was completed and acked on time.
1945 
1946   // If there were no producers to flush, consider it a success.
1947   bool success = it->second.producers.empty();
1948   auto callback = std::move(it->second.callback);
1949   tracing_session->pending_flushes.erase(it);
1950   CompleteFlush(tsid, std::move(callback), success);
1951 }
1952 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1953 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1954                                        ConsumerEndpoint::FlushCallback callback,
1955                                        bool success) {
1956   TracingSession* tracing_session = GetTracingSession(tsid);
1957   if (!tracing_session) {
1958     callback(false);
1959     return;
1960   }
1961   // Producers may not have been able to flush all their data, even if they
1962   // indicated flush completion. If possible, also collect uncommitted chunks
1963   // to make sure we have everything they wrote so far.
1964   for (auto& producer_id_and_producer : producers_) {
1965     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1966   }
1967   SnapshotLifecyleEvent(
1968       tracing_session,
1969       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
1970       true /* snapshot_clocks */);
1971 
1972   tracing_session->flushes_succeeded += success ? 1 : 0;
1973   tracing_session->flushes_failed += success ? 0 : 1;
1974   callback(success);
1975 }
1976 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1977 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1978     TracingSession* tracing_session,
1979     ProducerEndpointImpl* producer) {
1980   if (!producer->smb_scraping_enabled_)
1981     return;
1982 
1983   // Can't copy chunks if we don't know about any trace writers.
1984   if (producer->writers_.empty())
1985     return;
1986 
1987   // Performance optimization: On flush or session disconnect, this method is
1988   // called for each producer. If the producer doesn't participate in the
1989   // session, there's no need to scape its chunks right now. We can tell if a
1990   // producer participates in the session by checking if the producer is allowed
1991   // to write into the session's log buffers.
1992   const auto& session_buffers = tracing_session->buffers_index;
1993   bool producer_in_session =
1994       std::any_of(session_buffers.begin(), session_buffers.end(),
1995                   [producer](BufferID buffer_id) {
1996                     return producer->allowed_target_buffers_.count(buffer_id);
1997                   });
1998   if (!producer_in_session)
1999     return;
2000 
2001   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
2002 
2003   // Find and copy any uncommitted chunks from the SMB.
2004   //
2005   // In nominal conditions, the page layout of the used SMB pages should never
2006   // change because the service is the only one who is supposed to modify used
2007   // pages (to make them free again).
2008   //
2009   // However, the code here needs to deal with the case of a malicious producer
2010   // altering the SMB in unpredictable ways. Thankfully the SMB size is
2011   // immutable, so a chunk will always point to some valid memory, even if the
2012   // producer alters the intended layout and chunk header concurrently.
2013   // Ultimately a malicious producer altering the SMB's chunk layout while we
2014   // are iterating in this function is not any different from the case of a
2015   // malicious producer asking to commit a chunk made of random data, which is
2016   // something this class has to deal with regardless.
2017   //
2018   // The only legitimate mutations that can happen from sane producers,
2019   // concurrently to this function, are:
2020   //   A. free pages being partitioned,
2021   //   B. free chunks being migrated to kChunkBeingWritten,
2022   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
2023 
2024   SharedMemoryABI* abi = &producer->shmem_abi_;
2025   // num_pages() is immutable after the SMB is initialized and cannot be changed
2026   // even by a producer even if malicious.
2027   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
2028     uint32_t layout = abi->GetPageLayout(page_idx);
2029 
2030     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
2031     // Skip empty pages.
2032     if (used_chunks == 0)
2033       continue;
2034 
2035     // Scrape the chunks that are currently used. These should be either in
2036     // state kChunkBeingWritten or kChunkComplete.
2037     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
2038       if (!(used_chunks & 1))
2039         continue;
2040 
2041       SharedMemoryABI::ChunkState state =
2042           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
2043       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
2044                       state == SharedMemoryABI::kChunkComplete);
2045       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
2046 
2047       SharedMemoryABI::Chunk chunk =
2048           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
2049 
2050       uint16_t packet_count;
2051       uint8_t flags;
2052       // GetPacketCountAndFlags has acquire_load semantics.
2053       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
2054 
2055       // It only makes sense to copy an incomplete chunk if there's at least
2056       // one full packet available. (The producer may not have completed the
2057       // last packet in it yet, so we need at least 2.)
2058       if (!chunk_complete && packet_count < 2)
2059         continue;
2060 
2061       // At this point, it is safe to access the remaining header fields of
2062       // the chunk. Even if the chunk was only just transferred from
2063       // kChunkFree into kChunkBeingWritten state, the header should be
2064       // written completely once the packet count increased above 1 (it was
2065       // reset to 0 by the service when the chunk was freed).
2066 
2067       WriterID writer_id = chunk.writer_id();
2068       std::optional<BufferID> target_buffer_id =
2069           producer->buffer_id_for_writer(writer_id);
2070 
2071       // We can only scrape this chunk if we know which log buffer to copy it
2072       // into.
2073       if (!target_buffer_id)
2074         continue;
2075 
2076       // Skip chunks that don't belong to the requested tracing session.
2077       bool target_buffer_belongs_to_session =
2078           std::find(session_buffers.begin(), session_buffers.end(),
2079                     *target_buffer_id) != session_buffers.end();
2080       if (!target_buffer_belongs_to_session)
2081         continue;
2082 
2083       uint32_t chunk_id =
2084           chunk.header()->chunk_id.load(std::memory_order_relaxed);
2085 
2086       CopyProducerPageIntoLogBuffer(
2087           producer->id_, producer->client_identity_, writer_id, chunk_id,
2088           *target_buffer_id, packet_count, flags, chunk_complete,
2089           chunk.payload_begin(), chunk.payload_size());
2090     }
2091   }
2092 }
2093 
FlushAndDisableTracing(TracingSessionID tsid)2094 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
2095   PERFETTO_DCHECK_THREAD(thread_checker_);
2096   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
2097   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2098   Flush(
2099       tsid, 0,
2100       [weak_this, tsid](bool success) {
2101         // This was a DLOG up to Jun 2021 (v16, Android S).
2102         PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d",
2103                      tsid, success);
2104         if (!weak_this)
2105           return;
2106         TracingSession* session = weak_this->GetTracingSession(tsid);
2107         if (!session) {
2108           return;
2109         }
2110         session->final_flush_outcome = success
2111                                            ? TraceStats::FINAL_FLUSH_SUCCEEDED
2112                                            : TraceStats::FINAL_FLUSH_FAILED;
2113         if (session->consumer_maybe_null) {
2114           // If the consumer is still attached, just disable the session but
2115           // give it a chance to read the contents.
2116           weak_this->DisableTracing(tsid);
2117         } else {
2118           // If the consumer detached, destroy the session. If the consumer did
2119           // start the session in long-tracing mode, the service will have saved
2120           // the contents to the passed file. If not, the contents will be
2121           // destroyed.
2122           weak_this->FreeBuffers(tsid);
2123         }
2124       },
2125       FlushFlags(FlushFlags::Initiator::kTraced,
2126                  FlushFlags::Reason::kTraceStop));
2127 }
2128 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)2129 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
2130                                            bool post_next_only) {
2131   PERFETTO_DCHECK_THREAD(thread_checker_);
2132   TracingSession* tracing_session = GetTracingSession(tsid);
2133   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2134     return;
2135 
2136   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
2137   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2138   task_runner_->PostDelayedTask(
2139       [weak_this, tsid] {
2140         if (weak_this)
2141           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
2142       },
2143       flush_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2144                                               flush_period_ms));
2145 
2146   if (post_next_only)
2147     return;
2148 
2149   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
2150   Flush(
2151       tsid, 0,
2152       [](bool success) {
2153         if (!success)
2154           PERFETTO_ELOG("Periodic flush timed out");
2155       },
2156       FlushFlags(FlushFlags::Initiator::kTraced,
2157                  FlushFlags::Reason::kPeriodic));
2158 }
2159 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)2160 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
2161     TracingSessionID tsid,
2162     bool post_next_only) {
2163   PERFETTO_DCHECK_THREAD(thread_checker_);
2164   TracingSession* tracing_session = GetTracingSession(tsid);
2165   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2166     return;
2167 
2168   uint32_t clear_period_ms =
2169       tracing_session->config.incremental_state_config().clear_period_ms();
2170   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2171   task_runner_->PostDelayedTask(
2172       [weak_this, tsid] {
2173         if (weak_this)
2174           weak_this->PeriodicClearIncrementalStateTask(
2175               tsid, /*post_next_only=*/false);
2176       },
2177       clear_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2178                                               clear_period_ms));
2179 
2180   if (post_next_only)
2181     return;
2182 
2183   PERFETTO_DLOG(
2184       "Performing periodic incremental state clear for trace session %" PRIu64,
2185       tsid);
2186 
2187   // Queue the IPCs to producers with active data sources that opted in.
2188   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
2189   for (const auto& kv : tracing_session->data_source_instances) {
2190     ProducerID producer_id = kv.first;
2191     const DataSourceInstance& data_source = kv.second;
2192     if (data_source.handles_incremental_state_clear) {
2193       clear_map[producer_id].push_back(data_source.instance_id);
2194     }
2195   }
2196 
2197   for (const auto& kv : clear_map) {
2198     ProducerID producer_id = kv.first;
2199     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
2200     ProducerEndpointImpl* producer = GetProducer(producer_id);
2201     if (!producer) {
2202       PERFETTO_DFATAL("Producer does not exist.");
2203       continue;
2204     }
2205     producer->ClearIncrementalState(data_sources);
2206   }
2207 }
2208 
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2209 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2210     TracingSessionID tsid,
2211     ConsumerEndpointImpl* consumer) {
2212   PERFETTO_DCHECK(consumer);
2213   PERFETTO_DCHECK_THREAD(thread_checker_);
2214   TracingSession* tracing_session = GetTracingSession(tsid);
2215   if (!tracing_session) {
2216     PERFETTO_DLOG(
2217         "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2218     return false;
2219   }
2220 
2221   if (tracing_session->write_into_file) {
2222     // If the consumer enabled tracing and asked to save the contents into the
2223     // passed file makes little sense to also try to read the buffers over IPC,
2224     // as that would just steal data from the periodic draining task.
2225     PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2226     return false;
2227   }
2228 
2229   if (IsWaitingForTrigger(tracing_session))
2230     return false;
2231 
2232   // This is a rough threshold to determine how much to read from the buffer in
2233   // each task. This is to avoid executing a single huge sending task for too
2234   // long and risk to hit the watchdog. This is *not* an upper bound: we just
2235   // stop accumulating new packets and PostTask *after* we cross this threshold.
2236   // This constant essentially balances the PostTask and IPC overhead vs the
2237   // responsiveness of the service. An extremely small value will cause one IPC
2238   // and one PostTask for each slice but will keep the service extremely
2239   // responsive. An extremely large value will batch the send for the full
2240   // buffer in one large task, will hit the blocking send() once the socket
2241   // buffers are full and hang the service for a bit (until the consumer
2242   // catches up).
2243   static constexpr size_t kApproxBytesPerTask = 32768;
2244   bool has_more;
2245   std::vector<TracePacket> packets =
2246       ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2247 
2248   if (has_more) {
2249     auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2250     auto weak_this = weak_ptr_factory_.GetWeakPtr();
2251     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
2252       if (!weak_this || !weak_consumer)
2253         return;
2254       weak_this->ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2255     });
2256   }
2257 
2258   // Keep this as tail call, just in case the consumer re-enters.
2259   consumer->consumer_->OnTraceData(std::move(packets), has_more);
2260   return true;
2261 }
2262 
ReadBuffersIntoFile(TracingSessionID tsid)2263 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2264   PERFETTO_DCHECK_THREAD(thread_checker_);
2265   TracingSession* tracing_session = GetTracingSession(tsid);
2266   if (!tracing_session) {
2267     // This will be hit systematically from the PostDelayedTask. Avoid logging,
2268     // it would be just spam.
2269     return false;
2270   }
2271 
2272   // This can happen if the file is closed by a previous task because it reaches
2273   // |max_file_size_bytes|.
2274   if (!tracing_session->write_into_file)
2275     return false;
2276 
2277   if (IsWaitingForTrigger(tracing_session))
2278     return false;
2279 
2280   // ReadBuffers() can allocate memory internally, for filtering. By limiting
2281   // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2282   // we limit the amount of memory used on each iteration.
2283   //
2284   // It would be tempting to split this into multiple tasks like in
2285   // ReadBuffersIntoConsumer, but that's not currently possible.
2286   // ReadBuffersIntoFile has to read the whole available data before returning,
2287   // to support the disable_immediately=true code paths.
2288   bool has_more = true;
2289   bool stop_writing_into_file = false;
2290   do {
2291     std::vector<TracePacket> packets =
2292         ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2293 
2294     stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2295   } while (has_more && !stop_writing_into_file);
2296 
2297   if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2298     // Ensure all data was written to the file before we close it.
2299     base::FlushFile(tracing_session->write_into_file.get());
2300     tracing_session->write_into_file.reset();
2301     tracing_session->write_period_ms = 0;
2302     if (tracing_session->state == TracingSession::STARTED)
2303       DisableTracing(tsid);
2304     return true;
2305   }
2306 
2307   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2308   task_runner_->PostDelayedTask(
2309       [weak_this, tsid] {
2310         if (weak_this)
2311           weak_this->ReadBuffersIntoFile(tsid);
2312       },
2313       tracing_session->delay_to_next_write_period_ms());
2314   return true;
2315 }
2316 
IsWaitingForTrigger(TracingSession * tracing_session)2317 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2318   // Ignore the logic below for cloned tracing sessions. In this case we
2319   // actually want to read the (cloned) trace buffers even if no trigger was
2320   // hit.
2321   if (tracing_session->state == TracingSession::CLONED_READ_ONLY) {
2322     return false;
2323   }
2324 
2325   // When a tracing session is waiting for a trigger, it is considered empty. If
2326   // a tracing session finishes and moves into DISABLED without ever receiving a
2327   // trigger, the trace should never return any data. This includes the
2328   // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2329   // early and let the consumer know there is no data.
2330   if (!tracing_session->config.trigger_config().triggers().empty() &&
2331       tracing_session->received_triggers.empty()) {
2332     PERFETTO_DLOG(
2333         "ReadBuffers(): tracing session has not received a trigger yet.");
2334     return true;
2335   }
2336 
2337   // Traces with CLONE_SNAPSHOT triggers are a special case of the above. They
2338   // can be read only via a CloneSession() request. This is to keep the
2339   // behavior consistent with the STOP_TRACING+triggers case and avoid periodic
2340   // finalizations and uploads of the main CLONE_SNAPSHOT triggers.
2341   if (GetTriggerMode(tracing_session->config) ==
2342       TraceConfig::TriggerConfig::CLONE_SNAPSHOT) {
2343     PERFETTO_DLOG(
2344         "ReadBuffers(): skipping because the tracing session has "
2345         "CLONE_SNAPSHOT triggers defined");
2346     return true;
2347   }
2348 
2349   return false;
2350 }
2351 
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2352 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2353     TracingSession* tracing_session,
2354     size_t threshold,
2355     bool* has_more) {
2356   PERFETTO_DCHECK_THREAD(thread_checker_);
2357   PERFETTO_DCHECK(tracing_session);
2358   *has_more = false;
2359 
2360   std::vector<TracePacket> packets;
2361   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
2362 
2363   if (!tracing_session->initial_clock_snapshot.empty()) {
2364     EmitClockSnapshot(tracing_session,
2365                       std::move(tracing_session->initial_clock_snapshot),
2366                       &packets);
2367   }
2368 
2369   for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2370     PERFETTO_DCHECK(!snapshot.empty());
2371     EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2372   }
2373   tracing_session->clock_snapshot_ring_buffer.clear();
2374 
2375   if (tracing_session->should_emit_sync_marker) {
2376     EmitSyncMarker(&packets);
2377     tracing_session->should_emit_sync_marker = false;
2378   }
2379 
2380   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2381     MaybeEmitTraceConfig(tracing_session, &packets);
2382     MaybeEmitReceivedTriggers(tracing_session, &packets);
2383   }
2384   if (!tracing_session->did_emit_initial_packets) {
2385     EmitUuid(tracing_session, &packets);
2386     if (!tracing_session->config.builtin_data_sources().disable_system_info())
2387       EmitSystemInfo(&packets);
2388   }
2389   tracing_session->did_emit_initial_packets = true;
2390 
2391   // Note that in the proto comment, we guarantee that the tracing_started
2392   // lifecycle event will be emitted before any data packets so make sure to
2393   // keep this before reading the tracing buffers.
2394   if (!tracing_session->config.builtin_data_sources().disable_service_events())
2395     EmitLifecycleEvents(tracing_session, &packets);
2396 
2397   // In a multi-machine tracing session, emit clock synchronization messages for
2398   // remote machines.
2399   if (!relay_clients_.empty())
2400     MaybeEmitRemoteClockSync(tracing_session, &packets);
2401 
2402   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
2403 
2404   // Add up size for packets added by the Maybe* calls above.
2405   for (const TracePacket& packet : packets) {
2406     packets_bytes += packet.size();
2407   }
2408 
2409   bool did_hit_threshold = false;
2410 
2411   for (size_t buf_idx = 0;
2412        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2413        buf_idx++) {
2414     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2415     if (tbuf_iter == buffers_.end()) {
2416       PERFETTO_DFATAL("Buffer not found.");
2417       continue;
2418     }
2419     TraceBuffer& tbuf = *tbuf_iter->second;
2420     tbuf.BeginRead();
2421     while (!did_hit_threshold) {
2422       TracePacket packet;
2423       TraceBuffer::PacketSequenceProperties sequence_properties{};
2424       bool previous_packet_dropped;
2425       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2426                                     &previous_packet_dropped)) {
2427         break;
2428       }
2429       packet.set_buffer_index_for_stats(static_cast<uint32_t>(buf_idx));
2430       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2431       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2432       PERFETTO_DCHECK(sequence_properties.client_identity_trusted.has_uid());
2433       // Not checking sequence_properties.client_identity_trusted.has_pid():
2434       // it is false if the platform doesn't support it.
2435 
2436       PERFETTO_DCHECK(packet.size() > 0);
2437       if (!PacketStreamValidator::Validate(packet.slices())) {
2438         tracing_session->invalid_packets++;
2439         PERFETTO_DLOG("Dropping invalid packet");
2440         continue;
2441       }
2442 
2443       // Append a slice with the trusted field data. This can't be spoofed
2444       // because above we validated that the existing slices don't contain any
2445       // trusted fields. For added safety we append instead of prepending
2446       // because according to protobuf semantics, if the same field is
2447       // encountered multiple times the last instance takes priority. Note that
2448       // truncated packets are also rejected, so the producer can't give us a
2449       // partial packet (e.g., a truncated string) which only becomes valid when
2450       // the trusted data is appended here.
2451       Slice slice = Slice::Allocate(32);
2452       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2453           slice.own_data(), slice.size);
2454       const auto& client_identity_trusted =
2455           sequence_properties.client_identity_trusted;
2456       trusted_packet->set_trusted_uid(
2457           static_cast<int32_t>(client_identity_trusted.uid()));
2458       trusted_packet->set_trusted_packet_sequence_id(
2459           tracing_session->GetPacketSequenceID(
2460               client_identity_trusted.machine_id(),
2461               sequence_properties.producer_id_trusted,
2462               sequence_properties.writer_id));
2463       if (client_identity_trusted.has_pid()) {
2464         // Not supported on all platforms.
2465         trusted_packet->set_trusted_pid(
2466             static_cast<int32_t>(client_identity_trusted.pid()));
2467       }
2468       if (client_identity_trusted.has_non_default_machine_id()) {
2469         trusted_packet->set_machine_id(client_identity_trusted.machine_id());
2470       }
2471       if (previous_packet_dropped)
2472         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2473       slice.size = trusted_packet.Finalize();
2474       packet.AddSlice(std::move(slice));
2475 
2476       // Append the packet (inclusive of the trusted uid) to |packets|.
2477       packets_bytes += packet.size();
2478       did_hit_threshold = packets_bytes >= threshold;
2479       packets.emplace_back(std::move(packet));
2480     }  // for(packets...)
2481   }  // for(buffers...)
2482 
2483   *has_more = did_hit_threshold;
2484 
2485   // Only emit the "read complete" lifetime event when there is no more trace
2486   // data available to read. These events are used as safe points to limit
2487   // sorting in trace processor: the code shouldn't emit the event unless the
2488   // buffers are empty.
2489   if (!*has_more && !tracing_session->config.builtin_data_sources()
2490                          .disable_service_events()) {
2491     // We don't bother snapshotting clocks here because we wouldn't be able to
2492     // emit it and we shouldn't have significant drift from the last snapshot in
2493     // any case.
2494     SnapshotLifecyleEvent(tracing_session,
2495                           protos::pbzero::TracingServiceEvent::
2496                               kReadTracingBuffersCompletedFieldNumber,
2497                           false /* snapshot_clocks */);
2498     EmitLifecycleEvents(tracing_session, &packets);
2499   }
2500 
2501   // Only emit the stats when there is no more trace data is available to read.
2502   // That way, any problems that occur while reading from the buffers are
2503   // reflected in the emitted stats. This is particularly important for use
2504   // cases where ReadBuffers is only ever called after the tracing session is
2505   // stopped.
2506   if (!*has_more && tracing_session->should_emit_stats) {
2507     EmitStats(tracing_session, &packets);
2508     tracing_session->should_emit_stats = false;
2509   }
2510 
2511   MaybeFilterPackets(tracing_session, &packets);
2512 
2513   MaybeCompressPackets(tracing_session, &packets);
2514 
2515   if (!*has_more) {
2516     // We've observed some extremely high memory usage by scudo after
2517     // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2518     // now, but this code asks scudo to release memory just in case.
2519     base::MaybeReleaseAllocatorMemToOS();
2520   }
2521 
2522   return packets;
2523 }
2524 
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2525 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2526                                             std::vector<TracePacket>* packets) {
2527   // If the tracing session specified a filter, run all packets through the
2528   // filter and replace them with the filter results.
2529   // The process below mantains the cardinality of input packets. Even if an
2530   // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2531   // makes debugging and reasoning about the trace stats easier.
2532   // This place swaps the contents of each |packets| entry in place.
2533   if (!tracing_session->trace_filter) {
2534     return;
2535   }
2536   protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2537   // The filter root should be reset from protos.Trace to protos.TracePacket
2538   // by the earlier call to SetFilterRoot() in EnableTracing().
2539   PERFETTO_DCHECK(trace_filter.config().root_msg_index() != 0);
2540   std::vector<protozero::MessageFilter::InputSlice> filter_input;
2541   auto start = base::GetWallTimeNs();
2542   for (TracePacket& packet : *packets) {
2543     const auto& packet_slices = packet.slices();
2544     const size_t input_packet_size = packet.size();
2545     filter_input.clear();
2546     filter_input.resize(packet_slices.size());
2547     ++tracing_session->filter_input_packets;
2548     tracing_session->filter_input_bytes += input_packet_size;
2549     for (size_t i = 0; i < packet_slices.size(); ++i)
2550       filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2551     auto filtered_packet = trace_filter.FilterMessageFragments(
2552         &filter_input[0], filter_input.size());
2553 
2554     // Replace the packet in-place with the filtered one (unless failed).
2555     std::optional<uint32_t> maybe_buffer_idx = packet.buffer_index_for_stats();
2556     packet = TracePacket();
2557     if (filtered_packet.error) {
2558       ++tracing_session->filter_errors;
2559       PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2560                     tracing_session->filter_input_packets);
2561       continue;
2562     }
2563     tracing_session->filter_output_bytes += filtered_packet.size;
2564     if (maybe_buffer_idx.has_value()) {
2565       // Keep the per-buffer stats updated. Also propagate the
2566       // buffer_index_for_stats in the output packet to allow accounting by
2567       // other parts of the ReadBuffer pipeline.
2568       uint32_t buffer_idx = maybe_buffer_idx.value();
2569       packet.set_buffer_index_for_stats(buffer_idx);
2570       auto& vec = tracing_session->filter_bytes_discarded_per_buffer;
2571       if (static_cast<size_t>(buffer_idx) >= vec.size())
2572         vec.resize(buffer_idx + 1);
2573       PERFETTO_DCHECK(input_packet_size >= filtered_packet.size);
2574       size_t bytes_filtered_out = input_packet_size - filtered_packet.size;
2575       vec[buffer_idx] += bytes_filtered_out;
2576     }
2577     AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2578                               filtered_packet.size, kMaxTracePacketSliceSize,
2579                               &packet);
2580   }
2581   auto end = base::GetWallTimeNs();
2582   tracing_session->filter_time_taken_ns +=
2583       static_cast<uint64_t>((end - start).count());
2584 }
2585 
MaybeCompressPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2586 void TracingServiceImpl::MaybeCompressPackets(
2587     TracingSession* tracing_session,
2588     std::vector<TracePacket>* packets) {
2589   if (!tracing_session->compress_deflate) {
2590     return;
2591   }
2592 
2593   init_opts_.compressor_fn(packets);
2594 }
2595 
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2596 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2597                                        std::vector<TracePacket> packets) {
2598   if (!tracing_session->write_into_file) {
2599     return false;
2600   }
2601   const uint64_t max_size = tracing_session->max_file_size_bytes
2602                                 ? tracing_session->max_file_size_bytes
2603                                 : std::numeric_limits<size_t>::max();
2604 
2605   size_t total_slices = 0;
2606   for (const TracePacket& packet : packets) {
2607     total_slices += packet.slices().size();
2608   }
2609   // When writing into a file, the file should look like a root trace.proto
2610   // message. Each packet should be prepended with a proto preamble stating
2611   // its field id (within trace.proto) and size. Hence the addition below.
2612   const size_t max_iovecs = total_slices + packets.size();
2613 
2614   size_t num_iovecs = 0;
2615   bool stop_writing_into_file = false;
2616   std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2617   size_t num_iovecs_at_last_packet = 0;
2618   uint64_t bytes_about_to_be_written = 0;
2619   for (TracePacket& packet : packets) {
2620     std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2621         packet.GetProtoPreamble();
2622     bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2623     num_iovecs++;
2624     for (const Slice& slice : packet.slices()) {
2625       // writev() doesn't change the passed pointer. However, struct iovec
2626       // take a non-const ptr because it's the same struct used by readv().
2627       // Hence the const_cast here.
2628       char* start = static_cast<char*>(const_cast<void*>(slice.start));
2629       bytes_about_to_be_written += slice.size;
2630       iovecs[num_iovecs++] = {start, slice.size};
2631     }
2632 
2633     if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2634         max_size) {
2635       stop_writing_into_file = true;
2636       num_iovecs = num_iovecs_at_last_packet;
2637       break;
2638     }
2639 
2640     num_iovecs_at_last_packet = num_iovecs;
2641   }
2642   PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2643   int fd = *tracing_session->write_into_file;
2644 
2645   uint64_t total_wr_size = 0;
2646 
2647   // writev() can take at most IOV_MAX entries per call. Batch them.
2648   constexpr size_t kIOVMax = IOV_MAX;
2649   for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2650     int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2651     ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2652     if (wr_size <= 0) {
2653       PERFETTO_PLOG("writev() failed");
2654       stop_writing_into_file = true;
2655       break;
2656     }
2657     total_wr_size += static_cast<size_t>(wr_size);
2658   }
2659 
2660   tracing_session->bytes_written_into_file += total_wr_size;
2661 
2662   PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2663                 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2664   return stop_writing_into_file;
2665 }
2666 
FreeBuffers(TracingSessionID tsid)2667 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2668   PERFETTO_DCHECK_THREAD(thread_checker_);
2669   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2670   TracingSession* tracing_session = GetTracingSession(tsid);
2671   if (!tracing_session) {
2672     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2673     return;  // TODO(primiano): signal failure?
2674   }
2675   DisableTracing(tsid, /*disable_immediately=*/true);
2676 
2677   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2678   tracing_session->data_source_instances.clear();
2679 
2680   for (auto& producer_entry : producers_) {
2681     ProducerEndpointImpl* producer = producer_entry.second;
2682     producer->OnFreeBuffers(tracing_session->buffers_index);
2683   }
2684 
2685   for (BufferID buffer_id : tracing_session->buffers_index) {
2686     buffer_ids_.Free(buffer_id);
2687     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2688     buffers_.erase(buffer_id);
2689   }
2690   bool notify_traceur =
2691       tracing_session->config.notify_traceur() &&
2692       tracing_session->state != TracingSession::CLONED_READ_ONLY;
2693   bool is_long_trace =
2694       (tracing_session->config.write_into_file() &&
2695        tracing_session->config.file_write_period_ms() < kMillisPerDay);
2696   auto pending_clones = std::move(tracing_session->pending_clones);
2697   tracing_sessions_.erase(tsid);
2698   tracing_session = nullptr;
2699   UpdateMemoryGuardrail();
2700 
2701   for (const auto& id_to_clone_op : pending_clones) {
2702     const PendingClone& clone_op = id_to_clone_op.second;
2703     if (clone_op.weak_consumer) {
2704       task_runner_->PostTask([weak_consumer = clone_op.weak_consumer] {
2705         if (weak_consumer) {
2706           weak_consumer->consumer_->OnSessionCloned(
2707               {false, "Original session ended", {}});
2708         }
2709       });
2710     }
2711   }
2712 
2713   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2714                tracing_sessions_.size());
2715 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2716     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2717   if (notify_traceur && is_long_trace) {
2718     PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2719     if (!notify_fn || !notify_fn(/*session_stolen=*/false))
2720       PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2721   }
2722 #else
2723   base::ignore_result(notify_traceur);
2724   base::ignore_result(is_long_trace);
2725 #endif
2726 }
2727 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2728 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2729                                             const DataSourceDescriptor& desc) {
2730   PERFETTO_DCHECK_THREAD(thread_checker_);
2731   if (desc.name().empty()) {
2732     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2733     return;
2734   }
2735 
2736   ProducerEndpointImpl* producer = GetProducer(producer_id);
2737   if (!producer) {
2738     PERFETTO_DFATAL("Producer not found.");
2739     return;
2740   }
2741 
2742   // Check that the producer doesn't register two data sources with the same ID.
2743   // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2744   // field didn't exist.
2745   for (const auto& kv : data_sources_) {
2746     if (desc.id() && kv.second.producer_id == producer_id &&
2747         kv.second.descriptor.id() == desc.id()) {
2748       PERFETTO_ELOG(
2749           "Failed to register data source \"%s\". A data source with the same "
2750           "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2751           desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2752           producer_id);
2753       return;
2754     }
2755   }
2756 
2757   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2758                 producer_id, desc.name().c_str());
2759 
2760   auto reg_ds = data_sources_.emplace(desc.name(),
2761                                       RegisteredDataSource{producer_id, desc});
2762 
2763   // If there are existing tracing sessions, we need to check if the new
2764   // data source is enabled by any of them.
2765   for (auto& iter : tracing_sessions_) {
2766     TracingSession& tracing_session = iter.second;
2767     if (tracing_session.state != TracingSession::STARTED &&
2768         tracing_session.state != TracingSession::CONFIGURED) {
2769       continue;
2770     }
2771 
2772     TraceConfig::ProducerConfig producer_config;
2773     for (const auto& config : tracing_session.config.producers()) {
2774       if (producer->name_ == config.producer_name()) {
2775         producer_config = config;
2776         break;
2777       }
2778     }
2779     for (const TraceConfig::DataSource& cfg_data_source :
2780          tracing_session.config.data_sources()) {
2781       if (cfg_data_source.config().name() != desc.name())
2782         continue;
2783       DataSourceInstance* ds_inst = SetupDataSource(
2784           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2785       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2786         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2787     }
2788   }  // for(iter : tracing_sessions_)
2789 }
2790 
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2791 void TracingServiceImpl::UpdateDataSource(
2792     ProducerID producer_id,
2793     const DataSourceDescriptor& new_desc) {
2794   if (new_desc.id() == 0) {
2795     PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2796     return;
2797   }
2798 
2799   // If this producer has already registered a matching descriptor name and id,
2800   // just update the descriptor.
2801   RegisteredDataSource* data_source = nullptr;
2802   auto range = data_sources_.equal_range(new_desc.name());
2803   for (auto it = range.first; it != range.second; ++it) {
2804     if (it->second.producer_id == producer_id &&
2805         it->second.descriptor.id() == new_desc.id()) {
2806       data_source = &it->second;
2807       break;
2808     }
2809   }
2810 
2811   if (!data_source) {
2812     PERFETTO_ELOG(
2813         "UpdateDataSource() failed, could not find an existing data source "
2814         "with name=\"%s\" id=%" PRIu64,
2815         new_desc.name().c_str(), new_desc.id());
2816     return;
2817   }
2818 
2819   data_source->descriptor = new_desc;
2820 }
2821 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2822 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2823                                                 TracingSession* tracing_session,
2824                                                 DataSourceInstance* instance,
2825                                                 bool disable_immediately) {
2826   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2827   if (instance->will_notify_on_stop && !disable_immediately) {
2828     instance->state = DataSourceInstance::STOPPING;
2829   } else {
2830     instance->state = DataSourceInstance::STOPPED;
2831   }
2832   if (tracing_session->consumer_maybe_null) {
2833     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2834         *producer, *instance);
2835   }
2836   producer->StopDataSource(ds_inst_id);
2837 }
2838 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2839 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2840                                               const std::string& name) {
2841   PERFETTO_DCHECK_THREAD(thread_checker_);
2842   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2843                 producer_id, name.c_str());
2844   PERFETTO_CHECK(producer_id);
2845   ProducerEndpointImpl* producer = GetProducer(producer_id);
2846   PERFETTO_DCHECK(producer);
2847   for (auto& kv : tracing_sessions_) {
2848     auto& ds_instances = kv.second.data_source_instances;
2849     bool removed = false;
2850     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2851       if (it->first == producer_id && it->second.data_source_name == name) {
2852         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2853         if (it->second.state != DataSourceInstance::STOPPED) {
2854           if (it->second.state != DataSourceInstance::STOPPING) {
2855             StopDataSourceInstance(producer, &kv.second, &it->second,
2856                                    /* disable_immediately = */ false);
2857           }
2858 
2859           // Mark the instance as stopped immediately, since we are
2860           // unregistering it below.
2861           //
2862           //  The StopDataSourceInstance above might have set the state to
2863           //  STOPPING so this condition isn't an else.
2864           if (it->second.state == DataSourceInstance::STOPPING)
2865             NotifyDataSourceStopped(producer_id, ds_inst_id);
2866         }
2867         it = ds_instances.erase(it);
2868         removed = true;
2869       } else {
2870         ++it;
2871       }
2872     }  // for (data_source_instances)
2873     if (removed)
2874       MaybeNotifyAllDataSourcesStarted(&kv.second);
2875   }  // for (tracing_session)
2876 
2877   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2878     if (it->second.producer_id == producer_id &&
2879         it->second.descriptor.name() == name) {
2880       data_sources_.erase(it);
2881       return;
2882     }
2883   }
2884 
2885   PERFETTO_DFATAL(
2886       "Tried to unregister a non-existent data source \"%s\" for "
2887       "producer %" PRIu16,
2888       name.c_str(), producer_id);
2889 }
2890 
IsInitiatorPrivileged(const TracingSession & tracing_session)2891 bool TracingServiceImpl::IsInitiatorPrivileged(
2892     const TracingSession& tracing_session) {
2893 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2894   if (tracing_session.consumer_uid == 1066 /* AID_STATSD */ &&
2895       tracing_session.config.statsd_metadata().triggering_config_uid() !=
2896           2000 /* AID_SHELL */
2897       && tracing_session.config.statsd_metadata().triggering_config_uid() !=
2898              0 /* AID_ROOT */) {
2899     // StatsD can be triggered either by shell, root or an app that has DUMP and
2900     // USAGE_STATS permission. When triggered by shell or root, we do not want
2901     // to consider the trace a trusted system trace, as it was initiated by the
2902     // user. Otherwise, it has to come from an app with DUMP and
2903     // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2904     // system.
2905     // Check for shell / root: https://bit.ly/3b7oZNi
2906     // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2907     return true;
2908   }
2909   if (tracing_session.consumer_uid == 1000 /* AID_SYSTEM */) {
2910     // AID_SYSTEM is considered a privileged initiator so that system_server can
2911     // profile apps that are not profileable by shell. Other AID_SYSTEM
2912     // processes are not allowed by SELinux to connect to the consumer socket or
2913     // to exec perfetto.
2914     return true;
2915   }
2916 #else
2917   base::ignore_result(tracing_session);
2918 #endif
2919   return false;
2920 }
2921 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2922 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2923     const TraceConfig::DataSource& cfg_data_source,
2924     const TraceConfig::ProducerConfig& producer_config,
2925     const RegisteredDataSource& data_source,
2926     TracingSession* tracing_session) {
2927   PERFETTO_DCHECK_THREAD(thread_checker_);
2928   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2929   PERFETTO_DCHECK(producer);
2930   // An existing producer that is not ftrace could have registered itself as
2931   // ftrace, we must not enable it in that case.
2932   if (lockdown_mode_ && producer->uid() != uid_) {
2933     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2934     return nullptr;
2935   }
2936   // TODO(primiano): Add tests for registration ordering (data sources vs
2937   // consumers).
2938   if (!NameMatchesFilter(producer->name_,
2939                          cfg_data_source.producer_name_filter(),
2940                          cfg_data_source.producer_name_regex_filter())) {
2941     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2942                   cfg_data_source.config().name().c_str(),
2943                   producer->name_.c_str());
2944     return nullptr;
2945   }
2946 
2947   auto relative_buffer_id = cfg_data_source.config().target_buffer();
2948   if (relative_buffer_id >= tracing_session->num_buffers()) {
2949     PERFETTO_LOG(
2950         "The TraceConfig for DataSource %s specified a target_buffer out of "
2951         "bound (%d). Skipping it.",
2952         cfg_data_source.config().name().c_str(), relative_buffer_id);
2953     return nullptr;
2954   }
2955 
2956   // Create a copy of the DataSourceConfig specified in the trace config. This
2957   // will be passed to the producer after translating the |target_buffer| id.
2958   // The |target_buffer| parameter passed by the consumer in the trace config is
2959   // relative to the buffers declared in the same trace config. This has to be
2960   // translated to the global BufferID before passing it to the producers, which
2961   // don't know anything about tracing sessions and consumers.
2962 
2963   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2964   auto insert_iter = tracing_session->data_source_instances.emplace(
2965       std::piecewise_construct,  //
2966       std::forward_as_tuple(producer->id_),
2967       std::forward_as_tuple(
2968           inst_id,
2969           cfg_data_source.config(),  //  Deliberate copy.
2970           data_source.descriptor.name(),
2971           data_source.descriptor.will_notify_on_start(),
2972           data_source.descriptor.will_notify_on_stop(),
2973           data_source.descriptor.handles_incremental_state_clear(),
2974           data_source.descriptor.no_flush()));
2975   DataSourceInstance* ds_instance = &insert_iter->second;
2976 
2977   // New data source instance starts out in CONFIGURED state.
2978   if (tracing_session->consumer_maybe_null) {
2979     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2980         *producer, *ds_instance);
2981   }
2982 
2983   DataSourceConfig& ds_config = ds_instance->config;
2984   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2985 
2986   // Rationale for `if (prefer) set_prefer(true)`, rather than `set(prefer)`:
2987   // ComputeStartupConfigHash() in tracing_muxer_impl.cc compares hashes of the
2988   // DataSourceConfig and expects to know (and clear) the fields generated by
2989   // the tracing service. Unconditionally adding a new field breaks backward
2990   // compatibility of startup tracing with older SDKs, because the serialization
2991   // also propagates unkonwn fields, breaking the hash matching check.
2992   if (tracing_session->config.prefer_suspend_clock_for_duration())
2993     ds_config.set_prefer_suspend_clock_for_duration(true);
2994 
2995   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2996   ds_config.set_enable_extra_guardrails(
2997       tracing_session->config.enable_extra_guardrails());
2998   if (IsInitiatorPrivileged(*tracing_session)) {
2999     ds_config.set_session_initiator(
3000         DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
3001   } else {
3002     // Unset in case the consumer set it.
3003     // We need to be able to trust this field.
3004     ds_config.set_session_initiator(
3005         DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
3006   }
3007   ds_config.set_tracing_session_id(tracing_session->id);
3008   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
3009   PERFETTO_DCHECK(global_id);
3010   ds_config.set_target_buffer(global_id);
3011 
3012   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
3013                 ds_config.name().c_str(), global_id);
3014   if (!producer->shared_memory()) {
3015     // Determine the SMB page size. Must be an integer multiple of 4k.
3016     // As for the SMB size below, the decision tree is as follows:
3017     // 1. Give priority to what is defined in the trace config.
3018     // 2. If unset give priority to the hint passed by the producer.
3019     // 3. Keep within bounds and ensure it's a multiple of 4k.
3020     size_t page_size = producer_config.page_size_kb() * 1024;
3021     if (page_size == 0)
3022       page_size = producer->shmem_page_size_hint_bytes_;
3023 
3024     // Determine the SMB size. Must be an integer multiple of the SMB page size.
3025     // The decision tree is as follows:
3026     // 1. Give priority to what defined in the trace config.
3027     // 2. If unset give priority to the hint passed by the producer.
3028     // 3. Keep within bounds and ensure it's a multiple of the page size.
3029     size_t shm_size = producer_config.shm_size_kb() * 1024;
3030     if (shm_size == 0)
3031       shm_size = producer->shmem_size_hint_bytes_;
3032 
3033     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
3034     if (valid_sizes != std::tie(shm_size, page_size)) {
3035       PERFETTO_DLOG(
3036           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
3037           "back to shm_size %zu page_size %zu.",
3038           shm_size, page_size, std::get<0>(valid_sizes),
3039           std::get<1>(valid_sizes));
3040     }
3041     std::tie(shm_size, page_size) = valid_sizes;
3042 
3043     // TODO(primiano): right now Create() will suicide in case of OOM if the
3044     // mmap fails. We should instead gracefully fail the request and tell the
3045     // client to go away.
3046     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
3047                   producer->name_.c_str());
3048     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
3049     producer->SetupSharedMemory(std::move(shared_memory), page_size,
3050                                 /*provided_by_producer=*/false);
3051   }
3052   producer->SetupDataSource(inst_id, ds_config);
3053   return ds_instance;
3054 }
3055 
3056 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
3057 // might be lying / returning garbage contents. |src| and |size| can be trusted
3058 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,const ClientIdentity & client_identity_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)3059 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
3060     ProducerID producer_id_trusted,
3061     const ClientIdentity& client_identity_trusted,
3062     WriterID writer_id,
3063     ChunkID chunk_id,
3064     BufferID buffer_id,
3065     uint16_t num_fragments,
3066     uint8_t chunk_flags,
3067     bool chunk_complete,
3068     const uint8_t* src,
3069     size_t size) {
3070   PERFETTO_DCHECK_THREAD(thread_checker_);
3071 
3072   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
3073   if (!producer) {
3074     PERFETTO_DFATAL("Producer not found.");
3075     chunks_discarded_++;
3076     return;
3077   }
3078 
3079   TraceBuffer* buf = GetBufferByID(buffer_id);
3080   if (!buf) {
3081     PERFETTO_DLOG("Could not find target buffer %" PRIu16
3082                   " for producer %" PRIu16,
3083                   buffer_id, producer_id_trusted);
3084     chunks_discarded_++;
3085     return;
3086   }
3087 
3088   // Verify that the producer is actually allowed to write into the target
3089   // buffer specified in the request. This prevents a malicious producer from
3090   // injecting data into a log buffer that belongs to a tracing session the
3091   // producer is not part of.
3092   if (!producer->is_allowed_target_buffer(buffer_id)) {
3093     PERFETTO_ELOG("Producer %" PRIu16
3094                   " tried to write into forbidden target buffer %" PRIu16,
3095                   producer_id_trusted, buffer_id);
3096     PERFETTO_DFATAL("Forbidden target buffer");
3097     chunks_discarded_++;
3098     return;
3099   }
3100 
3101   // If the writer was registered by the producer, it should only write into the
3102   // buffer it was registered with.
3103   std::optional<BufferID> associated_buffer =
3104       producer->buffer_id_for_writer(writer_id);
3105   if (associated_buffer && *associated_buffer != buffer_id) {
3106     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
3107                   " was registered to write into target buffer %" PRIu16
3108                   ", but tried to write into buffer %" PRIu16,
3109                   writer_id, producer_id_trusted, *associated_buffer,
3110                   buffer_id);
3111     PERFETTO_DFATAL("Wrong target buffer");
3112     chunks_discarded_++;
3113     return;
3114   }
3115 
3116   buf->CopyChunkUntrusted(producer_id_trusted, client_identity_trusted,
3117                           writer_id, chunk_id, num_fragments, chunk_flags,
3118                           chunk_complete, src, size);
3119 }
3120 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)3121 void TracingServiceImpl::ApplyChunkPatches(
3122     ProducerID producer_id_trusted,
3123     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
3124   PERFETTO_DCHECK_THREAD(thread_checker_);
3125 
3126   for (const auto& chunk : chunks_to_patch) {
3127     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
3128     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
3129     TraceBuffer* buf =
3130         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
3131     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
3132                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
3133     if (!writer_id || writer_id > kMaxWriterID || !buf) {
3134       // This can genuinely happen when the trace is stopped. The producers
3135       // might see the stop signal with some delay and try to keep sending
3136       // patches left soon after.
3137       PERFETTO_DLOG(
3138           "Received invalid chunks_to_patch request from Producer: %" PRIu16
3139           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
3140           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
3141       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3142       continue;
3143     }
3144 
3145     // Note, there's no need to validate that the producer is allowed to write
3146     // to the specified buffer ID (or that it's the correct buffer ID for a
3147     // registered TraceWriter). That's because TraceBuffer uses the producer ID
3148     // and writer ID to look up the chunk to patch. If the producer specifies an
3149     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
3150     // patches. Because the producer ID is trusted, there's also no way for a
3151     // malicious producer to patch another producer's data.
3152 
3153     // Speculate on the fact that there are going to be a limited amount of
3154     // patches per request, so we can allocate the |patches| array on the stack.
3155     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
3156     if (chunk.patches().size() > patches.size()) {
3157       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
3158                     patches.size());
3159       PERFETTO_DFATAL("Too many patches");
3160       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3161       continue;
3162     }
3163 
3164     size_t i = 0;
3165     for (const auto& patch : chunk.patches()) {
3166       const std::string& patch_data = patch.data();
3167       if (patch_data.size() != patches[i].data.size()) {
3168         PERFETTO_ELOG("Received patch from producer: %" PRIu16
3169                       " of unexpected size %zu",
3170                       producer_id_trusted, patch_data.size());
3171         patches_discarded_++;
3172         continue;
3173       }
3174       patches[i].offset_untrusted = patch.offset();
3175       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
3176       i++;
3177     }
3178     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
3179                                &patches[0], i, chunk.has_more_patches());
3180   }
3181 }
3182 
GetDetachedSession(uid_t uid,const std::string & key)3183 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
3184     uid_t uid,
3185     const std::string& key) {
3186   PERFETTO_DCHECK_THREAD(thread_checker_);
3187   for (auto& kv : tracing_sessions_) {
3188     TracingSession* session = &kv.second;
3189     if (session->consumer_uid == uid && session->detach_key == key) {
3190       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
3191       return session;
3192     }
3193   }
3194   return nullptr;
3195 }
3196 
GetTracingSession(TracingSessionID tsid)3197 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
3198     TracingSessionID tsid) {
3199   PERFETTO_DCHECK_THREAD(thread_checker_);
3200   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
3201   if (it == tracing_sessions_.end())
3202     return nullptr;
3203   return &it->second;
3204 }
3205 
3206 TracingServiceImpl::TracingSession*
FindTracingSessionWithMaxBugreportScore()3207 TracingServiceImpl::FindTracingSessionWithMaxBugreportScore() {
3208   TracingSession* max_session = nullptr;
3209   for (auto& session_id_and_session : tracing_sessions_) {
3210     auto& session = session_id_and_session.second;
3211     const int32_t score = session.config.bugreport_score();
3212     // Exclude sessions with 0 (or below) score. By default tracing sessions
3213     // should NOT be eligible to be attached to bugreports.
3214     if (score <= 0 || session.state != TracingSession::STARTED)
3215       continue;
3216 
3217     if (!max_session || score > max_session->config.bugreport_score())
3218       max_session = &session;
3219   }
3220   return max_session;
3221 }
3222 
GetNextProducerID()3223 ProducerID TracingServiceImpl::GetNextProducerID() {
3224   PERFETTO_DCHECK_THREAD(thread_checker_);
3225   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
3226   do {
3227     ++last_producer_id_;
3228   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
3229   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
3230   return last_producer_id_;
3231 }
3232 
GetBufferByID(BufferID buffer_id)3233 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
3234   auto buf_iter = buffers_.find(buffer_id);
3235   if (buf_iter == buffers_.end())
3236     return nullptr;
3237   return &*buf_iter->second;
3238 }
3239 
OnStartTriggersTimeout(TracingSessionID tsid)3240 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
3241   // Skip entirely the flush if the trace session doesn't exist anymore.
3242   // This is to prevent misleading error messages to be logged.
3243   //
3244   // if the trace has started from the trigger we rely on
3245   // the |stop_delay_ms| from the trigger so don't flush and
3246   // disable if we've moved beyond a CONFIGURED state
3247   auto* tracing_session_ptr = GetTracingSession(tsid);
3248   if (tracing_session_ptr &&
3249       tracing_session_ptr->state == TracingSession::CONFIGURED) {
3250     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
3251                   " since no triggers activated.",
3252                   tsid);
3253     // No data should be returned from ReadBuffers() regardless of if we
3254     // call FreeBuffers() or DisableTracing(). This is because in
3255     // STOP_TRACING we need this promise in either case, and using
3256     // DisableTracing() allows a graceful shutdown. Consumers can follow
3257     // their normal path and check the buffers through ReadBuffers() and
3258     // the code won't hang because the tracing session will still be
3259     // alive just disabled.
3260     DisableTracing(tsid);
3261   }
3262 }
3263 
UpdateMemoryGuardrail()3264 void TracingServiceImpl::UpdateMemoryGuardrail() {
3265 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
3266   uint64_t total_buffer_bytes = 0;
3267 
3268   // Sum up all the shared memory buffers.
3269   for (const auto& id_to_producer : producers_) {
3270     if (id_to_producer.second->shared_memory())
3271       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
3272   }
3273 
3274   // Sum up all the trace buffers.
3275   for (const auto& id_to_buffer : buffers_) {
3276     total_buffer_bytes += id_to_buffer.second->size();
3277   }
3278 
3279   // Sum up all the cloned traced buffers.
3280   for (const auto& id_to_ts : tracing_sessions_) {
3281     const TracingSession& ts = id_to_ts.second;
3282     for (const auto& id_to_clone_op : ts.pending_clones) {
3283       const PendingClone& clone_op = id_to_clone_op.second;
3284       for (const std::unique_ptr<TraceBuffer>& buf : clone_op.buffers) {
3285         if (buf) {
3286           total_buffer_bytes += buf->size();
3287         }
3288       }
3289     }
3290   }
3291 
3292   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
3293   // interval.
3294   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
3295   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
3296 #endif
3297 }
3298 
PeriodicSnapshotTask(TracingSessionID tsid)3299 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
3300   auto* tracing_session = GetTracingSession(tsid);
3301   if (!tracing_session)
3302     return;
3303   if (tracing_session->state != TracingSession::STARTED)
3304     return;
3305   tracing_session->should_emit_sync_marker = true;
3306   tracing_session->should_emit_stats = true;
3307   MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3308 }
3309 
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)3310 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
3311                                                uint32_t field_id,
3312                                                bool snapshot_clocks) {
3313   // field_id should be an id of a field in TracingServiceEvent.
3314   auto& lifecycle_events = tracing_session->lifecycle_events;
3315   auto event_it =
3316       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3317                    [field_id](const TracingSession::LifecycleEvent& event) {
3318                      return event.field_id == field_id;
3319                    });
3320 
3321   TracingSession::LifecycleEvent* event;
3322   if (event_it == lifecycle_events.end()) {
3323     lifecycle_events.emplace_back(field_id);
3324     event = &lifecycle_events.back();
3325   } else {
3326     event = &*event_it;
3327   }
3328 
3329   // Snapshot the clocks before capturing the timestamp for the event so we can
3330   // use this snapshot to resolve the event timestamp if necessary.
3331   if (snapshot_clocks)
3332     MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3333 
3334   // Erase before emplacing to prevent a unncessary doubling of memory if
3335   // not needed.
3336   if (event->timestamps.size() >= event->max_size) {
3337     event->timestamps.erase_front(1 + event->timestamps.size() -
3338                                   event->max_size);
3339   }
3340   event->timestamps.emplace_back(base::GetBootTimeNs().count());
3341 }
3342 
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3343 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3344     TracingSession* tracing_session) {
3345   if (tracing_session->config.builtin_data_sources()
3346           .disable_clock_snapshotting()) {
3347     return;
3348   }
3349 
3350   // We are making an explicit copy of the latest snapshot (if it exists)
3351   // because SnapshotClocks reads this data and computes the drift based on its
3352   // content. If the clock drift is high enough, it will update the contents of
3353   // |snapshot| and return true. Otherwise, it will return false.
3354   TracingSession::ClockSnapshotData snapshot =
3355       tracing_session->clock_snapshot_ring_buffer.empty()
3356           ? TracingSession::ClockSnapshotData()
3357           : tracing_session->clock_snapshot_ring_buffer.back();
3358   bool did_update = SnapshotClocks(&snapshot);
3359   if (did_update) {
3360     // This means clocks drifted enough since last snapshot. See the comment
3361     // in SnapshotClocks.
3362     auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3363 
3364     // Erase before emplacing to prevent a unncessary doubling of memory if
3365     // not needed.
3366     static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3367     if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3368       snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3369                                    kClockSnapshotRingBufferSize);
3370     }
3371     snapshot_buffer->emplace_back(std::move(snapshot));
3372   }
3373 }
3374 
3375 // Returns true when the data in |snapshot_data| is updated with the new state
3376 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3377 bool TracingServiceImpl::SnapshotClocks(
3378     TracingSession::ClockSnapshotData* snapshot_data) {
3379   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3380   // been emitted into the trace yet (see comment below).
3381   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
3382 
3383   TracingSession::ClockSnapshotData new_snapshot_data = CaptureClockSnapshots();
3384   // If we're about to update a session's latest clock snapshot that hasn't been
3385   // emitted into the trace yet, check whether the clocks have drifted enough to
3386   // warrant overriding the current snapshot values. The older snapshot would be
3387   // valid for a larger part of the currently buffered trace data because the
3388   // clock sync protocol in trace processor uses the latest clock <= timestamp
3389   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3390   // we try to keep it if we can.
3391   if (!snapshot_data->empty()) {
3392     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3393     PERFETTO_DCHECK((*snapshot_data)[0].clock_id ==
3394                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
3395 
3396     bool update_snapshot = false;
3397     uint64_t old_boot_ns = (*snapshot_data)[0].timestamp;
3398     uint64_t new_boot_ns = new_snapshot_data[0].timestamp;
3399     int64_t boot_diff =
3400         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3401 
3402     for (size_t i = 1; i < snapshot_data->size(); i++) {
3403       uint64_t old_ns = (*snapshot_data)[i].timestamp;
3404       uint64_t new_ns = new_snapshot_data[i].timestamp;
3405 
3406       int64_t diff =
3407           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3408 
3409       // Compare the boottime delta against the delta of this clock.
3410       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3411         update_snapshot = true;
3412         break;
3413       }
3414     }
3415     if (!update_snapshot)
3416       return false;
3417     snapshot_data->clear();
3418   }
3419 
3420   *snapshot_data = std::move(new_snapshot_data);
3421   return true;
3422 }
3423 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3424 void TracingServiceImpl::EmitClockSnapshot(
3425     TracingSession* tracing_session,
3426     TracingSession::ClockSnapshotData snapshot_data,
3427     std::vector<TracePacket>* packets) {
3428   PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3429                        .disable_clock_snapshotting());
3430 
3431   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3432   auto* snapshot = packet->set_clock_snapshot();
3433 
3434   protos::gen::BuiltinClock trace_clock =
3435       tracing_session->config.builtin_data_sources().primary_trace_clock();
3436   if (!trace_clock)
3437     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3438   snapshot->set_primary_trace_clock(
3439       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3440 
3441   for (auto& clock_id_and_ts : snapshot_data) {
3442     auto* c = snapshot->add_clocks();
3443     c->set_clock_id(clock_id_and_ts.clock_id);
3444     c->set_timestamp(clock_id_and_ts.timestamp);
3445   }
3446 
3447   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3448   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3449   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3450 }
3451 
EmitSyncMarker(std::vector<TracePacket> * packets)3452 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3453   // The sync marks are used to tokenize large traces efficiently.
3454   // See description in trace_packet.proto.
3455   if (sync_marker_packet_size_ == 0) {
3456     // The marker ABI expects that the marker is written after the uid.
3457     // Protozero guarantees that fields are written in the same order of the
3458     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3459     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3460         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3461     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3462     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3463 
3464     // Keep this last.
3465     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3466     sync_marker_packet_size_ = packet.Finalize();
3467   }
3468   packets->emplace_back();
3469   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3470 }
3471 
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3472 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3473                                    std::vector<TracePacket>* packets) {
3474   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3475   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3476   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3477   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3478   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3479 }
3480 
GetTraceStats(TracingSession * tracing_session)3481 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3482   TraceStats trace_stats;
3483   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3484   trace_stats.set_producers_seen(last_producer_id_);
3485   trace_stats.set_data_sources_registered(
3486       static_cast<uint32_t>(data_sources_.size()));
3487   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3488   trace_stats.set_tracing_sessions(
3489       static_cast<uint32_t>(tracing_sessions_.size()));
3490   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3491   trace_stats.set_chunks_discarded(chunks_discarded_);
3492   trace_stats.set_patches_discarded(patches_discarded_);
3493   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3494   trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3495   trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3496   trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3497   trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3498 
3499   if (tracing_session->trace_filter) {
3500     auto* filt_stats = trace_stats.mutable_filter_stats();
3501     filt_stats->set_input_packets(tracing_session->filter_input_packets);
3502     filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3503     filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3504     filt_stats->set_errors(tracing_session->filter_errors);
3505     filt_stats->set_time_taken_ns(tracing_session->filter_time_taken_ns);
3506     for (uint64_t value : tracing_session->filter_bytes_discarded_per_buffer)
3507       filt_stats->add_bytes_discarded_per_buffer(value);
3508   }
3509 
3510   for (BufferID buf_id : tracing_session->buffers_index) {
3511     TraceBuffer* buf = GetBufferByID(buf_id);
3512     if (!buf) {
3513       PERFETTO_DFATAL("Buffer not found.");
3514       continue;
3515     }
3516     *trace_stats.add_buffer_stats() = buf->stats();
3517   }  // for (buf in session).
3518 
3519   if (!tracing_session->config.builtin_data_sources()
3520            .disable_chunk_usage_histograms()) {
3521     // Emit chunk usage stats broken down by sequence ID (i.e. by trace-writer).
3522     // Writer stats are updated by each TraceBuffer object at ReadBuffers time,
3523     // and there can be >1 buffer per session. A trace writer never writes to
3524     // more than one buffer (it's technically allowed but doesn't happen in the
3525     // current impl of the tracing SDK).
3526 
3527     bool has_written_bucket_definition = false;
3528     uint32_t buf_idx = static_cast<uint32_t>(-1);
3529     for (const BufferID buf_id : tracing_session->buffers_index) {
3530       ++buf_idx;
3531       const TraceBuffer* buf = GetBufferByID(buf_id);
3532       if (!buf)
3533         continue;
3534       for (auto it = buf->writer_stats().GetIterator(); it; ++it) {
3535         const auto& hist = it.value().used_chunk_hist;
3536         ProducerID p;
3537         WriterID w;
3538         GetProducerAndWriterID(it.key(), &p, &w);
3539         if (!has_written_bucket_definition) {
3540           // Serialize one-off the histogram bucket definition, which is the
3541           // same for all entries in the map.
3542           has_written_bucket_definition = true;
3543           // The -1 in the loop below is to skip the implicit overflow bucket.
3544           for (size_t i = 0; i < hist.num_buckets() - 1; ++i) {
3545             trace_stats.add_chunk_payload_histogram_def(hist.GetBucketThres(i));
3546           }
3547         }  // if(!has_written_bucket_definition)
3548         auto* wri_stats = trace_stats.add_writer_stats();
3549         wri_stats->set_sequence_id(
3550             tracing_session->GetPacketSequenceID(kDefaultMachineID, p, w));
3551         wri_stats->set_buffer(buf_idx);
3552         for (size_t i = 0; i < hist.num_buckets(); ++i) {
3553           wri_stats->add_chunk_payload_histogram_counts(hist.GetBucketCount(i));
3554           wri_stats->add_chunk_payload_histogram_sum(hist.GetBucketSum(i));
3555         }
3556       }  // for each sequence (writer).
3557     }  // for each buffer.
3558   }  // if (!disable_chunk_usage_histograms)
3559 
3560   return trace_stats;
3561 }
3562 
EmitUuid(TracingSession * tracing_session,std::vector<TracePacket> * packets)3563 void TracingServiceImpl::EmitUuid(TracingSession* tracing_session,
3564                                   std::vector<TracePacket>* packets) {
3565   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3566   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3567   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3568   auto* uuid = packet->set_trace_uuid();
3569   uuid->set_lsb(tracing_session->trace_uuid.lsb());
3570   uuid->set_msb(tracing_session->trace_uuid.msb());
3571   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3572 }
3573 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3574 void TracingServiceImpl::MaybeEmitTraceConfig(
3575     TracingSession* tracing_session,
3576     std::vector<TracePacket>* packets) {
3577   if (tracing_session->did_emit_initial_packets)
3578     return;
3579   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3580   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3581   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3582   tracing_session->config.Serialize(packet->set_trace_config());
3583   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3584 }
3585 
EmitSystemInfo(std::vector<TracePacket> * packets)3586 void TracingServiceImpl::EmitSystemInfo(std::vector<TracePacket>* packets) {
3587   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3588   auto* info = packet->set_system_info();
3589   info->set_tracing_service_version(base::GetVersionString());
3590 
3591   std::optional<int32_t> tzoff = base::GetTimezoneOffsetMins();
3592   if (tzoff.has_value())
3593     info->set_timezone_off_mins(*tzoff);
3594 
3595 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3596     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3597   struct utsname uname_info;
3598   if (uname(&uname_info) == 0) {
3599     auto* utsname_info = info->set_utsname();
3600     utsname_info->set_sysname(uname_info.sysname);
3601     utsname_info->set_version(uname_info.version);
3602     utsname_info->set_machine(uname_info.machine);
3603     utsname_info->set_release(uname_info.release);
3604   }
3605   info->set_page_size(static_cast<uint32_t>(sysconf(_SC_PAGESIZE)));
3606   info->set_num_cpus(static_cast<uint32_t>(sysconf(_SC_NPROCESSORS_CONF)));
3607 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3608 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3609   std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3610   if (!fingerprint_value.empty()) {
3611     info->set_android_build_fingerprint(fingerprint_value);
3612   } else {
3613     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3614   }
3615 
3616   std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3617   std::optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3618   if (sdk_value.has_value()) {
3619     info->set_android_sdk_version(*sdk_value);
3620   } else {
3621     PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3622   }
3623 
3624   std::string soc_model_value = base::GetAndroidProp("ro.soc.model");
3625   if (!soc_model_value.empty()) {
3626     info->set_android_soc_model(soc_model_value);
3627   } else {
3628     PERFETTO_ELOG("Unable to read ro.soc.model");
3629   }
3630 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3631   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3632   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3633   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3634 }
3635 
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3636 void TracingServiceImpl::EmitLifecycleEvents(
3637     TracingSession* tracing_session,
3638     std::vector<TracePacket>* packets) {
3639   using TimestampedPacket =
3640       std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3641 
3642   std::vector<TimestampedPacket> timestamped_packets;
3643   for (auto& event : tracing_session->lifecycle_events) {
3644     for (int64_t ts : event.timestamps) {
3645       protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3646       packet->set_timestamp(static_cast<uint64_t>(ts));
3647       packet->set_trusted_uid(static_cast<int32_t>(uid_));
3648       packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3649 
3650       auto* service_event = packet->set_service_event();
3651       service_event->AppendVarInt(event.field_id, 1);
3652       timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3653     }
3654     event.timestamps.clear();
3655   }
3656 
3657   // We sort by timestamp here to ensure that the "sequence" of lifecycle
3658   // packets has monotonic timestamps like other sequences in the trace.
3659   // Note that these events could still be out of order with respect to other
3660   // events on the service packet sequence (e.g. trigger received packets).
3661   std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3662             [](const TimestampedPacket& a, const TimestampedPacket& b) {
3663               return a.first < b.first;
3664             });
3665 
3666   for (auto& pair : timestamped_packets)
3667     SerializeAndAppendPacket(packets, std::move(pair.second));
3668 }
3669 
MaybeEmitRemoteClockSync(TracingSession * tracing_session,std::vector<TracePacket> * packets)3670 void TracingServiceImpl::MaybeEmitRemoteClockSync(
3671     TracingSession* tracing_session,
3672     std::vector<TracePacket>* packets) {
3673   if (tracing_session->did_emit_remote_clock_sync_)
3674     return;
3675 
3676   std::unordered_set<MachineID> did_emit_machines;
3677   for (const auto& id_and_relay_client : relay_clients_) {
3678     const auto& relay_client = id_and_relay_client.second;
3679     auto machine_id = relay_client->machine_id();
3680     if (did_emit_machines.find(machine_id) != did_emit_machines.end())
3681       continue;  // Already emitted for the machine (e.g. multiple clients).
3682 
3683     auto& sync_clock_snapshots = relay_client->synced_clocks();
3684     if (sync_clock_snapshots.empty()) {
3685       PERFETTO_DLOG("Clock not synchronized for machine ID = %" PRIu32,
3686                     machine_id);
3687       continue;
3688     }
3689 
3690     // Don't emit twice for the same machine.
3691     did_emit_machines.insert(machine_id);
3692 
3693     protozero::HeapBuffered<protos::pbzero::TracePacket> sync_packet;
3694     sync_packet->set_machine_id(machine_id);
3695     sync_packet->set_trusted_uid(static_cast<int32_t>(uid_));
3696     auto* remote_clock_sync = sync_packet->set_remote_clock_sync();
3697     for (const auto& sync_exchange : relay_client->synced_clocks()) {
3698       auto* sync_exchange_msg = remote_clock_sync->add_synced_clocks();
3699 
3700       auto* client_snapshots = sync_exchange_msg->set_client_clocks();
3701       for (const auto& client_clock : sync_exchange.client_clocks) {
3702         auto* clock = client_snapshots->add_clocks();
3703         clock->set_clock_id(client_clock.clock_id);
3704         clock->set_timestamp(client_clock.timestamp);
3705       }
3706 
3707       auto* host_snapshots = sync_exchange_msg->set_host_clocks();
3708       for (const auto& host_clock : sync_exchange.host_clocks) {
3709         auto* clock = host_snapshots->add_clocks();
3710         clock->set_clock_id(host_clock.clock_id);
3711         clock->set_timestamp(host_clock.timestamp);
3712       }
3713     }
3714 
3715     SerializeAndAppendPacket(packets, sync_packet.SerializeAsArray());
3716   }
3717 
3718   tracing_session->did_emit_remote_clock_sync_ = true;
3719 }
3720 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3721 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3722     TracingSession* tracing_session,
3723     std::vector<TracePacket>* packets) {
3724   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3725                   tracing_session->received_triggers.size());
3726   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3727        i < tracing_session->received_triggers.size(); ++i) {
3728     const auto& info = tracing_session->received_triggers[i];
3729     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3730     auto* trigger = packet->set_trigger();
3731     trigger->set_trigger_name(info.trigger_name);
3732     trigger->set_producer_name(info.producer_name);
3733     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3734 
3735     packet->set_timestamp(info.boot_time_ns);
3736     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3737     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3738     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3739     ++tracing_session->num_triggers_emitted_into_trace;
3740   }
3741 }
3742 
MaybeLogUploadEvent(const TraceConfig & cfg,const base::Uuid & uuid,PerfettoStatsdAtom atom,const std::string & trigger_name)3743 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3744                                              const base::Uuid& uuid,
3745                                              PerfettoStatsdAtom atom,
3746                                              const std::string& trigger_name) {
3747   if (!ShouldLogEvent(cfg))
3748     return;
3749 
3750   PERFETTO_DCHECK(uuid);  // The UUID must be set at this point.
3751   android_stats::MaybeLogUploadEvent(atom, uuid.lsb(), uuid.msb(),
3752                                      trigger_name);
3753 }
3754 
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3755 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3756                                               PerfettoTriggerAtom atom,
3757                                               const std::string& trigger_name) {
3758   if (!ShouldLogEvent(cfg))
3759     return;
3760   android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3761 }
3762 
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3763 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3764     int64_t now_ns,
3765     uint64_t trigger_name_hash) {
3766   PERFETTO_DCHECK(
3767       std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3768   size_t remove_count = 0;
3769   size_t trigger_count = 0;
3770   for (const TriggerHistory& h : trigger_history_) {
3771     if (h.timestamp_ns < now_ns - trigger_window_ns_) {
3772       remove_count++;
3773     } else if (h.name_hash == trigger_name_hash) {
3774       trigger_count++;
3775     }
3776   }
3777   trigger_history_.erase_front(remove_count);
3778   return trigger_count;
3779 }
3780 
FlushAndCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID tsid,bool skip_trace_filter,bool for_bugreport)3781 base::Status TracingServiceImpl::FlushAndCloneSession(
3782     ConsumerEndpointImpl* consumer,
3783     TracingSessionID tsid,
3784     bool skip_trace_filter,
3785     bool for_bugreport) {
3786   PERFETTO_DCHECK_THREAD(thread_checker_);
3787   auto clone_target = FlushFlags::CloneTarget::kUnknown;
3788 
3789   if (tsid == kBugreportSessionId) {
3790     // This branch is only here to support the legacy protocol where we could
3791     // clone only a single session using the magic ID kBugreportSessionId.
3792     // The newer perfetto --clone-all-for-bugreport first queries the existing
3793     // sessions and then issues individual clone requests specifying real
3794     // session IDs, setting args.{for_bugreport,skip_trace_filter}=true.
3795     PERFETTO_LOG("Looking for sessions for bugreport");
3796     TracingSession* session = FindTracingSessionWithMaxBugreportScore();
3797     if (!session) {
3798       return base::ErrStatus(
3799           "No tracing sessions eligible for bugreport found");
3800     }
3801     tsid = session->id;
3802     clone_target = FlushFlags::CloneTarget::kBugreport;
3803     skip_trace_filter = true;
3804     for_bugreport = true;
3805   } else if (for_bugreport) {
3806     clone_target = FlushFlags::CloneTarget::kBugreport;
3807   }
3808 
3809   TracingSession* session = GetTracingSession(tsid);
3810   if (!session) {
3811     return base::ErrStatus("Tracing session not found");
3812   }
3813 
3814   // Skip the UID check for sessions marked with a bugreport_score > 0.
3815   // Those sessions, by design, can be stolen by any other consumer for the
3816   // sake of creating snapshots for bugreports.
3817   if (!session->IsCloneAllowed(consumer->uid_)) {
3818     return PERFETTO_SVC_ERR("Not allowed to clone a session from another UID");
3819   }
3820 
3821   // If any of the buffers are marked as clear_before_clone, reset them before
3822   // issuing the Flush(kCloneReason).
3823   size_t buf_idx = 0;
3824   for (BufferID src_buf_id : session->buffers_index) {
3825     if (!session->config.buffers()[buf_idx++].clear_before_clone())
3826       continue;
3827     auto buf_iter = buffers_.find(src_buf_id);
3828     PERFETTO_CHECK(buf_iter != buffers_.end());
3829     std::unique_ptr<TraceBuffer>& buf = buf_iter->second;
3830 
3831     // No need to reset the buffer if nothing has been written into it yet.
3832     // This is the canonical case if producers behive nicely and don't timeout
3833     // the handling of writes during the flush.
3834     // This check avoids a useless re-mmap upon every Clone() if the buffer is
3835     // already empty (when used in combination with `transfer_on_clone`).
3836     if (!buf->has_data())
3837       continue;
3838 
3839     // Some leftover data was left in the buffer. Recreate it to empty it.
3840     const auto buf_policy = buf->overwrite_policy();
3841     const auto buf_size = buf->size();
3842     std::unique_ptr<TraceBuffer> old_buf = std::move(buf);
3843     buf = TraceBuffer::Create(buf_size, buf_policy);
3844     if (!buf) {
3845       // This is extremely rare but could happen on 32-bit. If the new buffer
3846       // allocation failed, put back the buffer where it was and fail the clone.
3847       // We cannot leave the original tracing session buffer-less as it would
3848       // cause crashes when data sources commit new data.
3849       buf = std::move(old_buf);
3850       return base::ErrStatus(
3851           "Buffer allocation failed while attempting to clone");
3852     }
3853   }
3854 
3855   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3856   auto weak_consumer = consumer->GetWeakPtr();
3857 
3858   const PendingCloneID clone_id = session->last_pending_clone_id_++;
3859 
3860   auto& clone_op = session->pending_clones[clone_id];
3861   clone_op.pending_flush_cnt = 0;
3862   clone_op.buffers =
3863       std::vector<std::unique_ptr<TraceBuffer>>(session->buffers_index.size());
3864   clone_op.weak_consumer = weak_consumer;
3865   clone_op.skip_trace_filter = skip_trace_filter;
3866 
3867   // Issue separate flush requests for separate buffer groups. The buffer marked
3868   // as transfer_on_clone will be flushed and cloned separately: even if they're
3869   // slower (like in the case of Winscope tracing), they will not delay the
3870   // snapshot of the other buffers.
3871   //
3872   // In the future we might want to split the buffer into more groups and maybe
3873   // allow this to be configurable.
3874   std::array<std::set<BufferID>, 2> bufs_groups;
3875   for (size_t i = 0; i < session->buffers_index.size(); i++) {
3876     if (session->config.buffers()[i].transfer_on_clone()) {
3877       bufs_groups[0].insert(session->buffers_index[i]);
3878     } else {
3879       bufs_groups[1].insert(session->buffers_index[i]);
3880     }
3881   }
3882 
3883   clone_op.pending_flush_cnt = bufs_groups.size();
3884   for (const std::set<BufferID>& buf_group : bufs_groups) {
3885     FlushDataSourceInstances(
3886         session, 0,
3887         GetFlushableDataSourceInstancesForBuffers(session, buf_group),
3888         [tsid, clone_id, buf_group, weak_this](bool final_flush) {
3889           if (!weak_this)
3890             return;
3891           weak_this->OnFlushDoneForClone(tsid, clone_id, buf_group,
3892                                          final_flush);
3893         },
3894         FlushFlags(FlushFlags::Initiator::kTraced,
3895                    FlushFlags::Reason::kTraceClone, clone_target));
3896   }
3897 
3898   return base::OkStatus();
3899 }
3900 
3901 std::map<ProducerID, std::vector<DataSourceInstanceID>>
GetFlushableDataSourceInstancesForBuffers(TracingSession * session,const std::set<BufferID> & bufs)3902 TracingServiceImpl::GetFlushableDataSourceInstancesForBuffers(
3903     TracingSession* session,
3904     const std::set<BufferID>& bufs) {
3905   std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
3906 
3907   for (const auto& [producer_id, ds_inst] : session->data_source_instances) {
3908     // TODO(ddiproietto): Consider if we should skip instances if ds_inst.state
3909     // != DataSourceInstance::STARTED
3910     if (ds_inst.no_flush) {
3911       continue;
3912     }
3913     if (!bufs.count(static_cast<BufferID>(ds_inst.config.target_buffer()))) {
3914       continue;
3915     }
3916     data_source_instances[producer_id].push_back(ds_inst.instance_id);
3917   }
3918 
3919   return data_source_instances;
3920 }
3921 
OnFlushDoneForClone(TracingSessionID tsid,PendingCloneID clone_id,const std::set<BufferID> & buf_ids,bool final_flush_outcome)3922 void TracingServiceImpl::OnFlushDoneForClone(TracingSessionID tsid,
3923                                              PendingCloneID clone_id,
3924                                              const std::set<BufferID>& buf_ids,
3925                                              bool final_flush_outcome) {
3926   TracingSession* src = GetTracingSession(tsid);
3927   // The session might be gone by the time we try to clone it.
3928   if (!src) {
3929     return;
3930   }
3931 
3932   auto it = src->pending_clones.find(clone_id);
3933   if (it == src->pending_clones.end()) {
3934     return;
3935   }
3936   auto& clone_op = it->second;
3937 
3938   if (final_flush_outcome == false) {
3939     clone_op.flush_failed = true;
3940   }
3941 
3942   base::Status result;
3943   base::Uuid uuid;
3944 
3945   // First clone the flushed TraceBuffer(s). This can fail because of ENOMEM. If
3946   // it happens bail out early before creating any session.
3947   if (!DoCloneBuffers(src, buf_ids, &clone_op.buffers)) {
3948     result = PERFETTO_SVC_ERR("Buffer allocation failed");
3949   }
3950 
3951   if (result.ok()) {
3952     UpdateMemoryGuardrail();
3953 
3954     if (--clone_op.pending_flush_cnt != 0) {
3955       // Wait for more pending flushes.
3956       return;
3957     }
3958 
3959     PERFETTO_LOG("FlushAndCloneSession(%" PRIu64 ") started, success=%d", tsid,
3960                  final_flush_outcome);
3961 
3962     if (clone_op.weak_consumer) {
3963       result = FinishCloneSession(
3964           &*clone_op.weak_consumer, tsid, std::move(clone_op.buffers),
3965           clone_op.skip_trace_filter, !clone_op.flush_failed, &uuid);
3966     }
3967   }  // if (result.ok())
3968 
3969   if (clone_op.weak_consumer) {
3970     clone_op.weak_consumer->consumer_->OnSessionCloned(
3971         {result.ok(), result.message(), uuid});
3972   }
3973 
3974   src->pending_clones.erase(it);
3975   UpdateMemoryGuardrail();
3976 }
3977 
DoCloneBuffers(TracingSession * src,const std::set<BufferID> & buf_ids,std::vector<std::unique_ptr<TraceBuffer>> * buf_snaps)3978 bool TracingServiceImpl::DoCloneBuffers(
3979     TracingSession* src,
3980     const std::set<BufferID>& buf_ids,
3981     std::vector<std::unique_ptr<TraceBuffer>>* buf_snaps) {
3982   PERFETTO_DCHECK(src->num_buffers() == src->config.buffers().size());
3983   buf_snaps->resize(src->buffers_index.size());
3984 
3985   for (size_t buf_idx = 0; buf_idx < src->buffers_index.size(); buf_idx++) {
3986     BufferID src_buf_id = src->buffers_index[buf_idx];
3987     if (buf_ids.count(src_buf_id) == 0)
3988       continue;
3989     auto buf_iter = buffers_.find(src_buf_id);
3990     PERFETTO_CHECK(buf_iter != buffers_.end());
3991     std::unique_ptr<TraceBuffer>& src_buf = buf_iter->second;
3992     std::unique_ptr<TraceBuffer> new_buf;
3993     if (src->config.buffers()[buf_idx].transfer_on_clone()) {
3994       const auto buf_policy = src_buf->overwrite_policy();
3995       const auto buf_size = src_buf->size();
3996       new_buf = std::move(src_buf);
3997       src_buf = TraceBuffer::Create(buf_size, buf_policy);
3998       if (!src_buf) {
3999         // If the allocation fails put the buffer back and let the code below
4000         // handle the failure gracefully.
4001         src_buf = std::move(new_buf);
4002       }
4003     } else {
4004       new_buf = src_buf->CloneReadOnly();
4005     }
4006     if (!new_buf.get()) {
4007       return false;
4008     }
4009     (*buf_snaps)[buf_idx] = std::move(new_buf);
4010   }
4011   return true;
4012 }
4013 
FinishCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID src_tsid,std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,bool skip_trace_filter,bool final_flush_outcome,base::Uuid * new_uuid)4014 base::Status TracingServiceImpl::FinishCloneSession(
4015     ConsumerEndpointImpl* consumer,
4016     TracingSessionID src_tsid,
4017     std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,
4018     bool skip_trace_filter,
4019     bool final_flush_outcome,
4020     base::Uuid* new_uuid) {
4021   PERFETTO_DLOG("CloneSession(%" PRIu64
4022                 ", skip_trace_filter=%d) started, consumer uid: %d",
4023                 src_tsid, skip_trace_filter, static_cast<int>(consumer->uid_));
4024 
4025   TracingSession* src = GetTracingSession(src_tsid);
4026 
4027   // The session might be gone by the time we try to clone it.
4028   if (!src)
4029     return PERFETTO_SVC_ERR("session not found");
4030 
4031   if (consumer->tracing_session_id_) {
4032     return PERFETTO_SVC_ERR(
4033         "The consumer is already attached to another tracing session");
4034   }
4035 
4036   std::vector<BufferID> buf_ids =
4037       buffer_ids_.AllocateMultiple(buf_snaps.size());
4038   if (buf_ids.size() != buf_snaps.size()) {
4039     return PERFETTO_SVC_ERR("Buffer id allocation failed");
4040   }
4041 
4042   PERFETTO_CHECK(std::none_of(
4043       buf_snaps.begin(), buf_snaps.end(),
4044       [](const std::unique_ptr<TraceBuffer>& buf) { return buf == nullptr; }));
4045 
4046   const TracingSessionID tsid = ++last_tracing_session_id_;
4047   TracingSession* cloned_session =
4048       &tracing_sessions_
4049            .emplace(
4050                std::piecewise_construct, std::forward_as_tuple(tsid),
4051                std::forward_as_tuple(tsid, consumer, src->config, task_runner_))
4052            .first->second;
4053 
4054   // Generate a new UUID for the cloned session, but preserve the LSB. In some
4055   // contexts the LSB is used to tie the trace back to the statsd subscription
4056   // that triggered it. See the corresponding code in perfetto_cmd.cc which
4057   // reads at triggering_subscription_id().
4058   const int64_t orig_uuid_lsb = src->trace_uuid.lsb();
4059   cloned_session->state = TracingSession::CLONED_READ_ONLY;
4060   cloned_session->trace_uuid = base::Uuidv4();
4061   cloned_session->trace_uuid.set_lsb(orig_uuid_lsb);
4062   *new_uuid = cloned_session->trace_uuid;
4063 
4064   for (size_t i = 0; i < buf_snaps.size(); i++) {
4065     BufferID buf_global_id = buf_ids[i];
4066     std::unique_ptr<TraceBuffer>& buf = buf_snaps[i];
4067     // This is only needed for transfer_on_clone. Other buffers are already
4068     // marked as read-only by CloneReadOnly(). We cannot do this early because
4069     // in case of an allocation failure we will put std::move() the original
4070     // buffer back in its place and in that case should not be made read-only.
4071     buf->set_read_only();
4072     buffers_.emplace(buf_global_id, std::move(buf));
4073     cloned_session->buffers_index.emplace_back(buf_global_id);
4074   }
4075   UpdateMemoryGuardrail();
4076 
4077   // Copy over relevant state that we want to persist in the cloned session.
4078   // Mostly stats and metadata that is emitted in the trace file by the service.
4079   // Also clear the received trigger list in the main tracing session. A
4080   // CLONE_SNAPSHOT session can go in ring buffer mode for several hours and get
4081   // snapshotted several times. This causes two issues with `received_triggers`:
4082   // 1. Adding noise in the cloned trace emitting triggers that happened too
4083   //    far back (see b/290799105).
4084   // 2. Bloating memory (see b/290798988).
4085   cloned_session->should_emit_stats = true;
4086   cloned_session->received_triggers = std::move(src->received_triggers);
4087   src->received_triggers.clear();
4088   src->num_triggers_emitted_into_trace = 0;
4089   cloned_session->lifecycle_events =
4090       std::vector<TracingSession::LifecycleEvent>(src->lifecycle_events);
4091   cloned_session->initial_clock_snapshot = src->initial_clock_snapshot;
4092   cloned_session->clock_snapshot_ring_buffer = src->clock_snapshot_ring_buffer;
4093   cloned_session->invalid_packets = src->invalid_packets;
4094   cloned_session->flushes_requested = src->flushes_requested;
4095   cloned_session->flushes_succeeded = src->flushes_succeeded;
4096   cloned_session->flushes_failed = src->flushes_failed;
4097   cloned_session->compress_deflate = src->compress_deflate;
4098   if (src->trace_filter && !skip_trace_filter) {
4099     // Copy the trace filter, unless it's a clone-for-bugreport (b/317065412).
4100     cloned_session->trace_filter.reset(
4101         new protozero::MessageFilter(src->trace_filter->config()));
4102   }
4103 
4104   SnapshotLifecyleEvent(
4105       cloned_session,
4106       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
4107       true /* snapshot_clocks */);
4108 
4109   PERFETTO_DLOG("Consumer (uid:%d) cloned tracing session %" PRIu64
4110                 " -> %" PRIu64,
4111                 static_cast<int>(consumer->uid_), src_tsid, tsid);
4112 
4113   consumer->tracing_session_id_ = tsid;
4114   cloned_session->final_flush_outcome = final_flush_outcome
4115                                             ? TraceStats::FINAL_FLUSH_SUCCEEDED
4116                                             : TraceStats::FINAL_FLUSH_FAILED;
4117   return base::OkStatus();
4118 }
4119 
IsCloneAllowed(uid_t clone_uid) const4120 bool TracingServiceImpl::TracingSession::IsCloneAllowed(uid_t clone_uid) const {
4121   if (clone_uid == 0)
4122     return true;  // Root is always allowed to clone everything.
4123   if (clone_uid == this->consumer_uid)
4124     return true;  // Allow cloning if the uids match.
4125 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
4126   // On Android allow shell to clone sessions marked as exported for bugreport.
4127   // Dumpstate (invoked by adb bugreport) invokes commands as shell.
4128   if (clone_uid == AID_SHELL && this->config.bugreport_score() > 0)
4129     return true;
4130 #endif
4131   return false;
4132 }
4133 
4134 ////////////////////////////////////////////////////////////////////////////////
4135 // TracingServiceImpl::ConsumerEndpointImpl implementation
4136 ////////////////////////////////////////////////////////////////////////////////
4137 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)4138 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
4139     TracingServiceImpl* service,
4140     base::TaskRunner* task_runner,
4141     Consumer* consumer,
4142     uid_t uid)
4143     : task_runner_(task_runner),
4144       service_(service),
4145       consumer_(consumer),
4146       uid_(uid),
4147       weak_ptr_factory_(this) {}
4148 
~ConsumerEndpointImpl()4149 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
4150   service_->DisconnectConsumer(this);
4151   consumer_->OnDisconnect();
4152 }
4153 
NotifyOnTracingDisabled(const std::string & error)4154 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
4155     const std::string& error) {
4156   PERFETTO_DCHECK_THREAD(thread_checker_);
4157   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4158   task_runner_->PostTask([weak_this, error /* deliberate copy */] {
4159     if (weak_this)
4160       weak_this->consumer_->OnTracingDisabled(error);
4161   });
4162 }
4163 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)4164 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
4165     const TraceConfig& cfg,
4166     base::ScopedFile fd) {
4167   PERFETTO_DCHECK_THREAD(thread_checker_);
4168   auto status = service_->EnableTracing(this, cfg, std::move(fd));
4169   if (!status.ok())
4170     NotifyOnTracingDisabled(status.message());
4171 }
4172 
ChangeTraceConfig(const TraceConfig & cfg)4173 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
4174     const TraceConfig& cfg) {
4175   if (!tracing_session_id_) {
4176     PERFETTO_LOG(
4177         "Consumer called ChangeTraceConfig() but tracing was "
4178         "not active");
4179     return;
4180   }
4181   service_->ChangeTraceConfig(this, cfg);
4182 }
4183 
StartTracing()4184 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
4185   PERFETTO_DCHECK_THREAD(thread_checker_);
4186   if (!tracing_session_id_) {
4187     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
4188     return;
4189   }
4190   service_->StartTracing(tracing_session_id_);
4191 }
4192 
DisableTracing()4193 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
4194   PERFETTO_DCHECK_THREAD(thread_checker_);
4195   if (!tracing_session_id_) {
4196     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
4197     return;
4198   }
4199   service_->DisableTracing(tracing_session_id_);
4200 }
4201 
ReadBuffers()4202 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
4203   PERFETTO_DCHECK_THREAD(thread_checker_);
4204   if (!tracing_session_id_) {
4205     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
4206     consumer_->OnTraceData({}, /* has_more = */ false);
4207     return;
4208   }
4209   if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
4210     consumer_->OnTraceData({}, /* has_more = */ false);
4211   }
4212 }
4213 
FreeBuffers()4214 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
4215   PERFETTO_DCHECK_THREAD(thread_checker_);
4216   if (!tracing_session_id_) {
4217     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
4218     return;
4219   }
4220   service_->FreeBuffers(tracing_session_id_);
4221   tracing_session_id_ = 0;
4222 }
4223 
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)4224 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
4225                                                      FlushCallback callback,
4226                                                      FlushFlags flush_flags) {
4227   PERFETTO_DCHECK_THREAD(thread_checker_);
4228   if (!tracing_session_id_) {
4229     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
4230     return;
4231   }
4232   service_->Flush(tracing_session_id_, timeout_ms, callback, flush_flags);
4233 }
4234 
Detach(const std::string & key)4235 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
4236   PERFETTO_DCHECK_THREAD(thread_checker_);
4237   bool success = service_->DetachConsumer(this, key);
4238   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4239   task_runner_->PostTask([weak_this, success] {
4240     if (weak_this)
4241       weak_this->consumer_->OnDetach(success);
4242   });
4243 }
4244 
Attach(const std::string & key)4245 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
4246   PERFETTO_DCHECK_THREAD(thread_checker_);
4247   bool success = service_->AttachConsumer(this, key);
4248   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4249   task_runner_->PostTask([weak_this, success] {
4250     if (!weak_this)
4251       return;
4252     Consumer* consumer = weak_this->consumer_;
4253     TracingSession* session =
4254         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
4255     if (!session) {
4256       consumer->OnAttach(false, TraceConfig());
4257       return;
4258     }
4259     consumer->OnAttach(success, session->config);
4260   });
4261 }
4262 
GetTraceStats()4263 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
4264   PERFETTO_DCHECK_THREAD(thread_checker_);
4265   bool success = false;
4266   TraceStats stats;
4267   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4268   if (session) {
4269     success = true;
4270     stats = service_->GetTraceStats(session);
4271   }
4272   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4273   task_runner_->PostTask([weak_this, success, stats] {
4274     if (weak_this)
4275       weak_this->consumer_->OnTraceStats(success, stats);
4276   });
4277 }
4278 
ObserveEvents(uint32_t events_mask)4279 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
4280     uint32_t events_mask) {
4281   PERFETTO_DCHECK_THREAD(thread_checker_);
4282   observable_events_mask_ = events_mask;
4283   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4284   if (!session)
4285     return;
4286 
4287   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
4288     // Issue initial states.
4289     for (const auto& kv : session->data_source_instances) {
4290       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
4291       PERFETTO_DCHECK(producer);
4292       OnDataSourceInstanceStateChange(*producer, kv.second);
4293     }
4294   }
4295 
4296   // If the ObserveEvents() call happens after data sources have acked already
4297   // notify immediately.
4298   if (observable_events_mask_ &
4299       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
4300     service_->MaybeNotifyAllDataSourcesStarted(session);
4301   }
4302 }
4303 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)4304 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
4305     const ProducerEndpointImpl& producer,
4306     const DataSourceInstance& instance) {
4307   if (!(observable_events_mask_ &
4308         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
4309     return;
4310   }
4311 
4312   if (instance.state != DataSourceInstance::CONFIGURED &&
4313       instance.state != DataSourceInstance::STARTED &&
4314       instance.state != DataSourceInstance::STOPPED) {
4315     return;
4316   }
4317 
4318   auto* observable_events = AddObservableEvents();
4319   auto* change = observable_events->add_instance_state_changes();
4320   change->set_producer_name(producer.name_);
4321   change->set_data_source_name(instance.data_source_name);
4322   if (instance.state == DataSourceInstance::STARTED) {
4323     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
4324   } else {
4325     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
4326   }
4327 }
4328 
OnAllDataSourcesStarted()4329 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
4330   if (!(observable_events_mask_ &
4331         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
4332     return;
4333   }
4334   auto* observable_events = AddObservableEvents();
4335   observable_events->set_all_data_sources_started(true);
4336 }
4337 
NotifyCloneSnapshotTrigger(const std::string & trigger_name)4338 void TracingServiceImpl::ConsumerEndpointImpl::NotifyCloneSnapshotTrigger(
4339     const std::string& trigger_name) {
4340   if (!(observable_events_mask_ & ObservableEvents::TYPE_CLONE_TRIGGER_HIT)) {
4341     return;
4342   }
4343   auto* observable_events = AddObservableEvents();
4344   auto* clone_trig = observable_events->mutable_clone_trigger_hit();
4345   clone_trig->set_tracing_session_id(static_cast<int64_t>(tracing_session_id_));
4346   clone_trig->set_trigger_name(trigger_name);
4347 }
4348 
4349 ObservableEvents*
AddObservableEvents()4350 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
4351   PERFETTO_DCHECK_THREAD(thread_checker_);
4352   if (!observable_events_) {
4353     observable_events_.reset(new ObservableEvents());
4354     auto weak_this = weak_ptr_factory_.GetWeakPtr();
4355     task_runner_->PostTask([weak_this] {
4356       if (!weak_this)
4357         return;
4358 
4359       // Move into a temporary to allow reentrancy in OnObservableEvents.
4360       auto observable_events = std::move(weak_this->observable_events_);
4361       weak_this->consumer_->OnObservableEvents(*observable_events);
4362     });
4363   }
4364   return observable_events_.get();
4365 }
4366 
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)4367 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
4368     QueryServiceStateArgs args,
4369     QueryServiceStateCallback callback) {
4370   PERFETTO_DCHECK_THREAD(thread_checker_);
4371   TracingServiceState svc_state;
4372 
4373   const auto& sessions = service_->tracing_sessions_;
4374   svc_state.set_tracing_service_version(base::GetVersionString());
4375   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
4376 
4377   int num_started = 0;
4378   for (const auto& kv : sessions)
4379     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
4380   svc_state.set_num_sessions_started(num_started);
4381 
4382   for (const auto& kv : service_->producers_) {
4383     if (args.sessions_only)
4384       break;
4385     auto* producer = svc_state.add_producers();
4386     producer->set_id(static_cast<int>(kv.first));
4387     producer->set_name(kv.second->name_);
4388     producer->set_sdk_version(kv.second->sdk_version_);
4389     producer->set_uid(static_cast<int32_t>(kv.second->uid()));
4390     producer->set_pid(static_cast<int32_t>(kv.second->pid()));
4391   }
4392 
4393   for (const auto& kv : service_->data_sources_) {
4394     if (args.sessions_only)
4395       break;
4396     const auto& registered_data_source = kv.second;
4397     auto* data_source = svc_state.add_data_sources();
4398     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
4399     data_source->set_producer_id(
4400         static_cast<int>(registered_data_source.producer_id));
4401   }
4402 
4403   svc_state.set_supports_tracing_sessions(true);
4404   for (const auto& kv : service_->tracing_sessions_) {
4405     const TracingSession& s = kv.second;
4406     if (!s.IsCloneAllowed(uid_))
4407       continue;
4408     auto* session = svc_state.add_tracing_sessions();
4409     session->set_id(s.id);
4410     session->set_consumer_uid(static_cast<int>(s.consumer_uid));
4411     session->set_duration_ms(s.config.duration_ms());
4412     session->set_num_data_sources(
4413         static_cast<uint32_t>(s.data_source_instances.size()));
4414     session->set_unique_session_name(s.config.unique_session_name());
4415     if (s.config.has_bugreport_score())
4416       session->set_bugreport_score(s.config.bugreport_score());
4417     if (s.config.has_bugreport_filename())
4418       session->set_bugreport_filename(s.config.bugreport_filename());
4419     for (const auto& snap_kv : s.initial_clock_snapshot) {
4420       if (snap_kv.clock_id == protos::pbzero::BUILTIN_CLOCK_REALTIME)
4421         session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.timestamp));
4422     }
4423     for (const auto& buf : s.config.buffers())
4424       session->add_buffer_size_kb(buf.size_kb());
4425 
4426     switch (s.state) {
4427       case TracingSession::State::DISABLED:
4428         session->set_state("DISABLED");
4429         break;
4430       case TracingSession::State::CONFIGURED:
4431         session->set_state("CONFIGURED");
4432         break;
4433       case TracingSession::State::STARTED:
4434         session->set_is_started(true);
4435         session->set_state("STARTED");
4436         break;
4437       case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
4438         session->set_state("STOP_WAIT");
4439         break;
4440       case TracingSession::State::CLONED_READ_ONLY:
4441         session->set_state("CLONED_READ_ONLY");
4442         break;
4443     }
4444   }
4445   callback(/*success=*/true, svc_state);
4446 }
4447 
QueryCapabilities(QueryCapabilitiesCallback callback)4448 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
4449     QueryCapabilitiesCallback callback) {
4450   PERFETTO_DCHECK_THREAD(thread_checker_);
4451   TracingServiceCapabilities caps;
4452   caps.set_has_query_capabilities(true);
4453   caps.set_has_trace_config_output_path(true);
4454   caps.set_has_clone_session(true);
4455   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
4456   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
4457   caps.add_observable_events(ObservableEvents::TYPE_CLONE_TRIGGER_HIT);
4458   static_assert(
4459       ObservableEvents::Type_MAX == ObservableEvents::TYPE_CLONE_TRIGGER_HIT,
4460       "");
4461   callback(caps);
4462 }
4463 
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)4464 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
4465     SaveTraceForBugreportCallback consumer_callback) {
4466   consumer_callback(false,
4467                     "SaveTraceForBugreport is deprecated. Use "
4468                     "CloneSession(kBugreportSessionId) instead.");
4469 }
4470 
CloneSession(TracingSessionID tsid,CloneSessionArgs args)4471 void TracingServiceImpl::ConsumerEndpointImpl::CloneSession(
4472     TracingSessionID tsid,
4473     CloneSessionArgs args) {
4474   PERFETTO_DCHECK_THREAD(thread_checker_);
4475   // FlushAndCloneSession will call OnSessionCloned after the async flush.
4476   base::Status result = service_->FlushAndCloneSession(
4477       this, tsid, args.skip_trace_filter, args.for_bugreport);
4478 
4479   if (!result.ok()) {
4480     consumer_->OnSessionCloned({false, result.message(), {}});
4481   }
4482 }
4483 
4484 ////////////////////////////////////////////////////////////////////////////////
4485 // TracingServiceImpl::ProducerEndpointImpl implementation
4486 ////////////////////////////////////////////////////////////////////////////////
4487 
ProducerEndpointImpl(ProducerID id,const ClientIdentity & client_identity,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)4488 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
4489     ProducerID id,
4490     const ClientIdentity& client_identity,
4491     TracingServiceImpl* service,
4492     base::TaskRunner* task_runner,
4493     Producer* producer,
4494     const std::string& producer_name,
4495     const std::string& sdk_version,
4496     bool in_process,
4497     bool smb_scraping_enabled)
4498     : id_(id),
4499       client_identity_(client_identity),
4500       service_(service),
4501       task_runner_(task_runner),
4502       producer_(producer),
4503       name_(producer_name),
4504       sdk_version_(sdk_version),
4505       in_process_(in_process),
4506       smb_scraping_enabled_(smb_scraping_enabled),
4507       weak_ptr_factory_(this) {}
4508 
~ProducerEndpointImpl()4509 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
4510   service_->DisconnectProducer(id_);
4511   producer_->OnDisconnect();
4512 }
4513 
Disconnect()4514 void TracingServiceImpl::ProducerEndpointImpl::Disconnect() {
4515   PERFETTO_DCHECK_THREAD(thread_checker_);
4516   // Disconnection is only supported via destroying the ProducerEndpoint.
4517   PERFETTO_FATAL("Not supported");
4518 }
4519 
RegisterDataSource(const DataSourceDescriptor & desc)4520 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
4521     const DataSourceDescriptor& desc) {
4522   PERFETTO_DCHECK_THREAD(thread_checker_);
4523   service_->RegisterDataSource(id_, desc);
4524 }
4525 
UpdateDataSource(const DataSourceDescriptor & desc)4526 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
4527     const DataSourceDescriptor& desc) {
4528   PERFETTO_DCHECK_THREAD(thread_checker_);
4529   service_->UpdateDataSource(id_, desc);
4530 }
4531 
UnregisterDataSource(const std::string & name)4532 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
4533     const std::string& name) {
4534   PERFETTO_DCHECK_THREAD(thread_checker_);
4535   service_->UnregisterDataSource(id_, name);
4536 }
4537 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)4538 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
4539     uint32_t writer_id,
4540     uint32_t target_buffer) {
4541   PERFETTO_DCHECK_THREAD(thread_checker_);
4542   writers_[static_cast<WriterID>(writer_id)] =
4543       static_cast<BufferID>(target_buffer);
4544 }
4545 
UnregisterTraceWriter(uint32_t writer_id)4546 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
4547     uint32_t writer_id) {
4548   PERFETTO_DCHECK_THREAD(thread_checker_);
4549   writers_.erase(static_cast<WriterID>(writer_id));
4550 }
4551 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)4552 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
4553     const CommitDataRequest& req_untrusted,
4554     CommitDataCallback callback) {
4555   PERFETTO_DCHECK_THREAD(thread_checker_);
4556 
4557   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
4558     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
4559                                EncodeCommitDataRequest(id_, req_untrusted));
4560   }
4561 
4562   if (!shared_memory_) {
4563     PERFETTO_DLOG(
4564         "Attempted to commit data before the shared memory was allocated.");
4565     return;
4566   }
4567   PERFETTO_DCHECK(shmem_abi_.is_valid());
4568   for (const auto& entry : req_untrusted.chunks_to_move()) {
4569     const uint32_t page_idx = entry.page();
4570     if (page_idx >= shmem_abi_.num_pages())
4571       continue;  // A buggy or malicious producer.
4572 
4573     SharedMemoryABI::Chunk chunk;
4574     bool commit_data_over_ipc = entry.has_data();
4575     if (PERFETTO_UNLIKELY(commit_data_over_ipc)) {
4576       // Chunk data is passed over the wire. Create a chunk using the serialized
4577       // protobuf message.
4578       const std::string& data = entry.data();
4579       if (data.size() > SharedMemoryABI::Chunk::kMaxSize) {
4580         PERFETTO_DFATAL("IPC data commit too large: %zu", data.size());
4581         continue;  // A malicious or buggy producer
4582       }
4583       // |data| is not altered, but we need to const_cast becasue Chunk data
4584       // members are non-const.
4585       chunk = SharedMemoryABI::MakeChunkFromSerializedData(
4586           reinterpret_cast<uint8_t*>(const_cast<char*>(data.data())),
4587           static_cast<uint16_t>(entry.data().size()),
4588           static_cast<uint8_t>(entry.chunk()));
4589     } else
4590       chunk = shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
4591     if (!chunk.is_valid()) {
4592       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
4593                     entry.page(), entry.chunk());
4594       continue;
4595     }
4596 
4597     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
4598     // the ABI contract expects the producer to not touch the chunk anymore
4599     // (until the service marks that as free). This is why all the reads below
4600     // are just memory_order_relaxed. Also, the code here assumes that all this
4601     // data can be malicious and just gives up if anything is malformed.
4602     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
4603     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
4604     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
4605     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
4606     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
4607     uint16_t num_fragments = packets.count;
4608     uint8_t chunk_flags = packets.flags;
4609 
4610     service_->CopyProducerPageIntoLogBuffer(
4611         id_, client_identity_, writer_id, chunk_id, buffer_id, num_fragments,
4612         chunk_flags,
4613         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
4614 
4615     if (!commit_data_over_ipc) {
4616       // This one has release-store semantics.
4617       shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
4618     }
4619   }  // for(chunks_to_move)
4620 
4621   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
4622 
4623   if (req_untrusted.flush_request_id()) {
4624     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
4625   }
4626 
4627   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
4628   // callback being invoked within the same callstack and not posted. If this
4629   // changes, the code there needs to be changed accordingly.
4630   if (callback)
4631     callback();
4632 }
4633 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)4634 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
4635     std::unique_ptr<SharedMemory> shared_memory,
4636     size_t page_size_bytes,
4637     bool provided_by_producer) {
4638   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
4639   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
4640 
4641   shared_memory_ = std::move(shared_memory);
4642   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
4643   is_shmem_provided_by_producer_ = provided_by_producer;
4644 
4645   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
4646                         shared_memory_->size(),
4647                         shared_buffer_page_size_kb() * 1024,
4648                         SharedMemoryABI::ShmemMode::kDefault);
4649   if (in_process_) {
4650     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
4651         shared_memory_->start(), shared_memory_->size(),
4652         SharedMemoryABI::ShmemMode::kDefault,
4653         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
4654     inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
4655   }
4656 
4657   OnTracingSetup();
4658   service_->UpdateMemoryGuardrail();
4659 }
4660 
shared_memory() const4661 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
4662   PERFETTO_DCHECK_THREAD(thread_checker_);
4663   return shared_memory_.get();
4664 }
4665 
shared_buffer_page_size_kb() const4666 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
4667     const {
4668   return shared_buffer_page_size_kb_;
4669 }
4670 
ActivateTriggers(const std::vector<std::string> & triggers)4671 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
4672     const std::vector<std::string>& triggers) {
4673   service_->ActivateTriggers(id_, triggers);
4674 }
4675 
StopDataSource(DataSourceInstanceID ds_inst_id)4676 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
4677     DataSourceInstanceID ds_inst_id) {
4678   // TODO(primiano): When we'll support tearing down the SMB, at this point we
4679   // should send the Producer a TearDownTracing if all its data sources have
4680   // been disabled (see b/77532839 and aosp/655179 PS1).
4681   PERFETTO_DCHECK_THREAD(thread_checker_);
4682   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4683   task_runner_->PostTask([weak_this, ds_inst_id] {
4684     if (weak_this)
4685       weak_this->producer_->StopDataSource(ds_inst_id);
4686   });
4687 }
4688 
4689 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()4690 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
4691   if (!inproc_shmem_arbiter_) {
4692     PERFETTO_FATAL(
4693         "The in-process SharedMemoryArbiter can only be used when "
4694         "CreateProducer has been called with in_process=true and after tracing "
4695         "has started.");
4696   }
4697 
4698   PERFETTO_DCHECK(in_process_);
4699   return inproc_shmem_arbiter_.get();
4700 }
4701 
IsShmemProvidedByProducer() const4702 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
4703     const {
4704   return is_shmem_provided_by_producer_;
4705 }
4706 
4707 // Can be called on any thread.
4708 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4709 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4710     BufferID buf_id,
4711     BufferExhaustedPolicy buffer_exhausted_policy) {
4712   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4713   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4714                                                        buffer_exhausted_policy);
4715 }
4716 
NotifyFlushComplete(FlushRequestID id)4717 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4718     FlushRequestID id) {
4719   PERFETTO_DCHECK_THREAD(thread_checker_);
4720   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4721   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4722 }
4723 
OnTracingSetup()4724 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4725   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4726   task_runner_->PostTask([weak_this] {
4727     if (weak_this)
4728       weak_this->producer_->OnTracingSetup();
4729   });
4730 }
4731 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources,FlushFlags flush_flags)4732 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4733     FlushRequestID flush_request_id,
4734     const std::vector<DataSourceInstanceID>& data_sources,
4735     FlushFlags flush_flags) {
4736   PERFETTO_DCHECK_THREAD(thread_checker_);
4737   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4738   task_runner_->PostTask(
4739       [weak_this, flush_request_id, data_sources, flush_flags] {
4740         if (weak_this) {
4741           weak_this->producer_->Flush(flush_request_id, data_sources.data(),
4742                                       data_sources.size(), flush_flags);
4743         }
4744       });
4745 }
4746 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4747 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4748     DataSourceInstanceID ds_id,
4749     const DataSourceConfig& config) {
4750   PERFETTO_DCHECK_THREAD(thread_checker_);
4751   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4752   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4753   task_runner_->PostTask([weak_this, ds_id, config] {
4754     if (weak_this)
4755       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
4756   });
4757 }
4758 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4759 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4760     DataSourceInstanceID ds_id,
4761     const DataSourceConfig& config) {
4762   PERFETTO_DCHECK_THREAD(thread_checker_);
4763   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4764   task_runner_->PostTask([weak_this, ds_id, config] {
4765     if (weak_this)
4766       weak_this->producer_->StartDataSource(ds_id, std::move(config));
4767   });
4768 }
4769 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4770 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4771     DataSourceInstanceID data_source_id) {
4772   PERFETTO_DCHECK_THREAD(thread_checker_);
4773   service_->NotifyDataSourceStarted(id_, data_source_id);
4774 }
4775 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4776 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4777     DataSourceInstanceID data_source_id) {
4778   PERFETTO_DCHECK_THREAD(thread_checker_);
4779   service_->NotifyDataSourceStopped(id_, data_source_id);
4780 }
4781 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4782 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4783     const std::vector<BufferID>& target_buffers) {
4784   if (allowed_target_buffers_.empty())
4785     return;
4786   for (BufferID buffer : target_buffers)
4787     allowed_target_buffers_.erase(buffer);
4788 }
4789 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4790 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4791     const std::vector<DataSourceInstanceID>& data_sources) {
4792   PERFETTO_DCHECK_THREAD(thread_checker_);
4793   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4794   task_runner_->PostTask([weak_this, data_sources] {
4795     if (weak_this) {
4796       base::StringView producer_name(weak_this->name_);
4797       weak_this->producer_->ClearIncrementalState(data_sources.data(),
4798                                                   data_sources.size());
4799     }
4800   });
4801 }
4802 
Sync(std::function<void ()> callback)4803 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4804     std::function<void()> callback) {
4805   task_runner_->PostTask(callback);
4806 }
4807 
4808 ////////////////////////////////////////////////////////////////////////////////
4809 // TracingServiceImpl::TracingSession implementation
4810 ////////////////////////////////////////////////////////////////////////////////
4811 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4812 TracingServiceImpl::TracingSession::TracingSession(
4813     TracingSessionID session_id,
4814     ConsumerEndpointImpl* consumer,
4815     const TraceConfig& new_config,
4816     base::TaskRunner* task_runner)
4817     : id(session_id),
4818       consumer_maybe_null(consumer),
4819       consumer_uid(consumer->uid_),
4820       config(new_config),
4821       snapshot_periodic_task(task_runner),
4822       timed_stop_task(task_runner) {
4823   // all_data_sources_flushed is special because we store up to 64 events of
4824   // this type. Other events will go through the default case in
4825   // SnapshotLifecycleEvent() where they will be given a max history of 1.
4826   lifecycle_events.emplace_back(
4827       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
4828       64 /* max_size */);
4829 }
4830 
4831 ////////////////////////////////////////////////////////////////////////////////
4832 // TracingServiceImpl::RelayEndpointImpl implementation
4833 ////////////////////////////////////////////////////////////////////////////////
RelayEndpointImpl(RelayClientID relay_client_id,TracingServiceImpl * service)4834 TracingServiceImpl::RelayEndpointImpl::RelayEndpointImpl(
4835     RelayClientID relay_client_id,
4836     TracingServiceImpl* service)
4837     : relay_client_id_(relay_client_id), service_(service) {}
4838 TracingServiceImpl::RelayEndpointImpl::~RelayEndpointImpl() = default;
4839 
SyncClocks(SyncMode sync_mode,ClockSnapshotVector client_clocks,ClockSnapshotVector host_clocks)4840 void TracingServiceImpl::RelayEndpointImpl::SyncClocks(
4841     SyncMode sync_mode,
4842     ClockSnapshotVector client_clocks,
4843     ClockSnapshotVector host_clocks) {
4844   // We keep only the most recent 5 clock sync snapshots.
4845   static constexpr size_t kNumSyncClocks = 5;
4846   if (synced_clocks_.size() >= kNumSyncClocks)
4847     synced_clocks_.pop_front();
4848 
4849   synced_clocks_.emplace_back(sync_mode, std::move(client_clocks),
4850                               std::move(host_clocks));
4851 }
4852 
Disconnect()4853 void TracingServiceImpl::RelayEndpointImpl::Disconnect() {
4854   service_->DisconnectRelayClient(relay_client_id_);
4855 }
4856 
4857 }  // namespace perfetto
4858