• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/tracing_service_impl.h"
18 
19 #include "perfetto/base/build_config.h"
20 #include "perfetto/tracing/core/forward_decls.h"
21 
22 #include <errno.h>
23 #include <limits.h>
24 #include <string.h>
25 
26 #include <cinttypes>
27 #include <regex>
28 #include <unordered_set>
29 
30 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
31     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
32 #include <sys/uio.h>
33 #include <sys/utsname.h>
34 #include <unistd.h>
35 #endif
36 
37 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
38     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
39 #include "src/android_internal/lazy_library_loader.h"    // nogncheck
40 #include "src/android_internal/tracing_service_proxy.h"  // nogncheck
41 #endif
42 
43 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
44     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
45     PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
46 #define PERFETTO_HAS_CHMOD
47 #include <sys/stat.h>
48 #endif
49 
50 #include <algorithm>
51 
52 #include "perfetto/base/build_config.h"
53 #include "perfetto/base/status.h"
54 #include "perfetto/base/task_runner.h"
55 #include "perfetto/ext/base/android_utils.h"
56 #include "perfetto/ext/base/file_utils.h"
57 #include "perfetto/ext/base/metatrace.h"
58 #include "perfetto/ext/base/string_utils.h"
59 #include "perfetto/ext/base/temp_file.h"
60 #include "perfetto/ext/base/utils.h"
61 #include "perfetto/ext/base/uuid.h"
62 #include "perfetto/ext/base/version.h"
63 #include "perfetto/ext/base/watchdog.h"
64 #include "perfetto/ext/tracing/core/basic_types.h"
65 #include "perfetto/ext/tracing/core/consumer.h"
66 #include "perfetto/ext/tracing/core/observable_events.h"
67 #include "perfetto/ext/tracing/core/producer.h"
68 #include "perfetto/ext/tracing/core/shared_memory.h"
69 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
70 #include "perfetto/ext/tracing/core/trace_packet.h"
71 #include "perfetto/ext/tracing/core/trace_writer.h"
72 #include "perfetto/protozero/scattered_heap_buffer.h"
73 #include "perfetto/protozero/static_buffer.h"
74 #include "perfetto/tracing/core/data_source_descriptor.h"
75 #include "perfetto/tracing/core/tracing_service_capabilities.h"
76 #include "perfetto/tracing/core/tracing_service_state.h"
77 #include "src/android_stats/statsd_logging_helper.h"
78 #include "src/protozero/filtering/message_filter.h"
79 #include "src/tracing/core/packet_stream_validator.h"
80 #include "src/tracing/core/shared_memory_arbiter_impl.h"
81 #include "src/tracing/core/trace_buffer.h"
82 
83 #include "protos/perfetto/common/builtin_clock.gen.h"
84 #include "protos/perfetto/common/builtin_clock.pbzero.h"
85 #include "protos/perfetto/common/trace_stats.pbzero.h"
86 #include "protos/perfetto/config/trace_config.pbzero.h"
87 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
88 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
89 #include "protos/perfetto/trace/system_info.pbzero.h"
90 #include "protos/perfetto/trace/trace_packet.pbzero.h"
91 #include "protos/perfetto/trace/trigger.pbzero.h"
92 
93 // General note: this class must assume that Producers are malicious and will
94 // try to crash / exploit this class. We can trust pointers because they come
95 // from the IPC layer, but we should never assume that that the producer calls
96 // come in the right order or their arguments are sane / within bounds.
97 
98 // This is a macro because we want the call-site line number for the ELOG.
99 #define PERFETTO_SVC_ERR(...) \
100   (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
101 
102 namespace perfetto {
103 
104 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
105     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
106 // These are the only SELinux approved dir for trace files that are created
107 // directly by traced.
108 const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
109 const char* kAndroidProductionBugreportTracePath =
110     "/data/misc/perfetto-traces/bugreport/systrace.pftrace";
111 #endif
112 
113 namespace {
114 constexpr int kMaxBuffersPerConsumer = 128;
115 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
116 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
117 constexpr int kMaxConcurrentTracingSessions = 15;
118 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
119 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
120 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
121 
122 constexpr uint32_t kMillisPerHour = 3600000;
123 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
124 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
125 
126 // These apply only if enable_extra_guardrails is true.
127 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
128 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
129 
130 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
131 struct iovec {
132   void* iov_base;  // Address
133   size_t iov_len;  // Block size
134 };
135 
136 // Simple implementation of writev. Note that this does not give the atomicity
137 // guarantees of a real writev, but we don't depend on these (we aren't writing
138 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)139 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
140   ssize_t total_size = 0;
141   for (int i = 0; i < iovcnt; ++i) {
142     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
143     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
144       return -1;
145     total_size += current_size;
146   }
147   return total_size;
148 }
149 
150 #define IOV_MAX 1024  // Linux compatible limit.
151 
152 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
153         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
154 
155 // Partially encodes a CommitDataRequest in an int32 for the purposes of
156 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
157 // (which is technically 16 bits wide).
158 //
159 // Format (by bit range):
160 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
161 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)162 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
163                                        const CommitDataRequest& req_untrusted) {
164   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
165   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
166   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
167 
168   uint32_t mask = (1 << 10) - 1;
169   uint32_t acc = 0;
170   acc |= has_flush_id << 30;
171   acc |= (cpatch & mask) << 20;
172   acc |= (cmov & mask) << 10;
173   acc |= (producer_id & mask);
174   return static_cast<int32_t>(acc);
175 }
176 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)177 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
178                               std::vector<uint8_t> packet) {
179   Slice slice = Slice::Allocate(packet.size());
180   memcpy(slice.own_data(), packet.data(), packet.size());
181   packets->emplace_back();
182   packets->back().AddSlice(std::move(slice));
183 }
184 
EnsureValidShmSizes(size_t shm_size,size_t page_size)185 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
186     size_t shm_size,
187     size_t page_size) {
188   // Theoretically the max page size supported by the ABI is 64KB.
189   // However, the current implementation of TraceBuffer (the non-shared
190   // userspace buffer where the service copies data) supports at most
191   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
192   // but then causes the data to be discarded when copying it into
193   // TraceBuffer.
194   constexpr size_t kMaxPageSize = 32 * 1024;
195   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
196 
197   if (page_size == 0)
198     page_size = TracingServiceImpl::kDefaultShmPageSize;
199   if (shm_size == 0)
200     shm_size = TracingServiceImpl::kDefaultShmSize;
201 
202   page_size = std::min<size_t>(page_size, kMaxPageSize);
203   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
204 
205   // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
206   // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
207   // matter here, because the tracing page size is just a logical partitioning
208   // and does not have any dependencies on kernel mm syscalls (read: it's fine
209   // to have trace page sizes of 4K on a system where the kernel page size is
210   // 16K).
211   bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
212   page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
213 
214   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
215   size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
216   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
217 
218   if (!page_size_is_valid || shm_size < page_size ||
219       shm_size % page_size != 0) {
220     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
221                            TracingServiceImpl::kDefaultShmPageSize);
222   }
223   return std::make_tuple(shm_size, page_size);
224 }
225 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)226 bool NameMatchesFilter(const std::string& name,
227                        const std::vector<std::string>& name_filter,
228                        const std::vector<std::string>& name_regex_filter) {
229   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
230   if (!filter_is_set)
231     return true;
232   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
233                                   name) != name_filter.end();
234   bool filter_regex_matches =
235       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
236                    [&](const std::string& regex) {
237                      return std::regex_match(
238                          name, std::regex(regex, std::regex::extended));
239                    }) != name_regex_filter.end();
240   return filter_matches || filter_regex_matches;
241 }
242 
243 // Used when:
244 // 1. TraceConfig.write_into_file == true and output_path is not empty.
245 // 2. Calling SaveTraceForBugreport(), from perfetto --save-for-bugreport.
CreateTraceFile(const std::string & path,bool overwrite)246 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
247 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
248     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
249   // This is NOT trying to preserve any security property, SELinux does that.
250   // It just improves the actionability of the error when people try to save the
251   // trace in a location that is not SELinux-allowed (a generic "permission
252   // denied" vs "don't put it here, put it there").
253   if (!base::StartsWith(path, kTraceDirBasePath)) {
254     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
255                   path.c_str(), kTraceDirBasePath);
256     return base::ScopedFile();
257   }
258 #endif
259   // O_CREAT | O_EXCL will fail if the file exists already.
260   const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
261   auto fd = base::OpenFile(path, flags, 0600);
262   if (fd) {
263 #if defined(PERFETTO_HAS_CHMOD)
264     // Passing 0644 directly above won't work because of umask.
265     PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
266 #endif
267   } else {
268     PERFETTO_PLOG("Failed to create %s", path.c_str());
269   }
270   return fd;
271 }
272 
GetBugreportTmpPath()273 std::string GetBugreportTmpPath() {
274   return GetBugreportPath() + ".tmp";
275 }
276 
ShouldLogEvent(const TraceConfig & cfg)277 bool ShouldLogEvent(const TraceConfig& cfg) {
278   switch (cfg.statsd_logging()) {
279     case TraceConfig::STATSD_LOGGING_ENABLED:
280       return true;
281     case TraceConfig::STATSD_LOGGING_DISABLED:
282       return false;
283     case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
284       // For backward compatibility with older versions of perfetto_cmd.
285       return cfg.enable_extra_guardrails();
286   }
287   PERFETTO_FATAL("For GCC");
288 }
289 
290 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
291 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)292 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
293                                size_t size,
294                                size_t max_slice_size,
295                                perfetto::TracePacket* packet) {
296   if (size <= max_slice_size) {
297     packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
298     return;
299   }
300   uint8_t* src_ptr = data.get();
301   for (size_t size_left = size; size_left > 0;) {
302     const size_t slice_size = std::min(size_left, max_slice_size);
303 
304     Slice slice = Slice::Allocate(slice_size);
305     memcpy(slice.own_data(), src_ptr, slice_size);
306     packet->AddSlice(std::move(slice));
307 
308     src_ptr += slice_size;
309     size_left -= slice_size;
310   }
311 }
312 
313 }  // namespace
314 
315 // These constants instead are defined in the header because are used by tests.
316 constexpr size_t TracingServiceImpl::kDefaultShmSize;
317 constexpr size_t TracingServiceImpl::kDefaultShmPageSize;
318 
319 constexpr size_t TracingServiceImpl::kMaxShmSize;
320 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
321 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
322 
GetBugreportPath()323 std::string GetBugreportPath() {
324 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
325     PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
326   return kAndroidProductionBugreportTracePath;
327 #else
328   // Only for tests, SaveTraceForBugreport is not used on other OSes.
329   return base::GetSysTempDir() + "/bugreport.pftrace";
330 #endif
331 }
332 
333 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)334 std::unique_ptr<TracingService> TracingService::CreateInstance(
335     std::unique_ptr<SharedMemory::Factory> shm_factory,
336     base::TaskRunner* task_runner) {
337   return std::unique_ptr<TracingService>(
338       new TracingServiceImpl(std::move(shm_factory), task_runner));
339 }
340 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)341 TracingServiceImpl::TracingServiceImpl(
342     std::unique_ptr<SharedMemory::Factory> shm_factory,
343     base::TaskRunner* task_runner)
344     : task_runner_(task_runner),
345       shm_factory_(std::move(shm_factory)),
346       uid_(base::GetCurrentUserId()),
347       buffer_ids_(kMaxTraceBufferID),
348       trigger_probability_rand_(
349           static_cast<uint32_t>(base::GetWallTimeNs().count())),
350       weak_ptr_factory_(this) {
351   PERFETTO_DCHECK(task_runner_);
352 }
353 
~TracingServiceImpl()354 TracingServiceImpl::~TracingServiceImpl() {
355   // TODO(fmayer): handle teardown of all Producer.
356 }
357 
358 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,pid_t pid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)359 TracingServiceImpl::ConnectProducer(Producer* producer,
360                                     uid_t uid,
361                                     pid_t pid,
362                                     const std::string& producer_name,
363                                     size_t shared_memory_size_hint_bytes,
364                                     bool in_process,
365                                     ProducerSMBScrapingMode smb_scraping_mode,
366                                     size_t shared_memory_page_size_hint_bytes,
367                                     std::unique_ptr<SharedMemory> shm,
368                                     const std::string& sdk_version) {
369   PERFETTO_DCHECK_THREAD(thread_checker_);
370 
371   if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
372     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
373                   static_cast<unsigned long>(uid));
374     return nullptr;
375   }
376 
377   if (producers_.size() >= kMaxProducerID) {
378     PERFETTO_DFATAL("Too many producers.");
379     return nullptr;
380   }
381   const ProducerID id = GetNextProducerID();
382   PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
383                 static_cast<int>(uid));
384   bool smb_scraping_enabled = smb_scraping_enabled_;
385   switch (smb_scraping_mode) {
386     case ProducerSMBScrapingMode::kDefault:
387       break;
388     case ProducerSMBScrapingMode::kEnabled:
389       smb_scraping_enabled = true;
390       break;
391     case ProducerSMBScrapingMode::kDisabled:
392       smb_scraping_enabled = false;
393       break;
394   }
395 
396   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
397       id, uid, pid, this, task_runner_, producer, producer_name, sdk_version,
398       in_process, smb_scraping_enabled));
399   auto it_and_inserted = producers_.emplace(id, endpoint.get());
400   PERFETTO_DCHECK(it_and_inserted.second);
401   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
402   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
403 
404   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
405   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
406   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
407   task_runner_->PostTask([weak_ptr] {
408     if (weak_ptr)
409       weak_ptr->producer_->OnConnect();
410   });
411 
412   if (shm) {
413     // The producer supplied an SMB. This is used only by Chrome; in the most
414     // common cases the SMB is created by the service and passed via
415     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
416     // use it. The transport layer has to verify the integrity of the SMB (e.g.
417     // ensure that the producer can't resize if after the fact).
418     size_t shm_size, page_size;
419     std::tie(shm_size, page_size) =
420         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
421     if (shm_size == shm->size() &&
422         page_size == endpoint->shmem_page_size_hint_bytes_) {
423       PERFETTO_DLOG(
424           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
425           shm_size / 1024, endpoint->name_.c_str());
426       endpoint->SetupSharedMemory(std::move(shm), page_size,
427                                   /*provided_by_producer=*/true);
428     } else {
429       PERFETTO_LOG(
430           "Discarding incorrectly sized producer-provided SMB for producer "
431           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
432           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
433           "%zu B page size",
434           endpoint->name_.c_str(), shm->size(),
435           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
436       shm.reset();
437     }
438   }
439 
440   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
441 }
442 
DisconnectProducer(ProducerID id)443 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
444   PERFETTO_DCHECK_THREAD(thread_checker_);
445   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
446   PERFETTO_DCHECK(producers_.count(id));
447 
448   // Scrape remaining chunks for this producer to ensure we don't lose data.
449   if (auto* producer = GetProducer(id)) {
450     for (auto& session_id_and_session : tracing_sessions_)
451       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
452   }
453 
454   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
455     auto next = it;
456     next++;
457     if (it->second.producer_id == id)
458       UnregisterDataSource(id, it->second.descriptor.name());
459     it = next;
460   }
461 
462   producers_.erase(id);
463   UpdateMemoryGuardrail();
464 }
465 
GetProducer(ProducerID id) const466 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
467     ProducerID id) const {
468   PERFETTO_DCHECK_THREAD(thread_checker_);
469   auto it = producers_.find(id);
470   if (it == producers_.end())
471     return nullptr;
472   return it->second;
473 }
474 
475 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)476 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
477   PERFETTO_DCHECK_THREAD(thread_checker_);
478   PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
479                 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
480   std::unique_ptr<ConsumerEndpointImpl> endpoint(
481       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
482   auto it_and_inserted = consumers_.emplace(endpoint.get());
483   PERFETTO_DCHECK(it_and_inserted.second);
484   // Consumer might go away before we're able to send the connect notification,
485   // if that is the case just bail out.
486   auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
487   task_runner_->PostTask([weak_ptr] {
488     if (weak_ptr)
489       weak_ptr->consumer_->OnConnect();
490   });
491   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
492 }
493 
DisconnectConsumer(ConsumerEndpointImpl * consumer)494 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
495   PERFETTO_DCHECK_THREAD(thread_checker_);
496   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
497   PERFETTO_DCHECK(consumers_.count(consumer));
498 
499   // TODO(primiano) : Check that this is safe (what happens if there are
500   // ReadBuffers() calls posted in the meantime? They need to become noop).
501   if (consumer->tracing_session_id_)
502     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
503   consumers_.erase(consumer);
504 
505   // At this point no more pointers to |consumer| should be around.
506   PERFETTO_DCHECK(!std::any_of(
507       tracing_sessions_.begin(), tracing_sessions_.end(),
508       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
509         return kv.second.consumer_maybe_null == consumer;
510       }));
511 }
512 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)513 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
514                                         const std::string& key) {
515   PERFETTO_DCHECK_THREAD(thread_checker_);
516   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
517   PERFETTO_DCHECK(consumers_.count(consumer));
518 
519   TracingSessionID tsid = consumer->tracing_session_id_;
520   TracingSession* tracing_session;
521   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
522     return false;
523 
524   if (GetDetachedSession(consumer->uid_, key)) {
525     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
526                   key.c_str());
527     return false;
528   }
529 
530   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
531   tracing_session->consumer_maybe_null = nullptr;
532   tracing_session->detach_key = key;
533   consumer->tracing_session_id_ = 0;
534   return true;
535 }
536 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)537 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
538                                         const std::string& key) {
539   PERFETTO_DCHECK_THREAD(thread_checker_);
540   PERFETTO_DLOG("Consumer %p attaching to session %s",
541                 reinterpret_cast<void*>(consumer), key.c_str());
542   PERFETTO_DCHECK(consumers_.count(consumer));
543 
544   if (consumer->tracing_session_id_) {
545     PERFETTO_ELOG(
546         "Cannot reattach consumer to session %s"
547         " while it already attached tracing session ID %" PRIu64,
548         key.c_str(), consumer->tracing_session_id_);
549     return false;
550   }
551 
552   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
553   if (!tracing_session) {
554     PERFETTO_ELOG(
555         "Failed to attach consumer, session '%s' not found for uid %d",
556         key.c_str(), static_cast<int>(consumer->uid_));
557     return false;
558   }
559 
560   consumer->tracing_session_id_ = tracing_session->id;
561   tracing_session->consumer_maybe_null = consumer;
562   tracing_session->detach_key.clear();
563   return true;
564 }
565 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)566 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
567                                                const TraceConfig& cfg,
568                                                base::ScopedFile fd) {
569   PERFETTO_DCHECK_THREAD(thread_checker_);
570   PERFETTO_DLOG("Enabling tracing for consumer %p",
571                 reinterpret_cast<void*>(consumer));
572   MaybeLogUploadEvent(cfg, PerfettoStatsdAtom::kTracedEnableTracing);
573   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
574     lockdown_mode_ = true;
575   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
576     lockdown_mode_ = false;
577 
578   // Scope |tracing_session| to this block to prevent accidental use of a null
579   // pointer later in this function.
580   {
581     TracingSession* tracing_session =
582         GetTracingSession(consumer->tracing_session_id_);
583     if (tracing_session) {
584       MaybeLogUploadEvent(
585           cfg, PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
586       return PERFETTO_SVC_ERR(
587           "A Consumer is trying to EnableTracing() but another tracing "
588           "session is already active (forgot a call to FreeBuffers() ?)");
589     }
590   }
591 
592   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
593                                        ? kGuardrailsMaxTracingDurationMillis
594                                        : kMaxTracingDurationMillis;
595   if (cfg.duration_ms() > max_duration_ms) {
596     MaybeLogUploadEvent(cfg,
597                         PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
598     return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
599                             "ms  > %" PRIu32 " ms)",
600                             cfg.duration_ms(), max_duration_ms);
601   }
602 
603   const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
604                                   TraceConfig::TriggerConfig::UNSPECIFIED;
605   if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
606                              cfg.trigger_config().trigger_timeout_ms() >
607                                  kGuardrailsMaxTracingDurationMillis)) {
608     MaybeLogUploadEvent(
609         cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
610     return PERFETTO_SVC_ERR(
611         "Traces with START_TRACING triggers must provide a positive "
612         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
613         cfg.trigger_config().trigger_timeout_ms());
614   }
615 
616   // This check has been introduced in May 2023 after finding b/274931668.
617   if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
618       TraceConfig::TriggerConfig::TriggerMode_MAX) {
619     MaybeLogUploadEvent(
620         cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
621     return PERFETTO_SVC_ERR(
622         "The trace config specified an invalid trigger_mode");
623   }
624 
625   if (has_trigger_config && cfg.duration_ms() != 0) {
626     MaybeLogUploadEvent(
627         cfg, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
628     return PERFETTO_SVC_ERR(
629         "duration_ms was set, this must not be set for traces with triggers.");
630   }
631 
632   if (cfg.trigger_config().trigger_mode() ==
633           TraceConfig::TriggerConfig::STOP_TRACING &&
634       cfg.write_into_file()) {
635     // We don't support this usecase because there are subtle assumptions which
636     // break around TracingServiceEvents and windowed sorting (i.e. if we don't
637     // drain the events in ReadBuffersIntoFile because we are waiting for
638     // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
639     // emitting them wildy out of order breaking windowed sorting in trace
640     // processor).
641     MaybeLogUploadEvent(
642         cfg, PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
643     return PERFETTO_SVC_ERR(
644         "Specifying trigger mode STOP_TRACING and write_into_file together is "
645         "unsupported");
646   }
647 
648   std::unordered_set<std::string> triggers;
649   for (const auto& trigger : cfg.trigger_config().triggers()) {
650     if (!triggers.insert(trigger.name()).second) {
651       MaybeLogUploadEvent(
652           cfg, PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
653       return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
654                               trigger.name().c_str());
655     }
656   }
657 
658   if (cfg.enable_extra_guardrails()) {
659     if (cfg.deferred_start()) {
660       MaybeLogUploadEvent(
661           cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
662       return PERFETTO_SVC_ERR(
663           "deferred_start=true is not supported in unsupervised traces");
664     }
665     uint64_t buf_size_sum = 0;
666     for (const auto& buf : cfg.buffers()) {
667       if (buf.size_kb() % 4 != 0) {
668         MaybeLogUploadEvent(
669             cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
670         return PERFETTO_SVC_ERR(
671             "buffers.size_kb must be a multiple of 4, got %" PRIu32,
672             buf.size_kb());
673       }
674       buf_size_sum += buf.size_kb();
675     }
676 
677     uint32_t max_tracing_buffer_size_kb =
678         std::max(kGuardrailsMaxTracingBufferSizeKb,
679                  cfg.guardrail_overrides().max_tracing_buffer_size_kb());
680     if (buf_size_sum > max_tracing_buffer_size_kb) {
681       MaybeLogUploadEvent(
682           cfg, PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
683       return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
684                               "kB  > %" PRIu32 " kB)",
685                               buf_size_sum, max_tracing_buffer_size_kb);
686     }
687   }
688 
689   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
690     MaybeLogUploadEvent(cfg,
691                         PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
692     return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
693                             cfg.buffers_size());
694   }
695   // Check that the config specifies all buffers for its data sources. This
696   // is also checked in SetupDataSource, but it is simpler to return a proper
697   // error to the consumer from here (and there will be less state to undo).
698   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
699     size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
700     size_t target_buffer = cfg_data_source.config().target_buffer();
701     if (target_buffer >= num_buffers) {
702       MaybeLogUploadEvent(
703           cfg, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
704       return PERFETTO_SVC_ERR(
705           "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
706           "%zu)",
707           cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
708     }
709   }
710 
711   if (!cfg.unique_session_name().empty()) {
712     const std::string& name = cfg.unique_session_name();
713     for (auto& kv : tracing_sessions_) {
714       if (kv.second.config.unique_session_name() == name) {
715         MaybeLogUploadEvent(
716             cfg, PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
717         static const char fmt[] =
718             "A trace with this unique session name (%s) already exists";
719         // This happens frequently, don't make it an "E"LOG.
720         PERFETTO_LOG(fmt, name.c_str());
721         return base::ErrStatus(fmt, name.c_str());
722       }
723     }
724   }
725 
726   if (cfg.enable_extra_guardrails()) {
727     // unique_session_name can be empty
728     const std::string& name = cfg.unique_session_name();
729     int64_t now_s = base::GetBootTimeS().count();
730 
731     // Remove any entries where the time limit has passed so this map doesn't
732     // grow indefinitely:
733     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
734     for (auto it = sessions.cbegin(); it != sessions.cend();) {
735       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
736         it = sessions.erase(it);
737       } else {
738         ++it;
739       }
740     }
741 
742     int64_t& previous_s = session_to_last_trace_s_[name];
743     if (previous_s == 0) {
744       previous_s = now_s;
745     } else {
746       MaybeLogUploadEvent(
747           cfg, PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
748       return PERFETTO_SVC_ERR(
749           "A trace with unique session name \"%s\" began less than %" PRId64
750           "s ago (%" PRId64 "s)",
751           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
752     }
753   }
754 
755   const int sessions_for_uid = static_cast<int>(std::count_if(
756       tracing_sessions_.begin(), tracing_sessions_.end(),
757       [consumer](const decltype(tracing_sessions_)::value_type& s) {
758         return s.second.consumer_uid == consumer->uid_;
759       }));
760 
761   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
762   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
763     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
764   }
765   if (sessions_for_uid >= per_uid_limit) {
766     MaybeLogUploadEvent(
767         cfg, PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
768     return PERFETTO_SVC_ERR(
769         "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
770         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
771   }
772 
773   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
774   // in a state where it stalls by design by having more TraceWriterImpl
775   // instances than free pages in the buffer. This is really a bug in
776   // trace_probes and the way it handles stalls in the shmem buffer.
777   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
778     MaybeLogUploadEvent(
779         cfg, PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
780     return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
781                             tracing_sessions_.size());
782   }
783 
784   // If the trace config provides a filter bytecode, setup the filter now.
785   // If the filter loading fails, abort the tracing session rather than running
786   // unfiltered.
787   std::unique_ptr<protozero::MessageFilter> trace_filter;
788   if (cfg.has_trace_filter()) {
789     const auto& filt = cfg.trace_filter();
790     const std::string& bytecode = filt.bytecode();
791     trace_filter.reset(new protozero::MessageFilter());
792     if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
793       MaybeLogUploadEvent(
794           cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
795       return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
796     }
797     // The filter is created using perfetto.protos.Trace as root message
798     // (because that makes it possible to play around with the `proto_filter`
799     // tool on actual traces). Here in the service, however, we deal with
800     // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
801     // The IPC client (or the write_into_filte logic in here) are responsible
802     // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
803     // the preamble is not there at ReadBuffer time. Hence we change the root of
804     // the filtering to start at the Trace.packet level.
805     uint32_t packet_field_id = TracePacket::kPacketFieldNumber;
806     if (!trace_filter->SetFilterRoot(&packet_field_id, 1)) {
807       MaybeLogUploadEvent(
808           cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
809       return PERFETTO_SVC_ERR("Failed to set filter root.");
810     }
811   }
812 
813   const TracingSessionID tsid = ++last_tracing_session_id_;
814   TracingSession* tracing_session =
815       &tracing_sessions_
816            .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
817                     std::forward_as_tuple(tsid, consumer, cfg, task_runner_))
818            .first->second;
819 
820   if (trace_filter)
821     tracing_session->trace_filter = std::move(trace_filter);
822 
823   if (cfg.write_into_file()) {
824     if (!fd ^ !cfg.output_path().empty()) {
825       tracing_sessions_.erase(tsid);
826       MaybeLogUploadEvent(
827           tracing_session->config,
828           PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
829       return PERFETTO_SVC_ERR(
830           "When write_into_file==true either a FD needs to be passed or "
831           "output_path must be populated (but not both)");
832     }
833     if (!cfg.output_path().empty()) {
834       fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
835       if (!fd) {
836         MaybeLogUploadEvent(
837             tracing_session->config,
838             PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
839         tracing_sessions_.erase(tsid);
840         return PERFETTO_SVC_ERR("Failed to create the trace file %s",
841                                 cfg.output_path().c_str());
842       }
843     }
844     tracing_session->write_into_file = std::move(fd);
845     uint32_t write_period_ms = cfg.file_write_period_ms();
846     if (write_period_ms == 0)
847       write_period_ms = kDefaultWriteIntoFilePeriodMs;
848     if (write_period_ms < min_write_period_ms_)
849       write_period_ms = min_write_period_ms_;
850     tracing_session->write_period_ms = write_period_ms;
851     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
852     tracing_session->bytes_written_into_file = 0;
853   }
854 
855   // Initialize the log buffers.
856   bool did_allocate_all_buffers = true;
857 
858   // Allocate the trace buffers. Also create a map to translate a consumer
859   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
860   // corresponding BufferID, which is a global ID namespace for the service and
861   // all producers.
862   size_t total_buf_size_kb = 0;
863   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
864   tracing_session->buffers_index.reserve(num_buffers);
865   for (size_t i = 0; i < num_buffers; i++) {
866     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
867     BufferID global_id = buffer_ids_.Allocate();
868     if (!global_id) {
869       did_allocate_all_buffers = false;  // We ran out of IDs.
870       break;
871     }
872     tracing_session->buffers_index.push_back(global_id);
873     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
874     total_buf_size_kb += buffer_cfg.size_kb();
875     TraceBuffer::OverwritePolicy policy =
876         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
877             ? TraceBuffer::kDiscard
878             : TraceBuffer::kOverwrite;
879     auto it_and_inserted = buffers_.emplace(
880         global_id, TraceBuffer::Create(buf_size_bytes, policy));
881     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
882     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
883     if (!trace_buffer) {
884       did_allocate_all_buffers = false;
885       break;
886     }
887   }
888 
889   UpdateMemoryGuardrail();
890 
891   // This can happen if either:
892   // - All the kMaxTraceBufferID slots are taken.
893   // - OOM, or, more relistically, we exhausted virtual memory.
894   // In any case, free all the previously allocated buffers and abort.
895   // TODO(fmayer): add a test to cover this case, this is quite subtle.
896   if (!did_allocate_all_buffers) {
897     for (BufferID global_id : tracing_session->buffers_index) {
898       buffer_ids_.Free(global_id);
899       buffers_.erase(global_id);
900     }
901     tracing_sessions_.erase(tsid);
902     MaybeLogUploadEvent(tracing_session->config,
903                         PerfettoStatsdAtom::kTracedEnableTracingOom);
904     return PERFETTO_SVC_ERR(
905         "Failed to allocate tracing buffers: OOM or too many buffers");
906   }
907 
908   consumer->tracing_session_id_ = tsid;
909 
910   // Setup the data sources on the producers without starting them.
911   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
912     // Scan all the registered data sources with a matching name.
913     auto range = data_sources_.equal_range(cfg_data_source.config().name());
914     for (auto it = range.first; it != range.second; it++) {
915       TraceConfig::ProducerConfig producer_config;
916       for (auto& config : cfg.producers()) {
917         if (GetProducer(it->second.producer_id)->name_ ==
918             config.producer_name()) {
919           producer_config = config;
920           break;
921         }
922       }
923       SetupDataSource(cfg_data_source, producer_config, it->second,
924                       tracing_session);
925     }
926   }
927 
928   bool has_start_trigger = false;
929   auto weak_this = weak_ptr_factory_.GetWeakPtr();
930   switch (cfg.trigger_config().trigger_mode()) {
931     case TraceConfig::TriggerConfig::UNSPECIFIED:
932       // no triggers are specified so this isn't a trace that is using triggers.
933       PERFETTO_DCHECK(!has_trigger_config);
934       break;
935     case TraceConfig::TriggerConfig::START_TRACING:
936       // For traces which use START_TRACE triggers we need to ensure that the
937       // tracing session will be cleaned up when it times out.
938       has_start_trigger = true;
939       task_runner_->PostDelayedTask(
940           [weak_this, tsid]() {
941             if (weak_this)
942               weak_this->OnStartTriggersTimeout(tsid);
943           },
944           cfg.trigger_config().trigger_timeout_ms());
945       break;
946     case TraceConfig::TriggerConfig::STOP_TRACING:
947       // Update the tracing_session's duration_ms to ensure that if no trigger
948       // is received the session will end and be cleaned up equal to the
949       // timeout.
950       //
951       // TODO(nuskos): Refactor this so that rather then modifying the config we
952       // have a field we look at on the tracing_session.
953       tracing_session->config.set_duration_ms(
954           cfg.trigger_config().trigger_timeout_ms());
955       break;
956 
957       // The case of unknown modes (coming from future versions of the service)
958       // is handled few lines above (search for TriggerMode_MAX).
959   }
960 
961   tracing_session->state = TracingSession::CONFIGURED;
962   PERFETTO_LOG(
963       "Configured tracing session %" PRIu64
964       ", #sources:%zu, duration:%d ms, #buffers:%d, total "
965       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
966       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
967       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
968       static_cast<unsigned int>(consumer->uid_),
969       cfg.unique_session_name().c_str());
970 
971   // Start the data sources, unless this is a case of early setup + fast
972   // triggering, either through TraceConfig.deferred_start or
973   // TraceConfig.trigger_config(). If both are specified which ever one occurs
974   // first will initiate the trace.
975   if (!cfg.deferred_start() && !has_start_trigger)
976     return StartTracing(tsid);
977 
978   return base::OkStatus();
979 }
980 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)981 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
982                                            const TraceConfig& updated_cfg) {
983   PERFETTO_DCHECK_THREAD(thread_checker_);
984   TracingSession* tracing_session =
985       GetTracingSession(consumer->tracing_session_id_);
986   PERFETTO_DCHECK(tracing_session);
987 
988   if ((tracing_session->state != TracingSession::STARTED) &&
989       (tracing_session->state != TracingSession::CONFIGURED)) {
990     PERFETTO_ELOG(
991         "ChangeTraceConfig() was called for a tracing session which isn't "
992         "running.");
993     return;
994   }
995 
996   // We only support updating producer_name_{,regex}_filter (and pass-through
997   // configs) for now; null out any changeable fields and make sure the rest are
998   // identical.
999   TraceConfig new_config_copy(updated_cfg);
1000   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1001     ds_cfg.clear_producer_name_filter();
1002     ds_cfg.clear_producer_name_regex_filter();
1003   }
1004 
1005   TraceConfig current_config_copy(tracing_session->config);
1006   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1007     ds_cfg.clear_producer_name_filter();
1008     ds_cfg.clear_producer_name_regex_filter();
1009   }
1010 
1011   if (new_config_copy != current_config_copy) {
1012     PERFETTO_LOG(
1013         "ChangeTraceConfig() was called with a config containing unsupported "
1014         "changes; only adding to the producer_name_{,regex}_filter is "
1015         "currently supported and will have an effect.");
1016   }
1017 
1018   for (TraceConfig::DataSource& cfg_data_source :
1019        *tracing_session->config.mutable_data_sources()) {
1020     // Find the updated producer_filter in the new config.
1021     std::vector<std::string> new_producer_name_filter;
1022     std::vector<std::string> new_producer_name_regex_filter;
1023     bool found_data_source = false;
1024     for (const auto& it : updated_cfg.data_sources()) {
1025       if (cfg_data_source.config().name() == it.config().name()) {
1026         new_producer_name_filter = it.producer_name_filter();
1027         new_producer_name_regex_filter = it.producer_name_regex_filter();
1028         found_data_source = true;
1029         break;
1030       }
1031     }
1032 
1033     // Bail out if data source not present in the new config.
1034     if (!found_data_source) {
1035       PERFETTO_ELOG(
1036           "ChangeTraceConfig() called without a current data source also "
1037           "present in the new config: %s",
1038           cfg_data_source.config().name().c_str());
1039       continue;
1040     }
1041 
1042     // TODO(oysteine): Just replacing the filter means that if
1043     // there are any filter entries which were present in the original config,
1044     // but removed from the config passed to ChangeTraceConfig, any matching
1045     // producers will keep producing but newly added producers after this
1046     // point will never start.
1047     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1048     *cfg_data_source.mutable_producer_name_regex_filter() =
1049         new_producer_name_regex_filter;
1050 
1051     // Scan all the registered data sources with a matching name.
1052     auto range = data_sources_.equal_range(cfg_data_source.config().name());
1053     for (auto it = range.first; it != range.second; it++) {
1054       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1055       PERFETTO_DCHECK(producer);
1056 
1057       // Check if the producer name of this data source is present
1058       // in the name filters. We currently only support new filters, not
1059       // removing old ones.
1060       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1061                              new_producer_name_regex_filter)) {
1062         continue;
1063       }
1064 
1065       bool already_setup = false;
1066       auto& ds_instances = tracing_session->data_source_instances;
1067       for (auto instance_it = ds_instances.begin();
1068            instance_it != ds_instances.end(); ++instance_it) {
1069         if (instance_it->first == it->second.producer_id &&
1070             instance_it->second.data_source_name ==
1071                 cfg_data_source.config().name()) {
1072           already_setup = true;
1073           break;
1074         }
1075       }
1076 
1077       if (already_setup)
1078         continue;
1079 
1080       // If it wasn't previously setup, set it up now.
1081       // (The per-producer config is optional).
1082       TraceConfig::ProducerConfig producer_config;
1083       for (auto& config : tracing_session->config.producers()) {
1084         if (producer->name_ == config.producer_name()) {
1085           producer_config = config;
1086           break;
1087         }
1088       }
1089 
1090       DataSourceInstance* ds_inst = SetupDataSource(
1091           cfg_data_source, producer_config, it->second, tracing_session);
1092 
1093       if (ds_inst && tracing_session->state == TracingSession::STARTED)
1094         StartDataSourceInstance(producer, tracing_session, ds_inst);
1095     }
1096   }
1097 }
1098 
StartTracing(TracingSessionID tsid)1099 base::Status TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1100   PERFETTO_DCHECK_THREAD(thread_checker_);
1101 
1102   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1103   TracingSession* tracing_session = GetTracingSession(tsid);
1104   if (!tracing_session) {
1105     return PERFETTO_SVC_ERR(
1106         "StartTracing() failed, invalid session ID %" PRIu64, tsid);
1107   }
1108 
1109   MaybeLogUploadEvent(tracing_session->config,
1110                       PerfettoStatsdAtom::kTracedStartTracing);
1111 
1112   if (tracing_session->state != TracingSession::CONFIGURED) {
1113     MaybeLogUploadEvent(
1114         tracing_session->config,
1115         PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1116     return PERFETTO_SVC_ERR("StartTracing() failed, invalid session state: %d",
1117                             tracing_session->state);
1118   }
1119 
1120   tracing_session->state = TracingSession::STARTED;
1121 
1122   // We store the start of trace snapshot separately as it's important to make
1123   // sure we can interpret all the data in the trace and storing it in the ring
1124   // buffer means it could be overwritten by a later snapshot.
1125   if (!tracing_session->config.builtin_data_sources()
1126            .disable_clock_snapshotting()) {
1127     SnapshotClocks(&tracing_session->initial_clock_snapshot);
1128   }
1129 
1130   // We don't snapshot the clocks here because we just did this above.
1131   SnapshotLifecyleEvent(
1132       tracing_session,
1133       protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1134       false /* snapshot_clocks */);
1135 
1136   // Periodically snapshot clocks, stats, sync markers while the trace is
1137   // active. The snapshots are emitted on the future ReadBuffers() calls, which
1138   // means that:
1139   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
1140   //      write snapshots periodically into the trace.
1141   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1142   //      snapshot into the trace. For clock snapshots, we keep track of the
1143   //      snapshot recorded at the beginning of the session
1144   //      (initial_clock_snapshot above), as well as the most recent sampled
1145   //      snapshots that showed significant new drift between different clocks.
1146   //      The latter clock snapshots are sampled periodically and at lifecycle
1147   //      events.
1148   base::PeriodicTask::Args snapshot_task_args;
1149   snapshot_task_args.start_first_task_immediately = true;
1150   snapshot_task_args.use_suspend_aware_timer =
1151       tracing_session->config.builtin_data_sources()
1152           .prefer_suspend_clock_for_snapshot();
1153   snapshot_task_args.task = [weak_this, tsid] {
1154     if (weak_this)
1155       weak_this->PeriodicSnapshotTask(tsid);
1156   };
1157   snapshot_task_args.period_ms =
1158       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1159   if (!snapshot_task_args.period_ms)
1160     snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1161   tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1162 
1163   // Trigger delayed task if the trace is time limited.
1164   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1165   if (trace_duration_ms > 0) {
1166     task_runner_->PostDelayedTask(
1167         [weak_this, tsid] {
1168           // Skip entirely the flush if the trace session doesn't exist anymore.
1169           // This is to prevent misleading error messages to be logged.
1170           if (!weak_this)
1171             return;
1172           auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
1173           if (!tracing_session_ptr)
1174             return;
1175           // If this trace was using STOP_TRACING triggers and we've seen
1176           // one, then the trigger overrides the normal timeout. In this
1177           // case we just return and let the other task clean up this trace.
1178           if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
1179                   TraceConfig::TriggerConfig::STOP_TRACING &&
1180               !tracing_session_ptr->received_triggers.empty())
1181             return;
1182           // In all other cases (START_TRACING or no triggers) we flush
1183           // after |trace_duration_ms| unconditionally.
1184           weak_this->FlushAndDisableTracing(tsid);
1185         },
1186         trace_duration_ms);
1187   }
1188 
1189   // Start the periodic drain tasks if we should to save the trace into a file.
1190   if (tracing_session->config.write_into_file()) {
1191     task_runner_->PostDelayedTask(
1192         [weak_this, tsid] {
1193           if (weak_this)
1194             weak_this->ReadBuffersIntoFile(tsid);
1195         },
1196         tracing_session->delay_to_next_write_period_ms());
1197   }
1198 
1199   // Start the periodic flush tasks if the config specified a flush period.
1200   if (tracing_session->config.flush_period_ms())
1201     PeriodicFlushTask(tsid, /*post_next_only=*/true);
1202 
1203   // Start the periodic incremental state clear tasks if the config specified a
1204   // period.
1205   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1206     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1207   }
1208 
1209   for (auto& kv : tracing_session->data_source_instances) {
1210     ProducerID producer_id = kv.first;
1211     DataSourceInstance& data_source = kv.second;
1212     ProducerEndpointImpl* producer = GetProducer(producer_id);
1213     if (!producer) {
1214       PERFETTO_DFATAL("Producer does not exist.");
1215       continue;
1216     }
1217     StartDataSourceInstance(producer, tracing_session, &data_source);
1218   }
1219 
1220   MaybeNotifyAllDataSourcesStarted(tracing_session);
1221   return base::OkStatus();
1222 }
1223 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1224 void TracingServiceImpl::StartDataSourceInstance(
1225     ProducerEndpointImpl* producer,
1226     TracingSession* tracing_session,
1227     TracingServiceImpl::DataSourceInstance* instance) {
1228   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1229   if (instance->will_notify_on_start) {
1230     instance->state = DataSourceInstance::STARTING;
1231   } else {
1232     instance->state = DataSourceInstance::STARTED;
1233   }
1234   if (tracing_session->consumer_maybe_null) {
1235     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1236         *producer, *instance);
1237   }
1238   producer->StartDataSource(instance->instance_id, instance->config);
1239 
1240   // If all data sources are started, notify the consumer.
1241   if (instance->state == DataSourceInstance::STARTED)
1242     MaybeNotifyAllDataSourcesStarted(tracing_session);
1243 }
1244 
1245 // DisableTracing just stops the data sources but doesn't free up any buffer.
1246 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1247 // and then drain the buffers. The actual teardown of the TracingSession happens
1248 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1249 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1250                                         bool disable_immediately) {
1251   PERFETTO_DCHECK_THREAD(thread_checker_);
1252   TracingSession* tracing_session = GetTracingSession(tsid);
1253   if (!tracing_session) {
1254     // Can happen if the consumer calls this before EnableTracing() or after
1255     // FreeBuffers().
1256     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1257     return;
1258   }
1259 
1260   MaybeLogUploadEvent(tracing_session->config,
1261                       PerfettoStatsdAtom::kTracedDisableTracing);
1262 
1263   switch (tracing_session->state) {
1264     // Spurious call to DisableTracing() while already disabled, nothing to do.
1265     case TracingSession::DISABLED:
1266       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1267       return;
1268 
1269     // This is either:
1270     // A) The case of a graceful DisableTracing() call followed by a call to
1271     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1272     //    to forcefully transition in the disabled state without waiting for the
1273     //    outstanding acks because the buffers are going to be destroyed soon.
1274     // B) A spurious call, iff |disable_immediately| == false, in which case
1275     //    there is nothing to do.
1276     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1277       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1278       if (disable_immediately)
1279         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1280       return;
1281 
1282     // Continues below.
1283     case TracingSession::CONFIGURED:
1284       // If the session didn't even start there is no need to orchestrate a
1285       // graceful stop of data sources.
1286       disable_immediately = true;
1287       break;
1288 
1289     // This is the nominal case, continues below.
1290     case TracingSession::STARTED:
1291       break;
1292   }
1293 
1294   for (auto& data_source_inst : tracing_session->data_source_instances) {
1295     const ProducerID producer_id = data_source_inst.first;
1296     DataSourceInstance& instance = data_source_inst.second;
1297     ProducerEndpointImpl* producer = GetProducer(producer_id);
1298     PERFETTO_DCHECK(producer);
1299     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1300                     instance.state == DataSourceInstance::STARTING ||
1301                     instance.state == DataSourceInstance::STARTED);
1302     StopDataSourceInstance(producer, tracing_session, &instance,
1303                            disable_immediately);
1304   }
1305 
1306   // If the periodic task is running, we can stop the periodic snapshot timer
1307   // here instead of waiting until FreeBuffers to prevent useless snapshots
1308   // which won't be read.
1309   tracing_session->snapshot_periodic_task.Reset();
1310 
1311   // Either this request is flagged with |disable_immediately| or there are no
1312   // data sources that are requesting a final handshake. In both cases just mark
1313   // the session as disabled immediately, notify the consumer and flush the
1314   // trace file (if used).
1315   if (tracing_session->AllDataSourceInstancesStopped())
1316     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1317 
1318   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1319   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1320   task_runner_->PostDelayedTask(
1321       [weak_this, tsid] {
1322         if (weak_this)
1323           weak_this->OnDisableTracingTimeout(tsid);
1324       },
1325       tracing_session->data_source_stop_timeout_ms());
1326 
1327   // Deliberately NOT removing the session from |tracing_session_|, it's still
1328   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1329 }
1330 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1331 void TracingServiceImpl::NotifyDataSourceStarted(
1332     ProducerID producer_id,
1333     DataSourceInstanceID instance_id) {
1334   PERFETTO_DCHECK_THREAD(thread_checker_);
1335   for (auto& kv : tracing_sessions_) {
1336     TracingSession& tracing_session = kv.second;
1337     DataSourceInstance* instance =
1338         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1339 
1340     if (!instance)
1341       continue;
1342 
1343     // If the tracing session was already stopped, ignore this notification.
1344     if (tracing_session.state != TracingSession::STARTED)
1345       continue;
1346 
1347     if (instance->state != DataSourceInstance::STARTING) {
1348       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1349                     instance->state);
1350       continue;
1351     }
1352 
1353     instance->state = DataSourceInstance::STARTED;
1354 
1355     ProducerEndpointImpl* producer = GetProducer(producer_id);
1356     PERFETTO_DCHECK(producer);
1357     if (tracing_session.consumer_maybe_null) {
1358       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1359           *producer, *instance);
1360     }
1361 
1362     // If all data sources are started, notify the consumer.
1363     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1364   }  // for (tracing_session)
1365 }
1366 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1367 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1368     TracingSession* tracing_session) {
1369   if (!tracing_session->consumer_maybe_null)
1370     return;
1371 
1372   if (!tracing_session->AllDataSourceInstancesStarted())
1373     return;
1374 
1375   // In some rare cases, we can get in this state more than once. Consider the
1376   // following scenario: 3 data sources are registered -> trace starts ->
1377   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1378   // Imagine now that a 4th data source registers while the trace is ongoing.
1379   // This would hit the AllDataSourceInstancesStarted() condition again.
1380   // In this case, however, we don't want to re-notify the consumer again.
1381   // That would be unexpected (even if, perhaps, technically correct) and
1382   // trigger bugs in the consumer.
1383   if (tracing_session->did_notify_all_data_source_started)
1384     return;
1385 
1386   PERFETTO_DLOG("All data sources started");
1387 
1388   SnapshotLifecyleEvent(
1389       tracing_session,
1390       protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1391       true /* snapshot_clocks */);
1392 
1393   tracing_session->did_notify_all_data_source_started = true;
1394   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1395 }
1396 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1397 void TracingServiceImpl::NotifyDataSourceStopped(
1398     ProducerID producer_id,
1399     DataSourceInstanceID instance_id) {
1400   PERFETTO_DCHECK_THREAD(thread_checker_);
1401   for (auto& kv : tracing_sessions_) {
1402     TracingSession& tracing_session = kv.second;
1403     DataSourceInstance* instance =
1404         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1405 
1406     if (!instance)
1407       continue;
1408 
1409     if (instance->state != DataSourceInstance::STOPPING) {
1410       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1411                     instance->state);
1412       continue;
1413     }
1414 
1415     instance->state = DataSourceInstance::STOPPED;
1416 
1417     ProducerEndpointImpl* producer = GetProducer(producer_id);
1418     PERFETTO_DCHECK(producer);
1419     if (tracing_session.consumer_maybe_null) {
1420       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1421           *producer, *instance);
1422     }
1423 
1424     if (!tracing_session.AllDataSourceInstancesStopped())
1425       continue;
1426 
1427     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1428       continue;
1429 
1430     // All data sources acked the termination.
1431     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1432   }  // for (tracing_session)
1433 }
1434 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1435 void TracingServiceImpl::ActivateTriggers(
1436     ProducerID producer_id,
1437     const std::vector<std::string>& triggers) {
1438   PERFETTO_DCHECK_THREAD(thread_checker_);
1439   auto* producer = GetProducer(producer_id);
1440   PERFETTO_DCHECK(producer);
1441 
1442   int64_t now_ns = base::GetBootTimeNs().count();
1443   for (const auto& trigger_name : triggers) {
1444     PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1445                   trigger_name.c_str());
1446     base::Hash hash;
1447     hash.Update(trigger_name.c_str(), trigger_name.size());
1448     std::string triggered_session_name;
1449     base::Uuid triggered_session_uuid;
1450     TracingSessionID triggered_session_id = 0;
1451     int trigger_mode = 0;
1452 
1453     uint64_t trigger_name_hash = hash.digest();
1454     size_t count_in_window =
1455         PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1456 
1457     bool trigger_matched = false;
1458     bool trigger_activated = false;
1459     for (auto& id_and_tracing_session : tracing_sessions_) {
1460       auto& tracing_session = id_and_tracing_session.second;
1461       TracingSessionID tsid = id_and_tracing_session.first;
1462       auto iter = std::find_if(
1463           tracing_session.config.trigger_config().triggers().begin(),
1464           tracing_session.config.trigger_config().triggers().end(),
1465           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1466             return trigger.name() == trigger_name;
1467           });
1468       if (iter == tracing_session.config.trigger_config().triggers().end()) {
1469         continue;
1470       }
1471 
1472       // If this trigger requires a certain producer to have sent it
1473       // (non-empty producer_name()) ensure the producer who sent this trigger
1474       // matches.
1475       if (!iter->producer_name_regex().empty() &&
1476           !std::regex_match(
1477               producer->name_,
1478               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1479         continue;
1480       }
1481 
1482       // Use a random number between 0 and 1 to check if we should allow this
1483       // trigger through or not.
1484       double trigger_rnd =
1485           trigger_rnd_override_for_testing_ > 0
1486               ? trigger_rnd_override_for_testing_
1487               : trigger_probability_dist_(trigger_probability_rand_);
1488       PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1489       if (trigger_rnd < iter->skip_probability()) {
1490         MaybeLogTriggerEvent(tracing_session.config,
1491                              PerfettoTriggerAtom::kTracedLimitProbability,
1492                              trigger_name);
1493         continue;
1494       }
1495 
1496       // If we already triggered more times than the limit, silently ignore
1497       // this trigger.
1498       if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1499         MaybeLogTriggerEvent(tracing_session.config,
1500                              PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1501                              trigger_name);
1502         continue;
1503       }
1504       trigger_matched = true;
1505       triggered_session_id = tracing_session.id;
1506       triggered_session_name = tracing_session.config.unique_session_name();
1507       triggered_session_uuid.set_lsb_msb(
1508           tracing_session.config.trace_uuid_lsb(),
1509           tracing_session.config.trace_uuid_msb());
1510       trigger_mode = static_cast<int>(
1511           tracing_session.config.trigger_config().trigger_mode());
1512 
1513       const bool triggers_already_received =
1514           !tracing_session.received_triggers.empty();
1515       tracing_session.received_triggers.push_back(
1516           {static_cast<uint64_t>(now_ns), iter->name(), producer->name_,
1517            producer->uid_});
1518       auto weak_this = weak_ptr_factory_.GetWeakPtr();
1519       switch (tracing_session.config.trigger_config().trigger_mode()) {
1520         case TraceConfig::TriggerConfig::START_TRACING:
1521           // If the session has already been triggered and moved past
1522           // CONFIGURED then we don't need to repeat StartTracing. This would
1523           // work fine (StartTracing would return false) but would add error
1524           // logs.
1525           if (tracing_session.state != TracingSession::CONFIGURED)
1526             break;
1527 
1528           trigger_activated = true;
1529           MaybeLogUploadEvent(tracing_session.config,
1530                               PerfettoStatsdAtom::kTracedTriggerStartTracing,
1531                               iter->name());
1532 
1533           // We override the trace duration to be the trigger's requested
1534           // value, this ensures that the trace will end after this amount
1535           // of time has passed.
1536           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1537           StartTracing(tsid);
1538           break;
1539         case TraceConfig::TriggerConfig::STOP_TRACING:
1540           // Only stop the trace once to avoid confusing log messages. I.E.
1541           // when we've already hit the first trigger we've already Posted the
1542           // task to FlushAndDisable. So all future triggers will just break
1543           // out.
1544           if (triggers_already_received)
1545             break;
1546 
1547           trigger_activated = true;
1548           MaybeLogUploadEvent(tracing_session.config,
1549                               PerfettoStatsdAtom::kTracedTriggerStopTracing,
1550                               iter->name());
1551 
1552           // Now that we've seen a trigger we need to stop, flush, and disable
1553           // this session after the configured |stop_delay_ms|.
1554           task_runner_->PostDelayedTask(
1555               [weak_this, tsid] {
1556                 // Skip entirely the flush if the trace session doesn't exist
1557                 // anymore. This is to prevent misleading error messages to be
1558                 // logged.
1559                 if (weak_this && weak_this->GetTracingSession(tsid))
1560                   weak_this->FlushAndDisableTracing(tsid);
1561               },
1562               // If this trigger is zero this will immediately executable and
1563               // will happen shortly.
1564               iter->stop_delay_ms());
1565           break;
1566         case TraceConfig::TriggerConfig::UNSPECIFIED:
1567           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1568           break;
1569       }
1570     }  // for (.. : tracing_sessions_)
1571 
1572     if (trigger_matched) {
1573       trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1574     }
1575 
1576     if (trigger_activated) {
1577       // Log only the trigger that actually caused a trace stop/start, don't log
1578       // the follow-up ones, even if they matched.
1579       PERFETTO_LOG(
1580           "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1581           "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1582           trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1583           triggered_session_uuid.ToPrettyString().c_str(),
1584           triggered_session_id);
1585     }
1586   }  // for (trigger_name : triggers)
1587 }
1588 
1589 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
1590 // conditions all data sources should have acked the stop and this will early
1591 // out.
OnDisableTracingTimeout(TracingSessionID tsid)1592 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1593   PERFETTO_DCHECK_THREAD(thread_checker_);
1594   TracingSession* tracing_session = GetTracingSession(tsid);
1595   if (!tracing_session ||
1596       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1597     return;  // Tracing session was successfully disabled.
1598   }
1599 
1600   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1601                 tsid);
1602   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1603   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1604 }
1605 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1606 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1607     TracingSession* tracing_session) {
1608   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1609   for (auto& inst_kv : tracing_session->data_source_instances) {
1610     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1611       continue;
1612     inst_kv.second.state = DataSourceInstance::STOPPED;
1613     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1614     PERFETTO_DCHECK(producer);
1615     if (tracing_session->consumer_maybe_null) {
1616       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1617           *producer, inst_kv.second);
1618     }
1619   }
1620   tracing_session->state = TracingSession::DISABLED;
1621 
1622   // Scrape any remaining chunks that weren't flushed by the producers.
1623   for (auto& producer_id_and_producer : producers_)
1624     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1625 
1626   SnapshotLifecyleEvent(
1627       tracing_session,
1628       protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1629       true /* snapshot_clocks */);
1630 
1631   if (tracing_session->write_into_file) {
1632     tracing_session->write_period_ms = 0;
1633     ReadBuffersIntoFile(tracing_session->id);
1634   }
1635 
1636   if (tracing_session->on_disable_callback_for_bugreport) {
1637     std::move(tracing_session->on_disable_callback_for_bugreport)();
1638     tracing_session->on_disable_callback_for_bugreport = nullptr;
1639   }
1640 
1641   MaybeLogUploadEvent(tracing_session->config,
1642                       PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1643 
1644   if (tracing_session->consumer_maybe_null)
1645     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1646 }
1647 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1648 void TracingServiceImpl::Flush(TracingSessionID tsid,
1649                                uint32_t timeout_ms,
1650                                ConsumerEndpoint::FlushCallback callback) {
1651   PERFETTO_DCHECK_THREAD(thread_checker_);
1652   TracingSession* tracing_session = GetTracingSession(tsid);
1653   if (!tracing_session) {
1654     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1655     return;
1656   }
1657 
1658   if (!timeout_ms)
1659     timeout_ms = tracing_session->flush_timeout_ms();
1660 
1661   if (tracing_session->pending_flushes.size() > 1000) {
1662     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1663                   tracing_session->pending_flushes.size());
1664     callback(false);
1665     return;
1666   }
1667 
1668   if (tracing_session->state != TracingSession::STARTED) {
1669     PERFETTO_ELOG("Flush() called, but tracing has not been started");
1670     callback(false);
1671     return;
1672   }
1673 
1674   ++tracing_session->flushes_requested;
1675   FlushRequestID flush_request_id = ++last_flush_request_id_;
1676   PendingFlush& pending_flush =
1677       tracing_session->pending_flushes
1678           .emplace_hint(tracing_session->pending_flushes.end(),
1679                         flush_request_id, PendingFlush(std::move(callback)))
1680           ->second;
1681 
1682   // Send a flush request to each producer involved in the tracing session. In
1683   // order to issue a flush request we have to build a map of all data source
1684   // instance ids enabled for each producer.
1685   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1686   for (const auto& data_source_inst : tracing_session->data_source_instances) {
1687     const ProducerID producer_id = data_source_inst.first;
1688     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1689     flush_map[producer_id].push_back(ds_inst_id);
1690   }
1691 
1692   for (const auto& kv : flush_map) {
1693     ProducerID producer_id = kv.first;
1694     ProducerEndpointImpl* producer = GetProducer(producer_id);
1695     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1696     producer->Flush(flush_request_id, data_sources);
1697     pending_flush.producers.insert(producer_id);
1698   }
1699 
1700   // If there are no producers to flush (realistically this happens only in
1701   // some tests) fire OnFlushTimeout() straight away, without waiting.
1702   if (flush_map.empty())
1703     timeout_ms = 0;
1704 
1705   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1706   task_runner_->PostDelayedTask(
1707       [weak_this, tsid, flush_request_id] {
1708         if (weak_this)
1709           weak_this->OnFlushTimeout(tsid, flush_request_id);
1710       },
1711       timeout_ms);
1712 }
1713 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1714 void TracingServiceImpl::NotifyFlushDoneForProducer(
1715     ProducerID producer_id,
1716     FlushRequestID flush_request_id) {
1717   for (auto& kv : tracing_sessions_) {
1718     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1719     auto& pending_flushes = kv.second.pending_flushes;
1720     auto end_it = pending_flushes.upper_bound(flush_request_id);
1721     for (auto it = pending_flushes.begin(); it != end_it;) {
1722       PendingFlush& pending_flush = it->second;
1723       pending_flush.producers.erase(producer_id);
1724       if (pending_flush.producers.empty()) {
1725         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1726         TracingSessionID tsid = kv.first;
1727         auto callback = std::move(pending_flush.callback);
1728         task_runner_->PostTask([weak_this, tsid, callback]() {
1729           if (weak_this) {
1730             weak_this->CompleteFlush(tsid, std::move(callback),
1731                                      /*success=*/true);
1732           }
1733         });
1734         it = pending_flushes.erase(it);
1735       } else {
1736         it++;
1737       }
1738     }  // for (pending_flushes)
1739   }    // for (tracing_session)
1740 }
1741 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1742 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1743                                         FlushRequestID flush_request_id) {
1744   TracingSession* tracing_session = GetTracingSession(tsid);
1745   if (!tracing_session)
1746     return;
1747   auto it = tracing_session->pending_flushes.find(flush_request_id);
1748   if (it == tracing_session->pending_flushes.end())
1749     return;  // Nominal case: flush was completed and acked on time.
1750 
1751   // If there were no producers to flush, consider it a success.
1752   bool success = it->second.producers.empty();
1753   auto callback = std::move(it->second.callback);
1754   tracing_session->pending_flushes.erase(it);
1755   CompleteFlush(tsid, std::move(callback), success);
1756 }
1757 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1758 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1759                                        ConsumerEndpoint::FlushCallback callback,
1760                                        bool success) {
1761   TracingSession* tracing_session = GetTracingSession(tsid);
1762   if (!tracing_session) {
1763     callback(false);
1764     return;
1765   }
1766   // Producers may not have been able to flush all their data, even if they
1767   // indicated flush completion. If possible, also collect uncommitted chunks
1768   // to make sure we have everything they wrote so far.
1769   for (auto& producer_id_and_producer : producers_) {
1770     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1771   }
1772   SnapshotLifecyleEvent(
1773       tracing_session,
1774       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
1775       true /* snapshot_clocks */);
1776 
1777   tracing_session->flushes_succeeded += success ? 1 : 0;
1778   tracing_session->flushes_failed += success ? 0 : 1;
1779   callback(success);
1780 }
1781 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1782 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1783     TracingSession* tracing_session,
1784     ProducerEndpointImpl* producer) {
1785   if (!producer->smb_scraping_enabled_)
1786     return;
1787 
1788   // Can't copy chunks if we don't know about any trace writers.
1789   if (producer->writers_.empty())
1790     return;
1791 
1792   // Performance optimization: On flush or session disconnect, this method is
1793   // called for each producer. If the producer doesn't participate in the
1794   // session, there's no need to scape its chunks right now. We can tell if a
1795   // producer participates in the session by checking if the producer is allowed
1796   // to write into the session's log buffers.
1797   const auto& session_buffers = tracing_session->buffers_index;
1798   bool producer_in_session =
1799       std::any_of(session_buffers.begin(), session_buffers.end(),
1800                   [producer](BufferID buffer_id) {
1801                     return producer->allowed_target_buffers_.count(buffer_id);
1802                   });
1803   if (!producer_in_session)
1804     return;
1805 
1806   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1807 
1808   // Find and copy any uncommitted chunks from the SMB.
1809   //
1810   // In nominal conditions, the page layout of the used SMB pages should never
1811   // change because the service is the only one who is supposed to modify used
1812   // pages (to make them free again).
1813   //
1814   // However, the code here needs to deal with the case of a malicious producer
1815   // altering the SMB in unpredictable ways. Thankfully the SMB size is
1816   // immutable, so a chunk will always point to some valid memory, even if the
1817   // producer alters the intended layout and chunk header concurrently.
1818   // Ultimately a malicious producer altering the SMB's chunk layout while we
1819   // are iterating in this function is not any different from the case of a
1820   // malicious producer asking to commit a chunk made of random data, which is
1821   // something this class has to deal with regardless.
1822   //
1823   // The only legitimate mutations that can happen from sane producers,
1824   // concurrently to this function, are:
1825   //   A. free pages being partitioned,
1826   //   B. free chunks being migrated to kChunkBeingWritten,
1827   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1828 
1829   SharedMemoryABI* abi = &producer->shmem_abi_;
1830   // num_pages() is immutable after the SMB is initialized and cannot be changed
1831   // even by a producer even if malicious.
1832   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1833     uint32_t layout = abi->GetPageLayout(page_idx);
1834 
1835     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
1836     // Skip empty pages.
1837     if (used_chunks == 0)
1838       continue;
1839 
1840     // Scrape the chunks that are currently used. These should be either in
1841     // state kChunkBeingWritten or kChunkComplete.
1842     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1843       if (!(used_chunks & 1))
1844         continue;
1845 
1846       SharedMemoryABI::ChunkState state =
1847           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1848       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1849                       state == SharedMemoryABI::kChunkComplete);
1850       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1851 
1852       SharedMemoryABI::Chunk chunk =
1853           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1854 
1855       uint16_t packet_count;
1856       uint8_t flags;
1857       // GetPacketCountAndFlags has acquire_load semantics.
1858       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1859 
1860       // It only makes sense to copy an incomplete chunk if there's at least
1861       // one full packet available. (The producer may not have completed the
1862       // last packet in it yet, so we need at least 2.)
1863       if (!chunk_complete && packet_count < 2)
1864         continue;
1865 
1866       // At this point, it is safe to access the remaining header fields of
1867       // the chunk. Even if the chunk was only just transferred from
1868       // kChunkFree into kChunkBeingWritten state, the header should be
1869       // written completely once the packet count increased above 1 (it was
1870       // reset to 0 by the service when the chunk was freed).
1871 
1872       WriterID writer_id = chunk.writer_id();
1873       base::Optional<BufferID> target_buffer_id =
1874           producer->buffer_id_for_writer(writer_id);
1875 
1876       // We can only scrape this chunk if we know which log buffer to copy it
1877       // into.
1878       if (!target_buffer_id)
1879         continue;
1880 
1881       // Skip chunks that don't belong to the requested tracing session.
1882       bool target_buffer_belongs_to_session =
1883           std::find(session_buffers.begin(), session_buffers.end(),
1884                     *target_buffer_id) != session_buffers.end();
1885       if (!target_buffer_belongs_to_session)
1886         continue;
1887 
1888       uint32_t chunk_id =
1889           chunk.header()->chunk_id.load(std::memory_order_relaxed);
1890 
1891       CopyProducerPageIntoLogBuffer(
1892           producer->id_, producer->uid_, producer->pid_, writer_id, chunk_id,
1893           *target_buffer_id, packet_count, flags, chunk_complete,
1894           chunk.payload_begin(), chunk.payload_size());
1895     }
1896   }
1897 }
1898 
FlushAndDisableTracing(TracingSessionID tsid)1899 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1900   PERFETTO_DCHECK_THREAD(thread_checker_);
1901   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1902   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1903   Flush(tsid, 0, [weak_this, tsid](bool success) {
1904     // This was a DLOG up to Jun 2021 (v16, Android S).
1905     PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d", tsid,
1906                  success);
1907     if (!weak_this)
1908       return;
1909     TracingSession* session = weak_this->GetTracingSession(tsid);
1910     session->final_flush_outcome = success ? TraceStats::FINAL_FLUSH_SUCCEEDED
1911                                            : TraceStats::FINAL_FLUSH_FAILED;
1912     if (session->consumer_maybe_null) {
1913       // If the consumer is still attached, just disable the session but give it
1914       // a chance to read the contents.
1915       weak_this->DisableTracing(tsid);
1916     } else {
1917       // If the consumer detached, destroy the session. If the consumer did
1918       // start the session in long-tracing mode, the service will have saved
1919       // the contents to the passed file. If not, the contents will be
1920       // destroyed.
1921       weak_this->FreeBuffers(tsid);
1922     }
1923   });
1924 }
1925 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1926 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1927                                            bool post_next_only) {
1928   PERFETTO_DCHECK_THREAD(thread_checker_);
1929   TracingSession* tracing_session = GetTracingSession(tsid);
1930   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1931     return;
1932 
1933   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1934   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1935   task_runner_->PostDelayedTask(
1936       [weak_this, tsid] {
1937         if (weak_this)
1938           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1939       },
1940       flush_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
1941                                               flush_period_ms));
1942 
1943   if (post_next_only)
1944     return;
1945 
1946   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1947   Flush(tsid, 0, [](bool success) {
1948     if (!success)
1949       PERFETTO_ELOG("Periodic flush timed out");
1950   });
1951 }
1952 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1953 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1954     TracingSessionID tsid,
1955     bool post_next_only) {
1956   PERFETTO_DCHECK_THREAD(thread_checker_);
1957   TracingSession* tracing_session = GetTracingSession(tsid);
1958   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1959     return;
1960 
1961   uint32_t clear_period_ms =
1962       tracing_session->config.incremental_state_config().clear_period_ms();
1963   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1964   task_runner_->PostDelayedTask(
1965       [weak_this, tsid] {
1966         if (weak_this)
1967           weak_this->PeriodicClearIncrementalStateTask(
1968               tsid, /*post_next_only=*/false);
1969       },
1970       clear_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
1971                                               clear_period_ms));
1972 
1973   if (post_next_only)
1974     return;
1975 
1976   PERFETTO_DLOG(
1977       "Performing periodic incremental state clear for trace session %" PRIu64,
1978       tsid);
1979 
1980   // Queue the IPCs to producers with active data sources that opted in.
1981   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1982   for (const auto& kv : tracing_session->data_source_instances) {
1983     ProducerID producer_id = kv.first;
1984     const DataSourceInstance& data_source = kv.second;
1985     if (data_source.handles_incremental_state_clear) {
1986       clear_map[producer_id].push_back(data_source.instance_id);
1987     }
1988   }
1989 
1990   for (const auto& kv : clear_map) {
1991     ProducerID producer_id = kv.first;
1992     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1993     ProducerEndpointImpl* producer = GetProducer(producer_id);
1994     if (!producer) {
1995       PERFETTO_DFATAL("Producer does not exist.");
1996       continue;
1997     }
1998     producer->ClearIncrementalState(data_sources);
1999   }
2000 }
2001 
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2002 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2003     TracingSessionID tsid,
2004     ConsumerEndpointImpl* consumer) {
2005   PERFETTO_DCHECK(consumer);
2006   PERFETTO_DCHECK_THREAD(thread_checker_);
2007   TracingSession* tracing_session = GetTracingSession(tsid);
2008   if (!tracing_session) {
2009     PERFETTO_DLOG(
2010         "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2011     return false;
2012   }
2013 
2014   if (tracing_session->write_into_file) {
2015     // If the consumer enabled tracing and asked to save the contents into the
2016     // passed file makes little sense to also try to read the buffers over IPC,
2017     // as that would just steal data from the periodic draining task.
2018     PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2019     return false;
2020   }
2021 
2022   // If a bugreport request happened and the trace was stolen for that, give
2023   // an empty trace with a clear signal to the consumer. This deals only with
2024   // the case of readback-from-IPC. A similar code-path deals with the
2025   // write_into_file case in MaybeSaveTraceForBugreport().
2026   if (tracing_session->seized_for_bugreport) {
2027     std::vector<TracePacket> packets;
2028     if (!tracing_session->config.builtin_data_sources()
2029              .disable_service_events()) {
2030       EmitSeizedForBugreportLifecycleEvent(&packets);
2031     }
2032     EmitLifecycleEvents(tracing_session, &packets);
2033     consumer->consumer_->OnTraceData(std::move(packets), /*has_more=*/false);
2034     return true;
2035   }
2036 
2037   if (IsWaitingForTrigger(tracing_session))
2038     return false;
2039 
2040   // This is a rough threshold to determine how much to read from the buffer in
2041   // each task. This is to avoid executing a single huge sending task for too
2042   // long and risk to hit the watchdog. This is *not* an upper bound: we just
2043   // stop accumulating new packets and PostTask *after* we cross this threshold.
2044   // This constant essentially balances the PostTask and IPC overhead vs the
2045   // responsiveness of the service. An extremely small value will cause one IPC
2046   // and one PostTask for each slice but will keep the service extremely
2047   // responsive. An extremely large value will batch the send for the full
2048   // buffer in one large task, will hit the blocking send() once the socket
2049   // buffers are full and hang the service for a bit (until the consumer
2050   // catches up).
2051   static constexpr size_t kApproxBytesPerTask = 32768;
2052   bool has_more;
2053   std::vector<TracePacket> packets =
2054       ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2055 
2056   if (has_more) {
2057     auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2058     auto weak_this = weak_ptr_factory_.GetWeakPtr();
2059     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
2060       if (!weak_this || !weak_consumer)
2061         return;
2062       weak_this->ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2063     });
2064   }
2065 
2066   // Keep this as tail call, just in case the consumer re-enters.
2067   consumer->consumer_->OnTraceData(std::move(packets), has_more);
2068   return true;
2069 }
2070 
ReadBuffersIntoFile(TracingSessionID tsid)2071 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2072   PERFETTO_DCHECK_THREAD(thread_checker_);
2073   TracingSession* tracing_session = GetTracingSession(tsid);
2074   if (!tracing_session) {
2075     // This will be hit systematically from the PostDelayedTask. Avoid logging,
2076     // it would be just spam.
2077     return false;
2078   }
2079 
2080   // This can happen if the file is closed by a previous task because it reaches
2081   // |max_file_size_bytes|.
2082   if (!tracing_session->write_into_file)
2083     return false;
2084 
2085   if (!tracing_session->seized_for_bugreport &&
2086       IsWaitingForTrigger(tracing_session))
2087     return false;
2088 
2089   // ReadBuffers() can allocate memory internally, for filtering. By limiting
2090   // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2091   // we limit the amount of memory used on each iteration.
2092   //
2093   // It would be tempting to split this into multiple tasks like in
2094   // ReadBuffersIntoConsumer, but that's not currently possible.
2095   // ReadBuffersIntoFile has to read the whole available data before returning,
2096   // to support the disable_immediately=true code paths.
2097   bool has_more = true;
2098   bool stop_writing_into_file = false;
2099   do {
2100     std::vector<TracePacket> packets =
2101         ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2102 
2103     stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2104   } while (has_more && !stop_writing_into_file);
2105 
2106   if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2107     // Ensure all data was written to the file before we close it.
2108     base::FlushFile(tracing_session->write_into_file.get());
2109     tracing_session->write_into_file.reset();
2110     tracing_session->write_period_ms = 0;
2111     if (tracing_session->state == TracingSession::STARTED)
2112       DisableTracing(tsid);
2113     return true;
2114   }
2115 
2116   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2117   task_runner_->PostDelayedTask(
2118       [weak_this, tsid] {
2119         if (weak_this)
2120           weak_this->ReadBuffersIntoFile(tsid);
2121       },
2122       tracing_session->delay_to_next_write_period_ms());
2123   return true;
2124 }
2125 
IsWaitingForTrigger(TracingSession * tracing_session)2126 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2127   // When a tracing session is waiting for a trigger, it is considered empty. If
2128   // a tracing session finishes and moves into DISABLED without ever receiving a
2129   // trigger, the trace should never return any data. This includes the
2130   // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2131   // early and let the consumer know there is no data.
2132   if (!tracing_session->config.trigger_config().triggers().empty() &&
2133       tracing_session->received_triggers.empty()) {
2134     PERFETTO_DLOG(
2135         "ReadBuffers(): tracing session has not received a trigger yet.");
2136     return true;
2137   }
2138   return false;
2139 }
2140 
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2141 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2142     TracingSession* tracing_session,
2143     size_t threshold,
2144     bool* has_more) {
2145   PERFETTO_DCHECK_THREAD(thread_checker_);
2146   PERFETTO_DCHECK(tracing_session);
2147   *has_more = false;
2148 
2149   std::vector<TracePacket> packets;
2150   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
2151 
2152   if (!tracing_session->initial_clock_snapshot.empty()) {
2153     EmitClockSnapshot(tracing_session,
2154                       std::move(tracing_session->initial_clock_snapshot),
2155                       &packets);
2156   }
2157 
2158   for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2159     PERFETTO_DCHECK(!snapshot.empty());
2160     EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2161   }
2162   tracing_session->clock_snapshot_ring_buffer.clear();
2163 
2164   if (tracing_session->should_emit_sync_marker) {
2165     EmitSyncMarker(&packets);
2166     tracing_session->should_emit_sync_marker = false;
2167   }
2168 
2169   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2170     MaybeEmitTraceConfig(tracing_session, &packets);
2171     MaybeEmitReceivedTriggers(tracing_session, &packets);
2172   }
2173   if (!tracing_session->config.builtin_data_sources().disable_system_info())
2174     MaybeEmitSystemInfo(tracing_session, &packets);
2175 
2176   // Note that in the proto comment, we guarantee that the tracing_started
2177   // lifecycle event will be emitted before any data packets so make sure to
2178   // keep this before reading the tracing buffers.
2179   if (!tracing_session->config.builtin_data_sources().disable_service_events())
2180     EmitLifecycleEvents(tracing_session, &packets);
2181 
2182   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
2183 
2184   // Add up size for packets added by the Maybe* calls above.
2185   for (const TracePacket& packet : packets) {
2186     packets_bytes += packet.size();
2187   }
2188 
2189   bool did_hit_threshold = false;
2190 
2191   for (size_t buf_idx = 0;
2192        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2193        buf_idx++) {
2194     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2195     if (tbuf_iter == buffers_.end()) {
2196       PERFETTO_DFATAL("Buffer not found.");
2197       continue;
2198     }
2199     TraceBuffer& tbuf = *tbuf_iter->second;
2200     tbuf.BeginRead();
2201     while (!did_hit_threshold) {
2202       TracePacket packet;
2203       TraceBuffer::PacketSequenceProperties sequence_properties{};
2204       bool previous_packet_dropped;
2205       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2206                                     &previous_packet_dropped)) {
2207         break;
2208       }
2209       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2210       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2211       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
2212       // Not checking sequence_properties.producer_pid_trusted: it is
2213       // base::kInvalidPid if the platform doesn't support it.
2214 
2215       PERFETTO_DCHECK(packet.size() > 0);
2216       if (!PacketStreamValidator::Validate(packet.slices())) {
2217         tracing_session->invalid_packets++;
2218         PERFETTO_DLOG("Dropping invalid packet");
2219         continue;
2220       }
2221 
2222       // Append a slice with the trusted field data. This can't be spoofed
2223       // because above we validated that the existing slices don't contain any
2224       // trusted fields. For added safety we append instead of prepending
2225       // because according to protobuf semantics, if the same field is
2226       // encountered multiple times the last instance takes priority. Note that
2227       // truncated packets are also rejected, so the producer can't give us a
2228       // partial packet (e.g., a truncated string) which only becomes valid when
2229       // the trusted data is appended here.
2230       Slice slice = Slice::Allocate(32);
2231       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2232           slice.own_data(), slice.size);
2233       trusted_packet->set_trusted_uid(
2234           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
2235       trusted_packet->set_trusted_packet_sequence_id(
2236           tracing_session->GetPacketSequenceID(
2237               sequence_properties.producer_id_trusted,
2238               sequence_properties.writer_id));
2239       if (sequence_properties.producer_pid_trusted != base::kInvalidPid) {
2240         // Not supported on all platforms.
2241         trusted_packet->set_trusted_pid(
2242             static_cast<int32_t>(sequence_properties.producer_pid_trusted));
2243       }
2244       if (previous_packet_dropped)
2245         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2246       slice.size = trusted_packet.Finalize();
2247       packet.AddSlice(std::move(slice));
2248 
2249       // Append the packet (inclusive of the trusted uid) to |packets|.
2250       packets_bytes += packet.size();
2251       did_hit_threshold = packets_bytes >= threshold;
2252       packets.emplace_back(std::move(packet));
2253     }  // for(packets...)
2254   }    // for(buffers...)
2255 
2256   *has_more = did_hit_threshold;
2257 
2258   // Only emit the "read complete" lifetime event when there is no more trace
2259   // data available to read. These events are used as safe points to limit
2260   // sorting in trace processor: the code shouldn't emit the event unless the
2261   // buffers are empty.
2262   if (!*has_more && !tracing_session->config.builtin_data_sources()
2263                          .disable_service_events()) {
2264     // We don't bother snapshotting clocks here because we wouldn't be able to
2265     // emit it and we shouldn't have significant drift from the last snapshot in
2266     // any case.
2267     SnapshotLifecyleEvent(tracing_session,
2268                           protos::pbzero::TracingServiceEvent::
2269                               kReadTracingBuffersCompletedFieldNumber,
2270                           false /* snapshot_clocks */);
2271     EmitLifecycleEvents(tracing_session, &packets);
2272   }
2273 
2274   // Only emit the stats when there is no more trace data is available to read.
2275   // That way, any problems that occur while reading from the buffers are
2276   // reflected in the emitted stats. This is particularly important for use
2277   // cases where ReadBuffers is only ever called after the tracing session is
2278   // stopped.
2279   if (!*has_more && tracing_session->should_emit_stats) {
2280     EmitStats(tracing_session, &packets);
2281     tracing_session->should_emit_stats = false;
2282   }
2283 
2284   MaybeFilterPackets(tracing_session, &packets);
2285 
2286   if (!*has_more) {
2287     // We've observed some extremely high memory usage by scudo after
2288     // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2289     // now, but this code asks scudo to release memory just in case.
2290     base::MaybeReleaseAllocatorMemToOS();
2291   }
2292 
2293   return packets;
2294 }
2295 
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2296 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2297                                             std::vector<TracePacket>* packets) {
2298   // If the tracing session specified a filter, run all packets through the
2299   // filter and replace them with the filter results.
2300   // The process below mantains the cardinality of input packets. Even if an
2301   // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2302   // makes debugging and reasoning about the trace stats easier.
2303   // This place swaps the contents of each |packets| entry in place.
2304   if (!tracing_session->trace_filter) {
2305     return;
2306   }
2307   protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2308   // The filter root shoud be reset from protos.Trace to protos.TracePacket
2309   // by the earlier call to SetFilterRoot() in EnableTracing().
2310   PERFETTO_DCHECK(trace_filter.root_msg_index() != 0);
2311   std::vector<protozero::MessageFilter::InputSlice> filter_input;
2312   for (TracePacket& packet : *packets) {
2313     const auto& packet_slices = packet.slices();
2314     filter_input.clear();
2315     filter_input.resize(packet_slices.size());
2316     ++tracing_session->filter_input_packets;
2317     tracing_session->filter_input_bytes += packet.size();
2318     for (size_t i = 0; i < packet_slices.size(); ++i)
2319       filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2320     auto filtered_packet = trace_filter.FilterMessageFragments(
2321         &filter_input[0], filter_input.size());
2322 
2323     // Replace the packet in-place with the filtered one (unless failed).
2324     packet = TracePacket();
2325     if (filtered_packet.error) {
2326       ++tracing_session->filter_errors;
2327       PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2328                     tracing_session->filter_input_packets);
2329       continue;
2330     }
2331     tracing_session->filter_output_bytes += filtered_packet.size;
2332     AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2333                               filtered_packet.size, kMaxTracePacketSliceSize,
2334                               &packet);
2335   }
2336 }
2337 
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2338 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2339                                        std::vector<TracePacket> packets) {
2340   if (!tracing_session->write_into_file) {
2341     return false;
2342   }
2343   const uint64_t max_size = tracing_session->max_file_size_bytes
2344                                 ? tracing_session->max_file_size_bytes
2345                                 : std::numeric_limits<size_t>::max();
2346 
2347   size_t total_slices = 0;
2348   for (const TracePacket& packet : packets) {
2349     total_slices += packet.slices().size();
2350   }
2351   // When writing into a file, the file should look like a root trace.proto
2352   // message. Each packet should be prepended with a proto preamble stating
2353   // its field id (within trace.proto) and size. Hence the addition below.
2354   const size_t max_iovecs = total_slices + packets.size();
2355 
2356   size_t num_iovecs = 0;
2357   bool stop_writing_into_file = false;
2358   std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2359   size_t num_iovecs_at_last_packet = 0;
2360   uint64_t bytes_about_to_be_written = 0;
2361   for (TracePacket& packet : packets) {
2362     std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2363         packet.GetProtoPreamble();
2364     bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2365     num_iovecs++;
2366     for (const Slice& slice : packet.slices()) {
2367       // writev() doesn't change the passed pointer. However, struct iovec
2368       // take a non-const ptr because it's the same struct used by readv().
2369       // Hence the const_cast here.
2370       char* start = static_cast<char*>(const_cast<void*>(slice.start));
2371       bytes_about_to_be_written += slice.size;
2372       iovecs[num_iovecs++] = {start, slice.size};
2373     }
2374 
2375     if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2376         max_size) {
2377       stop_writing_into_file = true;
2378       num_iovecs = num_iovecs_at_last_packet;
2379       break;
2380     }
2381 
2382     num_iovecs_at_last_packet = num_iovecs;
2383   }
2384   PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2385   int fd = *tracing_session->write_into_file;
2386 
2387   uint64_t total_wr_size = 0;
2388 
2389   // writev() can take at most IOV_MAX entries per call. Batch them.
2390   constexpr size_t kIOVMax = IOV_MAX;
2391   for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2392     int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2393     ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2394     if (wr_size <= 0) {
2395       PERFETTO_PLOG("writev() failed");
2396       stop_writing_into_file = true;
2397       break;
2398     }
2399     total_wr_size += static_cast<size_t>(wr_size);
2400   }
2401 
2402   tracing_session->bytes_written_into_file += total_wr_size;
2403 
2404   PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2405                 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2406   return stop_writing_into_file;
2407 }
2408 
FreeBuffers(TracingSessionID tsid)2409 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2410   PERFETTO_DCHECK_THREAD(thread_checker_);
2411   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2412   TracingSession* tracing_session = GetTracingSession(tsid);
2413   if (!tracing_session) {
2414     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2415     return;  // TODO(primiano): signal failure?
2416   }
2417   DisableTracing(tsid, /*disable_immediately=*/true);
2418 
2419   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2420   tracing_session->data_source_instances.clear();
2421 
2422   for (auto& producer_entry : producers_) {
2423     ProducerEndpointImpl* producer = producer_entry.second;
2424     producer->OnFreeBuffers(tracing_session->buffers_index);
2425   }
2426 
2427   for (BufferID buffer_id : tracing_session->buffers_index) {
2428     buffer_ids_.Free(buffer_id);
2429     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2430     buffers_.erase(buffer_id);
2431   }
2432   bool notify_traceur = tracing_session->config.notify_traceur();
2433   bool is_long_trace =
2434       (tracing_session->config.write_into_file() &&
2435        tracing_session->config.file_write_period_ms() < kMillisPerDay);
2436   bool seized_for_bugreport = tracing_session->seized_for_bugreport;
2437   tracing_sessions_.erase(tsid);
2438   tracing_session = nullptr;
2439   UpdateMemoryGuardrail();
2440 
2441   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2442                tracing_sessions_.size());
2443 
2444 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2445     PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2446   if (notify_traceur && (seized_for_bugreport || is_long_trace)) {
2447     PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2448     if (!notify_fn || !notify_fn(seized_for_bugreport))
2449       PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2450   }
2451 #else
2452   base::ignore_result(notify_traceur);
2453   base::ignore_result(is_long_trace);
2454   base::ignore_result(seized_for_bugreport);
2455 #endif
2456 }
2457 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2458 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2459                                             const DataSourceDescriptor& desc) {
2460   PERFETTO_DCHECK_THREAD(thread_checker_);
2461   if (desc.name().empty()) {
2462     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2463     return;
2464   }
2465 
2466   ProducerEndpointImpl* producer = GetProducer(producer_id);
2467   if (!producer) {
2468     PERFETTO_DFATAL("Producer not found.");
2469     return;
2470   }
2471 
2472   // Check that the producer doesn't register two data sources with the same ID.
2473   // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2474   // field didn't exist.
2475   for (const auto& kv : data_sources_) {
2476     if (desc.id() && kv.second.producer_id == producer_id &&
2477         kv.second.descriptor.id() == desc.id()) {
2478       PERFETTO_ELOG(
2479           "Failed to register data source \"%s\". A data source with the same "
2480           "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2481           desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2482           producer_id);
2483       return;
2484     }
2485   }
2486 
2487   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2488                 producer_id, desc.name().c_str());
2489 
2490   auto reg_ds = data_sources_.emplace(desc.name(),
2491                                       RegisteredDataSource{producer_id, desc});
2492 
2493   // If there are existing tracing sessions, we need to check if the new
2494   // data source is enabled by any of them.
2495   for (auto& iter : tracing_sessions_) {
2496     TracingSession& tracing_session = iter.second;
2497     if (tracing_session.state != TracingSession::STARTED &&
2498         tracing_session.state != TracingSession::CONFIGURED) {
2499       continue;
2500     }
2501 
2502     TraceConfig::ProducerConfig producer_config;
2503     for (auto& config : tracing_session.config.producers()) {
2504       if (producer->name_ == config.producer_name()) {
2505         producer_config = config;
2506         break;
2507       }
2508     }
2509     for (const TraceConfig::DataSource& cfg_data_source :
2510          tracing_session.config.data_sources()) {
2511       if (cfg_data_source.config().name() != desc.name())
2512         continue;
2513       DataSourceInstance* ds_inst = SetupDataSource(
2514           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2515       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2516         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2517     }
2518   }  // for(iter : tracing_sessions_)
2519 }
2520 
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2521 void TracingServiceImpl::UpdateDataSource(
2522     ProducerID producer_id,
2523     const DataSourceDescriptor& new_desc) {
2524   if (new_desc.id() == 0) {
2525     PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2526     return;
2527   }
2528 
2529   // If this producer has already registered a matching descriptor name and id,
2530   // just update the descriptor.
2531   RegisteredDataSource* data_source = nullptr;
2532   auto range = data_sources_.equal_range(new_desc.name());
2533   for (auto it = range.first; it != range.second; ++it) {
2534     if (it->second.producer_id == producer_id &&
2535         it->second.descriptor.id() == new_desc.id()) {
2536       data_source = &it->second;
2537       break;
2538     }
2539   }
2540 
2541   if (!data_source) {
2542     PERFETTO_ELOG(
2543         "UpdateDataSource() failed, could not find an existing data source "
2544         "with name=\"%s\" id=%" PRIu64,
2545         new_desc.name().c_str(), new_desc.id());
2546     return;
2547   }
2548 
2549   data_source->descriptor = new_desc;
2550 }
2551 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2552 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2553                                                 TracingSession* tracing_session,
2554                                                 DataSourceInstance* instance,
2555                                                 bool disable_immediately) {
2556   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2557   if (instance->will_notify_on_stop && !disable_immediately) {
2558     instance->state = DataSourceInstance::STOPPING;
2559   } else {
2560     instance->state = DataSourceInstance::STOPPED;
2561   }
2562   if (tracing_session->consumer_maybe_null) {
2563     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2564         *producer, *instance);
2565   }
2566   producer->StopDataSource(ds_inst_id);
2567 }
2568 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2569 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2570                                               const std::string& name) {
2571   PERFETTO_DCHECK_THREAD(thread_checker_);
2572   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2573                 producer_id, name.c_str());
2574   PERFETTO_CHECK(producer_id);
2575   ProducerEndpointImpl* producer = GetProducer(producer_id);
2576   PERFETTO_DCHECK(producer);
2577   for (auto& kv : tracing_sessions_) {
2578     auto& ds_instances = kv.second.data_source_instances;
2579     bool removed = false;
2580     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2581       if (it->first == producer_id && it->second.data_source_name == name) {
2582         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2583         if (it->second.state != DataSourceInstance::STOPPED) {
2584           if (it->second.state != DataSourceInstance::STOPPING) {
2585             StopDataSourceInstance(producer, &kv.second, &it->second,
2586                                    /* disable_immediately = */ false);
2587           }
2588 
2589           // Mark the instance as stopped immediately, since we are
2590           // unregistering it below.
2591           //
2592           //  The StopDataSourceInstance above might have set the state to
2593           //  STOPPING so this condition isn't an else.
2594           if (it->second.state == DataSourceInstance::STOPPING)
2595             NotifyDataSourceStopped(producer_id, ds_inst_id);
2596         }
2597         it = ds_instances.erase(it);
2598         removed = true;
2599       } else {
2600         ++it;
2601       }
2602     }  // for (data_source_instances)
2603     if (removed)
2604       MaybeNotifyAllDataSourcesStarted(&kv.second);
2605   }  // for (tracing_session)
2606 
2607   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2608     if (it->second.producer_id == producer_id &&
2609         it->second.descriptor.name() == name) {
2610       data_sources_.erase(it);
2611       return;
2612     }
2613   }
2614 
2615   PERFETTO_DFATAL(
2616       "Tried to unregister a non-existent data source \"%s\" for "
2617       "producer %" PRIu16,
2618       name.c_str(), producer_id);
2619 }
2620 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2621 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2622     const TraceConfig::DataSource& cfg_data_source,
2623     const TraceConfig::ProducerConfig& producer_config,
2624     const RegisteredDataSource& data_source,
2625     TracingSession* tracing_session) {
2626   PERFETTO_DCHECK_THREAD(thread_checker_);
2627   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2628   PERFETTO_DCHECK(producer);
2629   // An existing producer that is not ftrace could have registered itself as
2630   // ftrace, we must not enable it in that case.
2631   if (lockdown_mode_ && producer->uid_ != uid_) {
2632     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2633     return nullptr;
2634   }
2635   // TODO(primiano): Add tests for registration ordering (data sources vs
2636   // consumers).
2637   if (!NameMatchesFilter(producer->name_,
2638                          cfg_data_source.producer_name_filter(),
2639                          cfg_data_source.producer_name_regex_filter())) {
2640     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2641                   cfg_data_source.config().name().c_str(),
2642                   producer->name_.c_str());
2643     return nullptr;
2644   }
2645 
2646   auto relative_buffer_id = cfg_data_source.config().target_buffer();
2647   if (relative_buffer_id >= tracing_session->num_buffers()) {
2648     PERFETTO_LOG(
2649         "The TraceConfig for DataSource %s specified a target_buffer out of "
2650         "bound (%d). Skipping it.",
2651         cfg_data_source.config().name().c_str(), relative_buffer_id);
2652     return nullptr;
2653   }
2654 
2655   // Create a copy of the DataSourceConfig specified in the trace config. This
2656   // will be passed to the producer after translating the |target_buffer| id.
2657   // The |target_buffer| parameter passed by the consumer in the trace config is
2658   // relative to the buffers declared in the same trace config. This has to be
2659   // translated to the global BufferID before passing it to the producers, which
2660   // don't know anything about tracing sessions and consumers.
2661 
2662   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2663   auto insert_iter = tracing_session->data_source_instances.emplace(
2664       std::piecewise_construct,  //
2665       std::forward_as_tuple(producer->id_),
2666       std::forward_as_tuple(
2667           inst_id,
2668           cfg_data_source.config(),  //  Deliberate copy.
2669           data_source.descriptor.name(),
2670           data_source.descriptor.will_notify_on_start(),
2671           data_source.descriptor.will_notify_on_stop(),
2672           data_source.descriptor.handles_incremental_state_clear()));
2673   DataSourceInstance* ds_instance = &insert_iter->second;
2674 
2675   // New data source instance starts out in CONFIGURED state.
2676   if (tracing_session->consumer_maybe_null) {
2677     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2678         *producer, *ds_instance);
2679   }
2680 
2681   DataSourceConfig& ds_config = ds_instance->config;
2682   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2683   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2684   ds_config.set_enable_extra_guardrails(
2685       tracing_session->config.enable_extra_guardrails());
2686   if (tracing_session->consumer_uid == 1066 /* AID_STATSD */ &&
2687       tracing_session->config.statsd_metadata().triggering_config_uid() !=
2688           2000 /* AID_SHELL */
2689       && tracing_session->config.statsd_metadata().triggering_config_uid() !=
2690              0 /* AID_ROOT */) {
2691     // StatsD can be triggered either by shell, root or an app that has DUMP and
2692     // USAGE_STATS permission. When triggered by shell or root, we do not want
2693     // to consider the trace a trusted system trace, as it was initiated by the
2694     // user. Otherwise, it has to come from an app with DUMP and
2695     // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2696     // system.
2697     // Check for shell / root: https://bit.ly/3b7oZNi
2698     // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2699     ds_config.set_session_initiator(
2700         DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
2701   } else {
2702     // Unset in case the consumer set it.
2703     // We need to be able to trust this field.
2704     ds_config.set_session_initiator(
2705         DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
2706   }
2707   ds_config.set_tracing_session_id(tracing_session->id);
2708   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2709   PERFETTO_DCHECK(global_id);
2710   ds_config.set_target_buffer(global_id);
2711 
2712   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2713                 ds_config.name().c_str(), global_id);
2714   if (!producer->shared_memory()) {
2715     // Determine the SMB page size. Must be an integer multiple of 4k.
2716     // As for the SMB size below, the decision tree is as follows:
2717     // 1. Give priority to what is defined in the trace config.
2718     // 2. If unset give priority to the hint passed by the producer.
2719     // 3. Keep within bounds and ensure it's a multiple of 4k.
2720     size_t page_size = producer_config.page_size_kb() * 1024;
2721     if (page_size == 0)
2722       page_size = producer->shmem_page_size_hint_bytes_;
2723 
2724     // Determine the SMB size. Must be an integer multiple of the SMB page size.
2725     // The decision tree is as follows:
2726     // 1. Give priority to what defined in the trace config.
2727     // 2. If unset give priority to the hint passed by the producer.
2728     // 3. Keep within bounds and ensure it's a multiple of the page size.
2729     size_t shm_size = producer_config.shm_size_kb() * 1024;
2730     if (shm_size == 0)
2731       shm_size = producer->shmem_size_hint_bytes_;
2732 
2733     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2734     if (valid_sizes != std::tie(shm_size, page_size)) {
2735       PERFETTO_DLOG(
2736           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2737           "back to shm_size %zu page_size %zu.",
2738           shm_size, page_size, std::get<0>(valid_sizes),
2739           std::get<1>(valid_sizes));
2740     }
2741     std::tie(shm_size, page_size) = valid_sizes;
2742 
2743     // TODO(primiano): right now Create() will suicide in case of OOM if the
2744     // mmap fails. We should instead gracefully fail the request and tell the
2745     // client to go away.
2746     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2747                   producer->name_.c_str());
2748     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2749     producer->SetupSharedMemory(std::move(shared_memory), page_size,
2750                                 /*provided_by_producer=*/false);
2751   }
2752   producer->SetupDataSource(inst_id, ds_config);
2753   return ds_instance;
2754 }
2755 
2756 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2757 // might be lying / returning garbage contents. |src| and |size| can be trusted
2758 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,pid_t producer_pid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2759 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2760     ProducerID producer_id_trusted,
2761     uid_t producer_uid_trusted,
2762     pid_t producer_pid_trusted,
2763     WriterID writer_id,
2764     ChunkID chunk_id,
2765     BufferID buffer_id,
2766     uint16_t num_fragments,
2767     uint8_t chunk_flags,
2768     bool chunk_complete,
2769     const uint8_t* src,
2770     size_t size) {
2771   PERFETTO_DCHECK_THREAD(thread_checker_);
2772 
2773   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2774   if (!producer) {
2775     PERFETTO_DFATAL("Producer not found.");
2776     chunks_discarded_++;
2777     return;
2778   }
2779 
2780   TraceBuffer* buf = GetBufferByID(buffer_id);
2781   if (!buf) {
2782     PERFETTO_DLOG("Could not find target buffer %" PRIu16
2783                   " for producer %" PRIu16,
2784                   buffer_id, producer_id_trusted);
2785     chunks_discarded_++;
2786     return;
2787   }
2788 
2789   // Verify that the producer is actually allowed to write into the target
2790   // buffer specified in the request. This prevents a malicious producer from
2791   // injecting data into a log buffer that belongs to a tracing session the
2792   // producer is not part of.
2793   if (!producer->is_allowed_target_buffer(buffer_id)) {
2794     PERFETTO_ELOG("Producer %" PRIu16
2795                   " tried to write into forbidden target buffer %" PRIu16,
2796                   producer_id_trusted, buffer_id);
2797     PERFETTO_DFATAL("Forbidden target buffer");
2798     chunks_discarded_++;
2799     return;
2800   }
2801 
2802   // If the writer was registered by the producer, it should only write into the
2803   // buffer it was registered with.
2804   base::Optional<BufferID> associated_buffer =
2805       producer->buffer_id_for_writer(writer_id);
2806   if (associated_buffer && *associated_buffer != buffer_id) {
2807     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2808                   " was registered to write into target buffer %" PRIu16
2809                   ", but tried to write into buffer %" PRIu16,
2810                   writer_id, producer_id_trusted, *associated_buffer,
2811                   buffer_id);
2812     PERFETTO_DFATAL("Wrong target buffer");
2813     chunks_discarded_++;
2814     return;
2815   }
2816 
2817   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted,
2818                           producer_pid_trusted, writer_id, chunk_id,
2819                           num_fragments, chunk_flags, chunk_complete, src,
2820                           size);
2821 }
2822 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2823 void TracingServiceImpl::ApplyChunkPatches(
2824     ProducerID producer_id_trusted,
2825     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2826   PERFETTO_DCHECK_THREAD(thread_checker_);
2827 
2828   for (const auto& chunk : chunks_to_patch) {
2829     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2830     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2831     TraceBuffer* buf =
2832         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2833     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2834                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2835     if (!writer_id || writer_id > kMaxWriterID || !buf) {
2836       // This can genuinely happen when the trace is stopped. The producers
2837       // might see the stop signal with some delay and try to keep sending
2838       // patches left soon after.
2839       PERFETTO_DLOG(
2840           "Received invalid chunks_to_patch request from Producer: %" PRIu16
2841           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2842           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2843       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2844       continue;
2845     }
2846 
2847     // Note, there's no need to validate that the producer is allowed to write
2848     // to the specified buffer ID (or that it's the correct buffer ID for a
2849     // registered TraceWriter). That's because TraceBuffer uses the producer ID
2850     // and writer ID to look up the chunk to patch. If the producer specifies an
2851     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2852     // patches. Because the producer ID is trusted, there's also no way for a
2853     // malicious producer to patch another producer's data.
2854 
2855     // Speculate on the fact that there are going to be a limited amount of
2856     // patches per request, so we can allocate the |patches| array on the stack.
2857     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
2858     if (chunk.patches().size() > patches.size()) {
2859       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2860                     patches.size());
2861       PERFETTO_DFATAL("Too many patches");
2862       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2863       continue;
2864     }
2865 
2866     size_t i = 0;
2867     for (const auto& patch : chunk.patches()) {
2868       const std::string& patch_data = patch.data();
2869       if (patch_data.size() != patches[i].data.size()) {
2870         PERFETTO_ELOG("Received patch from producer: %" PRIu16
2871                       " of unexpected size %zu",
2872                       producer_id_trusted, patch_data.size());
2873         patches_discarded_++;
2874         continue;
2875       }
2876       patches[i].offset_untrusted = patch.offset();
2877       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
2878       i++;
2879     }
2880     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
2881                                &patches[0], i, chunk.has_more_patches());
2882   }
2883 }
2884 
GetDetachedSession(uid_t uid,const std::string & key)2885 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
2886     uid_t uid,
2887     const std::string& key) {
2888   PERFETTO_DCHECK_THREAD(thread_checker_);
2889   for (auto& kv : tracing_sessions_) {
2890     TracingSession* session = &kv.second;
2891     if (session->consumer_uid == uid && session->detach_key == key) {
2892       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
2893       return session;
2894     }
2895   }
2896   return nullptr;
2897 }
2898 
GetTracingSession(TracingSessionID tsid)2899 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2900     TracingSessionID tsid) {
2901   PERFETTO_DCHECK_THREAD(thread_checker_);
2902   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2903   if (it == tracing_sessions_.end())
2904     return nullptr;
2905   return &it->second;
2906 }
2907 
GetNextProducerID()2908 ProducerID TracingServiceImpl::GetNextProducerID() {
2909   PERFETTO_DCHECK_THREAD(thread_checker_);
2910   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2911   do {
2912     ++last_producer_id_;
2913   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2914   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2915   return last_producer_id_;
2916 }
2917 
GetBufferByID(BufferID buffer_id)2918 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2919   auto buf_iter = buffers_.find(buffer_id);
2920   if (buf_iter == buffers_.end())
2921     return nullptr;
2922   return &*buf_iter->second;
2923 }
2924 
OnStartTriggersTimeout(TracingSessionID tsid)2925 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2926   // Skip entirely the flush if the trace session doesn't exist anymore.
2927   // This is to prevent misleading error messages to be logged.
2928   //
2929   // if the trace has started from the trigger we rely on
2930   // the |stop_delay_ms| from the trigger so don't flush and
2931   // disable if we've moved beyond a CONFIGURED state
2932   auto* tracing_session_ptr = GetTracingSession(tsid);
2933   if (tracing_session_ptr &&
2934       tracing_session_ptr->state == TracingSession::CONFIGURED) {
2935     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2936                   " since no triggers activated.",
2937                   tsid);
2938     // No data should be returned from ReadBuffers() regardless of if we
2939     // call FreeBuffers() or DisableTracing(). This is because in
2940     // STOP_TRACING we need this promise in either case, and using
2941     // DisableTracing() allows a graceful shutdown. Consumers can follow
2942     // their normal path and check the buffers through ReadBuffers() and
2943     // the code won't hang because the tracing session will still be
2944     // alive just disabled.
2945     DisableTracing(tsid);
2946   }
2947 }
2948 
UpdateMemoryGuardrail()2949 void TracingServiceImpl::UpdateMemoryGuardrail() {
2950 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
2951   uint64_t total_buffer_bytes = 0;
2952 
2953   // Sum up all the shared memory buffers.
2954   for (const auto& id_to_producer : producers_) {
2955     if (id_to_producer.second->shared_memory())
2956       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2957   }
2958 
2959   // Sum up all the trace buffers.
2960   for (const auto& id_to_buffer : buffers_) {
2961     total_buffer_bytes += id_to_buffer.second->size();
2962   }
2963 
2964   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2965   // interval.
2966   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
2967   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2968 #endif
2969 }
2970 
PeriodicSnapshotTask(TracingSessionID tsid)2971 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
2972   auto* tracing_session = GetTracingSession(tsid);
2973   if (!tracing_session)
2974     return;
2975   if (tracing_session->state != TracingSession::STARTED)
2976     return;
2977   tracing_session->should_emit_sync_marker = true;
2978   tracing_session->should_emit_stats = true;
2979   MaybeSnapshotClocksIntoRingBuffer(tracing_session);
2980 }
2981 
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)2982 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
2983                                                uint32_t field_id,
2984                                                bool snapshot_clocks) {
2985   // field_id should be an id of a field in TracingServiceEvent.
2986   auto& lifecycle_events = tracing_session->lifecycle_events;
2987   auto event_it =
2988       std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
2989                    [field_id](const TracingSession::LifecycleEvent& event) {
2990                      return event.field_id == field_id;
2991                    });
2992 
2993   TracingSession::LifecycleEvent* event;
2994   if (event_it == lifecycle_events.end()) {
2995     lifecycle_events.emplace_back(field_id);
2996     event = &lifecycle_events.back();
2997   } else {
2998     event = &*event_it;
2999   }
3000 
3001   // Snapshot the clocks before capturing the timestamp for the event so we can
3002   // use this snapshot to resolve the event timestamp if necessary.
3003   if (snapshot_clocks)
3004     MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3005 
3006   // Erase before emplacing to prevent a unncessary doubling of memory if
3007   // not needed.
3008   if (event->timestamps.size() >= event->max_size) {
3009     event->timestamps.erase_front(1 + event->timestamps.size() -
3010                                   event->max_size);
3011   }
3012   event->timestamps.emplace_back(base::GetBootTimeNs().count());
3013 }
3014 
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3015 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3016     TracingSession* tracing_session) {
3017   if (tracing_session->config.builtin_data_sources()
3018           .disable_clock_snapshotting()) {
3019     return;
3020   }
3021 
3022   // We are making an explicit copy of the latest snapshot (if it exists)
3023   // because SnapshotClocks reads this data and computes the drift based on its
3024   // content. If the clock drift is high enough, it will update the contents of
3025   // |snapshot| and return true. Otherwise, it will return false.
3026   TracingSession::ClockSnapshotData snapshot =
3027       tracing_session->clock_snapshot_ring_buffer.empty()
3028           ? TracingSession::ClockSnapshotData()
3029           : tracing_session->clock_snapshot_ring_buffer.back();
3030   bool did_update = SnapshotClocks(&snapshot);
3031   if (did_update) {
3032     // This means clocks drifted enough since last snapshot. See the comment
3033     // in SnapshotClocks.
3034     auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3035 
3036     // Erase before emplacing to prevent a unncessary doubling of memory if
3037     // not needed.
3038     static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3039     if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3040       snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3041                                    kClockSnapshotRingBufferSize);
3042     }
3043     snapshot_buffer->emplace_back(std::move(snapshot));
3044   }
3045 }
3046 
3047 // Returns true when the data in |snapshot_data| is updated with the new state
3048 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3049 bool TracingServiceImpl::SnapshotClocks(
3050     TracingSession::ClockSnapshotData* snapshot_data) {
3051   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3052   // been emitted into the trace yet (see comment below).
3053   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
3054 
3055   TracingSession::ClockSnapshotData new_snapshot_data;
3056 
3057 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE) && \
3058     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&   \
3059     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3060   struct {
3061     clockid_t id;
3062     protos::pbzero::BuiltinClock type;
3063     struct timespec ts;
3064   } clocks[] = {
3065       {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
3066       {CLOCK_REALTIME_COARSE,
3067        protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
3068        {0, 0}},
3069       {CLOCK_MONOTONIC_COARSE,
3070        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
3071        {0, 0}},
3072       {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
3073       {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
3074       {CLOCK_MONOTONIC_RAW,
3075        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
3076        {0, 0}},
3077   };
3078   // First snapshot all the clocks as atomically as we can.
3079   for (auto& clock : clocks) {
3080     if (clock_gettime(clock.id, &clock.ts) == -1)
3081       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
3082   }
3083   for (auto& clock : clocks) {
3084     new_snapshot_data.push_back(std::make_pair(
3085         static_cast<uint32_t>(clock.type),
3086         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
3087   }
3088 #else  // OS_APPLE || OS_WIN && OS_NACL
3089   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
3090   // The default trace clock is boot time, so we always need to emit a path to
3091   // it. However since we don't actually have a boot time source on these
3092   // platforms, pretend that wall time equals boot time.
3093   new_snapshot_data.push_back(
3094       std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
3095   new_snapshot_data.push_back(
3096       std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
3097 #endif
3098 
3099   // If we're about to update a session's latest clock snapshot that hasn't been
3100   // emitted into the trace yet, check whether the clocks have drifted enough to
3101   // warrant overriding the current snapshot values. The older snapshot would be
3102   // valid for a larger part of the currently buffered trace data because the
3103   // clock sync protocol in trace processor uses the latest clock <= timestamp
3104   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3105   // we try to keep it if we can.
3106   if (!snapshot_data->empty()) {
3107     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3108     PERFETTO_DCHECK((*snapshot_data)[0].first ==
3109                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
3110 
3111     bool update_snapshot = false;
3112     uint64_t old_boot_ns = (*snapshot_data)[0].second;
3113     uint64_t new_boot_ns = new_snapshot_data[0].second;
3114     int64_t boot_diff =
3115         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3116 
3117     for (size_t i = 1; i < snapshot_data->size(); i++) {
3118       uint64_t old_ns = (*snapshot_data)[i].second;
3119       uint64_t new_ns = new_snapshot_data[i].second;
3120 
3121       int64_t diff =
3122           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3123 
3124       // Compare the boottime delta against the delta of this clock.
3125       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3126         update_snapshot = true;
3127         break;
3128       }
3129     }
3130     if (!update_snapshot)
3131       return false;
3132     snapshot_data->clear();
3133   }
3134 
3135   *snapshot_data = std::move(new_snapshot_data);
3136   return true;
3137 }
3138 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3139 void TracingServiceImpl::EmitClockSnapshot(
3140     TracingSession* tracing_session,
3141     TracingSession::ClockSnapshotData snapshot_data,
3142     std::vector<TracePacket>* packets) {
3143   PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3144                        .disable_clock_snapshotting());
3145 
3146   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3147   auto* snapshot = packet->set_clock_snapshot();
3148 
3149   protos::gen::BuiltinClock trace_clock =
3150       tracing_session->config.builtin_data_sources().primary_trace_clock();
3151   if (!trace_clock)
3152     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3153   snapshot->set_primary_trace_clock(
3154       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3155 
3156   for (auto& clock_id_and_ts : snapshot_data) {
3157     auto* c = snapshot->add_clocks();
3158     c->set_clock_id(clock_id_and_ts.first);
3159     c->set_timestamp(clock_id_and_ts.second);
3160   }
3161 
3162   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3163   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3164   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3165 }
3166 
EmitSyncMarker(std::vector<TracePacket> * packets)3167 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3168   // The sync marks are used to tokenize large traces efficiently.
3169   // See description in trace_packet.proto.
3170   if (sync_marker_packet_size_ == 0) {
3171     // The marker ABI expects that the marker is written after the uid.
3172     // Protozero guarantees that fields are written in the same order of the
3173     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3174     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3175         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3176     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3177     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3178 
3179     // Keep this last.
3180     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3181     sync_marker_packet_size_ = packet.Finalize();
3182   }
3183   packets->emplace_back();
3184   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3185 }
3186 
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3187 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3188                                    std::vector<TracePacket>* packets) {
3189   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3190   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3191   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3192   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3193   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3194 }
3195 
GetTraceStats(TracingSession * tracing_session)3196 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3197   TraceStats trace_stats;
3198   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3199   trace_stats.set_producers_seen(last_producer_id_);
3200   trace_stats.set_data_sources_registered(
3201       static_cast<uint32_t>(data_sources_.size()));
3202   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3203   trace_stats.set_tracing_sessions(
3204       static_cast<uint32_t>(tracing_sessions_.size()));
3205   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3206   trace_stats.set_chunks_discarded(chunks_discarded_);
3207   trace_stats.set_patches_discarded(patches_discarded_);
3208   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3209   trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3210   trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3211   trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3212   trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3213 
3214   if (tracing_session->trace_filter) {
3215     auto* filt_stats = trace_stats.mutable_filter_stats();
3216     filt_stats->set_input_packets(tracing_session->filter_input_packets);
3217     filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3218     filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3219     filt_stats->set_errors(tracing_session->filter_errors);
3220   }
3221 
3222   for (BufferID buf_id : tracing_session->buffers_index) {
3223     TraceBuffer* buf = GetBufferByID(buf_id);
3224     if (!buf) {
3225       PERFETTO_DFATAL("Buffer not found.");
3226       continue;
3227     }
3228     *trace_stats.add_buffer_stats() = buf->stats();
3229   }  // for (buf in session).
3230   return trace_stats;
3231 }
3232 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3233 void TracingServiceImpl::MaybeEmitTraceConfig(
3234     TracingSession* tracing_session,
3235     std::vector<TracePacket>* packets) {
3236   if (tracing_session->did_emit_config)
3237     return;
3238   tracing_session->did_emit_config = true;
3239   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3240   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3241   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3242   tracing_session->config.Serialize(packet->set_trace_config());
3243   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3244 }
3245 
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)3246 void TracingServiceImpl::MaybeEmitSystemInfo(
3247     TracingSession* tracing_session,
3248     std::vector<TracePacket>* packets) {
3249   if (tracing_session->did_emit_system_info)
3250     return;
3251   tracing_session->did_emit_system_info = true;
3252   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3253   auto* info = packet->set_system_info();
3254   info->set_tracing_service_version(base::GetVersionString());
3255 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3256     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3257   struct utsname uname_info;
3258   if (uname(&uname_info) == 0) {
3259     auto* utsname_info = info->set_utsname();
3260     utsname_info->set_sysname(uname_info.sysname);
3261     utsname_info->set_version(uname_info.version);
3262     utsname_info->set_machine(uname_info.machine);
3263     utsname_info->set_release(uname_info.release);
3264   }
3265 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3266 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3267   std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3268   if (!fingerprint_value.empty()) {
3269     info->set_android_build_fingerprint(fingerprint_value);
3270   } else {
3271     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3272   }
3273 
3274   std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3275   base::Optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3276   if (sdk_value.has_value()) {
3277     info->set_android_sdk_version(*sdk_value);
3278   } else {
3279     PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3280   }
3281   info->set_hz(sysconf(_SC_CLK_TCK));
3282 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3283   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3284   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3285   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3286 }
3287 
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3288 void TracingServiceImpl::EmitLifecycleEvents(
3289     TracingSession* tracing_session,
3290     std::vector<TracePacket>* packets) {
3291   using TimestampedPacket =
3292       std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3293 
3294   std::vector<TimestampedPacket> timestamped_packets;
3295   for (auto& event : tracing_session->lifecycle_events) {
3296     for (int64_t ts : event.timestamps) {
3297       protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3298       packet->set_timestamp(static_cast<uint64_t>(ts));
3299       packet->set_trusted_uid(static_cast<int32_t>(uid_));
3300       packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3301 
3302       auto* service_event = packet->set_service_event();
3303       service_event->AppendVarInt(event.field_id, 1);
3304       timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3305     }
3306     event.timestamps.clear();
3307   }
3308 
3309   // We sort by timestamp here to ensure that the "sequence" of lifecycle
3310   // packets has monotonic timestamps like other sequences in the trace.
3311   // Note that these events could still be out of order with respect to other
3312   // events on the service packet sequence (e.g. trigger received packets).
3313   std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3314             [](const TimestampedPacket& a, const TimestampedPacket& b) {
3315               return a.first < b.first;
3316             });
3317 
3318   for (const auto& pair : timestamped_packets)
3319     SerializeAndAppendPacket(packets, std::move(pair.second));
3320 }
3321 
EmitSeizedForBugreportLifecycleEvent(std::vector<TracePacket> * packets)3322 void TracingServiceImpl::EmitSeizedForBugreportLifecycleEvent(
3323     std::vector<TracePacket>* packets) {
3324   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3325   packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
3326   packet->set_trusted_uid(static_cast<int32_t>(uid_));
3327   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3328   auto* service_event = packet->set_service_event();
3329   service_event->AppendVarInt(
3330       protos::pbzero::TracingServiceEvent::kSeizedForBugreportFieldNumber, 1);
3331   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3332 }
3333 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3334 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3335     TracingSession* tracing_session,
3336     std::vector<TracePacket>* packets) {
3337   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3338                   tracing_session->received_triggers.size());
3339   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3340        i < tracing_session->received_triggers.size(); ++i) {
3341     const auto& info = tracing_session->received_triggers[i];
3342     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3343     auto* trigger = packet->set_trigger();
3344     trigger->set_trigger_name(info.trigger_name);
3345     trigger->set_producer_name(info.producer_name);
3346     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3347 
3348     packet->set_timestamp(info.boot_time_ns);
3349     packet->set_trusted_uid(static_cast<int32_t>(uid_));
3350     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3351     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3352     ++tracing_session->num_triggers_emitted_into_trace;
3353   }
3354 }
3355 
MaybeSaveTraceForBugreport(std::function<void ()> callback)3356 bool TracingServiceImpl::MaybeSaveTraceForBugreport(
3357     std::function<void()> callback) {
3358   TracingSession* max_session = nullptr;
3359   TracingSessionID max_tsid = 0;
3360   for (auto& session_id_and_session : tracing_sessions_) {
3361     auto& session = session_id_and_session.second;
3362     const int32_t score = session.config.bugreport_score();
3363     // Exclude sessions with 0 (or below) score. By default tracing sessions
3364     // should NOT be eligible to be attached to bugreports.
3365     if (score <= 0 || session.state != TracingSession::STARTED)
3366       continue;
3367 
3368     // Also don't try to steal long traces with write_into_file if their content
3369     // has been already partially written into a file, as we would get partial
3370     // traces on both sides. We can't just copy the original file into the
3371     // bugreport because the file could be too big (GBs) for bugreports.
3372     // The only case where it's legit to steal traces with write_into_file, is
3373     // when the consumer specified a very large write_period_ms (e.g. 24h),
3374     // meaning that this is effectively a ring-buffer trace. Traceur (the
3375     // Android System Tracing app), which uses --detach, does this to have a
3376     // consistent invocation path for long-traces and ring-buffer-mode traces.
3377     if (session.write_into_file && session.bytes_written_into_file > 0)
3378       continue;
3379 
3380     // If we are already in the process of finalizing another trace for
3381     // bugreport, don't even start another one, as they would try to write onto
3382     // the same file.
3383     if (session.on_disable_callback_for_bugreport)
3384       return false;
3385 
3386     if (!max_session || score > max_session->config.bugreport_score()) {
3387       max_session = &session;
3388       max_tsid = session_id_and_session.first;
3389     }
3390   }
3391 
3392   // No eligible trace found.
3393   if (!max_session)
3394     return false;
3395 
3396   PERFETTO_LOG("Seizing trace for bugreport. tsid:%" PRIu64
3397                " state:%d wf:%d score:%d name:\"%s\"",
3398                max_tsid, max_session->state, !!max_session->write_into_file,
3399                max_session->config.bugreport_score(),
3400                max_session->config.unique_session_name().c_str());
3401 
3402   auto br_fd = CreateTraceFile(GetBugreportTmpPath(), /*overwrite=*/true);
3403   if (!br_fd)
3404     return false;
3405 
3406   if (max_session->write_into_file) {
3407     auto fd = *max_session->write_into_file;
3408     // If we are stealing a write_into_file session, add a marker that explains
3409     // why the trace has been stolen rather than creating an empty file. This is
3410     // only for write_into_file traces. A similar code path deals with the case
3411     // of reading-back a seized trace from IPC in ReadBuffersIntoConsumer().
3412     if (!max_session->config.builtin_data_sources().disable_service_events()) {
3413       std::vector<TracePacket> packets;
3414       EmitSeizedForBugreportLifecycleEvent(&packets);
3415       for (auto& packet : packets) {
3416         char* preamble;
3417         size_t preamble_size = 0;
3418         std::tie(preamble, preamble_size) = packet.GetProtoPreamble();
3419         base::WriteAll(fd, preamble, preamble_size);
3420         for (const Slice& slice : packet.slices()) {
3421           base::WriteAll(fd, slice.start, slice.size);
3422         }
3423       }  // for (packets)
3424     }    // if (!disable_service_events())
3425   }      // if (max_session->write_into_file)
3426   max_session->write_into_file = std::move(br_fd);
3427   max_session->on_disable_callback_for_bugreport = std::move(callback);
3428   max_session->seized_for_bugreport = true;
3429 
3430   // Post a task to avoid that early FlushAndDisableTracing() failures invoke
3431   // the callback before we return. That would re-enter in a weird way the
3432   // callstack of the calling ConsumerEndpointImpl::SaveTraceForBugreport().
3433   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3434   task_runner_->PostTask([weak_this, max_tsid] {
3435     if (weak_this)
3436       weak_this->FlushAndDisableTracing(max_tsid);
3437   });
3438   return true;
3439 }
3440 
MaybeLogUploadEvent(const TraceConfig & cfg,PerfettoStatsdAtom atom,const std::string & trigger_name)3441 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3442                                              PerfettoStatsdAtom atom,
3443                                              const std::string& trigger_name) {
3444   if (!ShouldLogEvent(cfg))
3445     return;
3446 
3447   // If the UUID is not set for some reason, don't log anything.
3448   if (cfg.trace_uuid_lsb() == 0 && cfg.trace_uuid_msb() == 0)
3449     return;
3450 
3451   android_stats::MaybeLogUploadEvent(atom, cfg.trace_uuid_lsb(),
3452                                      cfg.trace_uuid_msb(), trigger_name);
3453 }
3454 
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3455 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3456                                               PerfettoTriggerAtom atom,
3457                                               const std::string& trigger_name) {
3458   if (!ShouldLogEvent(cfg))
3459     return;
3460   android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3461 }
3462 
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3463 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3464     int64_t now_ns,
3465     uint64_t trigger_name_hash) {
3466   PERFETTO_DCHECK(
3467       std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3468   size_t remove_count = 0;
3469   size_t trigger_count = 0;
3470   for (const TriggerHistory& h : trigger_history_) {
3471     if (h.timestamp_ns < now_ns - trigger_window_ns_) {
3472       remove_count++;
3473     } else if (h.name_hash == trigger_name_hash) {
3474       trigger_count++;
3475     }
3476   }
3477   trigger_history_.erase_front(remove_count);
3478   return trigger_count;
3479 }
3480 
3481 ////////////////////////////////////////////////////////////////////////////////
3482 // TracingServiceImpl::ConsumerEndpointImpl implementation
3483 ////////////////////////////////////////////////////////////////////////////////
3484 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)3485 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
3486     TracingServiceImpl* service,
3487     base::TaskRunner* task_runner,
3488     Consumer* consumer,
3489     uid_t uid)
3490     : task_runner_(task_runner),
3491       service_(service),
3492       consumer_(consumer),
3493       uid_(uid),
3494       weak_ptr_factory_(this) {}
3495 
~ConsumerEndpointImpl()3496 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
3497   service_->DisconnectConsumer(this);
3498   consumer_->OnDisconnect();
3499 }
3500 
NotifyOnTracingDisabled(const std::string & error)3501 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
3502     const std::string& error) {
3503   PERFETTO_DCHECK_THREAD(thread_checker_);
3504   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3505   task_runner_->PostTask([weak_this, error /* deliberate copy */] {
3506     if (weak_this)
3507       weak_this->consumer_->OnTracingDisabled(error);
3508   });
3509 }
3510 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)3511 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
3512     const TraceConfig& cfg,
3513     base::ScopedFile fd) {
3514   PERFETTO_DCHECK_THREAD(thread_checker_);
3515   auto status = service_->EnableTracing(this, cfg, std::move(fd));
3516   if (!status.ok())
3517     NotifyOnTracingDisabled(status.message());
3518 }
3519 
ChangeTraceConfig(const TraceConfig & cfg)3520 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
3521     const TraceConfig& cfg) {
3522   if (!tracing_session_id_) {
3523     PERFETTO_LOG(
3524         "Consumer called ChangeTraceConfig() but tracing was "
3525         "not active");
3526     return;
3527   }
3528   service_->ChangeTraceConfig(this, cfg);
3529 }
3530 
StartTracing()3531 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
3532   PERFETTO_DCHECK_THREAD(thread_checker_);
3533   if (!tracing_session_id_) {
3534     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
3535     return;
3536   }
3537   service_->StartTracing(tracing_session_id_);
3538 }
3539 
DisableTracing()3540 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
3541   PERFETTO_DCHECK_THREAD(thread_checker_);
3542   if (!tracing_session_id_) {
3543     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
3544     return;
3545   }
3546   service_->DisableTracing(tracing_session_id_);
3547 }
3548 
ReadBuffers()3549 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
3550   PERFETTO_DCHECK_THREAD(thread_checker_);
3551   if (!tracing_session_id_) {
3552     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
3553     consumer_->OnTraceData({}, /* has_more = */ false);
3554     return;
3555   }
3556   if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
3557     consumer_->OnTraceData({}, /* has_more = */ false);
3558   }
3559 }
3560 
FreeBuffers()3561 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
3562   PERFETTO_DCHECK_THREAD(thread_checker_);
3563   if (!tracing_session_id_) {
3564     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
3565     return;
3566   }
3567   service_->FreeBuffers(tracing_session_id_);
3568   tracing_session_id_ = 0;
3569 }
3570 
Flush(uint32_t timeout_ms,FlushCallback callback)3571 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
3572                                                      FlushCallback callback) {
3573   PERFETTO_DCHECK_THREAD(thread_checker_);
3574   if (!tracing_session_id_) {
3575     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
3576     return;
3577   }
3578   service_->Flush(tracing_session_id_, timeout_ms, callback);
3579 }
3580 
Detach(const std::string & key)3581 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
3582   PERFETTO_DCHECK_THREAD(thread_checker_);
3583   bool success = service_->DetachConsumer(this, key);
3584   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3585   task_runner_->PostTask([weak_this, success] {
3586     if (weak_this)
3587       weak_this->consumer_->OnDetach(success);
3588   });
3589 }
3590 
Attach(const std::string & key)3591 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
3592   PERFETTO_DCHECK_THREAD(thread_checker_);
3593   bool success = service_->AttachConsumer(this, key);
3594   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3595   task_runner_->PostTask([weak_this, success] {
3596     if (!weak_this)
3597       return;
3598     Consumer* consumer = weak_this->consumer_;
3599     TracingSession* session =
3600         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
3601     if (!session) {
3602       consumer->OnAttach(false, TraceConfig());
3603       return;
3604     }
3605     consumer->OnAttach(success, session->config);
3606   });
3607 }
3608 
GetTraceStats()3609 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
3610   PERFETTO_DCHECK_THREAD(thread_checker_);
3611   bool success = false;
3612   TraceStats stats;
3613   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3614   if (session) {
3615     success = true;
3616     stats = service_->GetTraceStats(session);
3617   }
3618   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3619   task_runner_->PostTask([weak_this, success, stats] {
3620     if (weak_this)
3621       weak_this->consumer_->OnTraceStats(success, stats);
3622   });
3623 }
3624 
ObserveEvents(uint32_t events_mask)3625 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
3626     uint32_t events_mask) {
3627   PERFETTO_DCHECK_THREAD(thread_checker_);
3628   observable_events_mask_ = events_mask;
3629   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3630   if (!session)
3631     return;
3632 
3633   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
3634     // Issue initial states.
3635     for (const auto& kv : session->data_source_instances) {
3636       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
3637       PERFETTO_DCHECK(producer);
3638       OnDataSourceInstanceStateChange(*producer, kv.second);
3639     }
3640   }
3641 
3642   // If the ObserveEvents() call happens after data sources have acked already
3643   // notify immediately.
3644   if (observable_events_mask_ &
3645       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
3646     service_->MaybeNotifyAllDataSourcesStarted(session);
3647   }
3648 }
3649 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)3650 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
3651     const ProducerEndpointImpl& producer,
3652     const DataSourceInstance& instance) {
3653   if (!(observable_events_mask_ &
3654         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
3655     return;
3656   }
3657 
3658   if (instance.state != DataSourceInstance::CONFIGURED &&
3659       instance.state != DataSourceInstance::STARTED &&
3660       instance.state != DataSourceInstance::STOPPED) {
3661     return;
3662   }
3663 
3664   auto* observable_events = AddObservableEvents();
3665   auto* change = observable_events->add_instance_state_changes();
3666   change->set_producer_name(producer.name_);
3667   change->set_data_source_name(instance.data_source_name);
3668   if (instance.state == DataSourceInstance::STARTED) {
3669     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
3670   } else {
3671     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
3672   }
3673 }
3674 
OnAllDataSourcesStarted()3675 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
3676   if (!(observable_events_mask_ &
3677         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
3678     return;
3679   }
3680   auto* observable_events = AddObservableEvents();
3681   observable_events->set_all_data_sources_started(true);
3682 }
3683 
3684 ObservableEvents*
AddObservableEvents()3685 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
3686   PERFETTO_DCHECK_THREAD(thread_checker_);
3687   if (!observable_events_) {
3688     observable_events_.reset(new ObservableEvents());
3689     auto weak_this = weak_ptr_factory_.GetWeakPtr();
3690     task_runner_->PostTask([weak_this] {
3691       if (!weak_this)
3692         return;
3693 
3694       // Move into a temporary to allow reentrancy in OnObservableEvents.
3695       auto observable_events = std::move(weak_this->observable_events_);
3696       weak_this->consumer_->OnObservableEvents(*observable_events);
3697     });
3698   }
3699   return observable_events_.get();
3700 }
3701 
QueryServiceState(QueryServiceStateCallback callback)3702 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
3703     QueryServiceStateCallback callback) {
3704   PERFETTO_DCHECK_THREAD(thread_checker_);
3705   TracingServiceState svc_state;
3706 
3707   const auto& sessions = service_->tracing_sessions_;
3708   svc_state.set_tracing_service_version(base::GetVersionString());
3709   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
3710 
3711   int num_started = 0;
3712   for (const auto& kv : sessions)
3713     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
3714   svc_state.set_num_sessions_started(static_cast<int>(num_started));
3715 
3716   for (const auto& kv : service_->producers_) {
3717     auto* producer = svc_state.add_producers();
3718     producer->set_id(static_cast<int>(kv.first));
3719     producer->set_name(kv.second->name_);
3720     producer->set_sdk_version(kv.second->sdk_version_);
3721     producer->set_uid(static_cast<int32_t>(kv.second->uid()));
3722     producer->set_pid(static_cast<int32_t>(kv.second->pid()));
3723   }
3724 
3725   for (const auto& kv : service_->data_sources_) {
3726     const auto& registered_data_source = kv.second;
3727     auto* data_source = svc_state.add_data_sources();
3728     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
3729     data_source->set_producer_id(
3730         static_cast<int>(registered_data_source.producer_id));
3731   }
3732 
3733   svc_state.set_supports_tracing_sessions(true);
3734   for (const auto& kv : service_->tracing_sessions_) {
3735     const TracingSession& s = kv.second;
3736     // List only tracing sessions for the calling UID (or everything for root).
3737     if (uid_ != 0 && uid_ != s.consumer_uid)
3738       continue;
3739     auto* session = svc_state.add_tracing_sessions();
3740     session->set_id(s.id);
3741     session->set_consumer_uid(static_cast<int>(s.consumer_uid));
3742     session->set_duration_ms(s.config.duration_ms());
3743     session->set_num_data_sources(
3744         static_cast<uint32_t>(s.data_source_instances.size()));
3745     session->set_unique_session_name(s.config.unique_session_name());
3746     for (const auto& snap_kv : s.initial_clock_snapshot) {
3747       if (snap_kv.first == protos::pbzero::BUILTIN_CLOCK_REALTIME)
3748         session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.second));
3749     }
3750     for (const auto& buf : s.config.buffers())
3751       session->add_buffer_size_kb(buf.size_kb());
3752 
3753     switch (s.state) {
3754       case TracingSession::State::DISABLED:
3755         session->set_state("DISABLED");
3756         break;
3757       case TracingSession::State::CONFIGURED:
3758         session->set_state("CONFIGURED");
3759         break;
3760       case TracingSession::State::STARTED:
3761         session->set_state("STARTED");
3762         break;
3763       case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
3764         session->set_state("STOP_WAIT");
3765         break;
3766     }
3767   }
3768   callback(/*success=*/true, svc_state);
3769 }
3770 
QueryCapabilities(QueryCapabilitiesCallback callback)3771 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
3772     QueryCapabilitiesCallback callback) {
3773   PERFETTO_DCHECK_THREAD(thread_checker_);
3774   TracingServiceCapabilities caps;
3775   caps.set_has_query_capabilities(true);
3776   caps.set_has_trace_config_output_path(true);
3777   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
3778   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
3779   static_assert(ObservableEvents::Type_MAX ==
3780                     ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED,
3781                 "");
3782   callback(caps);
3783 }
3784 
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)3785 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
3786     SaveTraceForBugreportCallback consumer_callback) {
3787   PERFETTO_DCHECK_THREAD(thread_checker_);
3788   auto on_complete_callback = [consumer_callback] {
3789     if (rename(GetBugreportTmpPath().c_str(), GetBugreportPath().c_str())) {
3790       consumer_callback(false, "rename(" + GetBugreportTmpPath() + ", " +
3791                                    GetBugreportPath() + ") failed (" +
3792                                    strerror(errno) + ")");
3793     } else {
3794       consumer_callback(true, GetBugreportPath());
3795     }
3796   };
3797   if (!service_->MaybeSaveTraceForBugreport(std::move(on_complete_callback))) {
3798     consumer_callback(false,
3799                       "No trace with TraceConfig.bugreport_score > 0 eligible "
3800                       "for bug reporting was found");
3801   }
3802 }
3803 
3804 ////////////////////////////////////////////////////////////////////////////////
3805 // TracingServiceImpl::ProducerEndpointImpl implementation
3806 ////////////////////////////////////////////////////////////////////////////////
3807 
ProducerEndpointImpl(ProducerID id,uid_t uid,pid_t pid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)3808 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
3809     ProducerID id,
3810     uid_t uid,
3811     pid_t pid,
3812     TracingServiceImpl* service,
3813     base::TaskRunner* task_runner,
3814     Producer* producer,
3815     const std::string& producer_name,
3816     const std::string& sdk_version,
3817     bool in_process,
3818     bool smb_scraping_enabled)
3819     : id_(id),
3820       uid_(uid),
3821       pid_(pid),
3822       service_(service),
3823       task_runner_(task_runner),
3824       producer_(producer),
3825       name_(producer_name),
3826       sdk_version_(sdk_version),
3827       in_process_(in_process),
3828       smb_scraping_enabled_(smb_scraping_enabled),
3829       weak_ptr_factory_(this) {}
3830 
~ProducerEndpointImpl()3831 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
3832   service_->DisconnectProducer(id_);
3833   producer_->OnDisconnect();
3834 }
3835 
RegisterDataSource(const DataSourceDescriptor & desc)3836 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
3837     const DataSourceDescriptor& desc) {
3838   PERFETTO_DCHECK_THREAD(thread_checker_);
3839   service_->RegisterDataSource(id_, desc);
3840 }
3841 
UpdateDataSource(const DataSourceDescriptor & desc)3842 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
3843     const DataSourceDescriptor& desc) {
3844   PERFETTO_DCHECK_THREAD(thread_checker_);
3845   service_->UpdateDataSource(id_, desc);
3846 }
3847 
UnregisterDataSource(const std::string & name)3848 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
3849     const std::string& name) {
3850   PERFETTO_DCHECK_THREAD(thread_checker_);
3851   service_->UnregisterDataSource(id_, name);
3852 }
3853 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)3854 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
3855     uint32_t writer_id,
3856     uint32_t target_buffer) {
3857   PERFETTO_DCHECK_THREAD(thread_checker_);
3858   writers_[static_cast<WriterID>(writer_id)] =
3859       static_cast<BufferID>(target_buffer);
3860 }
3861 
UnregisterTraceWriter(uint32_t writer_id)3862 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
3863     uint32_t writer_id) {
3864   PERFETTO_DCHECK_THREAD(thread_checker_);
3865   writers_.erase(static_cast<WriterID>(writer_id));
3866 }
3867 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)3868 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
3869     const CommitDataRequest& req_untrusted,
3870     CommitDataCallback callback) {
3871   PERFETTO_DCHECK_THREAD(thread_checker_);
3872 
3873   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
3874     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
3875                                EncodeCommitDataRequest(id_, req_untrusted));
3876   }
3877 
3878   if (!shared_memory_) {
3879     PERFETTO_DLOG(
3880         "Attempted to commit data before the shared memory was allocated.");
3881     return;
3882   }
3883   PERFETTO_DCHECK(shmem_abi_.is_valid());
3884   for (const auto& entry : req_untrusted.chunks_to_move()) {
3885     const uint32_t page_idx = entry.page();
3886     if (page_idx >= shmem_abi_.num_pages())
3887       continue;  // A buggy or malicious producer.
3888 
3889     SharedMemoryABI::Chunk chunk =
3890         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
3891     if (!chunk.is_valid()) {
3892       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
3893                     entry.page(), entry.chunk());
3894       continue;
3895     }
3896 
3897     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
3898     // the ABI contract expects the producer to not touch the chunk anymore
3899     // (until the service marks that as free). This is why all the reads below
3900     // are just memory_order_relaxed. Also, the code here assumes that all this
3901     // data can be malicious and just gives up if anything is malformed.
3902     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
3903     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
3904     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
3905     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
3906     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
3907     uint16_t num_fragments = packets.count;
3908     uint8_t chunk_flags = packets.flags;
3909 
3910     service_->CopyProducerPageIntoLogBuffer(
3911         id_, uid_, pid_, writer_id, chunk_id, buffer_id, num_fragments,
3912         chunk_flags,
3913         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
3914 
3915     // This one has release-store semantics.
3916     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
3917   }  // for(chunks_to_move)
3918 
3919   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
3920 
3921   if (req_untrusted.flush_request_id()) {
3922     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
3923   }
3924 
3925   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
3926   // callback being invoked within the same callstack and not posted. If this
3927   // changes, the code there needs to be changed accordingly.
3928   if (callback)
3929     callback();
3930 }
3931 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)3932 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
3933     std::unique_ptr<SharedMemory> shared_memory,
3934     size_t page_size_bytes,
3935     bool provided_by_producer) {
3936   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
3937   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
3938 
3939   shared_memory_ = std::move(shared_memory);
3940   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
3941   is_shmem_provided_by_producer_ = provided_by_producer;
3942 
3943   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
3944                         shared_memory_->size(),
3945                         shared_buffer_page_size_kb() * 1024);
3946   if (in_process_) {
3947     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
3948         shared_memory_->start(), shared_memory_->size(),
3949         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
3950     inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
3951   }
3952 
3953   OnTracingSetup();
3954   service_->UpdateMemoryGuardrail();
3955 }
3956 
shared_memory() const3957 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
3958   PERFETTO_DCHECK_THREAD(thread_checker_);
3959   return shared_memory_.get();
3960 }
3961 
shared_buffer_page_size_kb() const3962 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
3963     const {
3964   return shared_buffer_page_size_kb_;
3965 }
3966 
ActivateTriggers(const std::vector<std::string> & triggers)3967 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
3968     const std::vector<std::string>& triggers) {
3969   service_->ActivateTriggers(id_, triggers);
3970 }
3971 
StopDataSource(DataSourceInstanceID ds_inst_id)3972 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
3973     DataSourceInstanceID ds_inst_id) {
3974   // TODO(primiano): When we'll support tearing down the SMB, at this point we
3975   // should send the Producer a TearDownTracing if all its data sources have
3976   // been disabled (see b/77532839 and aosp/655179 PS1).
3977   PERFETTO_DCHECK_THREAD(thread_checker_);
3978   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3979   task_runner_->PostTask([weak_this, ds_inst_id] {
3980     if (weak_this)
3981       weak_this->producer_->StopDataSource(ds_inst_id);
3982   });
3983 }
3984 
3985 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()3986 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
3987   if (!inproc_shmem_arbiter_) {
3988     PERFETTO_FATAL(
3989         "The in-process SharedMemoryArbiter can only be used when "
3990         "CreateProducer has been called with in_process=true and after tracing "
3991         "has started.");
3992   }
3993 
3994   PERFETTO_DCHECK(in_process_);
3995   return inproc_shmem_arbiter_.get();
3996 }
3997 
IsShmemProvidedByProducer() const3998 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
3999     const {
4000   return is_shmem_provided_by_producer_;
4001 }
4002 
4003 // Can be called on any thread.
4004 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4005 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4006     BufferID buf_id,
4007     BufferExhaustedPolicy buffer_exhausted_policy) {
4008   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4009   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4010                                                        buffer_exhausted_policy);
4011 }
4012 
NotifyFlushComplete(FlushRequestID id)4013 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4014     FlushRequestID id) {
4015   PERFETTO_DCHECK_THREAD(thread_checker_);
4016   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4017   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4018 }
4019 
OnTracingSetup()4020 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4021   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4022   task_runner_->PostTask([weak_this] {
4023     if (weak_this)
4024       weak_this->producer_->OnTracingSetup();
4025   });
4026 }
4027 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)4028 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4029     FlushRequestID flush_request_id,
4030     const std::vector<DataSourceInstanceID>& data_sources) {
4031   PERFETTO_DCHECK_THREAD(thread_checker_);
4032   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4033   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
4034     if (weak_this) {
4035       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
4036                                   data_sources.size());
4037     }
4038   });
4039 }
4040 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4041 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4042     DataSourceInstanceID ds_id,
4043     const DataSourceConfig& config) {
4044   PERFETTO_DCHECK_THREAD(thread_checker_);
4045   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4046   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4047   task_runner_->PostTask([weak_this, ds_id, config] {
4048     if (weak_this)
4049       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
4050   });
4051 }
4052 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4053 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4054     DataSourceInstanceID ds_id,
4055     const DataSourceConfig& config) {
4056   PERFETTO_DCHECK_THREAD(thread_checker_);
4057   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4058   task_runner_->PostTask([weak_this, ds_id, config] {
4059     if (weak_this)
4060       weak_this->producer_->StartDataSource(ds_id, std::move(config));
4061   });
4062 }
4063 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4064 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4065     DataSourceInstanceID data_source_id) {
4066   PERFETTO_DCHECK_THREAD(thread_checker_);
4067   service_->NotifyDataSourceStarted(id_, data_source_id);
4068 }
4069 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4070 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4071     DataSourceInstanceID data_source_id) {
4072   PERFETTO_DCHECK_THREAD(thread_checker_);
4073   service_->NotifyDataSourceStopped(id_, data_source_id);
4074 }
4075 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4076 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4077     const std::vector<BufferID>& target_buffers) {
4078   if (allowed_target_buffers_.empty())
4079     return;
4080   for (BufferID buffer : target_buffers)
4081     allowed_target_buffers_.erase(buffer);
4082 }
4083 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4084 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4085     const std::vector<DataSourceInstanceID>& data_sources) {
4086   PERFETTO_DCHECK_THREAD(thread_checker_);
4087   auto weak_this = weak_ptr_factory_.GetWeakPtr();
4088   task_runner_->PostTask([weak_this, data_sources] {
4089     if (weak_this) {
4090       base::StringView producer_name(weak_this->name_);
4091       weak_this->producer_->ClearIncrementalState(data_sources.data(),
4092                                                   data_sources.size());
4093     }
4094   });
4095 }
4096 
Sync(std::function<void ()> callback)4097 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4098     std::function<void()> callback) {
4099   task_runner_->PostTask(callback);
4100 }
4101 
4102 ////////////////////////////////////////////////////////////////////////////////
4103 // TracingServiceImpl::TracingSession implementation
4104 ////////////////////////////////////////////////////////////////////////////////
4105 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4106 TracingServiceImpl::TracingSession::TracingSession(
4107     TracingSessionID session_id,
4108     ConsumerEndpointImpl* consumer,
4109     const TraceConfig& new_config,
4110     base::TaskRunner* task_runner)
4111     : id(session_id),
4112       consumer_maybe_null(consumer),
4113       consumer_uid(consumer->uid_),
4114       config(new_config),
4115       snapshot_periodic_task(task_runner) {
4116   // all_data_sources_flushed is special because we store up to 64 events of
4117   // this type. Other events will go through the default case in
4118   // SnapshotLifecycleEvent() where they will be given a max history of 1.
4119   lifecycle_events.emplace_back(
4120       protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
4121       64 /* max_size */);
4122 }
4123 
4124 }  // namespace perfetto
4125