• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/tracing_service_impl.h"
18 
19 #include <errno.h>
20 #include <limits.h>
21 #include <string.h>
22 
23 #include <cinttypes>
24 #include <optional>
25 #include <regex>
26 #include <unordered_set>
27 #include "perfetto/base/time.h"
28 
29 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
30     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
31 #include <sys/uio.h>
32 #include <sys/utsname.h>
33 #include <unistd.h>
34 #endif
35 
36 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
37     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
38 #include "src/android_internal/lazy_library_loader.h"    // nogncheck
39 #include "src/android_internal/tracing_service_proxy.h"  // nogncheck
40 #endif
41 
42 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
43     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
44     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
45 #define PERFETTO_HAS_CHMOD
46 #include <sys/stat.h>
47 #endif
48 
49 #include <algorithm>
50 
51 #include "perfetto/base/build_config.h"
52 #include "perfetto/base/status.h"
53 #include "perfetto/base/task_runner.h"
54 #include "perfetto/ext/base/android_utils.h"
55 #include "perfetto/ext/base/file_utils.h"
56 #include "perfetto/ext/base/metatrace.h"
57 #include "perfetto/ext/base/string_utils.h"
58 #include "perfetto/ext/base/temp_file.h"
59 #include "perfetto/ext/base/utils.h"
60 #include "perfetto/ext/base/uuid.h"
61 #include "perfetto/ext/base/version.h"
62 #include "perfetto/ext/base/watchdog.h"
63 #include "perfetto/ext/tracing/core/basic_types.h"
64 #include "perfetto/ext/tracing/core/consumer.h"
65 #include "perfetto/ext/tracing/core/observable_events.h"
66 #include "perfetto/ext/tracing/core/producer.h"
67 #include "perfetto/ext/tracing/core/shared_memory.h"
68 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
69 #include "perfetto/ext/tracing/core/trace_packet.h"
70 #include "perfetto/ext/tracing/core/trace_writer.h"
71 #include "perfetto/protozero/scattered_heap_buffer.h"
72 #include "perfetto/protozero/static_buffer.h"
73 #include "perfetto/tracing/core/data_source_descriptor.h"
74 #include "perfetto/tracing/core/tracing_service_capabilities.h"
75 #include "perfetto/tracing/core/tracing_service_state.h"
76 #include "src/android_stats/statsd_logging_helper.h"
77 #include "src/protozero/filtering/message_filter.h"
78 #include "src/protozero/filtering/string_filter.h"
79 #include "src/tracing/core/packet_stream_validator.h"
80 #include "src/tracing/core/shared_memory_arbiter_impl.h"
81 #include "src/tracing/core/trace_buffer.h"
82 
83 #include "protos/perfetto/common/builtin_clock.gen.h"
84 #include "protos/perfetto/common/builtin_clock.pbzero.h"
85 #include "protos/perfetto/common/trace_stats.pbzero.h"
86 #include "protos/perfetto/config/trace_config.pbzero.h"
87 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
88 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
89 #include "protos/perfetto/trace/system_info.pbzero.h"
90 #include "protos/perfetto/trace/trace_packet.pbzero.h"
91 #include "protos/perfetto/trace/trace_uuid.pbzero.h"
92 #include "protos/perfetto/trace/trigger.pbzero.h"
93 
94 // General note: this class must assume that Producers are malicious and will
95 // try to crash / exploit this class. We can trust pointers because they come
96 // from the IPC layer, but we should never assume that that the producer calls
97 // come in the right order or their arguments are sane / within bounds.
98 
99 // This is a macro because we want the call-site line number for the ELOG.
100 #define PERFETTO_SVC_ERR(...) \
101   (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
102 
103 namespace perfetto {
104 
105 namespace {
106 constexpr int kMaxBuffersPerConsumer = 128;
107 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
108 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
109 constexpr int kMaxConcurrentTracingSessions = 15;
110 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
111 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
112 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
113 
114 constexpr uint32_t kMillisPerHour = 3600000;
115 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
116 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
117 
118 // These apply only if enable_extra_guardrails is true.
119 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
120 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
121 
122 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
123 struct iovec {
124   void* iov_base;  // Address
125   size_t iov_len;  // Block size
126 };
127 
128 // Simple implementation of writev. Note that this does not give the atomicity
129 // guarantees of a real writev, but we don't depend on these (we aren't writing
130 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)131 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
132   ssize_t total_size = 0;
133   for (int i = 0; i < iovcnt; ++i) {
134     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
135     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
136       return -1;
137     total_size += current_size;
138   }
139   return total_size;
140 }
141 
142 #define IOV_MAX 1024  // Linux compatible limit.
143 
144 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
145         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
146 
147 // Partially encodes a CommitDataRequest in an int32 for the purposes of
148 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
149 // (which is technically 16 bits wide).
150 //
151 // Format (by bit range):
152 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
153 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)154 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
155                                        const CommitDataRequest& req_untrusted) {
156   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
157   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
158   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
159 
160   uint32_t mask = (1 << 10) - 1;
161   uint32_t acc = 0;
162   acc |= has_flush_id << 30;
163   acc |= (cpatch & mask) << 20;
164   acc |= (cmov & mask) << 10;
165   acc |= (producer_id & mask);
166   return static_cast<int32_t>(acc);
167 }
168 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)169 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
170                               std::vector<uint8_t> packet) {
171   Slice slice = Slice::Allocate(packet.size());
172   memcpy(slice.own_data(), packet.data(), packet.size());
173   packets->emplace_back();
174   packets->back().AddSlice(std::move(slice));
175 }
176 
EnsureValidShmSizes(size_t shm_size,size_t page_size)177 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
178     size_t shm_size,
179     size_t page_size) {
180   // Theoretically the max page size supported by the ABI is 64KB.
181   // However, the current implementation of TraceBuffer (the non-shared
182   // userspace buffer where the service copies data) supports at most
183   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
184   // but then causes the data to be discarded when copying it into
185   // TraceBuffer.
186   constexpr size_t kMaxPageSize = 32 * 1024;
187   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
188 
189   if (page_size == 0)
190     page_size = TracingServiceImpl::kDefaultShmPageSize;
191   if (shm_size == 0)
192     shm_size = TracingServiceImpl::kDefaultShmSize;
193 
194   page_size = std::min<size_t>(page_size, kMaxPageSize);
195   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
196 
197   // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
198   // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
199   // matter here, because the tracing page size is just a logical partitioning
200   // and does not have any dependencies on kernel mm syscalls (read: it's fine
201   // to have trace page sizes of 4K on a system where the kernel page size is
202   // 16K).
203   bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
204   page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
205 
206   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
207   size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
208   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
209 
210   if (!page_size_is_valid || shm_size < page_size ||
211       shm_size % page_size != 0) {
212     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
213                            TracingServiceImpl::kDefaultShmPageSize);
214   }
215   return std::make_tuple(shm_size, page_size);
216 }
217 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)218 bool NameMatchesFilter(const std::string& name,
219                        const std::vector<std::string>& name_filter,
220                        const std::vector<std::string>& name_regex_filter) {
221   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
222   if (!filter_is_set)
223     return true;
224   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
225                                   name) != name_filter.end();
226   bool filter_regex_matches =
227       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
228                    [&](const std::string& regex) {
229                      return std::regex_match(
230                          name, std::regex(regex, std::regex::extended));
231                    }) != name_regex_filter.end();
232   return filter_matches || filter_regex_matches;
233 }
234 
235 // Used when TraceConfig.write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path,bool overwrite)236 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
237 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
238     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
239   // This is NOT trying to preserve any security property, SELinux does that.
240   // It just improves the actionability of the error when people try to save the
241   // trace in a location that is not SELinux-allowed (a generic "permission
242   // denied" vs "don't put it here, put it there").
243   // These are the only SELinux approved dir for trace files that are created
244   // directly by traced.
245   static const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
246   if (!base::StartsWith(path, kTraceDirBasePath)) {
247     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
248                   path.c_str(), kTraceDirBasePath);
249     return base::ScopedFile();
250   }
251 #endif
252   // O_CREAT | O_EXCL will fail if the file exists already.
253   const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
254   auto fd = base::OpenFile(path, flags, 0600);
255   if (fd) {
256 #if defined(PERFETTO_HAS_CHMOD)
257     // Passing 0644 directly above won't work because of umask.
258     PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
259 #endif
260   } else {
261     PERFETTO_PLOG("Failed to create %s", path.c_str());
262   }
263   return fd;
264 }
265 
ShouldLogEvent(const TraceConfig & cfg)266 bool ShouldLogEvent(const TraceConfig& cfg) {
267   switch (cfg.statsd_logging()) {
268     case TraceConfig::STATSD_LOGGING_ENABLED:
269       return true;
270     case TraceConfig::STATSD_LOGGING_DISABLED:
271       return false;
272     case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
273       break;
274   }
275   // For backward compatibility with older versions of perfetto_cmd.
276   return cfg.enable_extra_guardrails();
277 }
278 
279 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
280 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)281 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
282                                size_t size,
283                                size_t max_slice_size,
284                                perfetto::TracePacket* packet) {
285   if (size <= max_slice_size) {
286     packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
287     return;
288   }
289   uint8_t* src_ptr = data.get();
290   for (size_t size_left = size; size_left > 0;) {
291     const size_t slice_size = std::min(size_left, max_slice_size);
292 
293     Slice slice = Slice::Allocate(slice_size);
294     memcpy(slice.own_data(), src_ptr, slice_size);
295     packet->AddSlice(std::move(slice));
296 
297     src_ptr += slice_size;
298     size_left -= slice_size;
299   }
300 }
301 
302 using TraceFilter = protos::gen::TraceConfig::TraceFilter;
ConvertPolicy(TraceFilter::StringFilterPolicy policy)303 std::optional<protozero::StringFilter::Policy> ConvertPolicy(
304     TraceFilter::StringFilterPolicy policy) {
305   switch (policy) {
306     case TraceFilter::SFP_UNSPECIFIED:
307       return std::nullopt;
308     case TraceFilter::SFP_MATCH_REDACT_GROUPS:
309       return protozero::StringFilter::Policy::kMatchRedactGroups;
310     case TraceFilter::SFP_ATRACE_MATCH_REDACT_GROUPS:
311       return protozero::StringFilter::Policy::kAtraceMatchRedactGroups;
312     case TraceFilter::SFP_MATCH_BREAK:
313       return protozero::StringFilter::Policy::kMatchBreak;
314     case TraceFilter::SFP_ATRACE_MATCH_BREAK:
315       return protozero::StringFilter::Policy::kAtraceMatchBreak;
316   }
317   return std::nullopt;
318 }
319 
320 }  // namespace
321 
322 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)323 std::unique_ptr<TracingService> TracingService::CreateInstance(
324     std::unique_ptr<SharedMemory::Factory> shm_factory,
325     base::TaskRunner* task_runner,
326     InitOpts init_opts) {
327   return std::unique_ptr<TracingService>(
328       new TracingServiceImpl(std::move(shm_factory), task_runner, init_opts));
329 }
330 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)331 TracingServiceImpl::TracingServiceImpl(
332     std::unique_ptr<SharedMemory::Factory> shm_factory,
333     base::TaskRunner* task_runner,
334     InitOpts init_opts)
335     : task_runner_(task_runner),
336       init_opts_(init_opts),
337       shm_factory_(std::move(shm_factory)),
338       uid_(base::GetCurrentUserId()),
339       buffer_ids_(kMaxTraceBufferID),
340       trigger_probability_rand_(
341           static_cast<uint32_t>(base::GetWallTimeNs().count())),
342       weak_ptr_factory_(this) {
343   PERFETTO_DCHECK(task_runner_);
344 }
345 
~TracingServiceImpl()346 TracingServiceImpl::~TracingServiceImpl() {
347   // TODO(fmayer): handle teardown of all Producer.
348 }
349 
350 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,pid_t pid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)351 TracingServiceImpl::ConnectProducer(Producer* producer,
352                                     uid_t uid,
353                                     pid_t pid,
354                                     const std::string& producer_name,
355                                     size_t shared_memory_size_hint_bytes,
356                                     bool in_process,
357                                     ProducerSMBScrapingMode smb_scraping_mode,
358                                     size_t shared_memory_page_size_hint_bytes,
359                                     std::unique_ptr<SharedMemory> shm,
360                                     const std::string& sdk_version) {
361   PERFETTO_DCHECK_THREAD(thread_checker_);
362 
363   if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
364     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
365                   static_cast<unsigned long>(uid));
366     return nullptr;
367   }
368 
369   if (producers_.size() >= kMaxProducerID) {
370     PERFETTO_DFATAL("Too many producers.");
371     return nullptr;
372   }
373   const ProducerID id = GetNextProducerID();
374   PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
375                 static_cast<int>(uid));
376   bool smb_scraping_enabled = smb_scraping_enabled_;
377   switch (smb_scraping_mode) {
378     case ProducerSMBScrapingMode::kDefault:
379       break;
380     case ProducerSMBScrapingMode::kEnabled:
381       smb_scraping_enabled = true;
382       break;
383     case ProducerSMBScrapingMode::kDisabled:
384       smb_scraping_enabled = false;
385       break;
386   }
387 
388   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
389       id, uid, pid, this, task_runner_, producer, producer_name, sdk_version,
390       in_process, smb_scraping_enabled));
391   auto it_and_inserted = producers_.emplace(id, endpoint.get());
392   PERFETTO_DCHECK(it_and_inserted.second);
393   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
394   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
395 
396   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
397   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
398   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
399   task_runner_->PostTask([weak_ptr] {
400     if (weak_ptr)
401       weak_ptr->producer_->OnConnect();
402   });
403 
404   if (shm) {
405     // The producer supplied an SMB. This is used only by Chrome; in the most
406     // common cases the SMB is created by the service and passed via
407     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
408     // use it. The transport layer has to verify the integrity of the SMB (e.g.
409     // ensure that the producer can't resize if after the fact).
410     size_t shm_size, page_size;
411     std::tie(shm_size, page_size) =
412         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
413     if (shm_size == shm->size() &&
414         page_size == endpoint->shmem_page_size_hint_bytes_) {
415       PERFETTO_DLOG(
416           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
417           shm_size / 1024, endpoint->name_.c_str());
418       endpoint->SetupSharedMemory(std::move(shm), page_size,
419                                   /*provided_by_producer=*/true);
420     } else {
421       PERFETTO_LOG(
422           "Discarding incorrectly sized producer-provided SMB for producer "
423           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
424           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
425           "%zu B page size",
426           endpoint->name_.c_str(), shm->size(),
427           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
428       shm.reset();
429     }
430   }
431 
432   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
433 }
434 
DisconnectProducer(ProducerID id)435 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
436   PERFETTO_DCHECK_THREAD(thread_checker_);
437   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
438   PERFETTO_DCHECK(producers_.count(id));
439 
440   // Scrape remaining chunks for this producer to ensure we don't lose data.
441   if (auto* producer = GetProducer(id)) {
442     for (auto& session_id_and_session : tracing_sessions_)
443       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
444   }
445 
446   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
447     auto next = it;
448     next++;
449     if (it->second.producer_id == id)
450       UnregisterDataSource(id, it->second.descriptor.name());
451     it = next;
452   }
453 
454   producers_.erase(id);
455   UpdateMemoryGuardrail();
456 }
457 
GetProducer(ProducerID id) const458 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
459     ProducerID id) const {
460   PERFETTO_DCHECK_THREAD(thread_checker_);
461   auto it = producers_.find(id);
462   if (it == producers_.end())
463     return nullptr;
464   return it->second;
465 }
466 
467 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)468 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
469   PERFETTO_DCHECK_THREAD(thread_checker_);
470   PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
471                 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
472   std::unique_ptr<ConsumerEndpointImpl> endpoint(
473       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
474   auto it_and_inserted = consumers_.emplace(endpoint.get());
475   PERFETTO_DCHECK(it_and_inserted.second);
476   // Consumer might go away before we're able to send the connect notification,
477   // if that is the case just bail out.
478   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
479   task_runner_->PostTask([weak_ptr] {
480     if (weak_ptr)
481       weak_ptr->consumer_->OnConnect();
482   });
483   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
484 }
485 
DisconnectConsumer(ConsumerEndpointImpl * consumer)486 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
487   PERFETTO_DCHECK_THREAD(thread_checker_);
488   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
489   PERFETTO_DCHECK(consumers_.count(consumer));
490 
491   // TODO(primiano) : Check that this is safe (what happens if there are
492   // ReadBuffers() calls posted in the meantime? They need to become noop).
493   if (consumer->tracing_session_id_)
494     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
495   consumers_.erase(consumer);
496 
497   // At this point no more pointers to |consumer| should be around.
498   PERFETTO_DCHECK(!std::any_of(
499       tracing_sessions_.begin(), tracing_sessions_.end(),
500       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
501         return kv.second.consumer_maybe_null == consumer;
502       }));
503 }
504 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)505 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
506                                         const std::string& key) {
507   PERFETTO_DCHECK_THREAD(thread_checker_);
508   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
509   PERFETTO_DCHECK(consumers_.count(consumer));
510 
511   TracingSessionID tsid = consumer->tracing_session_id_;
512   TracingSession* tracing_session;
513   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
514     return false;
515 
516   if (GetDetachedSession(consumer->uid_, key)) {
517     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
518                   key.c_str());
519     return false;
520   }
521 
522   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
523   tracing_session->consumer_maybe_null = nullptr;
524   tracing_session->detach_key = key;
525   consumer->tracing_session_id_ = 0;
526   return true;
527 }
528 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)529 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
530                                         const std::string& key) {
531   PERFETTO_DCHECK_THREAD(thread_checker_);
532   PERFETTO_DLOG("Consumer %p attaching to session %s",
533                 reinterpret_cast<void*>(consumer), key.c_str());
534   PERFETTO_DCHECK(consumers_.count(consumer));
535 
536   if (consumer->tracing_session_id_) {
537     PERFETTO_ELOG(
538         "Cannot reattach consumer to session %s"
539         " while it already attached tracing session ID %" PRIu64,
540         key.c_str(), consumer->tracing_session_id_);
541     return false;
542   }
543 
544   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
545   if (!tracing_session) {
546     PERFETTO_ELOG(
547         "Failed to attach consumer, session '%s' not found for uid %d",
548         key.c_str(), static_cast<int>(consumer->uid_));
549     return false;
550   }
551 
552   consumer->tracing_session_id_ = tracing_session->id;
553   tracing_session->consumer_maybe_null = consumer;
554   tracing_session->detach_key.clear();
555   return true;
556 }
557 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)558 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
559                                                const TraceConfig& cfg,
560                                                base::ScopedFile fd) {
561   PERFETTO_DCHECK_THREAD(thread_checker_);
562 
563   // If the producer is specifying a UUID, respect that (at least for the first
564   // snapshot). Otherwise generate a new UUID.
565   base::Uuid uuid(cfg.trace_uuid_lsb(), cfg.trace_uuid_msb());
566   if (!uuid)
567     uuid = base::Uuidv4();
568 
569   PERFETTO_DLOG("Enabling tracing for consumer %p, UUID: %s",
570                 reinterpret_cast<void*>(consumer),
571                 uuid.ToPrettyString().c_str());
572   MaybeLogUploadEvent(cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracing);
573   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
574     lockdown_mode_ = true;
575   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
576     lockdown_mode_ = false;
577 
578   // Scope |tracing_session| to this block to prevent accidental use of a null
579   // pointer later in this function.
580   {
581     TracingSession* tracing_session =
582         GetTracingSession(consumer->tracing_session_id_);
583     if (tracing_session) {
584       MaybeLogUploadEvent(
585           cfg, uuid,
586           PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
587       return PERFETTO_SVC_ERR(
588           "A Consumer is trying to EnableTracing() but another tracing "
589           "session is already active (forgot a call to FreeBuffers() ?)");
590     }
591   }
592 
593   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
594                                        ? kGuardrailsMaxTracingDurationMillis
595                                        : kMaxTracingDurationMillis;
596   if (cfg.duration_ms() > max_duration_ms) {
597     MaybeLogUploadEvent(cfg, uuid,
598                         PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
599     return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
600                             "ms  > %" PRIu32 " ms)",
601                             cfg.duration_ms(), max_duration_ms);
602   }
603 
604   const bool has_trigger_config =
605       GetTriggerMode(cfg) != TraceConfig::TriggerConfig::UNSPECIFIED;
606   if (has_trigger_config &&
607       (cfg.trigger_config().trigger_timeout_ms() == 0 ||
608        cfg.trigger_config().trigger_timeout_ms() > max_duration_ms)) {
609     MaybeLogUploadEvent(
610         cfg, uuid,
611         PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
612     return PERFETTO_SVC_ERR(
613         "Traces with START_TRACING triggers must provide a positive "
614         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
615         cfg.trigger_config().trigger_timeout_ms());
616   }
617 
618   // This check has been introduced in May 2023 after finding b/274931668.
619   if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
620       TraceConfig::TriggerConfig::TriggerMode_MAX) {
621     MaybeLogUploadEvent(
622         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
623     return PERFETTO_SVC_ERR(
624         "The trace config specified an invalid trigger_mode");
625   }
626 
627   if (cfg.trigger_config().use_clone_snapshot_if_available() &&
628       cfg.trigger_config().trigger_mode() !=
629           TraceConfig::TriggerConfig::STOP_TRACING) {
630     MaybeLogUploadEvent(
631         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
632     return PERFETTO_SVC_ERR(
633         "trigger_mode must be STOP_TRACING when "
634         "use_clone_snapshot_if_available=true");
635   }
636 
637   if (has_trigger_config && cfg.duration_ms() != 0) {
638     MaybeLogUploadEvent(
639         cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
640     return PERFETTO_SVC_ERR(
641         "duration_ms was set, this must not be set for traces with triggers.");
642   }
643 
644   if ((GetTriggerMode(cfg) == TraceConfig::TriggerConfig::STOP_TRACING ||
645        GetTriggerMode(cfg) == TraceConfig::TriggerConfig::CLONE_SNAPSHOT) &&
646       cfg.write_into_file()) {
647     // We don't support this usecase because there are subtle assumptions which
648     // break around TracingServiceEvents and windowed sorting (i.e. if we don't
649     // drain the events in ReadBuffersIntoFile because we are waiting for
650     // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
651     // emitting them wildy out of order breaking windowed sorting in trace
652     // processor).
653     MaybeLogUploadEvent(
654         cfg, uuid,
655         PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
656     return PERFETTO_SVC_ERR(
657         "Specifying trigger mode STOP_TRACING/CLONE_SNAPSHOT and "
658         "write_into_file together is unsupported");
659   }
660 
661   std::unordered_set<std::string> triggers;
662   for (const auto& trigger : cfg.trigger_config().triggers()) {
663     if (!triggers.insert(trigger.name()).second) {
664       MaybeLogUploadEvent(
665           cfg, uuid,
666           PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
667       return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
668                               trigger.name().c_str());
669     }
670   }
671 
672   if (cfg.enable_extra_guardrails()) {
673     if (cfg.deferred_start()) {
674       MaybeLogUploadEvent(
675           cfg, uuid,
676           PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
677       return PERFETTO_SVC_ERR(
678           "deferred_start=true is not supported in unsupervised traces");
679     }
680     uint64_t buf_size_sum = 0;
681     for (const auto& buf : cfg.buffers()) {
682       if (buf.size_kb() % 4 != 0) {
683         MaybeLogUploadEvent(
684             cfg, uuid,
685             PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
686         return PERFETTO_SVC_ERR(
687             "buffers.size_kb must be a multiple of 4, got %" PRIu32,
688             buf.size_kb());
689       }
690       buf_size_sum += buf.size_kb();
691     }
692 
693     uint32_t max_tracing_buffer_size_kb =
694         std::max(kGuardrailsMaxTracingBufferSizeKb,
695                  cfg.guardrail_overrides().max_tracing_buffer_size_kb());
696     if (buf_size_sum > max_tracing_buffer_size_kb) {
697       MaybeLogUploadEvent(
698           cfg, uuid,
699           PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
700       return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
701                               "kB  > %" PRIu32 " kB)",
702                               buf_size_sum, max_tracing_buffer_size_kb);
703     }
704   }
705 
706   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
707     MaybeLogUploadEvent(cfg, uuid,
708                         PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
709     return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
710                             cfg.buffers_size());
711   }
712   // Check that the config specifies all buffers for its data sources. This
713   // is also checked in SetupDataSource, but it is simpler to return a proper
714   // error to the consumer from here (and there will be less state to undo).
715   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
716     size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
717     size_t target_buffer = cfg_data_source.config().target_buffer();
718     if (target_buffer >= num_buffers) {
719       MaybeLogUploadEvent(
720           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
721       return PERFETTO_SVC_ERR(
722           "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
723           "%zu)",
724           cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
725     }
726   }
727 
728   if (!cfg.unique_session_name().empty()) {
729     const std::string& name = cfg.unique_session_name();
730     for (auto& kv : tracing_sessions_) {
731       if (kv.second.state == TracingSession::CLONED_READ_ONLY)
732         continue;  // Don't consider cloned sessions in uniqueness checks.
733       if (kv.second.config.unique_session_name() == name) {
734         MaybeLogUploadEvent(
735             cfg, uuid,
736             PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
737         static const char fmt[] =
738             "A trace with this unique session name (%s) already exists";
739         // This happens frequently, don't make it an "E"LOG.
740         PERFETTO_LOG(fmt, name.c_str());
741         return base::ErrStatus(fmt, name.c_str());
742       }
743     }
744   }
745 
746   if (cfg.enable_extra_guardrails()) {
747     // unique_session_name can be empty
748     const std::string& name = cfg.unique_session_name();
749     int64_t now_s = base::GetBootTimeS().count();
750 
751     // Remove any entries where the time limit has passed so this map doesn't
752     // grow indefinitely:
753     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
754     for (auto it = sessions.cbegin(); it != sessions.cend();) {
755       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
756         it = sessions.erase(it);
757       } else {
758         ++it;
759       }
760     }
761 
762     int64_t& previous_s = session_to_last_trace_s_[name];
763     if (previous_s == 0) {
764       previous_s = now_s;
765     } else {
766       MaybeLogUploadEvent(
767           cfg, uuid,
768           PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
769       return PERFETTO_SVC_ERR(
770           "A trace with unique session name \"%s\" began less than %" PRId64
771           "s ago (%" PRId64 "s)",
772           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
773     }
774   }
775 
776   const int sessions_for_uid = static_cast<int>(std::count_if(
777       tracing_sessions_.begin(), tracing_sessions_.end(),
778       [consumer](const decltype(tracing_sessions_)::value_type& s) {
779         return s.second.consumer_uid == consumer->uid_;
780       }));
781 
782   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
783   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
784     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
785   }
786   if (sessions_for_uid >= per_uid_limit) {
787     MaybeLogUploadEvent(
788         cfg, uuid,
789         PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
790     return PERFETTO_SVC_ERR(
791         "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
792         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
793   }
794 
795   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
796   // in a state where it stalls by design by having more TraceWriterImpl
797   // instances than free pages in the buffer. This is really a bug in
798   // trace_probes and the way it handles stalls in the shmem buffer.
799   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
800     MaybeLogUploadEvent(
801         cfg, uuid,
802         PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
803     return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
804                             tracing_sessions_.size());
805   }
806 
807   // If the trace config provides a filter bytecode, setup the filter now.
808   // If the filter loading fails, abort the tracing session rather than running
809   // unfiltered.
810   std::unique_ptr<protozero::MessageFilter> trace_filter;
811   if (cfg.has_trace_filter()) {
812     const auto& filt = cfg.trace_filter();
813     trace_filter.reset(new protozero::MessageFilter());
814 
815     protozero::StringFilter& string_filter = trace_filter->string_filter();
816     for (const auto& rule : filt.string_filter_chain().rules()) {
817       auto opt_policy = ConvertPolicy(rule.policy());
818       if (!opt_policy.has_value()) {
819         MaybeLogUploadEvent(
820             cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
821         return PERFETTO_SVC_ERR(
822             "Trace filter has invalid string filtering rules, aborting");
823       }
824       string_filter.AddRule(*opt_policy, rule.regex_pattern(),
825                             rule.atrace_payload_starts_with());
826     }
827 
828     const std::string& bytecode_v1 = filt.bytecode();
829     const std::string& bytecode_v2 = filt.bytecode_v2();
830     const std::string& bytecode =
831         bytecode_v2.empty() ? bytecode_v1 : bytecode_v2;
832     if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
833       MaybeLogUploadEvent(
834           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
835       return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
836     }
837 
838     // The filter is created using perfetto.protos.Trace as root message
839     // (because that makes it possible to play around with the `proto_filter`
840     // tool on actual traces). Here in the service, however, we deal with
841     // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
842     // The IPC client (or the write_into_filte logic in here) are responsible
843     // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
844     // the preamble is not there at ReadBuffer time. Hence we change the root of
845     // the filtering to start at the Trace.packet level.
846     uint32_t packet_field_id = TracePacket::kPacketFieldNumber;
847     if (!trace_filter->SetFilterRoot(&packet_field_id, 1)) {
848       MaybeLogUploadEvent(
849           cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
850       return PERFETTO_SVC_ERR("Failed to set filter root.");
851     }
852   }
853 
854   const TracingSessionID tsid = ++last_tracing_session_id_;
855   TracingSession* tracing_session =
856       &tracing_sessions_
857            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
858                     std::forward_as_tuple(tsid, consumer, cfg, task_runner_))
859            .first->second;
860 
861   tracing_session->trace_uuid = uuid;
862 
863   if (trace_filter)
864     tracing_session->trace_filter = std::move(trace_filter);
865 
866   if (cfg.write_into_file()) {
867     if (!fd ^ !cfg.output_path().empty()) {
868       MaybeLogUploadEvent(
869           tracing_session->config, uuid,
870           PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
871       tracing_sessions_.erase(tsid);
872       return PERFETTO_SVC_ERR(
873           "When write_into_file==true either a FD needs to be passed or "
874           "output_path must be populated (but not both)");
875     }
876     if (!cfg.output_path().empty()) {
877       fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
878       if (!fd) {
879         MaybeLogUploadEvent(
880             tracing_session->config, uuid,
881             PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
882         tracing_sessions_.erase(tsid);
883         return PERFETTO_SVC_ERR("Failed to create the trace file %s",
884                                 cfg.output_path().c_str());
885       }
886     }
887     tracing_session->write_into_file = std::move(fd);
888     uint32_t write_period_ms = cfg.file_write_period_ms();
889     if (write_period_ms == 0)
890       write_period_ms = kDefaultWriteIntoFilePeriodMs;
891     if (write_period_ms < min_write_period_ms_)
892       write_period_ms = min_write_period_ms_;
893     tracing_session->write_period_ms = write_period_ms;
894     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
895     tracing_session->bytes_written_into_file = 0;
896   }
897 
898   if (!cfg.compress_from_cli() &&
899       cfg.compression_type() == TraceConfig::COMPRESSION_TYPE_DEFLATE) {
900     if (init_opts_.compressor_fn) {
901       tracing_session->compress_deflate = true;
902     } else {
903       PERFETTO_LOG(
904           "COMPRESSION_TYPE_DEFLATE is not supported in the current build "
905           "configuration. Skipping compression");
906     }
907   }
908 
909   // Initialize the log buffers.
910   bool did_allocate_all_buffers = true;
911   bool invalid_buffer_config = false;
912 
913   // Allocate the trace buffers. Also create a map to translate a consumer
914   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
915   // corresponding BufferID, which is a global ID namespace for the service and
916   // all producers.
917   size_t total_buf_size_kb = 0;
918   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
919   tracing_session->buffers_index.reserve(num_buffers);
920   for (size_t i = 0; i < num_buffers; i++) {
921     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
922     BufferID global_id = buffer_ids_.Allocate();
923     if (!global_id) {
924       did_allocate_all_buffers = false;  // We ran out of IDs.
925       break;
926     }
927     tracing_session->buffers_index.push_back(global_id);
928     // TraceBuffer size is limited to 32-bit.
929     const uint32_t buf_size_kb = buffer_cfg.size_kb();
930     const uint64_t buf_size_bytes = buf_size_kb * static_cast<uint64_t>(1024);
931     const size_t buf_size = static_cast<size_t>(buf_size_bytes);
932     if (buf_size_bytes == 0 ||
933         buf_size_bytes > std::numeric_limits<uint32_t>::max() ||
934         buf_size != buf_size_bytes) {
935       invalid_buffer_config = true;
936       did_allocate_all_buffers = false;
937       break;
938     }
939     total_buf_size_kb += buf_size_kb;
940     TraceBuffer::OverwritePolicy policy =
941         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
942             ? TraceBuffer::kDiscard
943             : TraceBuffer::kOverwrite;
944     auto it_and_inserted =
945         buffers_.emplace(global_id, TraceBuffer::Create(buf_size, policy));
946     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
947     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
948     if (!trace_buffer) {
949       did_allocate_all_buffers = false;
950       break;
951     }
952   }
953 
954   // This can happen if either:
955   // - All the kMaxTraceBufferID slots are taken.
956   // - OOM, or, more realistically, we exhausted virtual memory.
957   // - The buffer size in the config is invalid.
958   // In any case, free all the previously allocated buffers and abort.
959   if (!did_allocate_all_buffers) {
960     for (BufferID global_id : tracing_session->buffers_index) {
961       buffer_ids_.Free(global_id);
962       buffers_.erase(global_id);
963     }
964     MaybeLogUploadEvent(tracing_session->config, uuid,
965                         PerfettoStatsdAtom::kTracedEnableTracingOom);
966     tracing_sessions_.erase(tsid);
967     if (invalid_buffer_config) {
968       return PERFETTO_SVC_ERR(
969           "Failed to allocate tracing buffers: Invalid buffer sizes");
970     }
971     return PERFETTO_SVC_ERR(
972         "Failed to allocate tracing buffers: OOM or too many buffers");
973   }
974 
975   UpdateMemoryGuardrail();
976 
977   consumer->tracing_session_id_ = tsid;
978 
979   // Setup the data sources on the producers without starting them.
980   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
981     // Scan all the registered data sources with a matching name.
982     auto range = data_sources_.equal_range(cfg_data_source.config().name());
983     for (auto it = range.first; it != range.second; it++) {
984       TraceConfig::ProducerConfig producer_config;
985       for (auto& config : cfg.producers()) {
986         if (GetProducer(it->second.producer_id)->name_ ==
987             config.producer_name()) {
988           producer_config = config;
989           break;
990         }
991       }
992       SetupDataSource(cfg_data_source, producer_config, it->second,
993                       tracing_session);
994     }
995   }
996 
997   bool has_start_trigger = false;
998   auto weak_this = weak_ptr_factory_.GetWeakPtr();
999   switch (GetTriggerMode(cfg)) {
1000     case TraceConfig::TriggerConfig::UNSPECIFIED:
1001       // no triggers are specified so this isn't a trace that is using triggers.
1002       PERFETTO_DCHECK(!has_trigger_config);
1003       break;
1004     case TraceConfig::TriggerConfig::START_TRACING:
1005       // For traces which use START_TRACE triggers we need to ensure that the
1006       // tracing session will be cleaned up when it times out.
1007       has_start_trigger = true;
1008       task_runner_->PostDelayedTask(
1009           [weak_this, tsid]() {
1010             if (weak_this)
1011               weak_this->OnStartTriggersTimeout(tsid);
1012           },
1013           cfg.trigger_config().trigger_timeout_ms());
1014       break;
1015     case TraceConfig::TriggerConfig::STOP_TRACING:
1016     case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1017       // Update the tracing_session's duration_ms to ensure that if no trigger
1018       // is received the session will end and be cleaned up equal to the
1019       // timeout.
1020       //
1021       // TODO(nuskos): Refactor this so that rather then modifying the config we
1022       // have a field we look at on the tracing_session.
1023       tracing_session->config.set_duration_ms(
1024           cfg.trigger_config().trigger_timeout_ms());
1025       break;
1026 
1027       // The case of unknown modes (coming from future versions of the service)
1028       // is handled few lines above (search for TriggerMode_MAX).
1029   }
1030 
1031   tracing_session->state = TracingSession::CONFIGURED;
1032   PERFETTO_LOG(
1033       "Configured tracing session %" PRIu64
1034       ", #sources:%zu, duration:%d ms%s, #buffers:%d, total "
1035       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
1036       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
1037       tracing_session->config.prefer_suspend_clock_for_duration()
1038           ? " (suspend_clock)"
1039           : "",
1040       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
1041       static_cast<unsigned int>(consumer->uid_),
1042       cfg.unique_session_name().c_str());
1043 
1044   // Start the data sources, unless this is a case of early setup + fast
1045   // triggering, either through TraceConfig.deferred_start or
1046   // TraceConfig.trigger_config(). If both are specified which ever one occurs
1047   // first will initiate the trace.
1048   if (!cfg.deferred_start() && !has_start_trigger)
1049     return StartTracing(tsid);
1050 
1051   return base::OkStatus();
1052 }
1053 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)1054 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
1055                                            const TraceConfig& updated_cfg) {
1056   PERFETTO_DCHECK_THREAD(thread_checker_);
1057   TracingSession* tracing_session =
1058       GetTracingSession(consumer->tracing_session_id_);
1059   PERFETTO_DCHECK(tracing_session);
1060 
1061   if ((tracing_session->state != TracingSession::STARTED) &&
1062       (tracing_session->state != TracingSession::CONFIGURED)) {
1063     PERFETTO_ELOG(
1064         "ChangeTraceConfig() was called for a tracing session which isn't "
1065         "running.");
1066     return;
1067   }
1068 
1069   // We only support updating producer_name_{,regex}_filter (and pass-through
1070   // configs) for now; null out any changeable fields and make sure the rest are
1071   // identical.
1072   TraceConfig new_config_copy(updated_cfg);
1073   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1074     ds_cfg.clear_producer_name_filter();
1075     ds_cfg.clear_producer_name_regex_filter();
1076   }
1077 
1078   TraceConfig current_config_copy(tracing_session->config);
1079   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1080     ds_cfg.clear_producer_name_filter();
1081     ds_cfg.clear_producer_name_regex_filter();
1082   }
1083 
1084   if (new_config_copy != current_config_copy) {
1085     PERFETTO_LOG(
1086         "ChangeTraceConfig() was called with a config containing unsupported "
1087         "changes; only adding to the producer_name_{,regex}_filter is "
1088         "currently supported and will have an effect.");
1089   }
1090 
1091   for (TraceConfig::DataSource& cfg_data_source :
1092        *tracing_session->config.mutable_data_sources()) {
1093     // Find the updated producer_filter in the new config.
1094     std::vector<std::string> new_producer_name_filter;
1095     std::vector<std::string> new_producer_name_regex_filter;
1096     bool found_data_source = false;
1097     for (const auto& it : updated_cfg.data_sources()) {
1098       if (cfg_data_source.config().name() == it.config().name()) {
1099         new_producer_name_filter = it.producer_name_filter();
1100         new_producer_name_regex_filter = it.producer_name_regex_filter();
1101         found_data_source = true;
1102         break;
1103       }
1104     }
1105 
1106     // Bail out if data source not present in the new config.
1107     if (!found_data_source) {
1108       PERFETTO_ELOG(
1109           "ChangeTraceConfig() called without a current data source also "
1110           "present in the new config: %s",
1111           cfg_data_source.config().name().c_str());
1112       continue;
1113     }
1114 
1115     // TODO(oysteine): Just replacing the filter means that if
1116     // there are any filter entries which were present in the original config,
1117     // but removed from the config passed to ChangeTraceConfig, any matching
1118     // producers will keep producing but newly added producers after this
1119     // point will never start.
1120     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1121     *cfg_data_source.mutable_producer_name_regex_filter() =
1122         new_producer_name_regex_filter;
1123 
1124     // Scan all the registered data sources with a matching name.
1125     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1126     for (auto it = range.first; it != range.second; it++) {
1127       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1128       PERFETTO_DCHECK(producer);
1129 
1130       // Check if the producer name of this data source is present
1131       // in the name filters. We currently only support new filters, not
1132       // removing old ones.
1133       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1134                              new_producer_name_regex_filter)) {
1135         continue;
1136       }
1137 
1138       bool already_setup = false;
1139       auto& ds_instances = tracing_session->data_source_instances;
1140       for (auto instance_it = ds_instances.begin();
1141            instance_it != ds_instances.end(); ++instance_it) {
1142         if (instance_it->first == it->second.producer_id &&
1143             instance_it->second.data_source_name ==
1144                 cfg_data_source.config().name()) {
1145           already_setup = true;
1146           break;
1147         }
1148       }
1149 
1150       if (already_setup)
1151         continue;
1152 
1153       // If it wasn't previously setup, set it up now.
1154       // (The per-producer config is optional).
1155       TraceConfig::ProducerConfig producer_config;
1156       for (auto& config : tracing_session->config.producers()) {
1157         if (producer->name_ == config.producer_name()) {
1158           producer_config = config;
1159           break;
1160         }
1161       }
1162 
1163       DataSourceInstance* ds_inst = SetupDataSource(
1164           cfg_data_source, producer_config, it->second, tracing_session);
1165 
1166       if (ds_inst && tracing_session->state == TracingSession::STARTED)
1167         StartDataSourceInstance(producer, tracing_session, ds_inst);
1168     }
1169   }
1170 }
1171 
StartTracing(TracingSessionID tsid)1172 base::Status TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1173   PERFETTO_DCHECK_THREAD(thread_checker_);
1174 
1175   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1176   TracingSession* tracing_session = GetTracingSession(tsid);
1177   if (!tracing_session) {
1178     return PERFETTO_SVC_ERR(
1179         "StartTracing() failed, invalid session ID %" PRIu64, tsid);
1180   }
1181 
1182   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1183                       PerfettoStatsdAtom::kTracedStartTracing);
1184 
1185   if (tracing_session->state != TracingSession::CONFIGURED) {
1186     MaybeLogUploadEvent(
1187         tracing_session->config, tracing_session->trace_uuid,
1188         PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1189     return PERFETTO_SVC_ERR("StartTracing() failed, invalid session state: %d",
1190                             tracing_session->state);
1191   }
1192 
1193   tracing_session->state = TracingSession::STARTED;
1194 
1195   // We store the start of trace snapshot separately as it's important to make
1196   // sure we can interpret all the data in the trace and storing it in the ring
1197   // buffer means it could be overwritten by a later snapshot.
1198   if (!tracing_session->config.builtin_data_sources()
1199            .disable_clock_snapshotting()) {
1200     SnapshotClocks(&tracing_session->initial_clock_snapshot);
1201   }
1202 
1203   // We don't snapshot the clocks here because we just did this above.
1204   SnapshotLifecyleEvent(
1205       tracing_session,
1206       protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1207       false /* snapshot_clocks */);
1208 
1209   // Periodically snapshot clocks, stats, sync markers while the trace is
1210   // active. The snapshots are emitted on the future ReadBuffers() calls, which
1211   // means that:
1212   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
1213   //      write snapshots periodically into the trace.
1214   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1215   //      snapshot into the trace. For clock snapshots, we keep track of the
1216   //      snapshot recorded at the beginning of the session
1217   //      (initial_clock_snapshot above), as well as the most recent sampled
1218   //      snapshots that showed significant new drift between different clocks.
1219   //      The latter clock snapshots are sampled periodically and at lifecycle
1220   //      events.
1221   base::PeriodicTask::Args snapshot_task_args;
1222   snapshot_task_args.start_first_task_immediately = true;
1223   snapshot_task_args.use_suspend_aware_timer =
1224       tracing_session->config.builtin_data_sources()
1225           .prefer_suspend_clock_for_snapshot();
1226   snapshot_task_args.task = [weak_this, tsid] {
1227     if (weak_this)
1228       weak_this->PeriodicSnapshotTask(tsid);
1229   };
1230   snapshot_task_args.period_ms =
1231       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1232   if (!snapshot_task_args.period_ms)
1233     snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1234   tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1235 
1236   // Trigger delayed task if the trace is time limited.
1237   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1238   if (trace_duration_ms > 0) {
1239     auto stop_task =
1240         std::bind(&TracingServiceImpl::StopOnDurationMsExpiry, weak_this, tsid);
1241     if (tracing_session->config.prefer_suspend_clock_for_duration()) {
1242       base::PeriodicTask::Args stop_args;
1243       stop_args.use_suspend_aware_timer = true;
1244       stop_args.period_ms = trace_duration_ms;
1245       stop_args.one_shot = true;
1246       stop_args.task = std::move(stop_task);
1247       tracing_session->timed_stop_task.Start(stop_args);
1248     } else {
1249       task_runner_->PostDelayedTask(std::move(stop_task), trace_duration_ms);
1250     }
1251   }  // if (trace_duration_ms > 0).
1252 
1253   // Start the periodic drain tasks if we should to save the trace into a file.
1254   if (tracing_session->config.write_into_file()) {
1255     task_runner_->PostDelayedTask(
1256         [weak_this, tsid] {
1257           if (weak_this)
1258             weak_this->ReadBuffersIntoFile(tsid);
1259         },
1260         tracing_session->delay_to_next_write_period_ms());
1261   }
1262 
1263   // Start the periodic flush tasks if the config specified a flush period.
1264   if (tracing_session->config.flush_period_ms())
1265     PeriodicFlushTask(tsid, /*post_next_only=*/true);
1266 
1267   // Start the periodic incremental state clear tasks if the config specified a
1268   // period.
1269   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1270     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1271   }
1272 
1273   for (auto& kv : tracing_session->data_source_instances) {
1274     ProducerID producer_id = kv.first;
1275     DataSourceInstance& data_source = kv.second;
1276     ProducerEndpointImpl* producer = GetProducer(producer_id);
1277     if (!producer) {
1278       PERFETTO_DFATAL("Producer does not exist.");
1279       continue;
1280     }
1281     StartDataSourceInstance(producer, tracing_session, &data_source);
1282   }
1283 
1284   MaybeNotifyAllDataSourcesStarted(tracing_session);
1285   return base::OkStatus();
1286 }
1287 
1288 // static
StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl> weak_this,TracingSessionID tsid)1289 void TracingServiceImpl::StopOnDurationMsExpiry(
1290     base::WeakPtr<TracingServiceImpl> weak_this,
1291     TracingSessionID tsid) {
1292   // Skip entirely the flush if the trace session doesn't exist anymore.
1293   // This is to prevent misleading error messages to be logged.
1294   if (!weak_this)
1295     return;
1296   auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
1297   if (!tracing_session_ptr)
1298     return;
1299   // If this trace was using STOP_TRACING triggers and we've seen
1300   // one, then the trigger overrides the normal timeout. In this
1301   // case we just return and let the other task clean up this trace.
1302   if (GetTriggerMode(tracing_session_ptr->config) ==
1303           TraceConfig::TriggerConfig::STOP_TRACING &&
1304       !tracing_session_ptr->received_triggers.empty())
1305     return;
1306   // In all other cases (START_TRACING or no triggers) we flush
1307   // after |trace_duration_ms| unconditionally.
1308   weak_this->FlushAndDisableTracing(tsid);
1309 }
1310 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1311 void TracingServiceImpl::StartDataSourceInstance(
1312     ProducerEndpointImpl* producer,
1313     TracingSession* tracing_session,
1314     TracingServiceImpl::DataSourceInstance* instance) {
1315   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1316   if (instance->will_notify_on_start) {
1317     instance->state = DataSourceInstance::STARTING;
1318   } else {
1319     instance->state = DataSourceInstance::STARTED;
1320   }
1321   if (tracing_session->consumer_maybe_null) {
1322     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1323         *producer, *instance);
1324   }
1325   producer->StartDataSource(instance->instance_id, instance->config);
1326 
1327   // If all data sources are started, notify the consumer.
1328   if (instance->state == DataSourceInstance::STARTED)
1329     MaybeNotifyAllDataSourcesStarted(tracing_session);
1330 }
1331 
1332 // DisableTracing just stops the data sources but doesn't free up any buffer.
1333 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1334 // and then drain the buffers. The actual teardown of the TracingSession happens
1335 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1336 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1337                                         bool disable_immediately) {
1338   PERFETTO_DCHECK_THREAD(thread_checker_);
1339   TracingSession* tracing_session = GetTracingSession(tsid);
1340   if (!tracing_session) {
1341     // Can happen if the consumer calls this before EnableTracing() or after
1342     // FreeBuffers().
1343     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1344     return;
1345   }
1346 
1347   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1348                       PerfettoStatsdAtom::kTracedDisableTracing);
1349 
1350   switch (tracing_session->state) {
1351     // Spurious call to DisableTracing() while already disabled, nothing to do.
1352     case TracingSession::DISABLED:
1353       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1354       return;
1355 
1356     case TracingSession::CLONED_READ_ONLY:
1357       PERFETTO_DLOG("DisableTracing() cannot be called on a cloned session");
1358       return;
1359 
1360     // This is either:
1361     // A) The case of a graceful DisableTracing() call followed by a call to
1362     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1363     //    to forcefully transition in the disabled state without waiting for the
1364     //    outstanding acks because the buffers are going to be destroyed soon.
1365     // B) A spurious call, iff |disable_immediately| == false, in which case
1366     //    there is nothing to do.
1367     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1368       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1369       if (disable_immediately)
1370         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1371       return;
1372 
1373     // Continues below.
1374     case TracingSession::CONFIGURED:
1375       // If the session didn't even start there is no need to orchestrate a
1376       // graceful stop of data sources.
1377       disable_immediately = true;
1378       break;
1379 
1380     // This is the nominal case, continues below.
1381     case TracingSession::STARTED:
1382       break;
1383   }
1384 
1385   for (auto& data_source_inst : tracing_session->data_source_instances) {
1386     const ProducerID producer_id = data_source_inst.first;
1387     DataSourceInstance& instance = data_source_inst.second;
1388     ProducerEndpointImpl* producer = GetProducer(producer_id);
1389     PERFETTO_DCHECK(producer);
1390     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1391                     instance.state == DataSourceInstance::STARTING ||
1392                     instance.state == DataSourceInstance::STARTED);
1393     StopDataSourceInstance(producer, tracing_session, &instance,
1394                            disable_immediately);
1395   }
1396 
1397   // If the periodic task is running, we can stop the periodic snapshot timer
1398   // here instead of waiting until FreeBuffers to prevent useless snapshots
1399   // which won't be read.
1400   tracing_session->snapshot_periodic_task.Reset();
1401 
1402   // Either this request is flagged with |disable_immediately| or there are no
1403   // data sources that are requesting a final handshake. In both cases just mark
1404   // the session as disabled immediately, notify the consumer and flush the
1405   // trace file (if used).
1406   if (tracing_session->AllDataSourceInstancesStopped())
1407     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1408 
1409   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1410   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1411   task_runner_->PostDelayedTask(
1412       [weak_this, tsid] {
1413         if (weak_this)
1414           weak_this->OnDisableTracingTimeout(tsid);
1415       },
1416       tracing_session->data_source_stop_timeout_ms());
1417 
1418   // Deliberately NOT removing the session from |tracing_session_|, it's still
1419   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1420 }
1421 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1422 void TracingServiceImpl::NotifyDataSourceStarted(
1423     ProducerID producer_id,
1424     DataSourceInstanceID instance_id) {
1425   PERFETTO_DCHECK_THREAD(thread_checker_);
1426   for (auto& kv : tracing_sessions_) {
1427     TracingSession& tracing_session = kv.second;
1428     DataSourceInstance* instance =
1429         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1430 
1431     if (!instance)
1432       continue;
1433 
1434     // If the tracing session was already stopped, ignore this notification.
1435     if (tracing_session.state != TracingSession::STARTED)
1436       continue;
1437 
1438     if (instance->state != DataSourceInstance::STARTING) {
1439       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1440                     instance->state);
1441       continue;
1442     }
1443 
1444     instance->state = DataSourceInstance::STARTED;
1445 
1446     ProducerEndpointImpl* producer = GetProducer(producer_id);
1447     PERFETTO_DCHECK(producer);
1448     if (tracing_session.consumer_maybe_null) {
1449       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1450           *producer, *instance);
1451     }
1452 
1453     // If all data sources are started, notify the consumer.
1454     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1455   }  // for (tracing_session)
1456 }
1457 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1458 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1459     TracingSession* tracing_session) {
1460   if (!tracing_session->consumer_maybe_null)
1461     return;
1462 
1463   if (!tracing_session->AllDataSourceInstancesStarted())
1464     return;
1465 
1466   // In some rare cases, we can get in this state more than once. Consider the
1467   // following scenario: 3 data sources are registered -> trace starts ->
1468   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1469   // Imagine now that a 4th data source registers while the trace is ongoing.
1470   // This would hit the AllDataSourceInstancesStarted() condition again.
1471   // In this case, however, we don't want to re-notify the consumer again.
1472   // That would be unexpected (even if, perhaps, technically correct) and
1473   // trigger bugs in the consumer.
1474   if (tracing_session->did_notify_all_data_source_started)
1475     return;
1476 
1477   PERFETTO_DLOG("All data sources started");
1478 
1479   SnapshotLifecyleEvent(
1480       tracing_session,
1481       protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1482       true /* snapshot_clocks */);
1483 
1484   tracing_session->did_notify_all_data_source_started = true;
1485   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1486 }
1487 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1488 void TracingServiceImpl::NotifyDataSourceStopped(
1489     ProducerID producer_id,
1490     DataSourceInstanceID instance_id) {
1491   PERFETTO_DCHECK_THREAD(thread_checker_);
1492   for (auto& kv : tracing_sessions_) {
1493     TracingSession& tracing_session = kv.second;
1494     DataSourceInstance* instance =
1495         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1496 
1497     if (!instance)
1498       continue;
1499 
1500     if (instance->state != DataSourceInstance::STOPPING) {
1501       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1502                     instance->state);
1503       continue;
1504     }
1505 
1506     instance->state = DataSourceInstance::STOPPED;
1507 
1508     ProducerEndpointImpl* producer = GetProducer(producer_id);
1509     PERFETTO_DCHECK(producer);
1510     if (tracing_session.consumer_maybe_null) {
1511       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1512           *producer, *instance);
1513     }
1514 
1515     if (!tracing_session.AllDataSourceInstancesStopped())
1516       continue;
1517 
1518     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1519       continue;
1520 
1521     // All data sources acked the termination.
1522     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1523   }  // for (tracing_session)
1524 }
1525 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1526 void TracingServiceImpl::ActivateTriggers(
1527     ProducerID producer_id,
1528     const std::vector<std::string>& triggers) {
1529   PERFETTO_DCHECK_THREAD(thread_checker_);
1530   auto* producer = GetProducer(producer_id);
1531   PERFETTO_DCHECK(producer);
1532 
1533   int64_t now_ns = base::GetBootTimeNs().count();
1534   for (const auto& trigger_name : triggers) {
1535     PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1536                   trigger_name.c_str());
1537     base::Hasher hash;
1538     hash.Update(trigger_name.c_str(), trigger_name.size());
1539     std::string triggered_session_name;
1540     base::Uuid triggered_session_uuid;
1541     TracingSessionID triggered_session_id = 0;
1542     auto trigger_mode = TraceConfig::TriggerConfig::UNSPECIFIED;
1543 
1544     uint64_t trigger_name_hash = hash.digest();
1545     size_t count_in_window =
1546         PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1547 
1548     bool trigger_matched = false;
1549     bool trigger_activated = false;
1550     for (auto& id_and_tracing_session : tracing_sessions_) {
1551       auto& tracing_session = id_and_tracing_session.second;
1552       TracingSessionID tsid = id_and_tracing_session.first;
1553       auto iter = std::find_if(
1554           tracing_session.config.trigger_config().triggers().begin(),
1555           tracing_session.config.trigger_config().triggers().end(),
1556           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1557             return trigger.name() == trigger_name;
1558           });
1559       if (iter == tracing_session.config.trigger_config().triggers().end())
1560         continue;
1561       if (tracing_session.state == TracingSession::CLONED_READ_ONLY)
1562         continue;
1563 
1564       // If this trigger requires a certain producer to have sent it
1565       // (non-empty producer_name()) ensure the producer who sent this trigger
1566       // matches.
1567       if (!iter->producer_name_regex().empty() &&
1568           !std::regex_match(
1569               producer->name_,
1570               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1571         continue;
1572       }
1573 
1574       // Use a random number between 0 and 1 to check if we should allow this
1575       // trigger through or not.
1576       double trigger_rnd =
1577           trigger_rnd_override_for_testing_ > 0
1578               ? trigger_rnd_override_for_testing_
1579               : trigger_probability_dist_(trigger_probability_rand_);
1580       PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1581       if (trigger_rnd < iter->skip_probability()) {
1582         MaybeLogTriggerEvent(tracing_session.config,
1583                              PerfettoTriggerAtom::kTracedLimitProbability,
1584                              trigger_name);
1585         continue;
1586       }
1587 
1588       // If we already triggered more times than the limit, silently ignore
1589       // this trigger.
1590       if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1591         MaybeLogTriggerEvent(tracing_session.config,
1592                              PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1593                              trigger_name);
1594         continue;
1595       }
1596       trigger_matched = true;
1597       triggered_session_id = tracing_session.id;
1598       triggered_session_name = tracing_session.config.unique_session_name();
1599       triggered_session_uuid.set_lsb_msb(tracing_session.trace_uuid.lsb(),
1600                                          tracing_session.trace_uuid.msb());
1601       trigger_mode = GetTriggerMode(tracing_session.config);
1602 
1603       const bool triggers_already_received =
1604           !tracing_session.received_triggers.empty();
1605       tracing_session.received_triggers.push_back(
1606           {static_cast<uint64_t>(now_ns), iter->name(), producer->name_,
1607            producer->uid_});
1608       auto weak_this = weak_ptr_factory_.GetWeakPtr();
1609       switch (trigger_mode) {
1610         case TraceConfig::TriggerConfig::START_TRACING:
1611           // If the session has already been triggered and moved past
1612           // CONFIGURED then we don't need to repeat StartTracing. This would
1613           // work fine (StartTracing would return false) but would add error
1614           // logs.
1615           if (tracing_session.state != TracingSession::CONFIGURED)
1616             break;
1617 
1618           trigger_activated = true;
1619           MaybeLogUploadEvent(
1620               tracing_session.config, tracing_session.trace_uuid,
1621               PerfettoStatsdAtom::kTracedTriggerStartTracing, iter->name());
1622 
1623           // We override the trace duration to be the trigger's requested
1624           // value, this ensures that the trace will end after this amount
1625           // of time has passed.
1626           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1627           StartTracing(tsid);
1628           break;
1629         case TraceConfig::TriggerConfig::STOP_TRACING:
1630           // Only stop the trace once to avoid confusing log messages. I.E.
1631           // when we've already hit the first trigger we've already Posted the
1632           // task to FlushAndDisable. So all future triggers will just break
1633           // out.
1634           if (triggers_already_received)
1635             break;
1636 
1637           trigger_activated = true;
1638           MaybeLogUploadEvent(
1639               tracing_session.config, tracing_session.trace_uuid,
1640               PerfettoStatsdAtom::kTracedTriggerStopTracing, iter->name());
1641 
1642           // Now that we've seen a trigger we need to stop, flush, and disable
1643           // this session after the configured |stop_delay_ms|.
1644           task_runner_->PostDelayedTask(
1645               [weak_this, tsid] {
1646                 // Skip entirely the flush if the trace session doesn't exist
1647                 // anymore. This is to prevent misleading error messages to be
1648                 // logged.
1649                 if (weak_this && weak_this->GetTracingSession(tsid))
1650                   weak_this->FlushAndDisableTracing(tsid);
1651               },
1652               // If this trigger is zero this will immediately executable and
1653               // will happen shortly.
1654               iter->stop_delay_ms());
1655           break;
1656 
1657         case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1658           trigger_activated = true;
1659           MaybeLogUploadEvent(
1660               tracing_session.config, tracing_session.trace_uuid,
1661               PerfettoStatsdAtom::kTracedTriggerCloneSnapshot, iter->name());
1662           task_runner_->PostDelayedTask(
1663               [weak_this, tsid] {
1664                 if (!weak_this)
1665                   return;
1666                 auto* tsess = weak_this->GetTracingSession(tsid);
1667                 if (!tsess || !tsess->consumer_maybe_null)
1668                   return;
1669                 tsess->consumer_maybe_null->NotifyCloneSnapshotTrigger();
1670               },
1671               iter->stop_delay_ms());
1672           break;
1673 
1674         case TraceConfig::TriggerConfig::UNSPECIFIED:
1675           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1676           break;
1677       }
1678     }  // for (.. : tracing_sessions_)
1679 
1680     if (trigger_matched) {
1681       trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1682     }
1683 
1684     if (trigger_activated) {
1685       // Log only the trigger that actually caused a trace stop/start, don't log
1686       // the follow-up ones, even if they matched.
1687       PERFETTO_LOG(
1688           "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1689           "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1690           trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1691           triggered_session_uuid.ToPrettyString().c_str(),
1692           triggered_session_id);
1693     }
1694   }  // for (trigger_name : triggers)
1695 }
1696 
1697 // Always invoked TraceConfig.data_source_stop_timeout_ms (by default
1698 // kDataSourceStopTimeoutMs) after DisableTracing(). In nominal conditions all
1699 // data sources should have acked the stop and this will early out.
OnDisableTracingTimeout(TracingSessionID tsid)1700 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1701   PERFETTO_DCHECK_THREAD(thread_checker_);
1702   TracingSession* tracing_session = GetTracingSession(tsid);
1703   if (!tracing_session ||
1704       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1705     return;  // Tracing session was successfully disabled.
1706   }
1707 
1708   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1709                 tsid);
1710   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1711   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1712 }
1713 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1714 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1715     TracingSession* tracing_session) {
1716   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1717   for (auto& inst_kv : tracing_session->data_source_instances) {
1718     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1719       continue;
1720     inst_kv.second.state = DataSourceInstance::STOPPED;
1721     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1722     PERFETTO_DCHECK(producer);
1723     if (tracing_session->consumer_maybe_null) {
1724       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1725           *producer, inst_kv.second);
1726     }
1727   }
1728   tracing_session->state = TracingSession::DISABLED;
1729 
1730   // Scrape any remaining chunks that weren't flushed by the producers.
1731   for (auto& producer_id_and_producer : producers_)
1732     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1733 
1734   SnapshotLifecyleEvent(
1735       tracing_session,
1736       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1737       true /* snapshot_clocks */);
1738 
1739   if (tracing_session->write_into_file) {
1740     tracing_session->write_period_ms = 0;
1741     ReadBuffersIntoFile(tracing_session->id);
1742   }
1743 
1744   MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1745                       PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1746 
1747   if (tracing_session->consumer_maybe_null)
1748     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1749 }
1750 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1751 void TracingServiceImpl::Flush(TracingSessionID tsid,
1752                                uint32_t timeout_ms,
1753                                ConsumerEndpoint::FlushCallback callback) {
1754   PERFETTO_DCHECK_THREAD(thread_checker_);
1755   TracingSession* tracing_session = GetTracingSession(tsid);
1756   if (!tracing_session) {
1757     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1758     return;
1759   }
1760 
1761   if (!timeout_ms)
1762     timeout_ms = tracing_session->flush_timeout_ms();
1763 
1764   if (tracing_session->pending_flushes.size() > 1000) {
1765     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1766                   tracing_session->pending_flushes.size());
1767     callback(false);
1768     return;
1769   }
1770 
1771   if (tracing_session->state != TracingSession::STARTED) {
1772     PERFETTO_ELOG("Flush() called, but tracing has not been started");
1773     callback(false);
1774     return;
1775   }
1776 
1777   ++tracing_session->flushes_requested;
1778   FlushRequestID flush_request_id = ++last_flush_request_id_;
1779   PendingFlush& pending_flush =
1780       tracing_session->pending_flushes
1781           .emplace_hint(tracing_session->pending_flushes.end(),
1782                         flush_request_id, PendingFlush(std::move(callback)))
1783           ->second;
1784 
1785   // Send a flush request to each producer involved in the tracing session. In
1786   // order to issue a flush request we have to build a map of all data source
1787   // instance ids enabled for each producer.
1788   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1789   for (const auto& data_source_inst : tracing_session->data_source_instances) {
1790     const ProducerID producer_id = data_source_inst.first;
1791     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1792     flush_map[producer_id].push_back(ds_inst_id);
1793   }
1794 
1795   for (const auto& kv : flush_map) {
1796     ProducerID producer_id = kv.first;
1797     ProducerEndpointImpl* producer = GetProducer(producer_id);
1798     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1799     producer->Flush(flush_request_id, data_sources);
1800     pending_flush.producers.insert(producer_id);
1801   }
1802 
1803   // If there are no producers to flush (realistically this happens only in
1804   // some tests) fire OnFlushTimeout() straight away, without waiting.
1805   if (flush_map.empty())
1806     timeout_ms = 0;
1807 
1808   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1809   task_runner_->PostDelayedTask(
1810       [weak_this, tsid, flush_request_id] {
1811         if (weak_this)
1812           weak_this->OnFlushTimeout(tsid, flush_request_id);
1813       },
1814       timeout_ms);
1815 }
1816 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1817 void TracingServiceImpl::NotifyFlushDoneForProducer(
1818     ProducerID producer_id,
1819     FlushRequestID flush_request_id) {
1820   for (auto& kv : tracing_sessions_) {
1821     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1822     auto& pending_flushes = kv.second.pending_flushes;
1823     auto end_it = pending_flushes.upper_bound(flush_request_id);
1824     for (auto it = pending_flushes.begin(); it != end_it;) {
1825       PendingFlush& pending_flush = it->second;
1826       pending_flush.producers.erase(producer_id);
1827       if (pending_flush.producers.empty()) {
1828         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1829         TracingSessionID tsid = kv.first;
1830         auto callback = std::move(pending_flush.callback);
1831         task_runner_->PostTask([weak_this, tsid, callback]() {
1832           if (weak_this) {
1833             weak_this->CompleteFlush(tsid, std::move(callback),
1834                                      /*success=*/true);
1835           }
1836         });
1837         it = pending_flushes.erase(it);
1838       } else {
1839         it++;
1840       }
1841     }  // for (pending_flushes)
1842   }    // for (tracing_session)
1843 }
1844 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1845 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1846                                         FlushRequestID flush_request_id) {
1847   TracingSession* tracing_session = GetTracingSession(tsid);
1848   if (!tracing_session)
1849     return;
1850   auto it = tracing_session->pending_flushes.find(flush_request_id);
1851   if (it == tracing_session->pending_flushes.end())
1852     return;  // Nominal case: flush was completed and acked on time.
1853 
1854   // If there were no producers to flush, consider it a success.
1855   bool success = it->second.producers.empty();
1856   auto callback = std::move(it->second.callback);
1857   tracing_session->pending_flushes.erase(it);
1858   CompleteFlush(tsid, std::move(callback), success);
1859 }
1860 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1861 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1862                                        ConsumerEndpoint::FlushCallback callback,
1863                                        bool success) {
1864   TracingSession* tracing_session = GetTracingSession(tsid);
1865   if (!tracing_session) {
1866     callback(false);
1867     return;
1868   }
1869   // Producers may not have been able to flush all their data, even if they
1870   // indicated flush completion. If possible, also collect uncommitted chunks
1871   // to make sure we have everything they wrote so far.
1872   for (auto& producer_id_and_producer : producers_) {
1873     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1874   }
1875   SnapshotLifecyleEvent(
1876       tracing_session,
1877       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
1878       true /* snapshot_clocks */);
1879 
1880   tracing_session->flushes_succeeded += success ? 1 : 0;
1881   tracing_session->flushes_failed += success ? 0 : 1;
1882   callback(success);
1883 }
1884 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1885 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1886     TracingSession* tracing_session,
1887     ProducerEndpointImpl* producer) {
1888   if (!producer->smb_scraping_enabled_)
1889     return;
1890 
1891   // Can't copy chunks if we don't know about any trace writers.
1892   if (producer->writers_.empty())
1893     return;
1894 
1895   // Performance optimization: On flush or session disconnect, this method is
1896   // called for each producer. If the producer doesn't participate in the
1897   // session, there's no need to scape its chunks right now. We can tell if a
1898   // producer participates in the session by checking if the producer is allowed
1899   // to write into the session's log buffers.
1900   const auto& session_buffers = tracing_session->buffers_index;
1901   bool producer_in_session =
1902       std::any_of(session_buffers.begin(), session_buffers.end(),
1903                   [producer](BufferID buffer_id) {
1904                     return producer->allowed_target_buffers_.count(buffer_id);
1905                   });
1906   if (!producer_in_session)
1907     return;
1908 
1909   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1910 
1911   // Find and copy any uncommitted chunks from the SMB.
1912   //
1913   // In nominal conditions, the page layout of the used SMB pages should never
1914   // change because the service is the only one who is supposed to modify used
1915   // pages (to make them free again).
1916   //
1917   // However, the code here needs to deal with the case of a malicious producer
1918   // altering the SMB in unpredictable ways. Thankfully the SMB size is
1919   // immutable, so a chunk will always point to some valid memory, even if the
1920   // producer alters the intended layout and chunk header concurrently.
1921   // Ultimately a malicious producer altering the SMB's chunk layout while we
1922   // are iterating in this function is not any different from the case of a
1923   // malicious producer asking to commit a chunk made of random data, which is
1924   // something this class has to deal with regardless.
1925   //
1926   // The only legitimate mutations that can happen from sane producers,
1927   // concurrently to this function, are:
1928   //   A. free pages being partitioned,
1929   //   B. free chunks being migrated to kChunkBeingWritten,
1930   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1931 
1932   SharedMemoryABI* abi = &producer->shmem_abi_;
1933   // num_pages() is immutable after the SMB is initialized and cannot be changed
1934   // even by a producer even if malicious.
1935   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1936     uint32_t layout = abi->GetPageLayout(page_idx);
1937 
1938     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
1939     // Skip empty pages.
1940     if (used_chunks == 0)
1941       continue;
1942 
1943     // Scrape the chunks that are currently used. These should be either in
1944     // state kChunkBeingWritten or kChunkComplete.
1945     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1946       if (!(used_chunks & 1))
1947         continue;
1948 
1949       SharedMemoryABI::ChunkState state =
1950           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1951       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1952                       state == SharedMemoryABI::kChunkComplete);
1953       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1954 
1955       SharedMemoryABI::Chunk chunk =
1956           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1957 
1958       uint16_t packet_count;
1959       uint8_t flags;
1960       // GetPacketCountAndFlags has acquire_load semantics.
1961       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1962 
1963       // It only makes sense to copy an incomplete chunk if there's at least
1964       // one full packet available. (The producer may not have completed the
1965       // last packet in it yet, so we need at least 2.)
1966       if (!chunk_complete && packet_count < 2)
1967         continue;
1968 
1969       // At this point, it is safe to access the remaining header fields of
1970       // the chunk. Even if the chunk was only just transferred from
1971       // kChunkFree into kChunkBeingWritten state, the header should be
1972       // written completely once the packet count increased above 1 (it was
1973       // reset to 0 by the service when the chunk was freed).
1974 
1975       WriterID writer_id = chunk.writer_id();
1976       std::optional<BufferID> target_buffer_id =
1977           producer->buffer_id_for_writer(writer_id);
1978 
1979       // We can only scrape this chunk if we know which log buffer to copy it
1980       // into.
1981       if (!target_buffer_id)
1982         continue;
1983 
1984       // Skip chunks that don't belong to the requested tracing session.
1985       bool target_buffer_belongs_to_session =
1986           std::find(session_buffers.begin(), session_buffers.end(),
1987                     *target_buffer_id) != session_buffers.end();
1988       if (!target_buffer_belongs_to_session)
1989         continue;
1990 
1991       uint32_t chunk_id =
1992           chunk.header()->chunk_id.load(std::memory_order_relaxed);
1993 
1994       CopyProducerPageIntoLogBuffer(
1995           producer->id_, producer->uid_, producer->pid_, writer_id, chunk_id,
1996           *target_buffer_id, packet_count, flags, chunk_complete,
1997           chunk.payload_begin(), chunk.payload_size());
1998     }
1999   }
2000 }
2001 
FlushAndDisableTracing(TracingSessionID tsid)2002 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
2003   PERFETTO_DCHECK_THREAD(thread_checker_);
2004   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
2005   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2006   Flush(tsid, 0, [weak_this, tsid](bool success) {
2007     // This was a DLOG up to Jun 2021 (v16, Android S).
2008     PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d", tsid,
2009                  success);
2010     if (!weak_this)
2011       return;
2012     TracingSession* session = weak_this->GetTracingSession(tsid);
2013     session->final_flush_outcome = success ? TraceStats::FINAL_FLUSH_SUCCEEDED
2014                                            : TraceStats::FINAL_FLUSH_FAILED;
2015     if (session->consumer_maybe_null) {
2016       // If the consumer is still attached, just disable the session but give it
2017       // a chance to read the contents.
2018       weak_this->DisableTracing(tsid);
2019     } else {
2020       // If the consumer detached, destroy the session. If the consumer did
2021       // start the session in long-tracing mode, the service will have saved
2022       // the contents to the passed file. If not, the contents will be
2023       // destroyed.
2024       weak_this->FreeBuffers(tsid);
2025     }
2026   });
2027 }
2028 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)2029 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
2030                                            bool post_next_only) {
2031   PERFETTO_DCHECK_THREAD(thread_checker_);
2032   TracingSession* tracing_session = GetTracingSession(tsid);
2033   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2034     return;
2035 
2036   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
2037   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2038   task_runner_->PostDelayedTask(
2039       [weak_this, tsid] {
2040         if (weak_this)
2041           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
2042       },
2043       flush_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2044                                               flush_period_ms));
2045 
2046   if (post_next_only)
2047     return;
2048 
2049   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
2050   Flush(tsid, 0, [](bool success) {
2051     if (!success)
2052       PERFETTO_ELOG("Periodic flush timed out");
2053   });
2054 }
2055 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)2056 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
2057     TracingSessionID tsid,
2058     bool post_next_only) {
2059   PERFETTO_DCHECK_THREAD(thread_checker_);
2060   TracingSession* tracing_session = GetTracingSession(tsid);
2061   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2062     return;
2063 
2064   uint32_t clear_period_ms =
2065       tracing_session->config.incremental_state_config().clear_period_ms();
2066   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2067   task_runner_->PostDelayedTask(
2068       [weak_this, tsid] {
2069         if (weak_this)
2070           weak_this->PeriodicClearIncrementalStateTask(
2071               tsid, /*post_next_only=*/false);
2072       },
2073       clear_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2074                                               clear_period_ms));
2075 
2076   if (post_next_only)
2077     return;
2078 
2079   PERFETTO_DLOG(
2080       "Performing periodic incremental state clear for trace session %" PRIu64,
2081       tsid);
2082 
2083   // Queue the IPCs to producers with active data sources that opted in.
2084   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
2085   for (const auto& kv : tracing_session->data_source_instances) {
2086     ProducerID producer_id = kv.first;
2087     const DataSourceInstance& data_source = kv.second;
2088     if (data_source.handles_incremental_state_clear) {
2089       clear_map[producer_id].push_back(data_source.instance_id);
2090     }
2091   }
2092 
2093   for (const auto& kv : clear_map) {
2094     ProducerID producer_id = kv.first;
2095     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
2096     ProducerEndpointImpl* producer = GetProducer(producer_id);
2097     if (!producer) {
2098       PERFETTO_DFATAL("Producer does not exist.");
2099       continue;
2100     }
2101     producer->ClearIncrementalState(data_sources);
2102   }
2103 }
2104 
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2105 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2106     TracingSessionID tsid,
2107     ConsumerEndpointImpl* consumer) {
2108   PERFETTO_DCHECK(consumer);
2109   PERFETTO_DCHECK_THREAD(thread_checker_);
2110   TracingSession* tracing_session = GetTracingSession(tsid);
2111   if (!tracing_session) {
2112     PERFETTO_DLOG(
2113         "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2114     return false;
2115   }
2116 
2117   if (tracing_session->write_into_file) {
2118     // If the consumer enabled tracing and asked to save the contents into the
2119     // passed file makes little sense to also try to read the buffers over IPC,
2120     // as that would just steal data from the periodic draining task.
2121     PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2122     return false;
2123   }
2124 
2125   if (IsWaitingForTrigger(tracing_session))
2126     return false;
2127 
2128   // This is a rough threshold to determine how much to read from the buffer in
2129   // each task. This is to avoid executing a single huge sending task for too
2130   // long and risk to hit the watchdog. This is *not* an upper bound: we just
2131   // stop accumulating new packets and PostTask *after* we cross this threshold.
2132   // This constant essentially balances the PostTask and IPC overhead vs the
2133   // responsiveness of the service. An extremely small value will cause one IPC
2134   // and one PostTask for each slice but will keep the service extremely
2135   // responsive. An extremely large value will batch the send for the full
2136   // buffer in one large task, will hit the blocking send() once the socket
2137   // buffers are full and hang the service for a bit (until the consumer
2138   // catches up).
2139   static constexpr size_t kApproxBytesPerTask = 32768;
2140   bool has_more;
2141   std::vector<TracePacket> packets =
2142       ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2143 
2144   if (has_more) {
2145     auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2146     auto weak_this = weak_ptr_factory_.GetWeakPtr();
2147     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
2148       if (!weak_this || !weak_consumer)
2149         return;
2150       weak_this->ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2151     });
2152   }
2153 
2154   // Keep this as tail call, just in case the consumer re-enters.
2155   consumer->consumer_->OnTraceData(std::move(packets), has_more);
2156   return true;
2157 }
2158 
ReadBuffersIntoFile(TracingSessionID tsid)2159 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2160   PERFETTO_DCHECK_THREAD(thread_checker_);
2161   TracingSession* tracing_session = GetTracingSession(tsid);
2162   if (!tracing_session) {
2163     // This will be hit systematically from the PostDelayedTask. Avoid logging,
2164     // it would be just spam.
2165     return false;
2166   }
2167 
2168   // This can happen if the file is closed by a previous task because it reaches
2169   // |max_file_size_bytes|.
2170   if (!tracing_session->write_into_file)
2171     return false;
2172 
2173   if (IsWaitingForTrigger(tracing_session))
2174     return false;
2175 
2176   // ReadBuffers() can allocate memory internally, for filtering. By limiting
2177   // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2178   // we limit the amount of memory used on each iteration.
2179   //
2180   // It would be tempting to split this into multiple tasks like in
2181   // ReadBuffersIntoConsumer, but that's not currently possible.
2182   // ReadBuffersIntoFile has to read the whole available data before returning,
2183   // to support the disable_immediately=true code paths.
2184   bool has_more = true;
2185   bool stop_writing_into_file = false;
2186   do {
2187     std::vector<TracePacket> packets =
2188         ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2189 
2190     stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2191   } while (has_more && !stop_writing_into_file);
2192 
2193   if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2194     // Ensure all data was written to the file before we close it.
2195     base::FlushFile(tracing_session->write_into_file.get());
2196     tracing_session->write_into_file.reset();
2197     tracing_session->write_period_ms = 0;
2198     if (tracing_session->state == TracingSession::STARTED)
2199       DisableTracing(tsid);
2200     return true;
2201   }
2202 
2203   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2204   task_runner_->PostDelayedTask(
2205       [weak_this, tsid] {
2206         if (weak_this)
2207           weak_this->ReadBuffersIntoFile(tsid);
2208       },
2209       tracing_session->delay_to_next_write_period_ms());
2210   return true;
2211 }
2212 
IsWaitingForTrigger(TracingSession * tracing_session)2213 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2214   // Ignore the logic below for cloned tracing sessions. In this case we
2215   // actually want to read the (cloned) trace buffers even if no trigger was
2216   // hit.
2217   if (tracing_session->state == TracingSession::CLONED_READ_ONLY) {
2218     return false;
2219   }
2220 
2221   // When a tracing session is waiting for a trigger, it is considered empty. If
2222   // a tracing session finishes and moves into DISABLED without ever receiving a
2223   // trigger, the trace should never return any data. This includes the
2224   // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2225   // early and let the consumer know there is no data.
2226   if (!tracing_session->config.trigger_config().triggers().empty() &&
2227       tracing_session->received_triggers.empty()) {
2228     PERFETTO_DLOG(
2229         "ReadBuffers(): tracing session has not received a trigger yet.");
2230     return true;
2231   }
2232 
2233   // Traces with CLONE_SNAPSHOT triggers are a special case of the above. They
2234   // can be read only via a CloneSession() request. This is to keep the
2235   // behavior consistent with the STOP_TRACING+triggers case and avoid periodic
2236   // finalizations and uploads of the main CLONE_SNAPSHOT triggers.
2237   if (GetTriggerMode(tracing_session->config) ==
2238       TraceConfig::TriggerConfig::CLONE_SNAPSHOT) {
2239     PERFETTO_DLOG(
2240         "ReadBuffers(): skipping because the tracing session has "
2241         "CLONE_SNAPSHOT triggers defined");
2242     return true;
2243   }
2244 
2245   return false;
2246 }
2247 
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2248 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2249     TracingSession* tracing_session,
2250     size_t threshold,
2251     bool* has_more) {
2252   PERFETTO_DCHECK_THREAD(thread_checker_);
2253   PERFETTO_DCHECK(tracing_session);
2254   *has_more = false;
2255 
2256   std::vector<TracePacket> packets;
2257   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
2258 
2259   if (!tracing_session->initial_clock_snapshot.empty()) {
2260     EmitClockSnapshot(tracing_session,
2261                       std::move(tracing_session->initial_clock_snapshot),
2262                       &packets);
2263   }
2264 
2265   for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2266     PERFETTO_DCHECK(!snapshot.empty());
2267     EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2268   }
2269   tracing_session->clock_snapshot_ring_buffer.clear();
2270 
2271   if (tracing_session->should_emit_sync_marker) {
2272     EmitSyncMarker(&packets);
2273     tracing_session->should_emit_sync_marker = false;
2274   }
2275 
2276   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2277     MaybeEmitUuidAndTraceConfig(tracing_session, &packets);
2278     MaybeEmitReceivedTriggers(tracing_session, &packets);
2279   }
2280   if (!tracing_session->config.builtin_data_sources().disable_system_info())
2281     MaybeEmitSystemInfo(tracing_session, &packets);
2282 
2283   // Note that in the proto comment, we guarantee that the tracing_started
2284   // lifecycle event will be emitted before any data packets so make sure to
2285   // keep this before reading the tracing buffers.
2286   if (!tracing_session->config.builtin_data_sources().disable_service_events())
2287     EmitLifecycleEvents(tracing_session, &packets);
2288 
2289   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
2290 
2291   // Add up size for packets added by the Maybe* calls above.
2292   for (const TracePacket& packet : packets) {
2293     packets_bytes += packet.size();
2294   }
2295 
2296   bool did_hit_threshold = false;
2297 
2298   for (size_t buf_idx = 0;
2299        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2300        buf_idx++) {
2301     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2302     if (tbuf_iter == buffers_.end()) {
2303       PERFETTO_DFATAL("Buffer not found.");
2304       continue;
2305     }
2306     TraceBuffer& tbuf = *tbuf_iter->second;
2307     tbuf.BeginRead();
2308     while (!did_hit_threshold) {
2309       TracePacket packet;
2310       TraceBuffer::PacketSequenceProperties sequence_properties{};
2311       bool previous_packet_dropped;
2312       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2313                                     &previous_packet_dropped)) {
2314         break;
2315       }
2316       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2317       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2318       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
2319       // Not checking sequence_properties.producer_pid_trusted: it is
2320       // base::kInvalidPid if the platform doesn't support it.
2321 
2322       PERFETTO_DCHECK(packet.size() > 0);
2323       if (!PacketStreamValidator::Validate(packet.slices())) {
2324         tracing_session->invalid_packets++;
2325         PERFETTO_DLOG("Dropping invalid packet");
2326         continue;
2327       }
2328 
2329       // Append a slice with the trusted field data. This can't be spoofed
2330       // because above we validated that the existing slices don't contain any
2331       // trusted fields. For added safety we append instead of prepending
2332       // because according to protobuf semantics, if the same field is
2333       // encountered multiple times the last instance takes priority. Note that
2334       // truncated packets are also rejected, so the producer can't give us a
2335       // partial packet (e.g., a truncated string) which only becomes valid when
2336       // the trusted data is appended here.
2337       Slice slice = Slice::Allocate(32);
2338       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2339           slice.own_data(), slice.size);
2340       trusted_packet->set_trusted_uid(
2341           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
2342       trusted_packet->set_trusted_packet_sequence_id(
2343           tracing_session->GetPacketSequenceID(
2344               sequence_properties.producer_id_trusted,
2345               sequence_properties.writer_id));
2346       if (sequence_properties.producer_pid_trusted != base::kInvalidPid) {
2347         // Not supported on all platforms.
2348         trusted_packet->set_trusted_pid(
2349             static_cast<int32_t>(sequence_properties.producer_pid_trusted));
2350       }
2351       if (previous_packet_dropped)
2352         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2353       slice.size = trusted_packet.Finalize();
2354       packet.AddSlice(std::move(slice));
2355 
2356       // Append the packet (inclusive of the trusted uid) to |packets|.
2357       packets_bytes += packet.size();
2358       did_hit_threshold = packets_bytes >= threshold;
2359       packets.emplace_back(std::move(packet));
2360     }  // for(packets...)
2361   }    // for(buffers...)
2362 
2363   *has_more = did_hit_threshold;
2364 
2365   // Only emit the "read complete" lifetime event when there is no more trace
2366   // data available to read. These events are used as safe points to limit
2367   // sorting in trace processor: the code shouldn't emit the event unless the
2368   // buffers are empty.
2369   if (!*has_more && !tracing_session->config.builtin_data_sources()
2370                          .disable_service_events()) {
2371     // We don't bother snapshotting clocks here because we wouldn't be able to
2372     // emit it and we shouldn't have significant drift from the last snapshot in
2373     // any case.
2374     SnapshotLifecyleEvent(tracing_session,
2375                           protos::pbzero::TracingServiceEvent::
2376                               kReadTracingBuffersCompletedFieldNumber,
2377                           false /* snapshot_clocks */);
2378     EmitLifecycleEvents(tracing_session, &packets);
2379   }
2380 
2381   // Only emit the stats when there is no more trace data is available to read.
2382   // That way, any problems that occur while reading from the buffers are
2383   // reflected in the emitted stats. This is particularly important for use
2384   // cases where ReadBuffers is only ever called after the tracing session is
2385   // stopped.
2386   if (!*has_more && tracing_session->should_emit_stats) {
2387     EmitStats(tracing_session, &packets);
2388     tracing_session->should_emit_stats = false;
2389   }
2390 
2391   MaybeFilterPackets(tracing_session, &packets);
2392 
2393   MaybeCompressPackets(tracing_session, &packets);
2394 
2395   if (!*has_more) {
2396     // We've observed some extremely high memory usage by scudo after
2397     // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2398     // now, but this code asks scudo to release memory just in case.
2399     base::MaybeReleaseAllocatorMemToOS();
2400   }
2401 
2402   return packets;
2403 }
2404 
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2405 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2406                                             std::vector<TracePacket>* packets) {
2407   // If the tracing session specified a filter, run all packets through the
2408   // filter and replace them with the filter results.
2409   // The process below mantains the cardinality of input packets. Even if an
2410   // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2411   // makes debugging and reasoning about the trace stats easier.
2412   // This place swaps the contents of each |packets| entry in place.
2413   if (!tracing_session->trace_filter) {
2414     return;
2415   }
2416   protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2417   // The filter root shoud be reset from protos.Trace to protos.TracePacket
2418   // by the earlier call to SetFilterRoot() in EnableTracing().
2419   PERFETTO_DCHECK(trace_filter.root_msg_index() != 0);
2420   std::vector<protozero::MessageFilter::InputSlice> filter_input;
2421   auto start = base::GetWallTimeNs();
2422   for (TracePacket& packet : *packets) {
2423     const auto& packet_slices = packet.slices();
2424     filter_input.clear();
2425     filter_input.resize(packet_slices.size());
2426     ++tracing_session->filter_input_packets;
2427     tracing_session->filter_input_bytes += packet.size();
2428     for (size_t i = 0; i < packet_slices.size(); ++i)
2429       filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2430     auto filtered_packet = trace_filter.FilterMessageFragments(
2431         &filter_input[0], filter_input.size());
2432 
2433     // Replace the packet in-place with the filtered one (unless failed).
2434     packet = TracePacket();
2435     if (filtered_packet.error) {
2436       ++tracing_session->filter_errors;
2437       PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2438                     tracing_session->filter_input_packets);
2439       continue;
2440     }
2441     tracing_session->filter_output_bytes += filtered_packet.size;
2442     AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2443                               filtered_packet.size, kMaxTracePacketSliceSize,
2444                               &packet);
2445   }
2446   auto end = base::GetWallTimeNs();
2447   tracing_session->filter_time_taken_ns +=
2448       static_cast<uint64_t>((end - start).count());
2449 }
2450 
MaybeCompressPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2451 void TracingServiceImpl::MaybeCompressPackets(
2452     TracingSession* tracing_session,
2453     std::vector<TracePacket>* packets) {
2454   if (!tracing_session->compress_deflate) {
2455     return;
2456   }
2457 
2458   init_opts_.compressor_fn(packets);
2459 }
2460 
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2461 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2462                                        std::vector<TracePacket> packets) {
2463   if (!tracing_session->write_into_file) {
2464     return false;
2465   }
2466   const uint64_t max_size = tracing_session->max_file_size_bytes
2467                                 ? tracing_session->max_file_size_bytes
2468                                 : std::numeric_limits<size_t>::max();
2469 
2470   size_t total_slices = 0;
2471   for (const TracePacket& packet : packets) {
2472     total_slices += packet.slices().size();
2473   }
2474   // When writing into a file, the file should look like a root trace.proto
2475   // message. Each packet should be prepended with a proto preamble stating
2476   // its field id (within trace.proto) and size. Hence the addition below.
2477   const size_t max_iovecs = total_slices + packets.size();
2478 
2479   size_t num_iovecs = 0;
2480   bool stop_writing_into_file = false;
2481   std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2482   size_t num_iovecs_at_last_packet = 0;
2483   uint64_t bytes_about_to_be_written = 0;
2484   for (TracePacket& packet : packets) {
2485     std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2486         packet.GetProtoPreamble();
2487     bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2488     num_iovecs++;
2489     for (const Slice& slice : packet.slices()) {
2490       // writev() doesn't change the passed pointer. However, struct iovec
2491       // take a non-const ptr because it's the same struct used by readv().
2492       // Hence the const_cast here.
2493       char* start = static_cast<char*>(const_cast<void*>(slice.start));
2494       bytes_about_to_be_written += slice.size;
2495       iovecs[num_iovecs++] = {start, slice.size};
2496     }
2497 
2498     if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2499         max_size) {
2500       stop_writing_into_file = true;
2501       num_iovecs = num_iovecs_at_last_packet;
2502       break;
2503     }
2504 
2505     num_iovecs_at_last_packet = num_iovecs;
2506   }
2507   PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2508   int fd = *tracing_session->write_into_file;
2509 
2510   uint64_t total_wr_size = 0;
2511 
2512   // writev() can take at most IOV_MAX entries per call. Batch them.
2513   constexpr size_t kIOVMax = IOV_MAX;
2514   for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2515     int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2516     ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2517     if (wr_size <= 0) {
2518       PERFETTO_PLOG("writev() failed");
2519       stop_writing_into_file = true;
2520       break;
2521     }
2522     total_wr_size += static_cast<size_t>(wr_size);
2523   }
2524 
2525   tracing_session->bytes_written_into_file += total_wr_size;
2526 
2527   PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2528                 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2529   return stop_writing_into_file;
2530 }
2531 
FreeBuffers(TracingSessionID tsid)2532 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2533   PERFETTO_DCHECK_THREAD(thread_checker_);
2534   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2535   TracingSession* tracing_session = GetTracingSession(tsid);
2536   if (!tracing_session) {
2537     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2538     return;  // TODO(primiano): signal failure?
2539   }
2540   DisableTracing(tsid, /*disable_immediately=*/true);
2541 
2542   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2543   tracing_session->data_source_instances.clear();
2544 
2545   for (auto& producer_entry : producers_) {
2546     ProducerEndpointImpl* producer = producer_entry.second;
2547     producer->OnFreeBuffers(tracing_session->buffers_index);
2548   }
2549 
2550   for (BufferID buffer_id : tracing_session->buffers_index) {
2551     buffer_ids_.Free(buffer_id);
2552     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2553     buffers_.erase(buffer_id);
2554   }
2555   bool notify_traceur =
2556       tracing_session->config.notify_traceur() &&
2557       tracing_session->state != TracingSession::CLONED_READ_ONLY;
2558   bool is_long_trace =
2559       (tracing_session->config.write_into_file() &&
2560        tracing_session->config.file_write_period_ms() < kMillisPerDay);
2561   tracing_sessions_.erase(tsid);
2562   tracing_session = nullptr;
2563   UpdateMemoryGuardrail();
2564 
2565   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2566                tracing_sessions_.size());
2567 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2568     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2569   if (notify_traceur && is_long_trace) {
2570     PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2571     if (!notify_fn || !notify_fn(/*session_stolen=*/false))
2572       PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2573   }
2574 #else
2575   base::ignore_result(notify_traceur);
2576   base::ignore_result(is_long_trace);
2577 #endif
2578 }
2579 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2580 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2581                                             const DataSourceDescriptor& desc) {
2582   PERFETTO_DCHECK_THREAD(thread_checker_);
2583   if (desc.name().empty()) {
2584     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2585     return;
2586   }
2587 
2588   ProducerEndpointImpl* producer = GetProducer(producer_id);
2589   if (!producer) {
2590     PERFETTO_DFATAL("Producer not found.");
2591     return;
2592   }
2593 
2594   // Check that the producer doesn't register two data sources with the same ID.
2595   // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2596   // field didn't exist.
2597   for (const auto& kv : data_sources_) {
2598     if (desc.id() && kv.second.producer_id == producer_id &&
2599         kv.second.descriptor.id() == desc.id()) {
2600       PERFETTO_ELOG(
2601           "Failed to register data source \"%s\". A data source with the same "
2602           "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2603           desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2604           producer_id);
2605       return;
2606     }
2607   }
2608 
2609   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2610                 producer_id, desc.name().c_str());
2611 
2612   auto reg_ds = data_sources_.emplace(desc.name(),
2613                                       RegisteredDataSource{producer_id, desc});
2614 
2615   // If there are existing tracing sessions, we need to check if the new
2616   // data source is enabled by any of them.
2617   for (auto& iter : tracing_sessions_) {
2618     TracingSession& tracing_session = iter.second;
2619     if (tracing_session.state != TracingSession::STARTED &&
2620         tracing_session.state != TracingSession::CONFIGURED) {
2621       continue;
2622     }
2623 
2624     TraceConfig::ProducerConfig producer_config;
2625     for (auto& config : tracing_session.config.producers()) {
2626       if (producer->name_ == config.producer_name()) {
2627         producer_config = config;
2628         break;
2629       }
2630     }
2631     for (const TraceConfig::DataSource& cfg_data_source :
2632          tracing_session.config.data_sources()) {
2633       if (cfg_data_source.config().name() != desc.name())
2634         continue;
2635       DataSourceInstance* ds_inst = SetupDataSource(
2636           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2637       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2638         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2639     }
2640   }  // for(iter : tracing_sessions_)
2641 }
2642 
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2643 void TracingServiceImpl::UpdateDataSource(
2644     ProducerID producer_id,
2645     const DataSourceDescriptor& new_desc) {
2646   if (new_desc.id() == 0) {
2647     PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2648     return;
2649   }
2650 
2651   // If this producer has already registered a matching descriptor name and id,
2652   // just update the descriptor.
2653   RegisteredDataSource* data_source = nullptr;
2654   auto range = data_sources_.equal_range(new_desc.name());
2655   for (auto it = range.first; it != range.second; ++it) {
2656     if (it->second.producer_id == producer_id &&
2657         it->second.descriptor.id() == new_desc.id()) {
2658       data_source = &it->second;
2659       break;
2660     }
2661   }
2662 
2663   if (!data_source) {
2664     PERFETTO_ELOG(
2665         "UpdateDataSource() failed, could not find an existing data source "
2666         "with name=\"%s\" id=%" PRIu64,
2667         new_desc.name().c_str(), new_desc.id());
2668     return;
2669   }
2670 
2671   data_source->descriptor = new_desc;
2672 }
2673 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2674 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2675                                                 TracingSession* tracing_session,
2676                                                 DataSourceInstance* instance,
2677                                                 bool disable_immediately) {
2678   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2679   if (instance->will_notify_on_stop && !disable_immediately) {
2680     instance->state = DataSourceInstance::STOPPING;
2681   } else {
2682     instance->state = DataSourceInstance::STOPPED;
2683   }
2684   if (tracing_session->consumer_maybe_null) {
2685     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2686         *producer, *instance);
2687   }
2688   producer->StopDataSource(ds_inst_id);
2689 }
2690 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2691 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2692                                               const std::string& name) {
2693   PERFETTO_DCHECK_THREAD(thread_checker_);
2694   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2695                 producer_id, name.c_str());
2696   PERFETTO_CHECK(producer_id);
2697   ProducerEndpointImpl* producer = GetProducer(producer_id);
2698   PERFETTO_DCHECK(producer);
2699   for (auto& kv : tracing_sessions_) {
2700     auto& ds_instances = kv.second.data_source_instances;
2701     bool removed = false;
2702     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2703       if (it->first == producer_id && it->second.data_source_name == name) {
2704         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2705         if (it->second.state != DataSourceInstance::STOPPED) {
2706           if (it->second.state != DataSourceInstance::STOPPING) {
2707             StopDataSourceInstance(producer, &kv.second, &it->second,
2708                                    /* disable_immediately = */ false);
2709           }
2710 
2711           // Mark the instance as stopped immediately, since we are
2712           // unregistering it below.
2713           //
2714           //  The StopDataSourceInstance above might have set the state to
2715           //  STOPPING so this condition isn't an else.
2716           if (it->second.state == DataSourceInstance::STOPPING)
2717             NotifyDataSourceStopped(producer_id, ds_inst_id);
2718         }
2719         it = ds_instances.erase(it);
2720         removed = true;
2721       } else {
2722         ++it;
2723       }
2724     }  // for (data_source_instances)
2725     if (removed)
2726       MaybeNotifyAllDataSourcesStarted(&kv.second);
2727   }  // for (tracing_session)
2728 
2729   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2730     if (it->second.producer_id == producer_id &&
2731         it->second.descriptor.name() == name) {
2732       data_sources_.erase(it);
2733       return;
2734     }
2735   }
2736 
2737   PERFETTO_DFATAL(
2738       "Tried to unregister a non-existent data source \"%s\" for "
2739       "producer %" PRIu16,
2740       name.c_str(), producer_id);
2741 }
2742 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2743 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2744     const TraceConfig::DataSource& cfg_data_source,
2745     const TraceConfig::ProducerConfig& producer_config,
2746     const RegisteredDataSource& data_source,
2747     TracingSession* tracing_session) {
2748   PERFETTO_DCHECK_THREAD(thread_checker_);
2749   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2750   PERFETTO_DCHECK(producer);
2751   // An existing producer that is not ftrace could have registered itself as
2752   // ftrace, we must not enable it in that case.
2753   if (lockdown_mode_ && producer->uid_ != uid_) {
2754     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2755     return nullptr;
2756   }
2757   // TODO(primiano): Add tests for registration ordering (data sources vs
2758   // consumers).
2759   if (!NameMatchesFilter(producer->name_,
2760                          cfg_data_source.producer_name_filter(),
2761                          cfg_data_source.producer_name_regex_filter())) {
2762     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2763                   cfg_data_source.config().name().c_str(),
2764                   producer->name_.c_str());
2765     return nullptr;
2766   }
2767 
2768   auto relative_buffer_id = cfg_data_source.config().target_buffer();
2769   if (relative_buffer_id >= tracing_session->num_buffers()) {
2770     PERFETTO_LOG(
2771         "The TraceConfig for DataSource %s specified a target_buffer out of "
2772         "bound (%d). Skipping it.",
2773         cfg_data_source.config().name().c_str(), relative_buffer_id);
2774     return nullptr;
2775   }
2776 
2777   // Create a copy of the DataSourceConfig specified in the trace config. This
2778   // will be passed to the producer after translating the |target_buffer| id.
2779   // The |target_buffer| parameter passed by the consumer in the trace config is
2780   // relative to the buffers declared in the same trace config. This has to be
2781   // translated to the global BufferID before passing it to the producers, which
2782   // don't know anything about tracing sessions and consumers.
2783 
2784   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2785   auto insert_iter = tracing_session->data_source_instances.emplace(
2786       std::piecewise_construct,  //
2787       std::forward_as_tuple(producer->id_),
2788       std::forward_as_tuple(
2789           inst_id,
2790           cfg_data_source.config(),  //  Deliberate copy.
2791           data_source.descriptor.name(),
2792           data_source.descriptor.will_notify_on_start(),
2793           data_source.descriptor.will_notify_on_stop(),
2794           data_source.descriptor.handles_incremental_state_clear()));
2795   DataSourceInstance* ds_instance = &insert_iter->second;
2796 
2797   // New data source instance starts out in CONFIGURED state.
2798   if (tracing_session->consumer_maybe_null) {
2799     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2800         *producer, *ds_instance);
2801   }
2802 
2803   DataSourceConfig& ds_config = ds_instance->config;
2804   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2805 
2806   // Rationale for `if (prefer) set_prefer(true)`, rather than `set(prefer)`:
2807   // ComputeStartupConfigHash() in tracing_muxer_impl.cc compares hashes of the
2808   // DataSourceConfig and expects to know (and clear) the fields generated by
2809   // the tracing service. Unconditionally adding a new field breaks backward
2810   // compatibility of startup tracing with older SDKs, because the serialization
2811   // also propagates unkonwn fields, breaking the hash matching check.
2812   if (tracing_session->config.prefer_suspend_clock_for_duration())
2813     ds_config.set_prefer_suspend_clock_for_duration(true);
2814 
2815   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2816   ds_config.set_enable_extra_guardrails(
2817       tracing_session->config.enable_extra_guardrails());
2818   if (tracing_session->consumer_uid == 1066 /* AID_STATSD */ &&
2819       tracing_session->config.statsd_metadata().triggering_config_uid() !=
2820           2000 /* AID_SHELL */
2821       && tracing_session->config.statsd_metadata().triggering_config_uid() !=
2822              0 /* AID_ROOT */) {
2823     // StatsD can be triggered either by shell, root or an app that has DUMP and
2824     // USAGE_STATS permission. When triggered by shell or root, we do not want
2825     // to consider the trace a trusted system trace, as it was initiated by the
2826     // user. Otherwise, it has to come from an app with DUMP and
2827     // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2828     // system.
2829     // Check for shell / root: https://bit.ly/3b7oZNi
2830     // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2831     ds_config.set_session_initiator(
2832         DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
2833   } else {
2834     // Unset in case the consumer set it.
2835     // We need to be able to trust this field.
2836     ds_config.set_session_initiator(
2837         DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
2838   }
2839   ds_config.set_tracing_session_id(tracing_session->id);
2840   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2841   PERFETTO_DCHECK(global_id);
2842   ds_config.set_target_buffer(global_id);
2843 
2844   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2845                 ds_config.name().c_str(), global_id);
2846   if (!producer->shared_memory()) {
2847     // Determine the SMB page size. Must be an integer multiple of 4k.
2848     // As for the SMB size below, the decision tree is as follows:
2849     // 1. Give priority to what is defined in the trace config.
2850     // 2. If unset give priority to the hint passed by the producer.
2851     // 3. Keep within bounds and ensure it's a multiple of 4k.
2852     size_t page_size = producer_config.page_size_kb() * 1024;
2853     if (page_size == 0)
2854       page_size = producer->shmem_page_size_hint_bytes_;
2855 
2856     // Determine the SMB size. Must be an integer multiple of the SMB page size.
2857     // The decision tree is as follows:
2858     // 1. Give priority to what defined in the trace config.
2859     // 2. If unset give priority to the hint passed by the producer.
2860     // 3. Keep within bounds and ensure it's a multiple of the page size.
2861     size_t shm_size = producer_config.shm_size_kb() * 1024;
2862     if (shm_size == 0)
2863       shm_size = producer->shmem_size_hint_bytes_;
2864 
2865     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2866     if (valid_sizes != std::tie(shm_size, page_size)) {
2867       PERFETTO_DLOG(
2868           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2869           "back to shm_size %zu page_size %zu.",
2870           shm_size, page_size, std::get<0>(valid_sizes),
2871           std::get<1>(valid_sizes));
2872     }
2873     std::tie(shm_size, page_size) = valid_sizes;
2874 
2875     // TODO(primiano): right now Create() will suicide in case of OOM if the
2876     // mmap fails. We should instead gracefully fail the request and tell the
2877     // client to go away.
2878     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2879                   producer->name_.c_str());
2880     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2881     producer->SetupSharedMemory(std::move(shared_memory), page_size,
2882                                 /*provided_by_producer=*/false);
2883   }
2884   producer->SetupDataSource(inst_id, ds_config);
2885   return ds_instance;
2886 }
2887 
2888 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2889 // might be lying / returning garbage contents. |src| and |size| can be trusted
2890 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,pid_t producer_pid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2891 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2892     ProducerID producer_id_trusted,
2893     uid_t producer_uid_trusted,
2894     pid_t producer_pid_trusted,
2895     WriterID writer_id,
2896     ChunkID chunk_id,
2897     BufferID buffer_id,
2898     uint16_t num_fragments,
2899     uint8_t chunk_flags,
2900     bool chunk_complete,
2901     const uint8_t* src,
2902     size_t size) {
2903   PERFETTO_DCHECK_THREAD(thread_checker_);
2904 
2905   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2906   if (!producer) {
2907     PERFETTO_DFATAL("Producer not found.");
2908     chunks_discarded_++;
2909     return;
2910   }
2911 
2912   TraceBuffer* buf = GetBufferByID(buffer_id);
2913   if (!buf) {
2914     PERFETTO_DLOG("Could not find target buffer %" PRIu16
2915                   " for producer %" PRIu16,
2916                   buffer_id, producer_id_trusted);
2917     chunks_discarded_++;
2918     return;
2919   }
2920 
2921   // Verify that the producer is actually allowed to write into the target
2922   // buffer specified in the request. This prevents a malicious producer from
2923   // injecting data into a log buffer that belongs to a tracing session the
2924   // producer is not part of.
2925   if (!producer->is_allowed_target_buffer(buffer_id)) {
2926     PERFETTO_ELOG("Producer %" PRIu16
2927                   " tried to write into forbidden target buffer %" PRIu16,
2928                   producer_id_trusted, buffer_id);
2929     PERFETTO_DFATAL("Forbidden target buffer");
2930     chunks_discarded_++;
2931     return;
2932   }
2933 
2934   // If the writer was registered by the producer, it should only write into the
2935   // buffer it was registered with.
2936   std::optional<BufferID> associated_buffer =
2937       producer->buffer_id_for_writer(writer_id);
2938   if (associated_buffer && *associated_buffer != buffer_id) {
2939     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2940                   " was registered to write into target buffer %" PRIu16
2941                   ", but tried to write into buffer %" PRIu16,
2942                   writer_id, producer_id_trusted, *associated_buffer,
2943                   buffer_id);
2944     PERFETTO_DFATAL("Wrong target buffer");
2945     chunks_discarded_++;
2946     return;
2947   }
2948 
2949   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted,
2950                           producer_pid_trusted, writer_id, chunk_id,
2951                           num_fragments, chunk_flags, chunk_complete, src,
2952                           size);
2953 }
2954 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2955 void TracingServiceImpl::ApplyChunkPatches(
2956     ProducerID producer_id_trusted,
2957     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2958   PERFETTO_DCHECK_THREAD(thread_checker_);
2959 
2960   for (const auto& chunk : chunks_to_patch) {
2961     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2962     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2963     TraceBuffer* buf =
2964         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2965     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2966                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2967     if (!writer_id || writer_id > kMaxWriterID || !buf) {
2968       // This can genuinely happen when the trace is stopped. The producers
2969       // might see the stop signal with some delay and try to keep sending
2970       // patches left soon after.
2971       PERFETTO_DLOG(
2972           "Received invalid chunks_to_patch request from Producer: %" PRIu16
2973           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2974           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2975       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2976       continue;
2977     }
2978 
2979     // Note, there's no need to validate that the producer is allowed to write
2980     // to the specified buffer ID (or that it's the correct buffer ID for a
2981     // registered TraceWriter). That's because TraceBuffer uses the producer ID
2982     // and writer ID to look up the chunk to patch. If the producer specifies an
2983     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2984     // patches. Because the producer ID is trusted, there's also no way for a
2985     // malicious producer to patch another producer's data.
2986 
2987     // Speculate on the fact that there are going to be a limited amount of
2988     // patches per request, so we can allocate the |patches| array on the stack.
2989     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
2990     if (chunk.patches().size() > patches.size()) {
2991       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2992                     patches.size());
2993       PERFETTO_DFATAL("Too many patches");
2994       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2995       continue;
2996     }
2997 
2998     size_t i = 0;
2999     for (const auto& patch : chunk.patches()) {
3000       const std::string& patch_data = patch.data();
3001       if (patch_data.size() != patches[i].data.size()) {
3002         PERFETTO_ELOG("Received patch from producer: %" PRIu16
3003                       " of unexpected size %zu",
3004                       producer_id_trusted, patch_data.size());
3005         patches_discarded_++;
3006         continue;
3007       }
3008       patches[i].offset_untrusted = patch.offset();
3009       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
3010       i++;
3011     }
3012     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
3013                                &patches[0], i, chunk.has_more_patches());
3014   }
3015 }
3016 
GetDetachedSession(uid_t uid,const std::string & key)3017 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
3018     uid_t uid,
3019     const std::string& key) {
3020   PERFETTO_DCHECK_THREAD(thread_checker_);
3021   for (auto& kv : tracing_sessions_) {
3022     TracingSession* session = &kv.second;
3023     if (session->consumer_uid == uid && session->detach_key == key) {
3024       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
3025       return session;
3026     }
3027   }
3028   return nullptr;
3029 }
3030 
GetTracingSession(TracingSessionID tsid)3031 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
3032     TracingSessionID tsid) {
3033   PERFETTO_DCHECK_THREAD(thread_checker_);
3034   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
3035   if (it == tracing_sessions_.end())
3036     return nullptr;
3037   return &it->second;
3038 }
3039 
3040 TracingServiceImpl::TracingSession*
FindTracingSessionWithMaxBugreportScore()3041 TracingServiceImpl::FindTracingSessionWithMaxBugreportScore() {
3042   TracingSession* max_session = nullptr;
3043   for (auto& session_id_and_session : tracing_sessions_) {
3044     auto& session = session_id_and_session.second;
3045     const int32_t score = session.config.bugreport_score();
3046     // Exclude sessions with 0 (or below) score. By default tracing sessions
3047     // should NOT be eligible to be attached to bugreports.
3048     if (score <= 0 || session.state != TracingSession::STARTED)
3049       continue;
3050 
3051     if (!max_session || score > max_session->config.bugreport_score())
3052       max_session = &session;
3053   }
3054   return max_session;
3055 }
3056 
GetNextProducerID()3057 ProducerID TracingServiceImpl::GetNextProducerID() {
3058   PERFETTO_DCHECK_THREAD(thread_checker_);
3059   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
3060   do {
3061     ++last_producer_id_;
3062   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
3063   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
3064   return last_producer_id_;
3065 }
3066 
GetBufferByID(BufferID buffer_id)3067 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
3068   auto buf_iter = buffers_.find(buffer_id);
3069   if (buf_iter == buffers_.end())
3070     return nullptr;
3071   return &*buf_iter->second;
3072 }
3073 
OnStartTriggersTimeout(TracingSessionID tsid)3074 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
3075   // Skip entirely the flush if the trace session doesn't exist anymore.
3076   // This is to prevent misleading error messages to be logged.
3077   //
3078   // if the trace has started from the trigger we rely on
3079   // the |stop_delay_ms| from the trigger so don't flush and
3080   // disable if we've moved beyond a CONFIGURED state
3081   auto* tracing_session_ptr = GetTracingSession(tsid);
3082   if (tracing_session_ptr &&
3083       tracing_session_ptr->state == TracingSession::CONFIGURED) {
3084     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
3085                   " since no triggers activated.",
3086                   tsid);
3087     // No data should be returned from ReadBuffers() regardless of if we
3088     // call FreeBuffers() or DisableTracing(). This is because in
3089     // STOP_TRACING we need this promise in either case, and using
3090     // DisableTracing() allows a graceful shutdown. Consumers can follow
3091     // their normal path and check the buffers through ReadBuffers() and
3092     // the code won't hang because the tracing session will still be
3093     // alive just disabled.
3094     DisableTracing(tsid);
3095   }
3096 }
3097 
UpdateMemoryGuardrail()3098 void TracingServiceImpl::UpdateMemoryGuardrail() {
3099 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
3100   uint64_t total_buffer_bytes = 0;
3101 
3102   // Sum up all the shared memory buffers.
3103   for (const auto& id_to_producer : producers_) {
3104     if (id_to_producer.second->shared_memory())
3105       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
3106   }
3107 
3108   // Sum up all the trace buffers.
3109   for (const auto& id_to_buffer : buffers_) {
3110     total_buffer_bytes += id_to_buffer.second->size();
3111   }
3112 
3113   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
3114   // interval.
3115   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
3116   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
3117 #endif
3118 }
3119 
PeriodicSnapshotTask(TracingSessionID tsid)3120 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
3121   auto* tracing_session = GetTracingSession(tsid);
3122   if (!tracing_session)
3123     return;
3124   if (tracing_session->state != TracingSession::STARTED)
3125     return;
3126   tracing_session->should_emit_sync_marker = true;
3127   tracing_session->should_emit_stats = true;
3128   MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3129 }
3130 
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)3131 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
3132                                                uint32_t field_id,
3133                                                bool snapshot_clocks) {
3134   // field_id should be an id of a field in TracingServiceEvent.
3135   auto& lifecycle_events = tracing_session->lifecycle_events;
3136   auto event_it =
3137       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3138                    [field_id](const TracingSession::LifecycleEvent& event) {
3139                      return event.field_id == field_id;
3140                    });
3141 
3142   TracingSession::LifecycleEvent* event;
3143   if (event_it == lifecycle_events.end()) {
3144     lifecycle_events.emplace_back(field_id);
3145     event = &lifecycle_events.back();
3146   } else {
3147     event = &*event_it;
3148   }
3149 
3150   // Snapshot the clocks before capturing the timestamp for the event so we can
3151   // use this snapshot to resolve the event timestamp if necessary.
3152   if (snapshot_clocks)
3153     MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3154 
3155   // Erase before emplacing to prevent a unncessary doubling of memory if
3156   // not needed.
3157   if (event->timestamps.size() >= event->max_size) {
3158     event->timestamps.erase_front(1 + event->timestamps.size() -
3159                                   event->max_size);
3160   }
3161   event->timestamps.emplace_back(base::GetBootTimeNs().count());
3162 }
3163 
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3164 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3165     TracingSession* tracing_session) {
3166   if (tracing_session->config.builtin_data_sources()
3167           .disable_clock_snapshotting()) {
3168     return;
3169   }
3170 
3171   // We are making an explicit copy of the latest snapshot (if it exists)
3172   // because SnapshotClocks reads this data and computes the drift based on its
3173   // content. If the clock drift is high enough, it will update the contents of
3174   // |snapshot| and return true. Otherwise, it will return false.
3175   TracingSession::ClockSnapshotData snapshot =
3176       tracing_session->clock_snapshot_ring_buffer.empty()
3177           ? TracingSession::ClockSnapshotData()
3178           : tracing_session->clock_snapshot_ring_buffer.back();
3179   bool did_update = SnapshotClocks(&snapshot);
3180   if (did_update) {
3181     // This means clocks drifted enough since last snapshot. See the comment
3182     // in SnapshotClocks.
3183     auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3184 
3185     // Erase before emplacing to prevent a unncessary doubling of memory if
3186     // not needed.
3187     static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3188     if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3189       snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3190                                    kClockSnapshotRingBufferSize);
3191     }
3192     snapshot_buffer->emplace_back(std::move(snapshot));
3193   }
3194 }
3195 
3196 // Returns true when the data in |snapshot_data| is updated with the new state
3197 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3198 bool TracingServiceImpl::SnapshotClocks(
3199     TracingSession::ClockSnapshotData* snapshot_data) {
3200   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3201   // been emitted into the trace yet (see comment below).
3202   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
3203 
3204   TracingSession::ClockSnapshotData new_snapshot_data;
3205 
3206 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE) && \
3207     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&   \
3208     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3209   struct {
3210     clockid_t id;
3211     protos::pbzero::BuiltinClock type;
3212     struct timespec ts;
3213   } clocks[] = {
3214       {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
3215       {CLOCK_REALTIME_COARSE,
3216        protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
3217        {0, 0}},
3218       {CLOCK_MONOTONIC_COARSE,
3219        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
3220        {0, 0}},
3221       {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
3222       {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
3223       {CLOCK_MONOTONIC_RAW,
3224        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
3225        {0, 0}},
3226   };
3227   // First snapshot all the clocks as atomically as we can.
3228   for (auto& clock : clocks) {
3229     if (clock_gettime(clock.id, &clock.ts) == -1)
3230       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
3231   }
3232   for (auto& clock : clocks) {
3233     new_snapshot_data.push_back(std::make_pair(
3234         static_cast<uint32_t>(clock.type),
3235         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
3236   }
3237 #else  // OS_APPLE || OS_WIN && OS_NACL
3238   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
3239   // The default trace clock is boot time, so we always need to emit a path to
3240   // it. However since we don't actually have a boot time source on these
3241   // platforms, pretend that wall time equals boot time.
3242   new_snapshot_data.push_back(
3243       std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
3244   new_snapshot_data.push_back(
3245       std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
3246 #endif
3247 
3248   // If we're about to update a session's latest clock snapshot that hasn't been
3249   // emitted into the trace yet, check whether the clocks have drifted enough to
3250   // warrant overriding the current snapshot values. The older snapshot would be
3251   // valid for a larger part of the currently buffered trace data because the
3252   // clock sync protocol in trace processor uses the latest clock <= timestamp
3253   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3254   // we try to keep it if we can.
3255   if (!snapshot_data->empty()) {
3256     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3257     PERFETTO_DCHECK((*snapshot_data)[0].first ==
3258                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
3259 
3260     bool update_snapshot = false;
3261     uint64_t old_boot_ns = (*snapshot_data)[0].second;
3262     uint64_t new_boot_ns = new_snapshot_data[0].second;
3263     int64_t boot_diff =
3264         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3265 
3266     for (size_t i = 1; i < snapshot_data->size(); i++) {
3267       uint64_t old_ns = (*snapshot_data)[i].second;
3268       uint64_t new_ns = new_snapshot_data[i].second;
3269 
3270       int64_t diff =
3271           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3272 
3273       // Compare the boottime delta against the delta of this clock.
3274       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3275         update_snapshot = true;
3276         break;
3277       }
3278     }
3279     if (!update_snapshot)
3280       return false;
3281     snapshot_data->clear();
3282   }
3283 
3284   *snapshot_data = std::move(new_snapshot_data);
3285   return true;
3286 }
3287 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3288 void TracingServiceImpl::EmitClockSnapshot(
3289     TracingSession* tracing_session,
3290     TracingSession::ClockSnapshotData snapshot_data,
3291     std::vector<TracePacket>* packets) {
3292   PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3293                        .disable_clock_snapshotting());
3294 
3295   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3296   auto* snapshot = packet->set_clock_snapshot();
3297 
3298   protos::gen::BuiltinClock trace_clock =
3299       tracing_session->config.builtin_data_sources().primary_trace_clock();
3300   if (!trace_clock)
3301     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3302   snapshot->set_primary_trace_clock(
3303       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3304 
3305   for (auto& clock_id_and_ts : snapshot_data) {
3306     auto* c = snapshot->add_clocks();
3307     c->set_clock_id(clock_id_and_ts.first);
3308     c->set_timestamp(clock_id_and_ts.second);
3309   }
3310 
3311   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3312   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3313   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3314 }
3315 
EmitSyncMarker(std::vector<TracePacket> * packets)3316 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3317   // The sync marks are used to tokenize large traces efficiently.
3318   // See description in trace_packet.proto.
3319   if (sync_marker_packet_size_ == 0) {
3320     // The marker ABI expects that the marker is written after the uid.
3321     // Protozero guarantees that fields are written in the same order of the
3322     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3323     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3324         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3325     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3326     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3327 
3328     // Keep this last.
3329     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3330     sync_marker_packet_size_ = packet.Finalize();
3331   }
3332   packets->emplace_back();
3333   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3334 }
3335 
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3336 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3337                                    std::vector<TracePacket>* packets) {
3338   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3339   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3340   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3341   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3342   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3343 }
3344 
GetTraceStats(TracingSession * tracing_session)3345 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3346   TraceStats trace_stats;
3347   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3348   trace_stats.set_producers_seen(last_producer_id_);
3349   trace_stats.set_data_sources_registered(
3350       static_cast<uint32_t>(data_sources_.size()));
3351   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3352   trace_stats.set_tracing_sessions(
3353       static_cast<uint32_t>(tracing_sessions_.size()));
3354   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3355   trace_stats.set_chunks_discarded(chunks_discarded_);
3356   trace_stats.set_patches_discarded(patches_discarded_);
3357   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3358   trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3359   trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3360   trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3361   trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3362 
3363   if (tracing_session->trace_filter) {
3364     auto* filt_stats = trace_stats.mutable_filter_stats();
3365     filt_stats->set_input_packets(tracing_session->filter_input_packets);
3366     filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3367     filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3368     filt_stats->set_errors(tracing_session->filter_errors);
3369     filt_stats->set_time_taken_ns(tracing_session->filter_time_taken_ns);
3370   }
3371 
3372   for (BufferID buf_id : tracing_session->buffers_index) {
3373     TraceBuffer* buf = GetBufferByID(buf_id);
3374     if (!buf) {
3375       PERFETTO_DFATAL("Buffer not found.");
3376       continue;
3377     }
3378     *trace_stats.add_buffer_stats() = buf->stats();
3379   }  // for (buf in session).
3380 
3381   if (!tracing_session->config.builtin_data_sources()
3382            .disable_chunk_usage_histograms()) {
3383     // Emit chunk usage stats broken down by sequence ID (i.e. by trace-writer).
3384     // Writer stats are updated by each TraceBuffer object at ReadBuffers time,
3385     // and there can be >1 buffer per session. However, we want to report only
3386     // one histogram per writer. A trace writer never writes to more than one
3387     // buffer (it's technically allowed but doesn't happen in the current impl
3388     // of the tracing SDK). Per-buffer breakdowns would be completely useless.
3389     TraceBuffer::WriterStatsMap merged_stats;
3390 
3391     // First merge all the per-buffer histograms into one-per-writer.
3392     for (const BufferID buf_id : tracing_session->buffers_index) {
3393       const TraceBuffer* buf = GetBufferByID(buf_id);
3394       if (!buf)
3395         continue;
3396       for (auto it = buf->writer_stats().GetIterator(); it; ++it) {
3397         auto& hist = merged_stats.Insert(it.key(), {}).first->used_chunk_hist;
3398         hist.Merge(it.value().used_chunk_hist);
3399       }
3400     }
3401 
3402     // Serialize the merged per-writer histogram into the stats proto.
3403     bool has_written_bucket_definition = false;
3404     for (auto it = merged_stats.GetIterator(); it; ++it) {
3405       const auto& hist = it.value().used_chunk_hist;
3406       ProducerID p;
3407       WriterID w;
3408       GetProducerAndWriterID(it.key(), &p, &w);
3409       if (!has_written_bucket_definition) {
3410         // Serialize one-off the histogram bucket definition, which is the same
3411         // for all entries in the map.
3412         has_written_bucket_definition = true;
3413         // The -1 in the for loop below is to skip the implicit overflow bucket.
3414         for (size_t i = 0; i < hist.num_buckets() - 1; ++i) {
3415           trace_stats.add_chunk_payload_histogram_def(hist.GetBucketThres(i));
3416         }
3417       }
3418       auto* wri_stats = trace_stats.add_writer_stats();
3419       wri_stats->set_sequence_id(tracing_session->GetPacketSequenceID(p, w));
3420       for (size_t i = 0; i < hist.num_buckets(); ++i) {
3421         wri_stats->add_chunk_payload_histogram_counts(hist.GetBucketCount(i));
3422         wri_stats->add_chunk_payload_histogram_sum(hist.GetBucketSum(i));
3423       }
3424     }  // for (writer in merged_stats.GetIterator())
3425   }    // if (!disable_chunk_usage_histograms)
3426 
3427   return trace_stats;
3428 }
3429 
MaybeEmitUuidAndTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3430 void TracingServiceImpl::MaybeEmitUuidAndTraceConfig(
3431     TracingSession* tracing_session,
3432     std::vector<TracePacket>* packets) {
3433   if (tracing_session->did_emit_config)
3434     return;
3435   tracing_session->did_emit_config = true;
3436 
3437   {
3438     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3439     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3440     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3441     auto* uuid = packet->set_trace_uuid();
3442     uuid->set_lsb(tracing_session->trace_uuid.lsb());
3443     uuid->set_msb(tracing_session->trace_uuid.msb());
3444     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3445   }
3446 
3447   {
3448     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3449     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3450     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3451     tracing_session->config.Serialize(packet->set_trace_config());
3452     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3453   }
3454 }
3455 
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)3456 void TracingServiceImpl::MaybeEmitSystemInfo(
3457     TracingSession* tracing_session,
3458     std::vector<TracePacket>* packets) {
3459   if (tracing_session->did_emit_system_info)
3460     return;
3461   tracing_session->did_emit_system_info = true;
3462   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3463   auto* info = packet->set_system_info();
3464   info->set_tracing_service_version(base::GetVersionString());
3465 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3466     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3467   struct utsname uname_info;
3468   if (uname(&uname_info) == 0) {
3469     auto* utsname_info = info->set_utsname();
3470     utsname_info->set_sysname(uname_info.sysname);
3471     utsname_info->set_version(uname_info.version);
3472     utsname_info->set_machine(uname_info.machine);
3473     utsname_info->set_release(uname_info.release);
3474   }
3475 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3476 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3477   std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3478   if (!fingerprint_value.empty()) {
3479     info->set_android_build_fingerprint(fingerprint_value);
3480   } else {
3481     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3482   }
3483 
3484   std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3485   std::optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3486   if (sdk_value.has_value()) {
3487     info->set_android_sdk_version(*sdk_value);
3488   } else {
3489     PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3490   }
3491   info->set_hz(sysconf(_SC_CLK_TCK));
3492   info->set_page_size(static_cast<uint32_t>(sysconf(_SC_PAGESIZE)));
3493 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3494   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3495   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3496   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3497 }
3498 
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3499 void TracingServiceImpl::EmitLifecycleEvents(
3500     TracingSession* tracing_session,
3501     std::vector<TracePacket>* packets) {
3502   using TimestampedPacket =
3503       std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3504 
3505   std::vector<TimestampedPacket> timestamped_packets;
3506   for (auto& event : tracing_session->lifecycle_events) {
3507     for (int64_t ts : event.timestamps) {
3508       protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3509       packet->set_timestamp(static_cast<uint64_t>(ts));
3510       packet->set_trusted_uid(static_cast<int32_t>(uid_));
3511       packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3512 
3513       auto* service_event = packet->set_service_event();
3514       service_event->AppendVarInt(event.field_id, 1);
3515       timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3516     }
3517     event.timestamps.clear();
3518   }
3519 
3520   // We sort by timestamp here to ensure that the "sequence" of lifecycle
3521   // packets has monotonic timestamps like other sequences in the trace.
3522   // Note that these events could still be out of order with respect to other
3523   // events on the service packet sequence (e.g. trigger received packets).
3524   std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3525             [](const TimestampedPacket& a, const TimestampedPacket& b) {
3526               return a.first < b.first;
3527             });
3528 
3529   for (const auto& pair : timestamped_packets)
3530     SerializeAndAppendPacket(packets, std::move(pair.second));
3531 }
3532 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3533 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3534     TracingSession* tracing_session,
3535     std::vector<TracePacket>* packets) {
3536   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3537                   tracing_session->received_triggers.size());
3538   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3539        i < tracing_session->received_triggers.size(); ++i) {
3540     const auto& info = tracing_session->received_triggers[i];
3541     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3542     auto* trigger = packet->set_trigger();
3543     trigger->set_trigger_name(info.trigger_name);
3544     trigger->set_producer_name(info.producer_name);
3545     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3546 
3547     packet->set_timestamp(info.boot_time_ns);
3548     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3549     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3550     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3551     ++tracing_session->num_triggers_emitted_into_trace;
3552   }
3553 }
3554 
MaybeLogUploadEvent(const TraceConfig & cfg,const base::Uuid & uuid,PerfettoStatsdAtom atom,const std::string & trigger_name)3555 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3556                                              const base::Uuid& uuid,
3557                                              PerfettoStatsdAtom atom,
3558                                              const std::string& trigger_name) {
3559   if (!ShouldLogEvent(cfg))
3560     return;
3561 
3562   PERFETTO_DCHECK(uuid);  // The UUID must be set at this point.
3563   android_stats::MaybeLogUploadEvent(atom, uuid.lsb(), uuid.msb(),
3564                                      trigger_name);
3565 }
3566 
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3567 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3568                                               PerfettoTriggerAtom atom,
3569                                               const std::string& trigger_name) {
3570   if (!ShouldLogEvent(cfg))
3571     return;
3572   android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3573 }
3574 
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3575 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3576     int64_t now_ns,
3577     uint64_t trigger_name_hash) {
3578   PERFETTO_DCHECK(
3579       std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3580   size_t remove_count = 0;
3581   size_t trigger_count = 0;
3582   for (const TriggerHistory& h : trigger_history_) {
3583     if (h.timestamp_ns < now_ns - trigger_window_ns_) {
3584       remove_count++;
3585     } else if (h.name_hash == trigger_name_hash) {
3586       trigger_count++;
3587     }
3588   }
3589   trigger_history_.erase_front(remove_count);
3590   return trigger_count;
3591 }
3592 
FlushAndCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID tsid)3593 void TracingServiceImpl::FlushAndCloneSession(ConsumerEndpointImpl* consumer,
3594                                               TracingSessionID tsid) {
3595   PERFETTO_DCHECK_THREAD(thread_checker_);
3596 
3597   if (tsid == kBugreportSessionId) {
3598     TracingSession* session = FindTracingSessionWithMaxBugreportScore();
3599     if (!session) {
3600       consumer->consumer_->OnSessionCloned(
3601           {false, "No tracing sessions eligible for bugreport found", {}});
3602       return;
3603     }
3604     tsid = session->id;
3605   }
3606 
3607   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3608   auto weak_consumer = consumer->GetWeakPtr();
3609   Flush(tsid, 0, [weak_this, tsid, weak_consumer](bool final_flush_outcome) {
3610     PERFETTO_LOG("FlushAndCloneSession(%" PRIu64 ") started, success=%d", tsid,
3611                  final_flush_outcome);
3612     if (!weak_this || !weak_consumer)
3613       return;
3614     base::Uuid uuid;
3615     base::Status result = weak_this->DoCloneSession(&*weak_consumer, tsid,
3616                                                     final_flush_outcome, &uuid);
3617     weak_consumer->consumer_->OnSessionCloned(
3618         {result.ok(), result.message(), uuid});
3619   });
3620 }
3621 
DoCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID src_tsid,bool final_flush_outcome,base::Uuid * new_uuid)3622 base::Status TracingServiceImpl::DoCloneSession(ConsumerEndpointImpl* consumer,
3623                                                 TracingSessionID src_tsid,
3624                                                 bool final_flush_outcome,
3625                                                 base::Uuid* new_uuid) {
3626   PERFETTO_DLOG("CloneSession(%" PRIu64 ") started, consumer uid: %d", src_tsid,
3627                 static_cast<int>(consumer->uid_));
3628 
3629   TracingSession* src = GetTracingSession(src_tsid);
3630 
3631   // The session might be gone by the time we try to clone it.
3632   if (!src)
3633     return PERFETTO_SVC_ERR("session not found");
3634 
3635   if (consumer->tracing_session_id_) {
3636     return PERFETTO_SVC_ERR(
3637         "The consumer is already attached to another tracing session");
3638   }
3639 
3640   // Skip the UID check for sessions marked with a bugreport_score > 0.
3641   // Those sessions, by design, can be stolen by any other consumer for the
3642   // sake of creating snapshots for bugreports.
3643   if (src->config.bugreport_score() <= 0 &&
3644       src->consumer_uid != consumer->uid_ && consumer->uid_ != 0) {
3645     return PERFETTO_SVC_ERR("Not allowed to clone a session from another UID");
3646   }
3647 
3648   // First clone all TraceBuffer(s). This can fail because of ENOMEM. If it
3649   // happens bail out early before creating any session.
3650   std::vector<std::pair<BufferID, std::unique_ptr<TraceBuffer>>> buf_snaps;
3651   buf_snaps.reserve(src->num_buffers());
3652   bool buf_clone_failed = false;
3653   for (BufferID src_buf_id : src->buffers_index) {
3654     TraceBuffer* src_buf = GetBufferByID(src_buf_id);
3655     std::unique_ptr<TraceBuffer> buf_snap = src_buf->CloneReadOnly();
3656     BufferID buf_global_id = buffer_ids_.Allocate();
3657     buf_clone_failed |= !buf_snap.get() || !buf_global_id;
3658     buf_snaps.emplace_back(buf_global_id, std::move(buf_snap));
3659   }
3660 
3661   // Free up allocated IDs in case of failure. No need to free the TraceBuffers,
3662   // as they are still owned by the temporary |buf_snaps|.
3663   if (buf_clone_failed) {
3664     for (auto& kv : buf_snaps) {
3665       if (kv.first)
3666         buffer_ids_.Free(kv.first);
3667     }
3668     return PERFETTO_SVC_ERR("Buffer allocation failed");
3669   }
3670 
3671   const TracingSessionID tsid = ++last_tracing_session_id_;
3672   TracingSession* cloned_session =
3673       &tracing_sessions_
3674            .emplace(
3675                std::piecewise_construct, std::forward_as_tuple(tsid),
3676                std::forward_as_tuple(tsid, consumer, src->config, task_runner_))
3677            .first->second;
3678 
3679   // Generate a new UUID for the cloned session, but preserve the LSB. In some
3680   // contexts the LSB is used to tie the trace back to the statsd subscription
3681   // that triggered it. See the corresponding code in perfetto_cmd.cc which
3682   // reads at triggering_subscription_id().
3683   const int64_t orig_uuid_lsb = src->trace_uuid.lsb();
3684   cloned_session->state = TracingSession::CLONED_READ_ONLY;
3685   cloned_session->trace_uuid = base::Uuidv4();
3686   cloned_session->trace_uuid.set_lsb(orig_uuid_lsb);
3687   *new_uuid = cloned_session->trace_uuid;
3688 
3689   for (auto& kv : buf_snaps) {
3690     BufferID buf_global_id = kv.first;
3691     std::unique_ptr<TraceBuffer>& buf = kv.second;
3692     buffers_.emplace(buf_global_id, std::move(buf));
3693     cloned_session->buffers_index.emplace_back(buf_global_id);
3694   }
3695   UpdateMemoryGuardrail();
3696 
3697   // Copy over relevant state that we want to persist in the cloned session.
3698   // Mostly stats and metadata that is emitted in the trace file by the service.
3699   // Also clear the received trigger list in the main tracing session. A
3700   // CLONE_SNAPSHOT session can go in ring buffer mode for several hours and get
3701   // snapshotted several times. This causes two issues with `received_triggers`:
3702   // 1. Adding noise in the cloned trace emitting triggers that happened too
3703   //    far back (see b/290799105).
3704   // 2. Bloating memory (see b/290798988).
3705   cloned_session->should_emit_stats = true;
3706   cloned_session->received_triggers = std::move(src->received_triggers);
3707   src->received_triggers.clear();
3708   src->num_triggers_emitted_into_trace = 0;
3709   cloned_session->lifecycle_events =
3710       std::vector<TracingSession::LifecycleEvent>(src->lifecycle_events);
3711   cloned_session->initial_clock_snapshot = src->initial_clock_snapshot;
3712   cloned_session->clock_snapshot_ring_buffer = src->clock_snapshot_ring_buffer;
3713   cloned_session->invalid_packets = src->invalid_packets;
3714   cloned_session->flushes_requested = src->flushes_requested;
3715   cloned_session->flushes_succeeded = src->flushes_succeeded;
3716   cloned_session->flushes_failed = src->flushes_failed;
3717   cloned_session->compress_deflate = src->compress_deflate;
3718   if (src->trace_filter) {
3719     // Copy the trace filter.
3720     cloned_session->trace_filter.reset(
3721         new protozero::MessageFilter(*src->trace_filter));
3722   }
3723 
3724   SnapshotLifecyleEvent(
3725       cloned_session,
3726       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
3727       true /* snapshot_clocks */);
3728 
3729   PERFETTO_DLOG("Consumer (uid:%d) cloned tracing session %" PRIu64
3730                 " -> %" PRIu64,
3731                 static_cast<int>(consumer->uid_), src_tsid, tsid);
3732 
3733   consumer->tracing_session_id_ = tsid;
3734   cloned_session->final_flush_outcome = final_flush_outcome
3735                                             ? TraceStats::FINAL_FLUSH_SUCCEEDED
3736                                             : TraceStats::FINAL_FLUSH_FAILED;
3737   return base::OkStatus();
3738 }
3739 
3740 ////////////////////////////////////////////////////////////////////////////////
3741 // TracingServiceImpl::ConsumerEndpointImpl implementation
3742 ////////////////////////////////////////////////////////////////////////////////
3743 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)3744 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
3745     TracingServiceImpl* service,
3746     base::TaskRunner* task_runner,
3747     Consumer* consumer,
3748     uid_t uid)
3749     : task_runner_(task_runner),
3750       service_(service),
3751       consumer_(consumer),
3752       uid_(uid),
3753       weak_ptr_factory_(this) {}
3754 
~ConsumerEndpointImpl()3755 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
3756   service_->DisconnectConsumer(this);
3757   consumer_->OnDisconnect();
3758 }
3759 
NotifyOnTracingDisabled(const std::string & error)3760 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
3761     const std::string& error) {
3762   PERFETTO_DCHECK_THREAD(thread_checker_);
3763   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3764   task_runner_->PostTask([weak_this, error /* deliberate copy */] {
3765     if (weak_this)
3766       weak_this->consumer_->OnTracingDisabled(error);
3767   });
3768 }
3769 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)3770 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
3771     const TraceConfig& cfg,
3772     base::ScopedFile fd) {
3773   PERFETTO_DCHECK_THREAD(thread_checker_);
3774   auto status = service_->EnableTracing(this, cfg, std::move(fd));
3775   if (!status.ok())
3776     NotifyOnTracingDisabled(status.message());
3777 }
3778 
ChangeTraceConfig(const TraceConfig & cfg)3779 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
3780     const TraceConfig& cfg) {
3781   if (!tracing_session_id_) {
3782     PERFETTO_LOG(
3783         "Consumer called ChangeTraceConfig() but tracing was "
3784         "not active");
3785     return;
3786   }
3787   service_->ChangeTraceConfig(this, cfg);
3788 }
3789 
StartTracing()3790 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
3791   PERFETTO_DCHECK_THREAD(thread_checker_);
3792   if (!tracing_session_id_) {
3793     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
3794     return;
3795   }
3796   service_->StartTracing(tracing_session_id_);
3797 }
3798 
DisableTracing()3799 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
3800   PERFETTO_DCHECK_THREAD(thread_checker_);
3801   if (!tracing_session_id_) {
3802     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
3803     return;
3804   }
3805   service_->DisableTracing(tracing_session_id_);
3806 }
3807 
ReadBuffers()3808 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
3809   PERFETTO_DCHECK_THREAD(thread_checker_);
3810   if (!tracing_session_id_) {
3811     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
3812     consumer_->OnTraceData({}, /* has_more = */ false);
3813     return;
3814   }
3815   if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
3816     consumer_->OnTraceData({}, /* has_more = */ false);
3817   }
3818 }
3819 
FreeBuffers()3820 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
3821   PERFETTO_DCHECK_THREAD(thread_checker_);
3822   if (!tracing_session_id_) {
3823     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
3824     return;
3825   }
3826   service_->FreeBuffers(tracing_session_id_);
3827   tracing_session_id_ = 0;
3828 }
3829 
Flush(uint32_t timeout_ms,FlushCallback callback)3830 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
3831                                                      FlushCallback callback) {
3832   PERFETTO_DCHECK_THREAD(thread_checker_);
3833   if (!tracing_session_id_) {
3834     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
3835     return;
3836   }
3837   service_->Flush(tracing_session_id_, timeout_ms, callback);
3838 }
3839 
Detach(const std::string & key)3840 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
3841   PERFETTO_DCHECK_THREAD(thread_checker_);
3842   bool success = service_->DetachConsumer(this, key);
3843   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3844   task_runner_->PostTask([weak_this, success] {
3845     if (weak_this)
3846       weak_this->consumer_->OnDetach(success);
3847   });
3848 }
3849 
Attach(const std::string & key)3850 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
3851   PERFETTO_DCHECK_THREAD(thread_checker_);
3852   bool success = service_->AttachConsumer(this, key);
3853   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3854   task_runner_->PostTask([weak_this, success] {
3855     if (!weak_this)
3856       return;
3857     Consumer* consumer = weak_this->consumer_;
3858     TracingSession* session =
3859         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
3860     if (!session) {
3861       consumer->OnAttach(false, TraceConfig());
3862       return;
3863     }
3864     consumer->OnAttach(success, session->config);
3865   });
3866 }
3867 
GetTraceStats()3868 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
3869   PERFETTO_DCHECK_THREAD(thread_checker_);
3870   bool success = false;
3871   TraceStats stats;
3872   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3873   if (session) {
3874     success = true;
3875     stats = service_->GetTraceStats(session);
3876   }
3877   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3878   task_runner_->PostTask([weak_this, success, stats] {
3879     if (weak_this)
3880       weak_this->consumer_->OnTraceStats(success, stats);
3881   });
3882 }
3883 
ObserveEvents(uint32_t events_mask)3884 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
3885     uint32_t events_mask) {
3886   PERFETTO_DCHECK_THREAD(thread_checker_);
3887   observable_events_mask_ = events_mask;
3888   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3889   if (!session)
3890     return;
3891 
3892   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
3893     // Issue initial states.
3894     for (const auto& kv : session->data_source_instances) {
3895       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
3896       PERFETTO_DCHECK(producer);
3897       OnDataSourceInstanceStateChange(*producer, kv.second);
3898     }
3899   }
3900 
3901   // If the ObserveEvents() call happens after data sources have acked already
3902   // notify immediately.
3903   if (observable_events_mask_ &
3904       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
3905     service_->MaybeNotifyAllDataSourcesStarted(session);
3906   }
3907 }
3908 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)3909 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
3910     const ProducerEndpointImpl& producer,
3911     const DataSourceInstance& instance) {
3912   if (!(observable_events_mask_ &
3913         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
3914     return;
3915   }
3916 
3917   if (instance.state != DataSourceInstance::CONFIGURED &&
3918       instance.state != DataSourceInstance::STARTED &&
3919       instance.state != DataSourceInstance::STOPPED) {
3920     return;
3921   }
3922 
3923   auto* observable_events = AddObservableEvents();
3924   auto* change = observable_events->add_instance_state_changes();
3925   change->set_producer_name(producer.name_);
3926   change->set_data_source_name(instance.data_source_name);
3927   if (instance.state == DataSourceInstance::STARTED) {
3928     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
3929   } else {
3930     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
3931   }
3932 }
3933 
OnAllDataSourcesStarted()3934 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
3935   if (!(observable_events_mask_ &
3936         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
3937     return;
3938   }
3939   auto* observable_events = AddObservableEvents();
3940   observable_events->set_all_data_sources_started(true);
3941 }
3942 
NotifyCloneSnapshotTrigger()3943 void TracingServiceImpl::ConsumerEndpointImpl::NotifyCloneSnapshotTrigger() {
3944   if (!(observable_events_mask_ & ObservableEvents::TYPE_CLONE_TRIGGER_HIT)) {
3945     return;
3946   }
3947   auto* observable_events = AddObservableEvents();
3948   auto* clone_trig = observable_events->mutable_clone_trigger_hit();
3949   clone_trig->set_tracing_session_id(static_cast<int64_t>(tracing_session_id_));
3950 }
3951 
3952 ObservableEvents*
AddObservableEvents()3953 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
3954   PERFETTO_DCHECK_THREAD(thread_checker_);
3955   if (!observable_events_) {
3956     observable_events_.reset(new ObservableEvents());
3957     auto weak_this = weak_ptr_factory_.GetWeakPtr();
3958     task_runner_->PostTask([weak_this] {
3959       if (!weak_this)
3960         return;
3961 
3962       // Move into a temporary to allow reentrancy in OnObservableEvents.
3963       auto observable_events = std::move(weak_this->observable_events_);
3964       weak_this->consumer_->OnObservableEvents(*observable_events);
3965     });
3966   }
3967   return observable_events_.get();
3968 }
3969 
QueryServiceState(QueryServiceStateCallback callback)3970 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
3971     QueryServiceStateCallback callback) {
3972   PERFETTO_DCHECK_THREAD(thread_checker_);
3973   TracingServiceState svc_state;
3974 
3975   const auto& sessions = service_->tracing_sessions_;
3976   svc_state.set_tracing_service_version(base::GetVersionString());
3977   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
3978 
3979   int num_started = 0;
3980   for (const auto& kv : sessions)
3981     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
3982   svc_state.set_num_sessions_started(static_cast<int>(num_started));
3983 
3984   for (const auto& kv : service_->producers_) {
3985     auto* producer = svc_state.add_producers();
3986     producer->set_id(static_cast<int>(kv.first));
3987     producer->set_name(kv.second->name_);
3988     producer->set_sdk_version(kv.second->sdk_version_);
3989     producer->set_uid(static_cast<int32_t>(kv.second->uid()));
3990     producer->set_pid(static_cast<int32_t>(kv.second->pid()));
3991   }
3992 
3993   for (const auto& kv : service_->data_sources_) {
3994     const auto& registered_data_source = kv.second;
3995     auto* data_source = svc_state.add_data_sources();
3996     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
3997     data_source->set_producer_id(
3998         static_cast<int>(registered_data_source.producer_id));
3999   }
4000 
4001   svc_state.set_supports_tracing_sessions(true);
4002   for (const auto& kv : service_->tracing_sessions_) {
4003     const TracingSession& s = kv.second;
4004     // List only tracing sessions for the calling UID (or everything for root).
4005     if (uid_ != 0 && uid_ != s.consumer_uid)
4006       continue;
4007     auto* session = svc_state.add_tracing_sessions();
4008     session->set_id(s.id);
4009     session->set_consumer_uid(static_cast<int>(s.consumer_uid));
4010     session->set_duration_ms(s.config.duration_ms());
4011     session->set_num_data_sources(
4012         static_cast<uint32_t>(s.data_source_instances.size()));
4013     session->set_unique_session_name(s.config.unique_session_name());
4014     for (const auto& snap_kv : s.initial_clock_snapshot) {
4015       if (snap_kv.first == protos::pbzero::BUILTIN_CLOCK_REALTIME)
4016         session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.second));
4017     }
4018     for (const auto& buf : s.config.buffers())
4019       session->add_buffer_size_kb(buf.size_kb());
4020 
4021     switch (s.state) {
4022       case TracingSession::State::DISABLED:
4023         session->set_state("DISABLED");
4024         break;
4025       case TracingSession::State::CONFIGURED:
4026         session->set_state("CONFIGURED");
4027         break;
4028       case TracingSession::State::STARTED:
4029         session->set_state("STARTED");
4030         break;
4031       case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
4032         session->set_state("STOP_WAIT");
4033         break;
4034       case TracingSession::State::CLONED_READ_ONLY:
4035         session->set_state("CLONED_READ_ONLY");
4036         break;
4037     }
4038   }
4039   callback(/*success=*/true, svc_state);
4040 }
4041 
QueryCapabilities(QueryCapabilitiesCallback callback)4042 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
4043     QueryCapabilitiesCallback callback) {
4044   PERFETTO_DCHECK_THREAD(thread_checker_);
4045   TracingServiceCapabilities caps;
4046   caps.set_has_query_capabilities(true);
4047   caps.set_has_trace_config_output_path(true);
4048   caps.set_has_clone_session(true);
4049   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
4050   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
4051   caps.add_observable_events(ObservableEvents::TYPE_CLONE_TRIGGER_HIT);
4052   static_assert(
4053       ObservableEvents::Type_MAX == ObservableEvents::TYPE_CLONE_TRIGGER_HIT,
4054       "");
4055   callback(caps);
4056 }
4057 
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)4058 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
4059     SaveTraceForBugreportCallback consumer_callback) {
4060   consumer_callback(false,
4061                     "SaveTraceForBugreport is deprecated. Use "
4062                     "CloneSession(kBugreportSessionId) instead.");
4063 }
4064 
CloneSession(TracingSessionID tsid)4065 void TracingServiceImpl::ConsumerEndpointImpl::CloneSession(
4066     TracingSessionID tsid) {
4067   PERFETTO_DCHECK_THREAD(thread_checker_);
4068   // FlushAndCloneSession will call OnSessionCloned after the async flush.
4069   service_->FlushAndCloneSession(this, tsid);
4070 }
4071 
4072 ////////////////////////////////////////////////////////////////////////////////
4073 // TracingServiceImpl::ProducerEndpointImpl implementation
4074 ////////////////////////////////////////////////////////////////////////////////
4075 
ProducerEndpointImpl(ProducerID id,uid_t uid,pid_t pid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)4076 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
4077     ProducerID id,
4078     uid_t uid,
4079     pid_t pid,
4080     TracingServiceImpl* service,
4081     base::TaskRunner* task_runner,
4082     Producer* producer,
4083     const std::string& producer_name,
4084     const std::string& sdk_version,
4085     bool in_process,
4086     bool smb_scraping_enabled)
4087     : id_(id),
4088       uid_(uid),
4089       pid_(pid),
4090       service_(service),
4091       task_runner_(task_runner),
4092       producer_(producer),
4093       name_(producer_name),
4094       sdk_version_(sdk_version),
4095       in_process_(in_process),
4096       smb_scraping_enabled_(smb_scraping_enabled),
4097       weak_ptr_factory_(this) {}
4098 
~ProducerEndpointImpl()4099 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
4100   service_->DisconnectProducer(id_);
4101   producer_->OnDisconnect();
4102 }
4103 
Disconnect()4104 void TracingServiceImpl::ProducerEndpointImpl::Disconnect() {
4105   PERFETTO_DCHECK_THREAD(thread_checker_);
4106   // Disconnection is only supported via destroying the ProducerEndpoint.
4107   PERFETTO_FATAL("Not supported");
4108 }
4109 
RegisterDataSource(const DataSourceDescriptor & desc)4110 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
4111     const DataSourceDescriptor& desc) {
4112   PERFETTO_DCHECK_THREAD(thread_checker_);
4113   service_->RegisterDataSource(id_, desc);
4114 }
4115 
UpdateDataSource(const DataSourceDescriptor & desc)4116 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
4117     const DataSourceDescriptor& desc) {
4118   PERFETTO_DCHECK_THREAD(thread_checker_);
4119   service_->UpdateDataSource(id_, desc);
4120 }
4121 
UnregisterDataSource(const std::string & name)4122 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
4123     const std::string& name) {
4124   PERFETTO_DCHECK_THREAD(thread_checker_);
4125   service_->UnregisterDataSource(id_, name);
4126 }
4127 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)4128 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
4129     uint32_t writer_id,
4130     uint32_t target_buffer) {
4131   PERFETTO_DCHECK_THREAD(thread_checker_);
4132   writers_[static_cast<WriterID>(writer_id)] =
4133       static_cast<BufferID>(target_buffer);
4134 }
4135 
UnregisterTraceWriter(uint32_t writer_id)4136 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
4137     uint32_t writer_id) {
4138   PERFETTO_DCHECK_THREAD(thread_checker_);
4139   writers_.erase(static_cast<WriterID>(writer_id));
4140 }
4141 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)4142 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
4143     const CommitDataRequest& req_untrusted,
4144     CommitDataCallback callback) {
4145   PERFETTO_DCHECK_THREAD(thread_checker_);
4146 
4147   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
4148     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
4149                                EncodeCommitDataRequest(id_, req_untrusted));
4150   }
4151 
4152   if (!shared_memory_) {
4153     PERFETTO_DLOG(
4154         "Attempted to commit data before the shared memory was allocated.");
4155     return;
4156   }
4157   PERFETTO_DCHECK(shmem_abi_.is_valid());
4158   for (const auto& entry : req_untrusted.chunks_to_move()) {
4159     const uint32_t page_idx = entry.page();
4160     if (page_idx >= shmem_abi_.num_pages())
4161       continue;  // A buggy or malicious producer.
4162 
4163     SharedMemoryABI::Chunk chunk =
4164         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
4165     if (!chunk.is_valid()) {
4166       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
4167                     entry.page(), entry.chunk());
4168       continue;
4169     }
4170 
4171     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
4172     // the ABI contract expects the producer to not touch the chunk anymore
4173     // (until the service marks that as free). This is why all the reads below
4174     // are just memory_order_relaxed. Also, the code here assumes that all this
4175     // data can be malicious and just gives up if anything is malformed.
4176     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
4177     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
4178     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
4179     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
4180     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
4181     uint16_t num_fragments = packets.count;
4182     uint8_t chunk_flags = packets.flags;
4183 
4184     service_->CopyProducerPageIntoLogBuffer(
4185         id_, uid_, pid_, writer_id, chunk_id, buffer_id, num_fragments,
4186         chunk_flags,
4187         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
4188 
4189     // This one has release-store semantics.
4190     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
4191   }  // for(chunks_to_move)
4192 
4193   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
4194 
4195   if (req_untrusted.flush_request_id()) {
4196     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
4197   }
4198 
4199   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
4200   // callback being invoked within the same callstack and not posted. If this
4201   // changes, the code there needs to be changed accordingly.
4202   if (callback)
4203     callback();
4204 }
4205 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)4206 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
4207     std::unique_ptr<SharedMemory> shared_memory,
4208     size_t page_size_bytes,
4209     bool provided_by_producer) {
4210   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
4211   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
4212 
4213   shared_memory_ = std::move(shared_memory);
4214   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
4215   is_shmem_provided_by_producer_ = provided_by_producer;
4216 
4217   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
4218                         shared_memory_->size(),
4219                         shared_buffer_page_size_kb() * 1024);
4220   if (in_process_) {
4221     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
4222         shared_memory_->start(), shared_memory_->size(),
4223         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
4224     inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
4225   }
4226 
4227   OnTracingSetup();
4228   service_->UpdateMemoryGuardrail();
4229 }
4230 
shared_memory() const4231 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
4232   PERFETTO_DCHECK_THREAD(thread_checker_);
4233   return shared_memory_.get();
4234 }
4235 
shared_buffer_page_size_kb() const4236 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
4237     const {
4238   return shared_buffer_page_size_kb_;
4239 }
4240 
ActivateTriggers(const std::vector<std::string> & triggers)4241 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
4242     const std::vector<std::string>& triggers) {
4243   service_->ActivateTriggers(id_, triggers);
4244 }
4245 
StopDataSource(DataSourceInstanceID ds_inst_id)4246 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
4247     DataSourceInstanceID ds_inst_id) {
4248   // TODO(primiano): When we'll support tearing down the SMB, at this point we
4249   // should send the Producer a TearDownTracing if all its data sources have
4250   // been disabled (see b/77532839 and aosp/655179 PS1).
4251   PERFETTO_DCHECK_THREAD(thread_checker_);
4252   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4253   task_runner_->PostTask([weak_this, ds_inst_id] {
4254     if (weak_this)
4255       weak_this->producer_->StopDataSource(ds_inst_id);
4256   });
4257 }
4258 
4259 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()4260 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
4261   if (!inproc_shmem_arbiter_) {
4262     PERFETTO_FATAL(
4263         "The in-process SharedMemoryArbiter can only be used when "
4264         "CreateProducer has been called with in_process=true and after tracing "
4265         "has started.");
4266   }
4267 
4268   PERFETTO_DCHECK(in_process_);
4269   return inproc_shmem_arbiter_.get();
4270 }
4271 
IsShmemProvidedByProducer() const4272 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
4273     const {
4274   return is_shmem_provided_by_producer_;
4275 }
4276 
4277 // Can be called on any thread.
4278 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4279 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4280     BufferID buf_id,
4281     BufferExhaustedPolicy buffer_exhausted_policy) {
4282   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4283   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4284                                                        buffer_exhausted_policy);
4285 }
4286 
NotifyFlushComplete(FlushRequestID id)4287 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4288     FlushRequestID id) {
4289   PERFETTO_DCHECK_THREAD(thread_checker_);
4290   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4291   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4292 }
4293 
OnTracingSetup()4294 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4295   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4296   task_runner_->PostTask([weak_this] {
4297     if (weak_this)
4298       weak_this->producer_->OnTracingSetup();
4299   });
4300 }
4301 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)4302 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4303     FlushRequestID flush_request_id,
4304     const std::vector<DataSourceInstanceID>& data_sources) {
4305   PERFETTO_DCHECK_THREAD(thread_checker_);
4306   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4307   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
4308     if (weak_this) {
4309       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
4310                                   data_sources.size());
4311     }
4312   });
4313 }
4314 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4315 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4316     DataSourceInstanceID ds_id,
4317     const DataSourceConfig& config) {
4318   PERFETTO_DCHECK_THREAD(thread_checker_);
4319   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4320   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4321   task_runner_->PostTask([weak_this, ds_id, config] {
4322     if (weak_this)
4323       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
4324   });
4325 }
4326 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4327 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4328     DataSourceInstanceID ds_id,
4329     const DataSourceConfig& config) {
4330   PERFETTO_DCHECK_THREAD(thread_checker_);
4331   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4332   task_runner_->PostTask([weak_this, ds_id, config] {
4333     if (weak_this)
4334       weak_this->producer_->StartDataSource(ds_id, std::move(config));
4335   });
4336 }
4337 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4338 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4339     DataSourceInstanceID data_source_id) {
4340   PERFETTO_DCHECK_THREAD(thread_checker_);
4341   service_->NotifyDataSourceStarted(id_, data_source_id);
4342 }
4343 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4344 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4345     DataSourceInstanceID data_source_id) {
4346   PERFETTO_DCHECK_THREAD(thread_checker_);
4347   service_->NotifyDataSourceStopped(id_, data_source_id);
4348 }
4349 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4350 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4351     const std::vector<BufferID>& target_buffers) {
4352   if (allowed_target_buffers_.empty())
4353     return;
4354   for (BufferID buffer : target_buffers)
4355     allowed_target_buffers_.erase(buffer);
4356 }
4357 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4358 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4359     const std::vector<DataSourceInstanceID>& data_sources) {
4360   PERFETTO_DCHECK_THREAD(thread_checker_);
4361   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4362   task_runner_->PostTask([weak_this, data_sources] {
4363     if (weak_this) {
4364       base::StringView producer_name(weak_this->name_);
4365       weak_this->producer_->ClearIncrementalState(data_sources.data(),
4366                                                   data_sources.size());
4367     }
4368   });
4369 }
4370 
Sync(std::function<void ()> callback)4371 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4372     std::function<void()> callback) {
4373   task_runner_->PostTask(callback);
4374 }
4375 
4376 ////////////////////////////////////////////////////////////////////////////////
4377 // TracingServiceImpl::TracingSession implementation
4378 ////////////////////////////////////////////////////////////////////////////////
4379 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4380 TracingServiceImpl::TracingSession::TracingSession(
4381     TracingSessionID session_id,
4382     ConsumerEndpointImpl* consumer,
4383     const TraceConfig& new_config,
4384     base::TaskRunner* task_runner)
4385     : id(session_id),
4386       consumer_maybe_null(consumer),
4387       consumer_uid(consumer->uid_),
4388       config(new_config),
4389       snapshot_periodic_task(task_runner),
4390       timed_stop_task(task_runner) {
4391   // all_data_sources_flushed is special because we store up to 64 events of
4392   // this type. Other events will go through the default case in
4393   // SnapshotLifecycleEvent() where they will be given a max history of 1.
4394   lifecycle_events.emplace_back(
4395       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
4396       64 /* max_size */);
4397 }
4398 
4399 }  // namespace perfetto
4400