1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/service/tracing_service_impl.h"
18
19 #include <limits.h>
20 #include <string.h>
21
22 #include <cinttypes>
23 #include <cstdint>
24 #include <limits>
25 #include <optional>
26 #include <regex>
27 #include <string>
28 #include <unordered_set>
29 #include "perfetto/base/time.h"
30 #include "perfetto/ext/tracing/core/client_identity.h"
31 #include "perfetto/tracing/core/clock_snapshots.h"
32
33 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
34 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
35 #include <sys/uio.h>
36 #include <sys/utsname.h>
37 #include <unistd.h>
38 #endif
39
40 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
41 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
42 #include "src/android_internal/lazy_library_loader.h" // nogncheck
43 #include "src/android_internal/tracing_service_proxy.h" // nogncheck
44 #endif
45
46 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
47 PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
48 PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
49 #define PERFETTO_HAS_CHMOD
50 #include <sys/stat.h>
51 #endif
52
53 #include <algorithm>
54
55 #include "perfetto/base/build_config.h"
56 #include "perfetto/base/status.h"
57 #include "perfetto/base/task_runner.h"
58 #include "perfetto/ext/base/android_utils.h"
59 #include "perfetto/ext/base/file_utils.h"
60 #include "perfetto/ext/base/metatrace.h"
61 #include "perfetto/ext/base/string_utils.h"
62 #include "perfetto/ext/base/string_view.h"
63 #include "perfetto/ext/base/sys_types.h"
64 #include "perfetto/ext/base/utils.h"
65 #include "perfetto/ext/base/uuid.h"
66 #include "perfetto/ext/base/version.h"
67 #include "perfetto/ext/base/watchdog.h"
68 #include "perfetto/ext/tracing/core/basic_types.h"
69 #include "perfetto/ext/tracing/core/consumer.h"
70 #include "perfetto/ext/tracing/core/observable_events.h"
71 #include "perfetto/ext/tracing/core/producer.h"
72 #include "perfetto/ext/tracing/core/shared_memory.h"
73 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
74 #include "perfetto/ext/tracing/core/trace_packet.h"
75 #include "perfetto/ext/tracing/core/trace_writer.h"
76 #include "perfetto/protozero/scattered_heap_buffer.h"
77 #include "perfetto/protozero/static_buffer.h"
78 #include "perfetto/tracing/core/data_source_descriptor.h"
79 #include "perfetto/tracing/core/tracing_service_capabilities.h"
80 #include "perfetto/tracing/core/tracing_service_state.h"
81 #include "src/android_stats/statsd_logging_helper.h"
82 #include "src/protozero/filtering/message_filter.h"
83 #include "src/protozero/filtering/string_filter.h"
84 #include "src/tracing/core/shared_memory_arbiter_impl.h"
85 #include "src/tracing/service/packet_stream_validator.h"
86 #include "src/tracing/service/trace_buffer.h"
87
88 #include "protos/perfetto/common/builtin_clock.gen.h"
89 #include "protos/perfetto/common/builtin_clock.pbzero.h"
90 #include "protos/perfetto/common/trace_stats.pbzero.h"
91 #include "protos/perfetto/config/trace_config.pbzero.h"
92 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
93 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
94 #include "protos/perfetto/trace/remote_clock_sync.pbzero.h"
95 #include "protos/perfetto/trace/system_info.pbzero.h"
96 #include "protos/perfetto/trace/trace_packet.pbzero.h"
97 #include "protos/perfetto/trace/trace_uuid.pbzero.h"
98 #include "protos/perfetto/trace/trigger.pbzero.h"
99
100 // General note: this class must assume that Producers are malicious and will
101 // try to crash / exploit this class. We can trust pointers because they come
102 // from the IPC layer, but we should never assume that that the producer calls
103 // come in the right order or their arguments are sane / within bounds.
104
105 // This is a macro because we want the call-site line number for the ELOG.
106 #define PERFETTO_SVC_ERR(...) \
107 (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
108
109 namespace perfetto {
110
111 namespace {
112 constexpr int kMaxBuffersPerConsumer = 128;
113 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
114 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
115 constexpr int kMaxConcurrentTracingSessions = 15;
116 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
117 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
118 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
119
120 constexpr uint32_t kMillisPerHour = 3600000;
121 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
122 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
123
124 // These apply only if enable_extra_guardrails is true.
125 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
126 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
127
128 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
129 struct iovec {
130 void* iov_base; // Address
131 size_t iov_len; // Block size
132 };
133
134 // Simple implementation of writev. Note that this does not give the atomicity
135 // guarantees of a real writev, but we don't depend on these (we aren't writing
136 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)137 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
138 ssize_t total_size = 0;
139 for (int i = 0; i < iovcnt; ++i) {
140 ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
141 if (current_size != static_cast<ssize_t>(iov[i].iov_len))
142 return -1;
143 total_size += current_size;
144 }
145 return total_size;
146 }
147
148 #define IOV_MAX 1024 // Linux compatible limit.
149
150 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
151 // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
152
153 // Partially encodes a CommitDataRequest in an int32 for the purposes of
154 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
155 // (which is technically 16 bits wide).
156 //
157 // Format (by bit range):
158 // [ 31 ][ 30 ][ 29:20 ][ 19:10 ][ 9:0]
159 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)160 int32_t EncodeCommitDataRequest(ProducerID producer_id,
161 const CommitDataRequest& req_untrusted) {
162 uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
163 uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
164 uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
165
166 uint32_t mask = (1 << 10) - 1;
167 uint32_t acc = 0;
168 acc |= has_flush_id << 30;
169 acc |= (cpatch & mask) << 20;
170 acc |= (cmov & mask) << 10;
171 acc |= (producer_id & mask);
172 return static_cast<int32_t>(acc);
173 }
174
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)175 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
176 std::vector<uint8_t> packet) {
177 Slice slice = Slice::Allocate(packet.size());
178 memcpy(slice.own_data(), packet.data(), packet.size());
179 packets->emplace_back();
180 packets->back().AddSlice(std::move(slice));
181 }
182
EnsureValidShmSizes(size_t shm_size,size_t page_size)183 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
184 size_t shm_size,
185 size_t page_size) {
186 // Theoretically the max page size supported by the ABI is 64KB.
187 // However, the current implementation of TraceBuffer (the non-shared
188 // userspace buffer where the service copies data) supports at most
189 // 32K. Setting 64K "works" from the producer<>consumer viewpoint
190 // but then causes the data to be discarded when copying it into
191 // TraceBuffer.
192 constexpr size_t kMaxPageSize = 32 * 1024;
193 static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
194
195 if (page_size == 0)
196 page_size = TracingServiceImpl::kDefaultShmPageSize;
197 if (shm_size == 0)
198 shm_size = TracingServiceImpl::kDefaultShmSize;
199
200 page_size = std::min<size_t>(page_size, kMaxPageSize);
201 shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
202
203 // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
204 // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
205 // matter here, because the tracing page size is just a logical partitioning
206 // and does not have any dependencies on kernel mm syscalls (read: it's fine
207 // to have trace page sizes of 4K on a system where the kernel page size is
208 // 16K).
209 bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
210 page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
211
212 // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
213 size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
214 page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
215
216 if (!page_size_is_valid || shm_size < page_size ||
217 shm_size % page_size != 0) {
218 return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
219 TracingServiceImpl::kDefaultShmPageSize);
220 }
221 return std::make_tuple(shm_size, page_size);
222 }
223
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)224 bool NameMatchesFilter(const std::string& name,
225 const std::vector<std::string>& name_filter,
226 const std::vector<std::string>& name_regex_filter) {
227 bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
228 if (!filter_is_set)
229 return true;
230 bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
231 name) != name_filter.end();
232 bool filter_regex_matches =
233 std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
234 [&](const std::string& regex) {
235 return std::regex_match(
236 name, std::regex(regex, std::regex::extended));
237 }) != name_regex_filter.end();
238 return filter_matches || filter_regex_matches;
239 }
240
241 // Used when TraceConfig.write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path,bool overwrite)242 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
243 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
244 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
245 // This is NOT trying to preserve any security property, SELinux does that.
246 // It just improves the actionability of the error when people try to save the
247 // trace in a location that is not SELinux-allowed (a generic "permission
248 // denied" vs "don't put it here, put it there").
249 // These are the only SELinux approved dir for trace files that are created
250 // directly by traced.
251 static const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
252 if (!base::StartsWith(path, kTraceDirBasePath)) {
253 PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
254 path.c_str(), kTraceDirBasePath);
255 return base::ScopedFile();
256 }
257 #endif
258 // O_CREAT | O_EXCL will fail if the file exists already.
259 const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
260 auto fd = base::OpenFile(path, flags, 0600);
261 if (fd) {
262 #if defined(PERFETTO_HAS_CHMOD)
263 // Passing 0644 directly above won't work because of umask.
264 PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
265 #endif
266 } else {
267 PERFETTO_PLOG("Failed to create %s", path.c_str());
268 }
269 return fd;
270 }
271
ShouldLogEvent(const TraceConfig & cfg)272 bool ShouldLogEvent(const TraceConfig& cfg) {
273 switch (cfg.statsd_logging()) {
274 case TraceConfig::STATSD_LOGGING_ENABLED:
275 return true;
276 case TraceConfig::STATSD_LOGGING_DISABLED:
277 return false;
278 case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
279 break;
280 }
281 // For backward compatibility with older versions of perfetto_cmd.
282 return cfg.enable_extra_guardrails();
283 }
284
285 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
286 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)287 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
288 size_t size,
289 size_t max_slice_size,
290 perfetto::TracePacket* packet) {
291 if (size <= max_slice_size) {
292 packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
293 return;
294 }
295 uint8_t* src_ptr = data.get();
296 for (size_t size_left = size; size_left > 0;) {
297 const size_t slice_size = std::min(size_left, max_slice_size);
298
299 Slice slice = Slice::Allocate(slice_size);
300 memcpy(slice.own_data(), src_ptr, slice_size);
301 packet->AddSlice(std::move(slice));
302
303 src_ptr += slice_size;
304 size_left -= slice_size;
305 }
306 }
307
308 using TraceFilter = protos::gen::TraceConfig::TraceFilter;
ConvertPolicy(TraceFilter::StringFilterPolicy policy)309 std::optional<protozero::StringFilter::Policy> ConvertPolicy(
310 TraceFilter::StringFilterPolicy policy) {
311 switch (policy) {
312 case TraceFilter::SFP_UNSPECIFIED:
313 return std::nullopt;
314 case TraceFilter::SFP_MATCH_REDACT_GROUPS:
315 return protozero::StringFilter::Policy::kMatchRedactGroups;
316 case TraceFilter::SFP_ATRACE_MATCH_REDACT_GROUPS:
317 return protozero::StringFilter::Policy::kAtraceMatchRedactGroups;
318 case TraceFilter::SFP_MATCH_BREAK:
319 return protozero::StringFilter::Policy::kMatchBreak;
320 case TraceFilter::SFP_ATRACE_MATCH_BREAK:
321 return protozero::StringFilter::Policy::kAtraceMatchBreak;
322 case TraceFilter::SFP_ATRACE_REPEATED_SEARCH_REDACT_GROUPS:
323 return protozero::StringFilter::Policy::kAtraceRepeatedSearchRedactGroups;
324 }
325 return std::nullopt;
326 }
327
328 } // namespace
329
330 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)331 std::unique_ptr<TracingService> TracingService::CreateInstance(
332 std::unique_ptr<SharedMemory::Factory> shm_factory,
333 base::TaskRunner* task_runner,
334 InitOpts init_opts) {
335 return std::unique_ptr<TracingService>(
336 new TracingServiceImpl(std::move(shm_factory), task_runner, init_opts));
337 }
338
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner,InitOpts init_opts)339 TracingServiceImpl::TracingServiceImpl(
340 std::unique_ptr<SharedMemory::Factory> shm_factory,
341 base::TaskRunner* task_runner,
342 InitOpts init_opts)
343 : task_runner_(task_runner),
344 init_opts_(init_opts),
345 shm_factory_(std::move(shm_factory)),
346 uid_(base::GetCurrentUserId()),
347 buffer_ids_(kMaxTraceBufferID),
348 trigger_probability_rand_(
349 static_cast<uint32_t>(base::GetWallTimeNs().count())),
350 weak_ptr_factory_(this) {
351 PERFETTO_DCHECK(task_runner_);
352 }
353
~TracingServiceImpl()354 TracingServiceImpl::~TracingServiceImpl() {
355 // TODO(fmayer): handle teardown of all Producer.
356 }
357
358 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,const ClientIdentity & client_identity,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)359 TracingServiceImpl::ConnectProducer(Producer* producer,
360 const ClientIdentity& client_identity,
361 const std::string& producer_name,
362 size_t shared_memory_size_hint_bytes,
363 bool in_process,
364 ProducerSMBScrapingMode smb_scraping_mode,
365 size_t shared_memory_page_size_hint_bytes,
366 std::unique_ptr<SharedMemory> shm,
367 const std::string& sdk_version) {
368 PERFETTO_DCHECK_THREAD(thread_checker_);
369
370 auto uid = client_identity.uid();
371 if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
372 PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
373 static_cast<unsigned long>(uid));
374 return nullptr;
375 }
376
377 if (producers_.size() >= kMaxProducerID) {
378 PERFETTO_DFATAL("Too many producers.");
379 return nullptr;
380 }
381 const ProducerID id = GetNextProducerID();
382 PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
383 static_cast<int>(uid));
384 bool smb_scraping_enabled = smb_scraping_enabled_;
385 switch (smb_scraping_mode) {
386 case ProducerSMBScrapingMode::kDefault:
387 break;
388 case ProducerSMBScrapingMode::kEnabled:
389 smb_scraping_enabled = true;
390 break;
391 case ProducerSMBScrapingMode::kDisabled:
392 smb_scraping_enabled = false;
393 break;
394 }
395
396 std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
397 id, client_identity, this, task_runner_, producer, producer_name,
398 sdk_version, in_process, smb_scraping_enabled));
399 auto it_and_inserted = producers_.emplace(id, endpoint.get());
400 PERFETTO_DCHECK(it_and_inserted.second);
401 endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
402 endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
403
404 // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
405 // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
406 auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
407 task_runner_->PostTask([weak_ptr] {
408 if (weak_ptr)
409 weak_ptr->producer_->OnConnect();
410 });
411
412 if (shm) {
413 // The producer supplied an SMB. This is used only by Chrome; in the most
414 // common cases the SMB is created by the service and passed via
415 // OnTracingSetup(). Verify that it is correctly sized before we attempt to
416 // use it. The transport layer has to verify the integrity of the SMB (e.g.
417 // ensure that the producer can't resize if after the fact).
418 size_t shm_size, page_size;
419 std::tie(shm_size, page_size) =
420 EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
421 if (shm_size == shm->size() &&
422 page_size == endpoint->shmem_page_size_hint_bytes_) {
423 PERFETTO_DLOG(
424 "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
425 shm_size / 1024, endpoint->name_.c_str());
426 endpoint->SetupSharedMemory(std::move(shm), page_size,
427 /*provided_by_producer=*/true);
428 } else {
429 PERFETTO_LOG(
430 "Discarding incorrectly sized producer-provided SMB for producer "
431 "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
432 "B total, %zu B page size; suggested corrected sizes: %zu B total, "
433 "%zu B page size",
434 endpoint->name_.c_str(), shm->size(),
435 endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
436 shm.reset();
437 }
438 }
439
440 return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
441 }
442
DisconnectProducer(ProducerID id)443 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
444 PERFETTO_DCHECK_THREAD(thread_checker_);
445 PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
446 PERFETTO_DCHECK(producers_.count(id));
447
448 // Scrape remaining chunks for this producer to ensure we don't lose data.
449 if (auto* producer = GetProducer(id)) {
450 for (auto& session_id_and_session : tracing_sessions_)
451 ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
452 }
453
454 for (auto it = data_sources_.begin(); it != data_sources_.end();) {
455 auto next = it;
456 next++;
457 if (it->second.producer_id == id)
458 UnregisterDataSource(id, it->second.descriptor.name());
459 it = next;
460 }
461
462 producers_.erase(id);
463 UpdateMemoryGuardrail();
464 }
465
GetProducer(ProducerID id) const466 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
467 ProducerID id) const {
468 PERFETTO_DCHECK_THREAD(thread_checker_);
469 auto it = producers_.find(id);
470 if (it == producers_.end())
471 return nullptr;
472 return it->second;
473 }
474
475 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)476 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
477 PERFETTO_DCHECK_THREAD(thread_checker_);
478 PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
479 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
480 std::unique_ptr<ConsumerEndpointImpl> endpoint(
481 new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
482 auto it_and_inserted = consumers_.emplace(endpoint.get());
483 PERFETTO_DCHECK(it_and_inserted.second);
484 // Consumer might go away before we're able to send the connect notification,
485 // if that is the case just bail out.
486 auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
487 task_runner_->PostTask([weak_ptr] {
488 if (weak_ptr)
489 weak_ptr->consumer_->OnConnect();
490 });
491 return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
492 }
493
DisconnectConsumer(ConsumerEndpointImpl * consumer)494 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
495 PERFETTO_DCHECK_THREAD(thread_checker_);
496 PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
497 PERFETTO_DCHECK(consumers_.count(consumer));
498
499 if (consumer->tracing_session_id_)
500 FreeBuffers(consumer->tracing_session_id_); // Will also DisableTracing().
501 consumers_.erase(consumer);
502
503 // At this point no more pointers to |consumer| should be around.
504 PERFETTO_DCHECK(!std::any_of(
505 tracing_sessions_.begin(), tracing_sessions_.end(),
506 [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
507 return kv.second.consumer_maybe_null == consumer;
508 }));
509 }
510
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)511 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
512 const std::string& key) {
513 PERFETTO_DCHECK_THREAD(thread_checker_);
514 PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
515 PERFETTO_DCHECK(consumers_.count(consumer));
516
517 TracingSessionID tsid = consumer->tracing_session_id_;
518 TracingSession* tracing_session;
519 if (!tsid || !(tracing_session = GetTracingSession(tsid)))
520 return false;
521
522 if (GetDetachedSession(consumer->uid_, key)) {
523 PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
524 key.c_str());
525 return false;
526 }
527
528 PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
529 tracing_session->consumer_maybe_null = nullptr;
530 tracing_session->detach_key = key;
531 consumer->tracing_session_id_ = 0;
532 return true;
533 }
534
535 std::unique_ptr<TracingService::RelayEndpoint>
ConnectRelayClient(RelayClientID relay_client_id)536 TracingServiceImpl::ConnectRelayClient(RelayClientID relay_client_id) {
537 PERFETTO_DCHECK_THREAD(thread_checker_);
538
539 auto endpoint = std::make_unique<RelayEndpointImpl>(relay_client_id, this);
540 relay_clients_[relay_client_id] = endpoint.get();
541
542 return std::move(endpoint);
543 }
544
DisconnectRelayClient(RelayClientID relay_client_id)545 void TracingServiceImpl::DisconnectRelayClient(RelayClientID relay_client_id) {
546 PERFETTO_DCHECK_THREAD(thread_checker_);
547
548 if (relay_clients_.find(relay_client_id) == relay_clients_.end())
549 return;
550 relay_clients_.erase(relay_client_id);
551 }
552
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)553 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
554 const std::string& key) {
555 PERFETTO_DCHECK_THREAD(thread_checker_);
556 PERFETTO_DLOG("Consumer %p attaching to session %s",
557 reinterpret_cast<void*>(consumer), key.c_str());
558 PERFETTO_DCHECK(consumers_.count(consumer));
559
560 if (consumer->tracing_session_id_) {
561 PERFETTO_ELOG(
562 "Cannot reattach consumer to session %s"
563 " while it already attached tracing session ID %" PRIu64,
564 key.c_str(), consumer->tracing_session_id_);
565 return false;
566 }
567
568 auto* tracing_session = GetDetachedSession(consumer->uid_, key);
569 if (!tracing_session) {
570 PERFETTO_ELOG(
571 "Failed to attach consumer, session '%s' not found for uid %d",
572 key.c_str(), static_cast<int>(consumer->uid_));
573 return false;
574 }
575
576 consumer->tracing_session_id_ = tracing_session->id;
577 tracing_session->consumer_maybe_null = consumer;
578 tracing_session->detach_key.clear();
579 return true;
580 }
581
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)582 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
583 const TraceConfig& cfg,
584 base::ScopedFile fd) {
585 PERFETTO_DCHECK_THREAD(thread_checker_);
586
587 // If the producer is specifying a UUID, respect that (at least for the first
588 // snapshot). Otherwise generate a new UUID.
589 base::Uuid uuid(cfg.trace_uuid_lsb(), cfg.trace_uuid_msb());
590 if (!uuid)
591 uuid = base::Uuidv4();
592
593 PERFETTO_DLOG("Enabling tracing for consumer %p, UUID: %s",
594 reinterpret_cast<void*>(consumer),
595 uuid.ToPrettyString().c_str());
596 MaybeLogUploadEvent(cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracing);
597 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
598 lockdown_mode_ = true;
599 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
600 lockdown_mode_ = false;
601
602 // Scope |tracing_session| to this block to prevent accidental use of a null
603 // pointer later in this function.
604 {
605 TracingSession* tracing_session =
606 GetTracingSession(consumer->tracing_session_id_);
607 if (tracing_session) {
608 MaybeLogUploadEvent(
609 cfg, uuid,
610 PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
611 return PERFETTO_SVC_ERR(
612 "A Consumer is trying to EnableTracing() but another tracing "
613 "session is already active (forgot a call to FreeBuffers() ?)");
614 }
615 }
616
617 const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
618 ? kGuardrailsMaxTracingDurationMillis
619 : kMaxTracingDurationMillis;
620 if (cfg.duration_ms() > max_duration_ms) {
621 MaybeLogUploadEvent(cfg, uuid,
622 PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
623 return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
624 "ms > %" PRIu32 " ms)",
625 cfg.duration_ms(), max_duration_ms);
626 }
627
628 const bool has_trigger_config =
629 GetTriggerMode(cfg) != TraceConfig::TriggerConfig::UNSPECIFIED;
630 if (has_trigger_config &&
631 (cfg.trigger_config().trigger_timeout_ms() == 0 ||
632 cfg.trigger_config().trigger_timeout_ms() > max_duration_ms)) {
633 MaybeLogUploadEvent(
634 cfg, uuid,
635 PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
636 return PERFETTO_SVC_ERR(
637 "Traces with START_TRACING triggers must provide a positive "
638 "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
639 cfg.trigger_config().trigger_timeout_ms());
640 }
641
642 // This check has been introduced in May 2023 after finding b/274931668.
643 if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
644 TraceConfig::TriggerConfig::TriggerMode_MAX) {
645 MaybeLogUploadEvent(
646 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
647 return PERFETTO_SVC_ERR(
648 "The trace config specified an invalid trigger_mode");
649 }
650
651 if (cfg.trigger_config().use_clone_snapshot_if_available() &&
652 cfg.trigger_config().trigger_mode() !=
653 TraceConfig::TriggerConfig::STOP_TRACING) {
654 MaybeLogUploadEvent(
655 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
656 return PERFETTO_SVC_ERR(
657 "trigger_mode must be STOP_TRACING when "
658 "use_clone_snapshot_if_available=true");
659 }
660
661 if (has_trigger_config && cfg.duration_ms() != 0) {
662 MaybeLogUploadEvent(
663 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
664 return PERFETTO_SVC_ERR(
665 "duration_ms was set, this must not be set for traces with triggers.");
666 }
667
668 for (char c : cfg.bugreport_filename()) {
669 if (!((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
670 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.')) {
671 MaybeLogUploadEvent(
672 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidBrFilename);
673 return PERFETTO_SVC_ERR(
674 "bugreport_filename contains invalid chars. Use [a-zA-Z0-9-_.]+");
675 }
676 }
677
678 if ((GetTriggerMode(cfg) == TraceConfig::TriggerConfig::STOP_TRACING ||
679 GetTriggerMode(cfg) == TraceConfig::TriggerConfig::CLONE_SNAPSHOT) &&
680 cfg.write_into_file()) {
681 // We don't support this usecase because there are subtle assumptions which
682 // break around TracingServiceEvents and windowed sorting (i.e. if we don't
683 // drain the events in ReadBuffersIntoFile because we are waiting for
684 // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
685 // emitting them wildy out of order breaking windowed sorting in trace
686 // processor).
687 MaybeLogUploadEvent(
688 cfg, uuid,
689 PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
690 return PERFETTO_SVC_ERR(
691 "Specifying trigger mode STOP_TRACING/CLONE_SNAPSHOT and "
692 "write_into_file together is unsupported");
693 }
694
695 std::unordered_set<std::string> triggers;
696 for (const auto& trigger : cfg.trigger_config().triggers()) {
697 if (!triggers.insert(trigger.name()).second) {
698 MaybeLogUploadEvent(
699 cfg, uuid,
700 PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
701 return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
702 trigger.name().c_str());
703 }
704 }
705
706 if (cfg.enable_extra_guardrails()) {
707 if (cfg.deferred_start()) {
708 MaybeLogUploadEvent(
709 cfg, uuid,
710 PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
711 return PERFETTO_SVC_ERR(
712 "deferred_start=true is not supported in unsupervised traces");
713 }
714 uint64_t buf_size_sum = 0;
715 for (const auto& buf : cfg.buffers()) {
716 if (buf.size_kb() % 4 != 0) {
717 MaybeLogUploadEvent(
718 cfg, uuid,
719 PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
720 return PERFETTO_SVC_ERR(
721 "buffers.size_kb must be a multiple of 4, got %" PRIu32,
722 buf.size_kb());
723 }
724 buf_size_sum += buf.size_kb();
725 }
726
727 uint32_t max_tracing_buffer_size_kb =
728 std::max(kGuardrailsMaxTracingBufferSizeKb,
729 cfg.guardrail_overrides().max_tracing_buffer_size_kb());
730 if (buf_size_sum > max_tracing_buffer_size_kb) {
731 MaybeLogUploadEvent(
732 cfg, uuid,
733 PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
734 return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
735 "kB > %" PRIu32 " kB)",
736 buf_size_sum, max_tracing_buffer_size_kb);
737 }
738 }
739
740 if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
741 MaybeLogUploadEvent(cfg, uuid,
742 PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
743 return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
744 cfg.buffers_size());
745 }
746 // Check that the config specifies all buffers for its data sources. This
747 // is also checked in SetupDataSource, but it is simpler to return a proper
748 // error to the consumer from here (and there will be less state to undo).
749 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
750 size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
751 size_t target_buffer = cfg_data_source.config().target_buffer();
752 if (target_buffer >= num_buffers) {
753 MaybeLogUploadEvent(
754 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
755 return PERFETTO_SVC_ERR(
756 "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
757 "%zu)",
758 cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
759 }
760 }
761
762 if (!cfg.unique_session_name().empty()) {
763 const std::string& name = cfg.unique_session_name();
764 for (auto& kv : tracing_sessions_) {
765 if (kv.second.state == TracingSession::CLONED_READ_ONLY)
766 continue; // Don't consider cloned sessions in uniqueness checks.
767 if (kv.second.config.unique_session_name() == name) {
768 MaybeLogUploadEvent(
769 cfg, uuid,
770 PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
771 static const char fmt[] =
772 "A trace with this unique session name (%s) already exists";
773 // This happens frequently, don't make it an "E"LOG.
774 PERFETTO_LOG(fmt, name.c_str());
775 return base::ErrStatus(fmt, name.c_str());
776 }
777 }
778 }
779
780 if (!cfg.session_semaphores().empty()) {
781 struct SemaphoreSessionsState {
782 uint64_t smallest_max_other_session_count =
783 std::numeric_limits<uint64_t>::max();
784 uint64_t session_count = 0;
785 };
786 // For each semaphore, compute the number of active sessions and the
787 // MIN(limit).
788 std::unordered_map<std::string, SemaphoreSessionsState>
789 sem_to_sessions_state;
790 for (const auto& id_and_session : tracing_sessions_) {
791 const auto& session = id_and_session.second;
792 if (session.state == TracingSession::CLONED_READ_ONLY ||
793 session.state == TracingSession::DISABLED) {
794 // Don't consider cloned or disabled sessions in checks.
795 continue;
796 }
797 for (const auto& sem : session.config.session_semaphores()) {
798 auto& sessions_state = sem_to_sessions_state[sem.name()];
799 sessions_state.smallest_max_other_session_count =
800 std::min(sessions_state.smallest_max_other_session_count,
801 sem.max_other_session_count());
802 sessions_state.session_count++;
803 }
804 }
805
806 // Check if any of the semaphores declared by the config clashes with any of
807 // the currently active semaphores.
808 for (const auto& semaphore : cfg.session_semaphores()) {
809 auto it = sem_to_sessions_state.find(semaphore.name());
810 if (it == sem_to_sessions_state.end()) {
811 continue;
812 }
813 uint64_t max_other_session_count =
814 std::min(semaphore.max_other_session_count(),
815 it->second.smallest_max_other_session_count);
816 if (it->second.session_count > max_other_session_count) {
817 MaybeLogUploadEvent(
818 cfg, uuid,
819 PerfettoStatsdAtom::
820 kTracedEnableTracingFailedSessionSemaphoreCheck);
821 return PERFETTO_SVC_ERR(
822 "Semaphore \"%s\" exceeds maximum allowed other session count "
823 "(%" PRIu64 " > min(%" PRIu64 ", %" PRIu64 "))",
824 semaphore.name().c_str(), it->second.session_count,
825 semaphore.max_other_session_count(),
826 it->second.smallest_max_other_session_count);
827 }
828 }
829 }
830
831 if (cfg.enable_extra_guardrails()) {
832 // unique_session_name can be empty
833 const std::string& name = cfg.unique_session_name();
834 int64_t now_s = base::GetBootTimeS().count();
835
836 // Remove any entries where the time limit has passed so this map doesn't
837 // grow indefinitely:
838 std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
839 for (auto it = sessions.cbegin(); it != sessions.cend();) {
840 if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
841 it = sessions.erase(it);
842 } else {
843 ++it;
844 }
845 }
846
847 int64_t& previous_s = session_to_last_trace_s_[name];
848 if (previous_s == 0) {
849 previous_s = now_s;
850 } else {
851 MaybeLogUploadEvent(
852 cfg, uuid,
853 PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
854 return PERFETTO_SVC_ERR(
855 "A trace with unique session name \"%s\" began less than %" PRId64
856 "s ago (%" PRId64 "s)",
857 name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
858 }
859 }
860
861 const int sessions_for_uid = static_cast<int>(std::count_if(
862 tracing_sessions_.begin(), tracing_sessions_.end(),
863 [consumer](const decltype(tracing_sessions_)::value_type& s) {
864 return s.second.consumer_uid == consumer->uid_;
865 }));
866
867 int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
868 if (consumer->uid_ == 1066 /* AID_STATSD*/) {
869 per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
870 }
871 if (sessions_for_uid >= per_uid_limit) {
872 MaybeLogUploadEvent(
873 cfg, uuid,
874 PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
875 return PERFETTO_SVC_ERR(
876 "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
877 sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
878 }
879
880 // TODO(primiano): This is a workaround to prevent that a producer gets stuck
881 // in a state where it stalls by design by having more TraceWriterImpl
882 // instances than free pages in the buffer. This is really a bug in
883 // trace_probes and the way it handles stalls in the shmem buffer.
884 if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
885 MaybeLogUploadEvent(
886 cfg, uuid,
887 PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
888 return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
889 tracing_sessions_.size());
890 }
891
892 // If the trace config provides a filter bytecode, setup the filter now.
893 // If the filter loading fails, abort the tracing session rather than running
894 // unfiltered.
895 std::unique_ptr<protozero::MessageFilter> trace_filter;
896 if (cfg.has_trace_filter()) {
897 const auto& filt = cfg.trace_filter();
898 trace_filter.reset(new protozero::MessageFilter());
899
900 protozero::StringFilter& string_filter = trace_filter->string_filter();
901 for (const auto& rule : filt.string_filter_chain().rules()) {
902 auto opt_policy = ConvertPolicy(rule.policy());
903 if (!opt_policy.has_value()) {
904 MaybeLogUploadEvent(
905 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
906 return PERFETTO_SVC_ERR(
907 "Trace filter has invalid string filtering rules, aborting");
908 }
909 string_filter.AddRule(*opt_policy, rule.regex_pattern(),
910 rule.atrace_payload_starts_with());
911 }
912
913 const std::string& bytecode_v1 = filt.bytecode();
914 const std::string& bytecode_v2 = filt.bytecode_v2();
915 const std::string& bytecode =
916 bytecode_v2.empty() ? bytecode_v1 : bytecode_v2;
917 if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
918 MaybeLogUploadEvent(
919 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
920 return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
921 }
922
923 // The filter is created using perfetto.protos.Trace as root message
924 // (because that makes it possible to play around with the `proto_filter`
925 // tool on actual traces). Here in the service, however, we deal with
926 // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
927 // The IPC client (or the write_into_filte logic in here) are responsible
928 // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
929 // the preamble is not there at ReadBuffer time. Hence we change the root of
930 // the filtering to start at the Trace.packet level.
931 if (!trace_filter->SetFilterRoot({TracePacket::kPacketFieldNumber})) {
932 MaybeLogUploadEvent(
933 cfg, uuid, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
934 return PERFETTO_SVC_ERR("Failed to set filter root.");
935 }
936 }
937
938 const TracingSessionID tsid = ++last_tracing_session_id_;
939 TracingSession* tracing_session =
940 &tracing_sessions_
941 .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
942 std::forward_as_tuple(tsid, consumer, cfg, task_runner_))
943 .first->second;
944
945 tracing_session->trace_uuid = uuid;
946
947 if (trace_filter)
948 tracing_session->trace_filter = std::move(trace_filter);
949
950 if (cfg.write_into_file()) {
951 if (!fd ^ !cfg.output_path().empty()) {
952 MaybeLogUploadEvent(
953 tracing_session->config, uuid,
954 PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
955 tracing_sessions_.erase(tsid);
956 return PERFETTO_SVC_ERR(
957 "When write_into_file==true either a FD needs to be passed or "
958 "output_path must be populated (but not both)");
959 }
960 if (!cfg.output_path().empty()) {
961 fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
962 if (!fd) {
963 MaybeLogUploadEvent(
964 tracing_session->config, uuid,
965 PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
966 tracing_sessions_.erase(tsid);
967 return PERFETTO_SVC_ERR("Failed to create the trace file %s",
968 cfg.output_path().c_str());
969 }
970 }
971 tracing_session->write_into_file = std::move(fd);
972 uint32_t write_period_ms = cfg.file_write_period_ms();
973 if (write_period_ms == 0)
974 write_period_ms = kDefaultWriteIntoFilePeriodMs;
975 if (write_period_ms < min_write_period_ms_)
976 write_period_ms = min_write_period_ms_;
977 tracing_session->write_period_ms = write_period_ms;
978 tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
979 tracing_session->bytes_written_into_file = 0;
980 }
981
982 if (cfg.compression_type() == TraceConfig::COMPRESSION_TYPE_DEFLATE) {
983 if (init_opts_.compressor_fn) {
984 tracing_session->compress_deflate = true;
985 } else {
986 PERFETTO_LOG(
987 "COMPRESSION_TYPE_DEFLATE is not supported in the current build "
988 "configuration. Skipping compression");
989 }
990 }
991
992 // Initialize the log buffers.
993 bool did_allocate_all_buffers = true;
994 bool invalid_buffer_config = false;
995
996 // Allocate the trace buffers. Also create a map to translate a consumer
997 // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
998 // corresponding BufferID, which is a global ID namespace for the service and
999 // all producers.
1000 size_t total_buf_size_kb = 0;
1001 const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
1002 tracing_session->buffers_index.reserve(num_buffers);
1003 for (size_t i = 0; i < num_buffers; i++) {
1004 const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
1005 BufferID global_id = buffer_ids_.Allocate();
1006 if (!global_id) {
1007 did_allocate_all_buffers = false; // We ran out of IDs.
1008 break;
1009 }
1010 tracing_session->buffers_index.push_back(global_id);
1011 // TraceBuffer size is limited to 32-bit.
1012 const uint32_t buf_size_kb = buffer_cfg.size_kb();
1013 const uint64_t buf_size_bytes = buf_size_kb * static_cast<uint64_t>(1024);
1014 const size_t buf_size = static_cast<size_t>(buf_size_bytes);
1015 if (buf_size_bytes == 0 ||
1016 buf_size_bytes > std::numeric_limits<uint32_t>::max() ||
1017 buf_size != buf_size_bytes) {
1018 invalid_buffer_config = true;
1019 did_allocate_all_buffers = false;
1020 break;
1021 }
1022 total_buf_size_kb += buf_size_kb;
1023 TraceBuffer::OverwritePolicy policy =
1024 buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
1025 ? TraceBuffer::kDiscard
1026 : TraceBuffer::kOverwrite;
1027 auto it_and_inserted =
1028 buffers_.emplace(global_id, TraceBuffer::Create(buf_size, policy));
1029 PERFETTO_DCHECK(it_and_inserted.second); // buffers_.count(global_id) == 0.
1030 std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
1031 if (!trace_buffer) {
1032 did_allocate_all_buffers = false;
1033 break;
1034 }
1035 }
1036
1037 // This can happen if either:
1038 // - All the kMaxTraceBufferID slots are taken.
1039 // - OOM, or, more realistically, we exhausted virtual memory.
1040 // - The buffer size in the config is invalid.
1041 // In any case, free all the previously allocated buffers and abort.
1042 if (!did_allocate_all_buffers) {
1043 for (BufferID global_id : tracing_session->buffers_index) {
1044 buffer_ids_.Free(global_id);
1045 buffers_.erase(global_id);
1046 }
1047 MaybeLogUploadEvent(tracing_session->config, uuid,
1048 PerfettoStatsdAtom::kTracedEnableTracingOom);
1049 tracing_sessions_.erase(tsid);
1050 if (invalid_buffer_config) {
1051 return PERFETTO_SVC_ERR(
1052 "Failed to allocate tracing buffers: Invalid buffer sizes");
1053 }
1054 return PERFETTO_SVC_ERR(
1055 "Failed to allocate tracing buffers: OOM or too many buffers");
1056 }
1057
1058 UpdateMemoryGuardrail();
1059
1060 consumer->tracing_session_id_ = tsid;
1061
1062 // Setup the data sources on the producers without starting them.
1063 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
1064 // Scan all the registered data sources with a matching name.
1065 auto range = data_sources_.equal_range(cfg_data_source.config().name());
1066 for (auto it = range.first; it != range.second; it++) {
1067 TraceConfig::ProducerConfig producer_config;
1068 for (const auto& config : cfg.producers()) {
1069 if (GetProducer(it->second.producer_id)->name_ ==
1070 config.producer_name()) {
1071 producer_config = config;
1072 break;
1073 }
1074 }
1075 SetupDataSource(cfg_data_source, producer_config, it->second,
1076 tracing_session);
1077 }
1078 }
1079
1080 bool has_start_trigger = false;
1081 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1082 switch (GetTriggerMode(cfg)) {
1083 case TraceConfig::TriggerConfig::UNSPECIFIED:
1084 // no triggers are specified so this isn't a trace that is using triggers.
1085 PERFETTO_DCHECK(!has_trigger_config);
1086 break;
1087 case TraceConfig::TriggerConfig::START_TRACING:
1088 // For traces which use START_TRACE triggers we need to ensure that the
1089 // tracing session will be cleaned up when it times out.
1090 has_start_trigger = true;
1091 task_runner_->PostDelayedTask(
1092 [weak_this, tsid]() {
1093 if (weak_this)
1094 weak_this->OnStartTriggersTimeout(tsid);
1095 },
1096 cfg.trigger_config().trigger_timeout_ms());
1097 break;
1098 case TraceConfig::TriggerConfig::STOP_TRACING:
1099 case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1100 // Update the tracing_session's duration_ms to ensure that if no trigger
1101 // is received the session will end and be cleaned up equal to the
1102 // timeout.
1103 //
1104 // TODO(nuskos): Refactor this so that rather then modifying the config we
1105 // have a field we look at on the tracing_session.
1106 tracing_session->config.set_duration_ms(
1107 cfg.trigger_config().trigger_timeout_ms());
1108 break;
1109
1110 // The case of unknown modes (coming from future versions of the service)
1111 // is handled few lines above (search for TriggerMode_MAX).
1112 }
1113
1114 tracing_session->state = TracingSession::CONFIGURED;
1115 PERFETTO_LOG(
1116 "Configured tracing session %" PRIu64
1117 ", #sources:%zu, duration:%d ms%s, #buffers:%d, total "
1118 "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
1119 tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
1120 tracing_session->config.prefer_suspend_clock_for_duration()
1121 ? " (suspend_clock)"
1122 : "",
1123 cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
1124 static_cast<unsigned int>(consumer->uid_),
1125 cfg.unique_session_name().c_str());
1126
1127 // Start the data sources, unless this is a case of early setup + fast
1128 // triggering, either through TraceConfig.deferred_start or
1129 // TraceConfig.trigger_config(). If both are specified which ever one occurs
1130 // first will initiate the trace.
1131 if (!cfg.deferred_start() && !has_start_trigger)
1132 return StartTracing(tsid);
1133
1134 return base::OkStatus();
1135 }
1136
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)1137 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
1138 const TraceConfig& updated_cfg) {
1139 PERFETTO_DCHECK_THREAD(thread_checker_);
1140 TracingSession* tracing_session =
1141 GetTracingSession(consumer->tracing_session_id_);
1142 PERFETTO_DCHECK(tracing_session);
1143
1144 if ((tracing_session->state != TracingSession::STARTED) &&
1145 (tracing_session->state != TracingSession::CONFIGURED)) {
1146 PERFETTO_ELOG(
1147 "ChangeTraceConfig() was called for a tracing session which isn't "
1148 "running.");
1149 return;
1150 }
1151
1152 // We only support updating producer_name_{,regex}_filter (and pass-through
1153 // configs) for now; null out any changeable fields and make sure the rest are
1154 // identical.
1155 TraceConfig new_config_copy(updated_cfg);
1156 for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1157 ds_cfg.clear_producer_name_filter();
1158 ds_cfg.clear_producer_name_regex_filter();
1159 }
1160
1161 TraceConfig current_config_copy(tracing_session->config);
1162 for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1163 ds_cfg.clear_producer_name_filter();
1164 ds_cfg.clear_producer_name_regex_filter();
1165 }
1166
1167 if (new_config_copy != current_config_copy) {
1168 PERFETTO_LOG(
1169 "ChangeTraceConfig() was called with a config containing unsupported "
1170 "changes; only adding to the producer_name_{,regex}_filter is "
1171 "currently supported and will have an effect.");
1172 }
1173
1174 for (TraceConfig::DataSource& cfg_data_source :
1175 *tracing_session->config.mutable_data_sources()) {
1176 // Find the updated producer_filter in the new config.
1177 std::vector<std::string> new_producer_name_filter;
1178 std::vector<std::string> new_producer_name_regex_filter;
1179 bool found_data_source = false;
1180 for (const auto& it : updated_cfg.data_sources()) {
1181 if (cfg_data_source.config().name() == it.config().name()) {
1182 new_producer_name_filter = it.producer_name_filter();
1183 new_producer_name_regex_filter = it.producer_name_regex_filter();
1184 found_data_source = true;
1185 break;
1186 }
1187 }
1188
1189 // Bail out if data source not present in the new config.
1190 if (!found_data_source) {
1191 PERFETTO_ELOG(
1192 "ChangeTraceConfig() called without a current data source also "
1193 "present in the new config: %s",
1194 cfg_data_source.config().name().c_str());
1195 continue;
1196 }
1197
1198 // TODO(oysteine): Just replacing the filter means that if
1199 // there are any filter entries which were present in the original config,
1200 // but removed from the config passed to ChangeTraceConfig, any matching
1201 // producers will keep producing but newly added producers after this
1202 // point will never start.
1203 *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1204 *cfg_data_source.mutable_producer_name_regex_filter() =
1205 new_producer_name_regex_filter;
1206
1207 // Get the list of producers that are already set up.
1208 std::unordered_set<uint16_t> set_up_producers;
1209 auto& ds_instances = tracing_session->data_source_instances;
1210 for (auto instance_it = ds_instances.begin();
1211 instance_it != ds_instances.end(); ++instance_it) {
1212 set_up_producers.insert(instance_it->first);
1213 }
1214
1215 // Scan all the registered data sources with a matching name.
1216 auto range = data_sources_.equal_range(cfg_data_source.config().name());
1217 for (auto it = range.first; it != range.second; it++) {
1218 ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1219 PERFETTO_DCHECK(producer);
1220
1221 // Check if the producer name of this data source is present
1222 // in the name filters. We currently only support new filters, not
1223 // removing old ones.
1224 if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1225 new_producer_name_regex_filter)) {
1226 continue;
1227 }
1228
1229 // If this producer is already set up, we assume that all datasources
1230 // in it started already.
1231 if (set_up_producers.count(it->second.producer_id))
1232 continue;
1233
1234 // If it wasn't previously setup, set it up now.
1235 // (The per-producer config is optional).
1236 TraceConfig::ProducerConfig producer_config;
1237 for (const auto& config : tracing_session->config.producers()) {
1238 if (producer->name_ == config.producer_name()) {
1239 producer_config = config;
1240 break;
1241 }
1242 }
1243
1244 DataSourceInstance* ds_inst = SetupDataSource(
1245 cfg_data_source, producer_config, it->second, tracing_session);
1246
1247 if (ds_inst && tracing_session->state == TracingSession::STARTED)
1248 StartDataSourceInstance(producer, tracing_session, ds_inst);
1249 }
1250 }
1251 }
1252
StartTracing(TracingSessionID tsid)1253 base::Status TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1254 PERFETTO_DCHECK_THREAD(thread_checker_);
1255
1256 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1257 TracingSession* tracing_session = GetTracingSession(tsid);
1258 if (!tracing_session) {
1259 return PERFETTO_SVC_ERR(
1260 "StartTracing() failed, invalid session ID %" PRIu64, tsid);
1261 }
1262
1263 MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1264 PerfettoStatsdAtom::kTracedStartTracing);
1265
1266 if (tracing_session->state != TracingSession::CONFIGURED) {
1267 MaybeLogUploadEvent(
1268 tracing_session->config, tracing_session->trace_uuid,
1269 PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1270 return PERFETTO_SVC_ERR("StartTracing() failed, invalid session state: %d",
1271 tracing_session->state);
1272 }
1273
1274 tracing_session->state = TracingSession::STARTED;
1275
1276 // We store the start of trace snapshot separately as it's important to make
1277 // sure we can interpret all the data in the trace and storing it in the ring
1278 // buffer means it could be overwritten by a later snapshot.
1279 if (!tracing_session->config.builtin_data_sources()
1280 .disable_clock_snapshotting()) {
1281 SnapshotClocks(&tracing_session->initial_clock_snapshot);
1282 }
1283
1284 // We don't snapshot the clocks here because we just did this above.
1285 SnapshotLifecyleEvent(
1286 tracing_session,
1287 protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1288 false /* snapshot_clocks */);
1289
1290 // Periodically snapshot clocks, stats, sync markers while the trace is
1291 // active. The snapshots are emitted on the future ReadBuffers() calls, which
1292 // means that:
1293 // (a) If we're streaming to a file (or to a consumer) while tracing, we
1294 // write snapshots periodically into the trace.
1295 // (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1296 // snapshot into the trace. For clock snapshots, we keep track of the
1297 // snapshot recorded at the beginning of the session
1298 // (initial_clock_snapshot above), as well as the most recent sampled
1299 // snapshots that showed significant new drift between different clocks.
1300 // The latter clock snapshots are sampled periodically and at lifecycle
1301 // events.
1302 base::PeriodicTask::Args snapshot_task_args;
1303 snapshot_task_args.start_first_task_immediately = true;
1304 snapshot_task_args.use_suspend_aware_timer =
1305 tracing_session->config.builtin_data_sources()
1306 .prefer_suspend_clock_for_snapshot();
1307 snapshot_task_args.task = [weak_this, tsid] {
1308 if (weak_this)
1309 weak_this->PeriodicSnapshotTask(tsid);
1310 };
1311 snapshot_task_args.period_ms =
1312 tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1313 if (!snapshot_task_args.period_ms)
1314 snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1315 tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1316
1317 // Trigger delayed task if the trace is time limited.
1318 const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1319 if (trace_duration_ms > 0) {
1320 auto stop_task =
1321 std::bind(&TracingServiceImpl::StopOnDurationMsExpiry, weak_this, tsid);
1322 if (tracing_session->config.prefer_suspend_clock_for_duration()) {
1323 base::PeriodicTask::Args stop_args;
1324 stop_args.use_suspend_aware_timer = true;
1325 stop_args.period_ms = trace_duration_ms;
1326 stop_args.one_shot = true;
1327 stop_args.task = std::move(stop_task);
1328 tracing_session->timed_stop_task.Start(stop_args);
1329 } else {
1330 task_runner_->PostDelayedTask(std::move(stop_task), trace_duration_ms);
1331 }
1332 } // if (trace_duration_ms > 0).
1333
1334 // Start the periodic drain tasks if we should to save the trace into a file.
1335 if (tracing_session->config.write_into_file()) {
1336 task_runner_->PostDelayedTask(
1337 [weak_this, tsid] {
1338 if (weak_this)
1339 weak_this->ReadBuffersIntoFile(tsid);
1340 },
1341 tracing_session->delay_to_next_write_period_ms());
1342 }
1343
1344 // Start the periodic flush tasks if the config specified a flush period.
1345 if (tracing_session->config.flush_period_ms())
1346 PeriodicFlushTask(tsid, /*post_next_only=*/true);
1347
1348 // Start the periodic incremental state clear tasks if the config specified a
1349 // period.
1350 if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1351 PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1352 }
1353
1354 for (auto& [prod_id, data_source] : tracing_session->data_source_instances) {
1355 ProducerEndpointImpl* producer = GetProducer(prod_id);
1356 if (!producer) {
1357 PERFETTO_DFATAL("Producer does not exist.");
1358 continue;
1359 }
1360 StartDataSourceInstance(producer, tracing_session, &data_source);
1361 }
1362
1363 MaybeNotifyAllDataSourcesStarted(tracing_session);
1364 return base::OkStatus();
1365 }
1366
1367 // static
StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl> weak_this,TracingSessionID tsid)1368 void TracingServiceImpl::StopOnDurationMsExpiry(
1369 base::WeakPtr<TracingServiceImpl> weak_this,
1370 TracingSessionID tsid) {
1371 // Skip entirely the flush if the trace session doesn't exist anymore.
1372 // This is to prevent misleading error messages to be logged.
1373 if (!weak_this)
1374 return;
1375 auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
1376 if (!tracing_session_ptr)
1377 return;
1378 // If this trace was using STOP_TRACING triggers and we've seen
1379 // one, then the trigger overrides the normal timeout. In this
1380 // case we just return and let the other task clean up this trace.
1381 if (GetTriggerMode(tracing_session_ptr->config) ==
1382 TraceConfig::TriggerConfig::STOP_TRACING &&
1383 !tracing_session_ptr->received_triggers.empty())
1384 return;
1385 // In all other cases (START_TRACING or no triggers) we flush
1386 // after |trace_duration_ms| unconditionally.
1387 weak_this->FlushAndDisableTracing(tsid);
1388 }
1389
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1390 void TracingServiceImpl::StartDataSourceInstance(
1391 ProducerEndpointImpl* producer,
1392 TracingSession* tracing_session,
1393 TracingServiceImpl::DataSourceInstance* instance) {
1394 PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1395 if (instance->will_notify_on_start) {
1396 instance->state = DataSourceInstance::STARTING;
1397 } else {
1398 instance->state = DataSourceInstance::STARTED;
1399 }
1400 if (tracing_session->consumer_maybe_null) {
1401 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1402 *producer, *instance);
1403 }
1404 producer->StartDataSource(instance->instance_id, instance->config);
1405
1406 // If all data sources are started, notify the consumer.
1407 if (instance->state == DataSourceInstance::STARTED)
1408 MaybeNotifyAllDataSourcesStarted(tracing_session);
1409 }
1410
1411 // DisableTracing just stops the data sources but doesn't free up any buffer.
1412 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1413 // and then drain the buffers. The actual teardown of the TracingSession happens
1414 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1415 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1416 bool disable_immediately) {
1417 PERFETTO_DCHECK_THREAD(thread_checker_);
1418 TracingSession* tracing_session = GetTracingSession(tsid);
1419 if (!tracing_session) {
1420 // Can happen if the consumer calls this before EnableTracing() or after
1421 // FreeBuffers().
1422 PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1423 return;
1424 }
1425
1426 MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1427 PerfettoStatsdAtom::kTracedDisableTracing);
1428
1429 switch (tracing_session->state) {
1430 // Spurious call to DisableTracing() while already disabled, nothing to do.
1431 case TracingSession::DISABLED:
1432 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1433 return;
1434
1435 case TracingSession::CLONED_READ_ONLY:
1436 PERFETTO_DLOG("DisableTracing() cannot be called on a cloned session");
1437 return;
1438
1439 // This is either:
1440 // A) The case of a graceful DisableTracing() call followed by a call to
1441 // FreeBuffers(), iff |disable_immediately| == true. In this case we want
1442 // to forcefully transition in the disabled state without waiting for the
1443 // outstanding acks because the buffers are going to be destroyed soon.
1444 // B) A spurious call, iff |disable_immediately| == false, in which case
1445 // there is nothing to do.
1446 case TracingSession::DISABLING_WAITING_STOP_ACKS:
1447 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1448 if (disable_immediately)
1449 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1450 return;
1451
1452 // Continues below.
1453 case TracingSession::CONFIGURED:
1454 // If the session didn't even start there is no need to orchestrate a
1455 // graceful stop of data sources.
1456 disable_immediately = true;
1457 break;
1458
1459 // This is the nominal case, continues below.
1460 case TracingSession::STARTED:
1461 break;
1462 }
1463
1464 for (auto& data_source_inst : tracing_session->data_source_instances) {
1465 const ProducerID producer_id = data_source_inst.first;
1466 DataSourceInstance& instance = data_source_inst.second;
1467 ProducerEndpointImpl* producer = GetProducer(producer_id);
1468 PERFETTO_DCHECK(producer);
1469 PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1470 instance.state == DataSourceInstance::STARTING ||
1471 instance.state == DataSourceInstance::STARTED);
1472 StopDataSourceInstance(producer, tracing_session, &instance,
1473 disable_immediately);
1474 }
1475
1476 // If the periodic task is running, we can stop the periodic snapshot timer
1477 // here instead of waiting until FreeBuffers to prevent useless snapshots
1478 // which won't be read.
1479 tracing_session->snapshot_periodic_task.Reset();
1480
1481 // Either this request is flagged with |disable_immediately| or there are no
1482 // data sources that are requesting a final handshake. In both cases just mark
1483 // the session as disabled immediately, notify the consumer and flush the
1484 // trace file (if used).
1485 if (tracing_session->AllDataSourceInstancesStopped())
1486 return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1487
1488 tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1489 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1490 task_runner_->PostDelayedTask(
1491 [weak_this, tsid] {
1492 if (weak_this)
1493 weak_this->OnDisableTracingTimeout(tsid);
1494 },
1495 tracing_session->data_source_stop_timeout_ms());
1496
1497 // Deliberately NOT removing the session from |tracing_session_|, it's still
1498 // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1499 }
1500
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1501 void TracingServiceImpl::NotifyDataSourceStarted(
1502 ProducerID producer_id,
1503 DataSourceInstanceID instance_id) {
1504 PERFETTO_DCHECK_THREAD(thread_checker_);
1505 for (auto& kv : tracing_sessions_) {
1506 TracingSession& tracing_session = kv.second;
1507 DataSourceInstance* instance =
1508 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1509
1510 if (!instance)
1511 continue;
1512
1513 // If the tracing session was already stopped, ignore this notification.
1514 if (tracing_session.state != TracingSession::STARTED)
1515 continue;
1516
1517 if (instance->state != DataSourceInstance::STARTING) {
1518 PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1519 instance->state);
1520 continue;
1521 }
1522
1523 instance->state = DataSourceInstance::STARTED;
1524
1525 ProducerEndpointImpl* producer = GetProducer(producer_id);
1526 PERFETTO_DCHECK(producer);
1527 if (tracing_session.consumer_maybe_null) {
1528 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1529 *producer, *instance);
1530 }
1531
1532 // If all data sources are started, notify the consumer.
1533 MaybeNotifyAllDataSourcesStarted(&tracing_session);
1534 } // for (tracing_session)
1535 }
1536
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1537 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1538 TracingSession* tracing_session) {
1539 if (!tracing_session->consumer_maybe_null)
1540 return;
1541
1542 if (!tracing_session->AllDataSourceInstancesStarted())
1543 return;
1544
1545 // In some rare cases, we can get in this state more than once. Consider the
1546 // following scenario: 3 data sources are registered -> trace starts ->
1547 // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1548 // Imagine now that a 4th data source registers while the trace is ongoing.
1549 // This would hit the AllDataSourceInstancesStarted() condition again.
1550 // In this case, however, we don't want to re-notify the consumer again.
1551 // That would be unexpected (even if, perhaps, technically correct) and
1552 // trigger bugs in the consumer.
1553 if (tracing_session->did_notify_all_data_source_started)
1554 return;
1555
1556 PERFETTO_DLOG("All data sources started");
1557
1558 SnapshotLifecyleEvent(
1559 tracing_session,
1560 protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1561 true /* snapshot_clocks */);
1562
1563 tracing_session->did_notify_all_data_source_started = true;
1564 tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1565 }
1566
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1567 void TracingServiceImpl::NotifyDataSourceStopped(
1568 ProducerID producer_id,
1569 DataSourceInstanceID instance_id) {
1570 PERFETTO_DCHECK_THREAD(thread_checker_);
1571 for (auto& kv : tracing_sessions_) {
1572 TracingSession& tracing_session = kv.second;
1573 DataSourceInstance* instance =
1574 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1575
1576 if (!instance)
1577 continue;
1578
1579 if (instance->state != DataSourceInstance::STOPPING) {
1580 PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1581 instance->state);
1582 continue;
1583 }
1584
1585 instance->state = DataSourceInstance::STOPPED;
1586
1587 ProducerEndpointImpl* producer = GetProducer(producer_id);
1588 PERFETTO_DCHECK(producer);
1589 if (tracing_session.consumer_maybe_null) {
1590 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1591 *producer, *instance);
1592 }
1593
1594 if (!tracing_session.AllDataSourceInstancesStopped())
1595 continue;
1596
1597 if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1598 continue;
1599
1600 // All data sources acked the termination.
1601 DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1602 } // for (tracing_session)
1603 }
1604
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1605 void TracingServiceImpl::ActivateTriggers(
1606 ProducerID producer_id,
1607 const std::vector<std::string>& triggers) {
1608 PERFETTO_DCHECK_THREAD(thread_checker_);
1609 auto* producer = GetProducer(producer_id);
1610 PERFETTO_DCHECK(producer);
1611
1612 int64_t now_ns = base::GetBootTimeNs().count();
1613 for (const auto& trigger_name : triggers) {
1614 PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1615 trigger_name.c_str());
1616 base::Hasher hash;
1617 hash.Update(trigger_name.c_str(), trigger_name.size());
1618 std::string triggered_session_name;
1619 base::Uuid triggered_session_uuid;
1620 TracingSessionID triggered_session_id = 0;
1621 auto trigger_mode = TraceConfig::TriggerConfig::UNSPECIFIED;
1622
1623 uint64_t trigger_name_hash = hash.digest();
1624 size_t count_in_window =
1625 PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1626
1627 bool trigger_matched = false;
1628 bool trigger_activated = false;
1629 for (auto& id_and_tracing_session : tracing_sessions_) {
1630 auto& tracing_session = id_and_tracing_session.second;
1631 TracingSessionID tsid = id_and_tracing_session.first;
1632 auto iter = std::find_if(
1633 tracing_session.config.trigger_config().triggers().begin(),
1634 tracing_session.config.trigger_config().triggers().end(),
1635 [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1636 return trigger.name() == trigger_name;
1637 });
1638 if (iter == tracing_session.config.trigger_config().triggers().end())
1639 continue;
1640 if (tracing_session.state == TracingSession::CLONED_READ_ONLY)
1641 continue;
1642
1643 // If this trigger requires a certain producer to have sent it
1644 // (non-empty producer_name()) ensure the producer who sent this trigger
1645 // matches.
1646 if (!iter->producer_name_regex().empty() &&
1647 !std::regex_match(
1648 producer->name_,
1649 std::regex(iter->producer_name_regex(), std::regex::extended))) {
1650 continue;
1651 }
1652
1653 // Use a random number between 0 and 1 to check if we should allow this
1654 // trigger through or not.
1655 double trigger_rnd =
1656 trigger_rnd_override_for_testing_ > 0
1657 ? trigger_rnd_override_for_testing_
1658 : trigger_probability_dist_(trigger_probability_rand_);
1659 PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1660 if (trigger_rnd < iter->skip_probability()) {
1661 MaybeLogTriggerEvent(tracing_session.config,
1662 PerfettoTriggerAtom::kTracedLimitProbability,
1663 trigger_name);
1664 continue;
1665 }
1666
1667 // If we already triggered more times than the limit, silently ignore
1668 // this trigger.
1669 if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1670 MaybeLogTriggerEvent(tracing_session.config,
1671 PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1672 trigger_name);
1673 continue;
1674 }
1675 trigger_matched = true;
1676 triggered_session_id = tracing_session.id;
1677 triggered_session_name = tracing_session.config.unique_session_name();
1678 triggered_session_uuid.set_lsb_msb(tracing_session.trace_uuid.lsb(),
1679 tracing_session.trace_uuid.msb());
1680 trigger_mode = GetTriggerMode(tracing_session.config);
1681
1682 const bool triggers_already_received =
1683 !tracing_session.received_triggers.empty();
1684 tracing_session.received_triggers.push_back(
1685 {static_cast<uint64_t>(now_ns), iter->name(), producer->name_,
1686 producer->uid()});
1687 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1688 switch (trigger_mode) {
1689 case TraceConfig::TriggerConfig::START_TRACING:
1690 // If the session has already been triggered and moved past
1691 // CONFIGURED then we don't need to repeat StartTracing. This would
1692 // work fine (StartTracing would return false) but would add error
1693 // logs.
1694 if (tracing_session.state != TracingSession::CONFIGURED)
1695 break;
1696
1697 trigger_activated = true;
1698 MaybeLogUploadEvent(
1699 tracing_session.config, tracing_session.trace_uuid,
1700 PerfettoStatsdAtom::kTracedTriggerStartTracing, iter->name());
1701
1702 // We override the trace duration to be the trigger's requested
1703 // value, this ensures that the trace will end after this amount
1704 // of time has passed.
1705 tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1706 StartTracing(tsid);
1707 break;
1708 case TraceConfig::TriggerConfig::STOP_TRACING:
1709 // Only stop the trace once to avoid confusing log messages. I.E.
1710 // when we've already hit the first trigger we've already Posted the
1711 // task to FlushAndDisable. So all future triggers will just break
1712 // out.
1713 if (triggers_already_received)
1714 break;
1715
1716 trigger_activated = true;
1717 MaybeLogUploadEvent(
1718 tracing_session.config, tracing_session.trace_uuid,
1719 PerfettoStatsdAtom::kTracedTriggerStopTracing, iter->name());
1720
1721 // Now that we've seen a trigger we need to stop, flush, and disable
1722 // this session after the configured |stop_delay_ms|.
1723 task_runner_->PostDelayedTask(
1724 [weak_this, tsid] {
1725 // Skip entirely the flush if the trace session doesn't exist
1726 // anymore. This is to prevent misleading error messages to be
1727 // logged.
1728 if (weak_this && weak_this->GetTracingSession(tsid))
1729 weak_this->FlushAndDisableTracing(tsid);
1730 },
1731 // If this trigger is zero this will immediately executable and
1732 // will happen shortly.
1733 iter->stop_delay_ms());
1734 break;
1735
1736 case TraceConfig::TriggerConfig::CLONE_SNAPSHOT:
1737 trigger_activated = true;
1738 MaybeLogUploadEvent(
1739 tracing_session.config, tracing_session.trace_uuid,
1740 PerfettoStatsdAtom::kTracedTriggerCloneSnapshot, iter->name());
1741 task_runner_->PostDelayedTask(
1742 [weak_this, tsid, trigger_name = iter->name()] {
1743 if (!weak_this)
1744 return;
1745 auto* tsess = weak_this->GetTracingSession(tsid);
1746 if (!tsess || !tsess->consumer_maybe_null)
1747 return;
1748 tsess->consumer_maybe_null->NotifyCloneSnapshotTrigger(
1749 trigger_name);
1750 },
1751 iter->stop_delay_ms());
1752 break;
1753
1754 case TraceConfig::TriggerConfig::UNSPECIFIED:
1755 PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1756 break;
1757 }
1758 } // for (.. : tracing_sessions_)
1759
1760 if (trigger_matched) {
1761 trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1762 }
1763
1764 if (trigger_activated) {
1765 // Log only the trigger that actually caused a trace stop/start, don't log
1766 // the follow-up ones, even if they matched.
1767 PERFETTO_LOG(
1768 "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1769 "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1770 trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1771 triggered_session_uuid.ToPrettyString().c_str(),
1772 triggered_session_id);
1773 }
1774 } // for (trigger_name : triggers)
1775 }
1776
1777 // Always invoked TraceConfig.data_source_stop_timeout_ms (by default
1778 // kDataSourceStopTimeoutMs) after DisableTracing(). In nominal conditions all
1779 // data sources should have acked the stop and this will early out.
OnDisableTracingTimeout(TracingSessionID tsid)1780 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1781 PERFETTO_DCHECK_THREAD(thread_checker_);
1782 TracingSession* tracing_session = GetTracingSession(tsid);
1783 if (!tracing_session ||
1784 tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1785 return; // Tracing session was successfully disabled.
1786 }
1787
1788 PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1789 tsid);
1790 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1791 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1792 }
1793
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1794 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1795 TracingSession* tracing_session) {
1796 PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1797 for (auto& inst_kv : tracing_session->data_source_instances) {
1798 if (inst_kv.second.state == DataSourceInstance::STOPPED)
1799 continue;
1800 inst_kv.second.state = DataSourceInstance::STOPPED;
1801 ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1802 PERFETTO_DCHECK(producer);
1803 if (tracing_session->consumer_maybe_null) {
1804 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1805 *producer, inst_kv.second);
1806 }
1807 }
1808 tracing_session->state = TracingSession::DISABLED;
1809
1810 // Scrape any remaining chunks that weren't flushed by the producers.
1811 for (auto& producer_id_and_producer : producers_)
1812 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1813
1814 SnapshotLifecyleEvent(
1815 tracing_session,
1816 protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1817 true /* snapshot_clocks */);
1818
1819 if (tracing_session->write_into_file) {
1820 tracing_session->write_period_ms = 0;
1821 ReadBuffersIntoFile(tracing_session->id);
1822 }
1823
1824 MaybeLogUploadEvent(tracing_session->config, tracing_session->trace_uuid,
1825 PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1826
1827 if (tracing_session->consumer_maybe_null)
1828 tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1829 }
1830
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1831 void TracingServiceImpl::Flush(TracingSessionID tsid,
1832 uint32_t timeout_ms,
1833 ConsumerEndpoint::FlushCallback callback,
1834 FlushFlags flush_flags) {
1835 PERFETTO_DCHECK_THREAD(thread_checker_);
1836 TracingSession* tracing_session = GetTracingSession(tsid);
1837 if (!tracing_session) {
1838 PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1839 return;
1840 }
1841
1842 std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
1843 for (const auto& [producer_id, ds_inst] :
1844 tracing_session->data_source_instances) {
1845 if (ds_inst.no_flush)
1846 continue;
1847 data_source_instances[producer_id].push_back(ds_inst.instance_id);
1848 }
1849 FlushDataSourceInstances(tracing_session, timeout_ms, data_source_instances,
1850 std::move(callback), flush_flags);
1851 }
1852
FlushDataSourceInstances(TracingSession * tracing_session,uint32_t timeout_ms,const std::map<ProducerID,std::vector<DataSourceInstanceID>> & data_source_instances,ConsumerEndpoint::FlushCallback callback,FlushFlags flush_flags)1853 void TracingServiceImpl::FlushDataSourceInstances(
1854 TracingSession* tracing_session,
1855 uint32_t timeout_ms,
1856 const std::map<ProducerID, std::vector<DataSourceInstanceID>>&
1857 data_source_instances,
1858 ConsumerEndpoint::FlushCallback callback,
1859 FlushFlags flush_flags) {
1860 PERFETTO_DCHECK_THREAD(thread_checker_);
1861 if (!timeout_ms)
1862 timeout_ms = tracing_session->flush_timeout_ms();
1863
1864 if (tracing_session->pending_flushes.size() > 1000) {
1865 PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1866 tracing_session->pending_flushes.size());
1867 callback(false);
1868 return;
1869 }
1870
1871 if (tracing_session->state != TracingSession::STARTED) {
1872 PERFETTO_LOG("Flush() called, but tracing has not been started");
1873 callback(false);
1874 return;
1875 }
1876
1877 ++tracing_session->flushes_requested;
1878 FlushRequestID flush_request_id = ++last_flush_request_id_;
1879 PendingFlush& pending_flush =
1880 tracing_session->pending_flushes
1881 .emplace_hint(tracing_session->pending_flushes.end(),
1882 flush_request_id, PendingFlush(std::move(callback)))
1883 ->second;
1884
1885 // Send a flush request to each producer involved in the tracing session. In
1886 // order to issue a flush request we have to build a map of all data source
1887 // instance ids enabled for each producer.
1888
1889 for (const auto& [producer_id, data_sources] : data_source_instances) {
1890 ProducerEndpointImpl* producer = GetProducer(producer_id);
1891 producer->Flush(flush_request_id, data_sources, flush_flags);
1892 pending_flush.producers.insert(producer_id);
1893 }
1894
1895 // If there are no producers to flush (realistically this happens only in
1896 // some tests) fire OnFlushTimeout() straight away, without waiting.
1897 if (data_source_instances.empty())
1898 timeout_ms = 0;
1899
1900 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1901 task_runner_->PostDelayedTask(
1902 [weak_this, tsid = tracing_session->id, flush_request_id] {
1903 if (weak_this)
1904 weak_this->OnFlushTimeout(tsid, flush_request_id);
1905 },
1906 timeout_ms);
1907 }
1908
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1909 void TracingServiceImpl::NotifyFlushDoneForProducer(
1910 ProducerID producer_id,
1911 FlushRequestID flush_request_id) {
1912 for (auto& kv : tracing_sessions_) {
1913 // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1914 auto& pending_flushes = kv.second.pending_flushes;
1915 auto end_it = pending_flushes.upper_bound(flush_request_id);
1916 for (auto it = pending_flushes.begin(); it != end_it;) {
1917 PendingFlush& pending_flush = it->second;
1918 pending_flush.producers.erase(producer_id);
1919 if (pending_flush.producers.empty()) {
1920 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1921 TracingSessionID tsid = kv.first;
1922 auto callback = std::move(pending_flush.callback);
1923 task_runner_->PostTask([weak_this, tsid, callback]() {
1924 if (weak_this) {
1925 weak_this->CompleteFlush(tsid, std::move(callback),
1926 /*success=*/true);
1927 }
1928 });
1929 it = pending_flushes.erase(it);
1930 } else {
1931 it++;
1932 }
1933 } // for (pending_flushes)
1934 } // for (tracing_session)
1935 }
1936
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1937 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1938 FlushRequestID flush_request_id) {
1939 TracingSession* tracing_session = GetTracingSession(tsid);
1940 if (!tracing_session)
1941 return;
1942 auto it = tracing_session->pending_flushes.find(flush_request_id);
1943 if (it == tracing_session->pending_flushes.end())
1944 return; // Nominal case: flush was completed and acked on time.
1945
1946 // If there were no producers to flush, consider it a success.
1947 bool success = it->second.producers.empty();
1948 auto callback = std::move(it->second.callback);
1949 tracing_session->pending_flushes.erase(it);
1950 CompleteFlush(tsid, std::move(callback), success);
1951 }
1952
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1953 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1954 ConsumerEndpoint::FlushCallback callback,
1955 bool success) {
1956 TracingSession* tracing_session = GetTracingSession(tsid);
1957 if (!tracing_session) {
1958 callback(false);
1959 return;
1960 }
1961 // Producers may not have been able to flush all their data, even if they
1962 // indicated flush completion. If possible, also collect uncommitted chunks
1963 // to make sure we have everything they wrote so far.
1964 for (auto& producer_id_and_producer : producers_) {
1965 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1966 }
1967 SnapshotLifecyleEvent(
1968 tracing_session,
1969 protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
1970 true /* snapshot_clocks */);
1971
1972 tracing_session->flushes_succeeded += success ? 1 : 0;
1973 tracing_session->flushes_failed += success ? 0 : 1;
1974 callback(success);
1975 }
1976
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1977 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1978 TracingSession* tracing_session,
1979 ProducerEndpointImpl* producer) {
1980 if (!producer->smb_scraping_enabled_)
1981 return;
1982
1983 // Can't copy chunks if we don't know about any trace writers.
1984 if (producer->writers_.empty())
1985 return;
1986
1987 // Performance optimization: On flush or session disconnect, this method is
1988 // called for each producer. If the producer doesn't participate in the
1989 // session, there's no need to scape its chunks right now. We can tell if a
1990 // producer participates in the session by checking if the producer is allowed
1991 // to write into the session's log buffers.
1992 const auto& session_buffers = tracing_session->buffers_index;
1993 bool producer_in_session =
1994 std::any_of(session_buffers.begin(), session_buffers.end(),
1995 [producer](BufferID buffer_id) {
1996 return producer->allowed_target_buffers_.count(buffer_id);
1997 });
1998 if (!producer_in_session)
1999 return;
2000
2001 PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
2002
2003 // Find and copy any uncommitted chunks from the SMB.
2004 //
2005 // In nominal conditions, the page layout of the used SMB pages should never
2006 // change because the service is the only one who is supposed to modify used
2007 // pages (to make them free again).
2008 //
2009 // However, the code here needs to deal with the case of a malicious producer
2010 // altering the SMB in unpredictable ways. Thankfully the SMB size is
2011 // immutable, so a chunk will always point to some valid memory, even if the
2012 // producer alters the intended layout and chunk header concurrently.
2013 // Ultimately a malicious producer altering the SMB's chunk layout while we
2014 // are iterating in this function is not any different from the case of a
2015 // malicious producer asking to commit a chunk made of random data, which is
2016 // something this class has to deal with regardless.
2017 //
2018 // The only legitimate mutations that can happen from sane producers,
2019 // concurrently to this function, are:
2020 // A. free pages being partitioned,
2021 // B. free chunks being migrated to kChunkBeingWritten,
2022 // C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
2023
2024 SharedMemoryABI* abi = &producer->shmem_abi_;
2025 // num_pages() is immutable after the SMB is initialized and cannot be changed
2026 // even by a producer even if malicious.
2027 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
2028 uint32_t layout = abi->GetPageLayout(page_idx);
2029
2030 uint32_t used_chunks = abi->GetUsedChunks(layout); // Returns a bitmap.
2031 // Skip empty pages.
2032 if (used_chunks == 0)
2033 continue;
2034
2035 // Scrape the chunks that are currently used. These should be either in
2036 // state kChunkBeingWritten or kChunkComplete.
2037 for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
2038 if (!(used_chunks & 1))
2039 continue;
2040
2041 SharedMemoryABI::ChunkState state =
2042 SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
2043 PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
2044 state == SharedMemoryABI::kChunkComplete);
2045 bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
2046
2047 SharedMemoryABI::Chunk chunk =
2048 abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
2049
2050 uint16_t packet_count;
2051 uint8_t flags;
2052 // GetPacketCountAndFlags has acquire_load semantics.
2053 std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
2054
2055 // It only makes sense to copy an incomplete chunk if there's at least
2056 // one full packet available. (The producer may not have completed the
2057 // last packet in it yet, so we need at least 2.)
2058 if (!chunk_complete && packet_count < 2)
2059 continue;
2060
2061 // At this point, it is safe to access the remaining header fields of
2062 // the chunk. Even if the chunk was only just transferred from
2063 // kChunkFree into kChunkBeingWritten state, the header should be
2064 // written completely once the packet count increased above 1 (it was
2065 // reset to 0 by the service when the chunk was freed).
2066
2067 WriterID writer_id = chunk.writer_id();
2068 std::optional<BufferID> target_buffer_id =
2069 producer->buffer_id_for_writer(writer_id);
2070
2071 // We can only scrape this chunk if we know which log buffer to copy it
2072 // into.
2073 if (!target_buffer_id)
2074 continue;
2075
2076 // Skip chunks that don't belong to the requested tracing session.
2077 bool target_buffer_belongs_to_session =
2078 std::find(session_buffers.begin(), session_buffers.end(),
2079 *target_buffer_id) != session_buffers.end();
2080 if (!target_buffer_belongs_to_session)
2081 continue;
2082
2083 uint32_t chunk_id =
2084 chunk.header()->chunk_id.load(std::memory_order_relaxed);
2085
2086 CopyProducerPageIntoLogBuffer(
2087 producer->id_, producer->client_identity_, writer_id, chunk_id,
2088 *target_buffer_id, packet_count, flags, chunk_complete,
2089 chunk.payload_begin(), chunk.payload_size());
2090 }
2091 }
2092 }
2093
FlushAndDisableTracing(TracingSessionID tsid)2094 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
2095 PERFETTO_DCHECK_THREAD(thread_checker_);
2096 PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
2097 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2098 Flush(
2099 tsid, 0,
2100 [weak_this, tsid](bool success) {
2101 // This was a DLOG up to Jun 2021 (v16, Android S).
2102 PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d",
2103 tsid, success);
2104 if (!weak_this)
2105 return;
2106 TracingSession* session = weak_this->GetTracingSession(tsid);
2107 if (!session) {
2108 return;
2109 }
2110 session->final_flush_outcome = success
2111 ? TraceStats::FINAL_FLUSH_SUCCEEDED
2112 : TraceStats::FINAL_FLUSH_FAILED;
2113 if (session->consumer_maybe_null) {
2114 // If the consumer is still attached, just disable the session but
2115 // give it a chance to read the contents.
2116 weak_this->DisableTracing(tsid);
2117 } else {
2118 // If the consumer detached, destroy the session. If the consumer did
2119 // start the session in long-tracing mode, the service will have saved
2120 // the contents to the passed file. If not, the contents will be
2121 // destroyed.
2122 weak_this->FreeBuffers(tsid);
2123 }
2124 },
2125 FlushFlags(FlushFlags::Initiator::kTraced,
2126 FlushFlags::Reason::kTraceStop));
2127 }
2128
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)2129 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
2130 bool post_next_only) {
2131 PERFETTO_DCHECK_THREAD(thread_checker_);
2132 TracingSession* tracing_session = GetTracingSession(tsid);
2133 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2134 return;
2135
2136 uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
2137 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2138 task_runner_->PostDelayedTask(
2139 [weak_this, tsid] {
2140 if (weak_this)
2141 weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
2142 },
2143 flush_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2144 flush_period_ms));
2145
2146 if (post_next_only)
2147 return;
2148
2149 PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
2150 Flush(
2151 tsid, 0,
2152 [](bool success) {
2153 if (!success)
2154 PERFETTO_ELOG("Periodic flush timed out");
2155 },
2156 FlushFlags(FlushFlags::Initiator::kTraced,
2157 FlushFlags::Reason::kPeriodic));
2158 }
2159
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)2160 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
2161 TracingSessionID tsid,
2162 bool post_next_only) {
2163 PERFETTO_DCHECK_THREAD(thread_checker_);
2164 TracingSession* tracing_session = GetTracingSession(tsid);
2165 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
2166 return;
2167
2168 uint32_t clear_period_ms =
2169 tracing_session->config.incremental_state_config().clear_period_ms();
2170 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2171 task_runner_->PostDelayedTask(
2172 [weak_this, tsid] {
2173 if (weak_this)
2174 weak_this->PeriodicClearIncrementalStateTask(
2175 tsid, /*post_next_only=*/false);
2176 },
2177 clear_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
2178 clear_period_ms));
2179
2180 if (post_next_only)
2181 return;
2182
2183 PERFETTO_DLOG(
2184 "Performing periodic incremental state clear for trace session %" PRIu64,
2185 tsid);
2186
2187 // Queue the IPCs to producers with active data sources that opted in.
2188 std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
2189 for (const auto& kv : tracing_session->data_source_instances) {
2190 ProducerID producer_id = kv.first;
2191 const DataSourceInstance& data_source = kv.second;
2192 if (data_source.handles_incremental_state_clear) {
2193 clear_map[producer_id].push_back(data_source.instance_id);
2194 }
2195 }
2196
2197 for (const auto& kv : clear_map) {
2198 ProducerID producer_id = kv.first;
2199 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
2200 ProducerEndpointImpl* producer = GetProducer(producer_id);
2201 if (!producer) {
2202 PERFETTO_DFATAL("Producer does not exist.");
2203 continue;
2204 }
2205 producer->ClearIncrementalState(data_sources);
2206 }
2207 }
2208
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2209 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2210 TracingSessionID tsid,
2211 ConsumerEndpointImpl* consumer) {
2212 PERFETTO_DCHECK(consumer);
2213 PERFETTO_DCHECK_THREAD(thread_checker_);
2214 TracingSession* tracing_session = GetTracingSession(tsid);
2215 if (!tracing_session) {
2216 PERFETTO_DLOG(
2217 "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2218 return false;
2219 }
2220
2221 if (tracing_session->write_into_file) {
2222 // If the consumer enabled tracing and asked to save the contents into the
2223 // passed file makes little sense to also try to read the buffers over IPC,
2224 // as that would just steal data from the periodic draining task.
2225 PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2226 return false;
2227 }
2228
2229 if (IsWaitingForTrigger(tracing_session))
2230 return false;
2231
2232 // This is a rough threshold to determine how much to read from the buffer in
2233 // each task. This is to avoid executing a single huge sending task for too
2234 // long and risk to hit the watchdog. This is *not* an upper bound: we just
2235 // stop accumulating new packets and PostTask *after* we cross this threshold.
2236 // This constant essentially balances the PostTask and IPC overhead vs the
2237 // responsiveness of the service. An extremely small value will cause one IPC
2238 // and one PostTask for each slice but will keep the service extremely
2239 // responsive. An extremely large value will batch the send for the full
2240 // buffer in one large task, will hit the blocking send() once the socket
2241 // buffers are full and hang the service for a bit (until the consumer
2242 // catches up).
2243 static constexpr size_t kApproxBytesPerTask = 32768;
2244 bool has_more;
2245 std::vector<TracePacket> packets =
2246 ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2247
2248 if (has_more) {
2249 auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2250 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2251 task_runner_->PostTask([weak_this, weak_consumer, tsid] {
2252 if (!weak_this || !weak_consumer)
2253 return;
2254 weak_this->ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2255 });
2256 }
2257
2258 // Keep this as tail call, just in case the consumer re-enters.
2259 consumer->consumer_->OnTraceData(std::move(packets), has_more);
2260 return true;
2261 }
2262
ReadBuffersIntoFile(TracingSessionID tsid)2263 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2264 PERFETTO_DCHECK_THREAD(thread_checker_);
2265 TracingSession* tracing_session = GetTracingSession(tsid);
2266 if (!tracing_session) {
2267 // This will be hit systematically from the PostDelayedTask. Avoid logging,
2268 // it would be just spam.
2269 return false;
2270 }
2271
2272 // This can happen if the file is closed by a previous task because it reaches
2273 // |max_file_size_bytes|.
2274 if (!tracing_session->write_into_file)
2275 return false;
2276
2277 if (IsWaitingForTrigger(tracing_session))
2278 return false;
2279
2280 // ReadBuffers() can allocate memory internally, for filtering. By limiting
2281 // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2282 // we limit the amount of memory used on each iteration.
2283 //
2284 // It would be tempting to split this into multiple tasks like in
2285 // ReadBuffersIntoConsumer, but that's not currently possible.
2286 // ReadBuffersIntoFile has to read the whole available data before returning,
2287 // to support the disable_immediately=true code paths.
2288 bool has_more = true;
2289 bool stop_writing_into_file = false;
2290 do {
2291 std::vector<TracePacket> packets =
2292 ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2293
2294 stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2295 } while (has_more && !stop_writing_into_file);
2296
2297 if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2298 // Ensure all data was written to the file before we close it.
2299 base::FlushFile(tracing_session->write_into_file.get());
2300 tracing_session->write_into_file.reset();
2301 tracing_session->write_period_ms = 0;
2302 if (tracing_session->state == TracingSession::STARTED)
2303 DisableTracing(tsid);
2304 return true;
2305 }
2306
2307 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2308 task_runner_->PostDelayedTask(
2309 [weak_this, tsid] {
2310 if (weak_this)
2311 weak_this->ReadBuffersIntoFile(tsid);
2312 },
2313 tracing_session->delay_to_next_write_period_ms());
2314 return true;
2315 }
2316
IsWaitingForTrigger(TracingSession * tracing_session)2317 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2318 // Ignore the logic below for cloned tracing sessions. In this case we
2319 // actually want to read the (cloned) trace buffers even if no trigger was
2320 // hit.
2321 if (tracing_session->state == TracingSession::CLONED_READ_ONLY) {
2322 return false;
2323 }
2324
2325 // When a tracing session is waiting for a trigger, it is considered empty. If
2326 // a tracing session finishes and moves into DISABLED without ever receiving a
2327 // trigger, the trace should never return any data. This includes the
2328 // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2329 // early and let the consumer know there is no data.
2330 if (!tracing_session->config.trigger_config().triggers().empty() &&
2331 tracing_session->received_triggers.empty()) {
2332 PERFETTO_DLOG(
2333 "ReadBuffers(): tracing session has not received a trigger yet.");
2334 return true;
2335 }
2336
2337 // Traces with CLONE_SNAPSHOT triggers are a special case of the above. They
2338 // can be read only via a CloneSession() request. This is to keep the
2339 // behavior consistent with the STOP_TRACING+triggers case and avoid periodic
2340 // finalizations and uploads of the main CLONE_SNAPSHOT triggers.
2341 if (GetTriggerMode(tracing_session->config) ==
2342 TraceConfig::TriggerConfig::CLONE_SNAPSHOT) {
2343 PERFETTO_DLOG(
2344 "ReadBuffers(): skipping because the tracing session has "
2345 "CLONE_SNAPSHOT triggers defined");
2346 return true;
2347 }
2348
2349 return false;
2350 }
2351
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2352 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2353 TracingSession* tracing_session,
2354 size_t threshold,
2355 bool* has_more) {
2356 PERFETTO_DCHECK_THREAD(thread_checker_);
2357 PERFETTO_DCHECK(tracing_session);
2358 *has_more = false;
2359
2360 std::vector<TracePacket> packets;
2361 packets.reserve(1024); // Just an educated guess to avoid trivial expansions.
2362
2363 if (!tracing_session->initial_clock_snapshot.empty()) {
2364 EmitClockSnapshot(tracing_session,
2365 std::move(tracing_session->initial_clock_snapshot),
2366 &packets);
2367 }
2368
2369 for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2370 PERFETTO_DCHECK(!snapshot.empty());
2371 EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2372 }
2373 tracing_session->clock_snapshot_ring_buffer.clear();
2374
2375 if (tracing_session->should_emit_sync_marker) {
2376 EmitSyncMarker(&packets);
2377 tracing_session->should_emit_sync_marker = false;
2378 }
2379
2380 if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2381 MaybeEmitTraceConfig(tracing_session, &packets);
2382 MaybeEmitReceivedTriggers(tracing_session, &packets);
2383 }
2384 if (!tracing_session->did_emit_initial_packets) {
2385 EmitUuid(tracing_session, &packets);
2386 if (!tracing_session->config.builtin_data_sources().disable_system_info())
2387 EmitSystemInfo(&packets);
2388 }
2389 tracing_session->did_emit_initial_packets = true;
2390
2391 // Note that in the proto comment, we guarantee that the tracing_started
2392 // lifecycle event will be emitted before any data packets so make sure to
2393 // keep this before reading the tracing buffers.
2394 if (!tracing_session->config.builtin_data_sources().disable_service_events())
2395 EmitLifecycleEvents(tracing_session, &packets);
2396
2397 // In a multi-machine tracing session, emit clock synchronization messages for
2398 // remote machines.
2399 if (!relay_clients_.empty())
2400 MaybeEmitRemoteClockSync(tracing_session, &packets);
2401
2402 size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|).
2403
2404 // Add up size for packets added by the Maybe* calls above.
2405 for (const TracePacket& packet : packets) {
2406 packets_bytes += packet.size();
2407 }
2408
2409 bool did_hit_threshold = false;
2410
2411 for (size_t buf_idx = 0;
2412 buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2413 buf_idx++) {
2414 auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2415 if (tbuf_iter == buffers_.end()) {
2416 PERFETTO_DFATAL("Buffer not found.");
2417 continue;
2418 }
2419 TraceBuffer& tbuf = *tbuf_iter->second;
2420 tbuf.BeginRead();
2421 while (!did_hit_threshold) {
2422 TracePacket packet;
2423 TraceBuffer::PacketSequenceProperties sequence_properties{};
2424 bool previous_packet_dropped;
2425 if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2426 &previous_packet_dropped)) {
2427 break;
2428 }
2429 packet.set_buffer_index_for_stats(static_cast<uint32_t>(buf_idx));
2430 PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2431 PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2432 PERFETTO_DCHECK(sequence_properties.client_identity_trusted.has_uid());
2433 // Not checking sequence_properties.client_identity_trusted.has_pid():
2434 // it is false if the platform doesn't support it.
2435
2436 PERFETTO_DCHECK(packet.size() > 0);
2437 if (!PacketStreamValidator::Validate(packet.slices())) {
2438 tracing_session->invalid_packets++;
2439 PERFETTO_DLOG("Dropping invalid packet");
2440 continue;
2441 }
2442
2443 // Append a slice with the trusted field data. This can't be spoofed
2444 // because above we validated that the existing slices don't contain any
2445 // trusted fields. For added safety we append instead of prepending
2446 // because according to protobuf semantics, if the same field is
2447 // encountered multiple times the last instance takes priority. Note that
2448 // truncated packets are also rejected, so the producer can't give us a
2449 // partial packet (e.g., a truncated string) which only becomes valid when
2450 // the trusted data is appended here.
2451 Slice slice = Slice::Allocate(32);
2452 protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2453 slice.own_data(), slice.size);
2454 const auto& client_identity_trusted =
2455 sequence_properties.client_identity_trusted;
2456 trusted_packet->set_trusted_uid(
2457 static_cast<int32_t>(client_identity_trusted.uid()));
2458 trusted_packet->set_trusted_packet_sequence_id(
2459 tracing_session->GetPacketSequenceID(
2460 client_identity_trusted.machine_id(),
2461 sequence_properties.producer_id_trusted,
2462 sequence_properties.writer_id));
2463 if (client_identity_trusted.has_pid()) {
2464 // Not supported on all platforms.
2465 trusted_packet->set_trusted_pid(
2466 static_cast<int32_t>(client_identity_trusted.pid()));
2467 }
2468 if (client_identity_trusted.has_non_default_machine_id()) {
2469 trusted_packet->set_machine_id(client_identity_trusted.machine_id());
2470 }
2471 if (previous_packet_dropped)
2472 trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2473 slice.size = trusted_packet.Finalize();
2474 packet.AddSlice(std::move(slice));
2475
2476 // Append the packet (inclusive of the trusted uid) to |packets|.
2477 packets_bytes += packet.size();
2478 did_hit_threshold = packets_bytes >= threshold;
2479 packets.emplace_back(std::move(packet));
2480 } // for(packets...)
2481 } // for(buffers...)
2482
2483 *has_more = did_hit_threshold;
2484
2485 // Only emit the "read complete" lifetime event when there is no more trace
2486 // data available to read. These events are used as safe points to limit
2487 // sorting in trace processor: the code shouldn't emit the event unless the
2488 // buffers are empty.
2489 if (!*has_more && !tracing_session->config.builtin_data_sources()
2490 .disable_service_events()) {
2491 // We don't bother snapshotting clocks here because we wouldn't be able to
2492 // emit it and we shouldn't have significant drift from the last snapshot in
2493 // any case.
2494 SnapshotLifecyleEvent(tracing_session,
2495 protos::pbzero::TracingServiceEvent::
2496 kReadTracingBuffersCompletedFieldNumber,
2497 false /* snapshot_clocks */);
2498 EmitLifecycleEvents(tracing_session, &packets);
2499 }
2500
2501 // Only emit the stats when there is no more trace data is available to read.
2502 // That way, any problems that occur while reading from the buffers are
2503 // reflected in the emitted stats. This is particularly important for use
2504 // cases where ReadBuffers is only ever called after the tracing session is
2505 // stopped.
2506 if (!*has_more && tracing_session->should_emit_stats) {
2507 EmitStats(tracing_session, &packets);
2508 tracing_session->should_emit_stats = false;
2509 }
2510
2511 MaybeFilterPackets(tracing_session, &packets);
2512
2513 MaybeCompressPackets(tracing_session, &packets);
2514
2515 if (!*has_more) {
2516 // We've observed some extremely high memory usage by scudo after
2517 // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2518 // now, but this code asks scudo to release memory just in case.
2519 base::MaybeReleaseAllocatorMemToOS();
2520 }
2521
2522 return packets;
2523 }
2524
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2525 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2526 std::vector<TracePacket>* packets) {
2527 // If the tracing session specified a filter, run all packets through the
2528 // filter and replace them with the filter results.
2529 // The process below mantains the cardinality of input packets. Even if an
2530 // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2531 // makes debugging and reasoning about the trace stats easier.
2532 // This place swaps the contents of each |packets| entry in place.
2533 if (!tracing_session->trace_filter) {
2534 return;
2535 }
2536 protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2537 // The filter root should be reset from protos.Trace to protos.TracePacket
2538 // by the earlier call to SetFilterRoot() in EnableTracing().
2539 PERFETTO_DCHECK(trace_filter.config().root_msg_index() != 0);
2540 std::vector<protozero::MessageFilter::InputSlice> filter_input;
2541 auto start = base::GetWallTimeNs();
2542 for (TracePacket& packet : *packets) {
2543 const auto& packet_slices = packet.slices();
2544 const size_t input_packet_size = packet.size();
2545 filter_input.clear();
2546 filter_input.resize(packet_slices.size());
2547 ++tracing_session->filter_input_packets;
2548 tracing_session->filter_input_bytes += input_packet_size;
2549 for (size_t i = 0; i < packet_slices.size(); ++i)
2550 filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2551 auto filtered_packet = trace_filter.FilterMessageFragments(
2552 &filter_input[0], filter_input.size());
2553
2554 // Replace the packet in-place with the filtered one (unless failed).
2555 std::optional<uint32_t> maybe_buffer_idx = packet.buffer_index_for_stats();
2556 packet = TracePacket();
2557 if (filtered_packet.error) {
2558 ++tracing_session->filter_errors;
2559 PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2560 tracing_session->filter_input_packets);
2561 continue;
2562 }
2563 tracing_session->filter_output_bytes += filtered_packet.size;
2564 if (maybe_buffer_idx.has_value()) {
2565 // Keep the per-buffer stats updated. Also propagate the
2566 // buffer_index_for_stats in the output packet to allow accounting by
2567 // other parts of the ReadBuffer pipeline.
2568 uint32_t buffer_idx = maybe_buffer_idx.value();
2569 packet.set_buffer_index_for_stats(buffer_idx);
2570 auto& vec = tracing_session->filter_bytes_discarded_per_buffer;
2571 if (static_cast<size_t>(buffer_idx) >= vec.size())
2572 vec.resize(buffer_idx + 1);
2573 PERFETTO_DCHECK(input_packet_size >= filtered_packet.size);
2574 size_t bytes_filtered_out = input_packet_size - filtered_packet.size;
2575 vec[buffer_idx] += bytes_filtered_out;
2576 }
2577 AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2578 filtered_packet.size, kMaxTracePacketSliceSize,
2579 &packet);
2580 }
2581 auto end = base::GetWallTimeNs();
2582 tracing_session->filter_time_taken_ns +=
2583 static_cast<uint64_t>((end - start).count());
2584 }
2585
MaybeCompressPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2586 void TracingServiceImpl::MaybeCompressPackets(
2587 TracingSession* tracing_session,
2588 std::vector<TracePacket>* packets) {
2589 if (!tracing_session->compress_deflate) {
2590 return;
2591 }
2592
2593 init_opts_.compressor_fn(packets);
2594 }
2595
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2596 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2597 std::vector<TracePacket> packets) {
2598 if (!tracing_session->write_into_file) {
2599 return false;
2600 }
2601 const uint64_t max_size = tracing_session->max_file_size_bytes
2602 ? tracing_session->max_file_size_bytes
2603 : std::numeric_limits<size_t>::max();
2604
2605 size_t total_slices = 0;
2606 for (const TracePacket& packet : packets) {
2607 total_slices += packet.slices().size();
2608 }
2609 // When writing into a file, the file should look like a root trace.proto
2610 // message. Each packet should be prepended with a proto preamble stating
2611 // its field id (within trace.proto) and size. Hence the addition below.
2612 const size_t max_iovecs = total_slices + packets.size();
2613
2614 size_t num_iovecs = 0;
2615 bool stop_writing_into_file = false;
2616 std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2617 size_t num_iovecs_at_last_packet = 0;
2618 uint64_t bytes_about_to_be_written = 0;
2619 for (TracePacket& packet : packets) {
2620 std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2621 packet.GetProtoPreamble();
2622 bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2623 num_iovecs++;
2624 for (const Slice& slice : packet.slices()) {
2625 // writev() doesn't change the passed pointer. However, struct iovec
2626 // take a non-const ptr because it's the same struct used by readv().
2627 // Hence the const_cast here.
2628 char* start = static_cast<char*>(const_cast<void*>(slice.start));
2629 bytes_about_to_be_written += slice.size;
2630 iovecs[num_iovecs++] = {start, slice.size};
2631 }
2632
2633 if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2634 max_size) {
2635 stop_writing_into_file = true;
2636 num_iovecs = num_iovecs_at_last_packet;
2637 break;
2638 }
2639
2640 num_iovecs_at_last_packet = num_iovecs;
2641 }
2642 PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2643 int fd = *tracing_session->write_into_file;
2644
2645 uint64_t total_wr_size = 0;
2646
2647 // writev() can take at most IOV_MAX entries per call. Batch them.
2648 constexpr size_t kIOVMax = IOV_MAX;
2649 for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2650 int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2651 ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2652 if (wr_size <= 0) {
2653 PERFETTO_PLOG("writev() failed");
2654 stop_writing_into_file = true;
2655 break;
2656 }
2657 total_wr_size += static_cast<size_t>(wr_size);
2658 }
2659
2660 tracing_session->bytes_written_into_file += total_wr_size;
2661
2662 PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2663 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2664 return stop_writing_into_file;
2665 }
2666
FreeBuffers(TracingSessionID tsid)2667 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2668 PERFETTO_DCHECK_THREAD(thread_checker_);
2669 PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2670 TracingSession* tracing_session = GetTracingSession(tsid);
2671 if (!tracing_session) {
2672 PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2673 return; // TODO(primiano): signal failure?
2674 }
2675 DisableTracing(tsid, /*disable_immediately=*/true);
2676
2677 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2678 tracing_session->data_source_instances.clear();
2679
2680 for (auto& producer_entry : producers_) {
2681 ProducerEndpointImpl* producer = producer_entry.second;
2682 producer->OnFreeBuffers(tracing_session->buffers_index);
2683 }
2684
2685 for (BufferID buffer_id : tracing_session->buffers_index) {
2686 buffer_ids_.Free(buffer_id);
2687 PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2688 buffers_.erase(buffer_id);
2689 }
2690 bool notify_traceur =
2691 tracing_session->config.notify_traceur() &&
2692 tracing_session->state != TracingSession::CLONED_READ_ONLY;
2693 bool is_long_trace =
2694 (tracing_session->config.write_into_file() &&
2695 tracing_session->config.file_write_period_ms() < kMillisPerDay);
2696 auto pending_clones = std::move(tracing_session->pending_clones);
2697 tracing_sessions_.erase(tsid);
2698 tracing_session = nullptr;
2699 UpdateMemoryGuardrail();
2700
2701 for (const auto& id_to_clone_op : pending_clones) {
2702 const PendingClone& clone_op = id_to_clone_op.second;
2703 if (clone_op.weak_consumer) {
2704 task_runner_->PostTask([weak_consumer = clone_op.weak_consumer] {
2705 if (weak_consumer) {
2706 weak_consumer->consumer_->OnSessionCloned(
2707 {false, "Original session ended", {}});
2708 }
2709 });
2710 }
2711 }
2712
2713 PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2714 tracing_sessions_.size());
2715 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2716 PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2717 if (notify_traceur && is_long_trace) {
2718 PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2719 if (!notify_fn || !notify_fn(/*session_stolen=*/false))
2720 PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2721 }
2722 #else
2723 base::ignore_result(notify_traceur);
2724 base::ignore_result(is_long_trace);
2725 #endif
2726 }
2727
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2728 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2729 const DataSourceDescriptor& desc) {
2730 PERFETTO_DCHECK_THREAD(thread_checker_);
2731 if (desc.name().empty()) {
2732 PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2733 return;
2734 }
2735
2736 ProducerEndpointImpl* producer = GetProducer(producer_id);
2737 if (!producer) {
2738 PERFETTO_DFATAL("Producer not found.");
2739 return;
2740 }
2741
2742 // Check that the producer doesn't register two data sources with the same ID.
2743 // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2744 // field didn't exist.
2745 for (const auto& kv : data_sources_) {
2746 if (desc.id() && kv.second.producer_id == producer_id &&
2747 kv.second.descriptor.id() == desc.id()) {
2748 PERFETTO_ELOG(
2749 "Failed to register data source \"%s\". A data source with the same "
2750 "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2751 desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2752 producer_id);
2753 return;
2754 }
2755 }
2756
2757 PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2758 producer_id, desc.name().c_str());
2759
2760 auto reg_ds = data_sources_.emplace(desc.name(),
2761 RegisteredDataSource{producer_id, desc});
2762
2763 // If there are existing tracing sessions, we need to check if the new
2764 // data source is enabled by any of them.
2765 for (auto& iter : tracing_sessions_) {
2766 TracingSession& tracing_session = iter.second;
2767 if (tracing_session.state != TracingSession::STARTED &&
2768 tracing_session.state != TracingSession::CONFIGURED) {
2769 continue;
2770 }
2771
2772 TraceConfig::ProducerConfig producer_config;
2773 for (const auto& config : tracing_session.config.producers()) {
2774 if (producer->name_ == config.producer_name()) {
2775 producer_config = config;
2776 break;
2777 }
2778 }
2779 for (const TraceConfig::DataSource& cfg_data_source :
2780 tracing_session.config.data_sources()) {
2781 if (cfg_data_source.config().name() != desc.name())
2782 continue;
2783 DataSourceInstance* ds_inst = SetupDataSource(
2784 cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2785 if (ds_inst && tracing_session.state == TracingSession::STARTED)
2786 StartDataSourceInstance(producer, &tracing_session, ds_inst);
2787 }
2788 } // for(iter : tracing_sessions_)
2789 }
2790
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2791 void TracingServiceImpl::UpdateDataSource(
2792 ProducerID producer_id,
2793 const DataSourceDescriptor& new_desc) {
2794 if (new_desc.id() == 0) {
2795 PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2796 return;
2797 }
2798
2799 // If this producer has already registered a matching descriptor name and id,
2800 // just update the descriptor.
2801 RegisteredDataSource* data_source = nullptr;
2802 auto range = data_sources_.equal_range(new_desc.name());
2803 for (auto it = range.first; it != range.second; ++it) {
2804 if (it->second.producer_id == producer_id &&
2805 it->second.descriptor.id() == new_desc.id()) {
2806 data_source = &it->second;
2807 break;
2808 }
2809 }
2810
2811 if (!data_source) {
2812 PERFETTO_ELOG(
2813 "UpdateDataSource() failed, could not find an existing data source "
2814 "with name=\"%s\" id=%" PRIu64,
2815 new_desc.name().c_str(), new_desc.id());
2816 return;
2817 }
2818
2819 data_source->descriptor = new_desc;
2820 }
2821
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2822 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2823 TracingSession* tracing_session,
2824 DataSourceInstance* instance,
2825 bool disable_immediately) {
2826 const DataSourceInstanceID ds_inst_id = instance->instance_id;
2827 if (instance->will_notify_on_stop && !disable_immediately) {
2828 instance->state = DataSourceInstance::STOPPING;
2829 } else {
2830 instance->state = DataSourceInstance::STOPPED;
2831 }
2832 if (tracing_session->consumer_maybe_null) {
2833 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2834 *producer, *instance);
2835 }
2836 producer->StopDataSource(ds_inst_id);
2837 }
2838
UnregisterDataSource(ProducerID producer_id,const std::string & name)2839 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2840 const std::string& name) {
2841 PERFETTO_DCHECK_THREAD(thread_checker_);
2842 PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2843 producer_id, name.c_str());
2844 PERFETTO_CHECK(producer_id);
2845 ProducerEndpointImpl* producer = GetProducer(producer_id);
2846 PERFETTO_DCHECK(producer);
2847 for (auto& kv : tracing_sessions_) {
2848 auto& ds_instances = kv.second.data_source_instances;
2849 bool removed = false;
2850 for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2851 if (it->first == producer_id && it->second.data_source_name == name) {
2852 DataSourceInstanceID ds_inst_id = it->second.instance_id;
2853 if (it->second.state != DataSourceInstance::STOPPED) {
2854 if (it->second.state != DataSourceInstance::STOPPING) {
2855 StopDataSourceInstance(producer, &kv.second, &it->second,
2856 /* disable_immediately = */ false);
2857 }
2858
2859 // Mark the instance as stopped immediately, since we are
2860 // unregistering it below.
2861 //
2862 // The StopDataSourceInstance above might have set the state to
2863 // STOPPING so this condition isn't an else.
2864 if (it->second.state == DataSourceInstance::STOPPING)
2865 NotifyDataSourceStopped(producer_id, ds_inst_id);
2866 }
2867 it = ds_instances.erase(it);
2868 removed = true;
2869 } else {
2870 ++it;
2871 }
2872 } // for (data_source_instances)
2873 if (removed)
2874 MaybeNotifyAllDataSourcesStarted(&kv.second);
2875 } // for (tracing_session)
2876
2877 for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2878 if (it->second.producer_id == producer_id &&
2879 it->second.descriptor.name() == name) {
2880 data_sources_.erase(it);
2881 return;
2882 }
2883 }
2884
2885 PERFETTO_DFATAL(
2886 "Tried to unregister a non-existent data source \"%s\" for "
2887 "producer %" PRIu16,
2888 name.c_str(), producer_id);
2889 }
2890
IsInitiatorPrivileged(const TracingSession & tracing_session)2891 bool TracingServiceImpl::IsInitiatorPrivileged(
2892 const TracingSession& tracing_session) {
2893 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2894 if (tracing_session.consumer_uid == 1066 /* AID_STATSD */ &&
2895 tracing_session.config.statsd_metadata().triggering_config_uid() !=
2896 2000 /* AID_SHELL */
2897 && tracing_session.config.statsd_metadata().triggering_config_uid() !=
2898 0 /* AID_ROOT */) {
2899 // StatsD can be triggered either by shell, root or an app that has DUMP and
2900 // USAGE_STATS permission. When triggered by shell or root, we do not want
2901 // to consider the trace a trusted system trace, as it was initiated by the
2902 // user. Otherwise, it has to come from an app with DUMP and
2903 // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2904 // system.
2905 // Check for shell / root: https://bit.ly/3b7oZNi
2906 // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2907 return true;
2908 }
2909 if (tracing_session.consumer_uid == 1000 /* AID_SYSTEM */) {
2910 // AID_SYSTEM is considered a privileged initiator so that system_server can
2911 // profile apps that are not profileable by shell. Other AID_SYSTEM
2912 // processes are not allowed by SELinux to connect to the consumer socket or
2913 // to exec perfetto.
2914 return true;
2915 }
2916 #else
2917 base::ignore_result(tracing_session);
2918 #endif
2919 return false;
2920 }
2921
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2922 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2923 const TraceConfig::DataSource& cfg_data_source,
2924 const TraceConfig::ProducerConfig& producer_config,
2925 const RegisteredDataSource& data_source,
2926 TracingSession* tracing_session) {
2927 PERFETTO_DCHECK_THREAD(thread_checker_);
2928 ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2929 PERFETTO_DCHECK(producer);
2930 // An existing producer that is not ftrace could have registered itself as
2931 // ftrace, we must not enable it in that case.
2932 if (lockdown_mode_ && producer->uid() != uid_) {
2933 PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2934 return nullptr;
2935 }
2936 // TODO(primiano): Add tests for registration ordering (data sources vs
2937 // consumers).
2938 if (!NameMatchesFilter(producer->name_,
2939 cfg_data_source.producer_name_filter(),
2940 cfg_data_source.producer_name_regex_filter())) {
2941 PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2942 cfg_data_source.config().name().c_str(),
2943 producer->name_.c_str());
2944 return nullptr;
2945 }
2946
2947 auto relative_buffer_id = cfg_data_source.config().target_buffer();
2948 if (relative_buffer_id >= tracing_session->num_buffers()) {
2949 PERFETTO_LOG(
2950 "The TraceConfig for DataSource %s specified a target_buffer out of "
2951 "bound (%d). Skipping it.",
2952 cfg_data_source.config().name().c_str(), relative_buffer_id);
2953 return nullptr;
2954 }
2955
2956 // Create a copy of the DataSourceConfig specified in the trace config. This
2957 // will be passed to the producer after translating the |target_buffer| id.
2958 // The |target_buffer| parameter passed by the consumer in the trace config is
2959 // relative to the buffers declared in the same trace config. This has to be
2960 // translated to the global BufferID before passing it to the producers, which
2961 // don't know anything about tracing sessions and consumers.
2962
2963 DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2964 auto insert_iter = tracing_session->data_source_instances.emplace(
2965 std::piecewise_construct, //
2966 std::forward_as_tuple(producer->id_),
2967 std::forward_as_tuple(
2968 inst_id,
2969 cfg_data_source.config(), // Deliberate copy.
2970 data_source.descriptor.name(),
2971 data_source.descriptor.will_notify_on_start(),
2972 data_source.descriptor.will_notify_on_stop(),
2973 data_source.descriptor.handles_incremental_state_clear(),
2974 data_source.descriptor.no_flush()));
2975 DataSourceInstance* ds_instance = &insert_iter->second;
2976
2977 // New data source instance starts out in CONFIGURED state.
2978 if (tracing_session->consumer_maybe_null) {
2979 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2980 *producer, *ds_instance);
2981 }
2982
2983 DataSourceConfig& ds_config = ds_instance->config;
2984 ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2985
2986 // Rationale for `if (prefer) set_prefer(true)`, rather than `set(prefer)`:
2987 // ComputeStartupConfigHash() in tracing_muxer_impl.cc compares hashes of the
2988 // DataSourceConfig and expects to know (and clear) the fields generated by
2989 // the tracing service. Unconditionally adding a new field breaks backward
2990 // compatibility of startup tracing with older SDKs, because the serialization
2991 // also propagates unkonwn fields, breaking the hash matching check.
2992 if (tracing_session->config.prefer_suspend_clock_for_duration())
2993 ds_config.set_prefer_suspend_clock_for_duration(true);
2994
2995 ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2996 ds_config.set_enable_extra_guardrails(
2997 tracing_session->config.enable_extra_guardrails());
2998 if (IsInitiatorPrivileged(*tracing_session)) {
2999 ds_config.set_session_initiator(
3000 DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
3001 } else {
3002 // Unset in case the consumer set it.
3003 // We need to be able to trust this field.
3004 ds_config.set_session_initiator(
3005 DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
3006 }
3007 ds_config.set_tracing_session_id(tracing_session->id);
3008 BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
3009 PERFETTO_DCHECK(global_id);
3010 ds_config.set_target_buffer(global_id);
3011
3012 PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
3013 ds_config.name().c_str(), global_id);
3014 if (!producer->shared_memory()) {
3015 // Determine the SMB page size. Must be an integer multiple of 4k.
3016 // As for the SMB size below, the decision tree is as follows:
3017 // 1. Give priority to what is defined in the trace config.
3018 // 2. If unset give priority to the hint passed by the producer.
3019 // 3. Keep within bounds and ensure it's a multiple of 4k.
3020 size_t page_size = producer_config.page_size_kb() * 1024;
3021 if (page_size == 0)
3022 page_size = producer->shmem_page_size_hint_bytes_;
3023
3024 // Determine the SMB size. Must be an integer multiple of the SMB page size.
3025 // The decision tree is as follows:
3026 // 1. Give priority to what defined in the trace config.
3027 // 2. If unset give priority to the hint passed by the producer.
3028 // 3. Keep within bounds and ensure it's a multiple of the page size.
3029 size_t shm_size = producer_config.shm_size_kb() * 1024;
3030 if (shm_size == 0)
3031 shm_size = producer->shmem_size_hint_bytes_;
3032
3033 auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
3034 if (valid_sizes != std::tie(shm_size, page_size)) {
3035 PERFETTO_DLOG(
3036 "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
3037 "back to shm_size %zu page_size %zu.",
3038 shm_size, page_size, std::get<0>(valid_sizes),
3039 std::get<1>(valid_sizes));
3040 }
3041 std::tie(shm_size, page_size) = valid_sizes;
3042
3043 // TODO(primiano): right now Create() will suicide in case of OOM if the
3044 // mmap fails. We should instead gracefully fail the request and tell the
3045 // client to go away.
3046 PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
3047 producer->name_.c_str());
3048 auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
3049 producer->SetupSharedMemory(std::move(shared_memory), page_size,
3050 /*provided_by_producer=*/false);
3051 }
3052 producer->SetupDataSource(inst_id, ds_config);
3053 return ds_instance;
3054 }
3055
3056 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
3057 // might be lying / returning garbage contents. |src| and |size| can be trusted
3058 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,const ClientIdentity & client_identity_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)3059 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
3060 ProducerID producer_id_trusted,
3061 const ClientIdentity& client_identity_trusted,
3062 WriterID writer_id,
3063 ChunkID chunk_id,
3064 BufferID buffer_id,
3065 uint16_t num_fragments,
3066 uint8_t chunk_flags,
3067 bool chunk_complete,
3068 const uint8_t* src,
3069 size_t size) {
3070 PERFETTO_DCHECK_THREAD(thread_checker_);
3071
3072 ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
3073 if (!producer) {
3074 PERFETTO_DFATAL("Producer not found.");
3075 chunks_discarded_++;
3076 return;
3077 }
3078
3079 TraceBuffer* buf = GetBufferByID(buffer_id);
3080 if (!buf) {
3081 PERFETTO_DLOG("Could not find target buffer %" PRIu16
3082 " for producer %" PRIu16,
3083 buffer_id, producer_id_trusted);
3084 chunks_discarded_++;
3085 return;
3086 }
3087
3088 // Verify that the producer is actually allowed to write into the target
3089 // buffer specified in the request. This prevents a malicious producer from
3090 // injecting data into a log buffer that belongs to a tracing session the
3091 // producer is not part of.
3092 if (!producer->is_allowed_target_buffer(buffer_id)) {
3093 PERFETTO_ELOG("Producer %" PRIu16
3094 " tried to write into forbidden target buffer %" PRIu16,
3095 producer_id_trusted, buffer_id);
3096 PERFETTO_DFATAL("Forbidden target buffer");
3097 chunks_discarded_++;
3098 return;
3099 }
3100
3101 // If the writer was registered by the producer, it should only write into the
3102 // buffer it was registered with.
3103 std::optional<BufferID> associated_buffer =
3104 producer->buffer_id_for_writer(writer_id);
3105 if (associated_buffer && *associated_buffer != buffer_id) {
3106 PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
3107 " was registered to write into target buffer %" PRIu16
3108 ", but tried to write into buffer %" PRIu16,
3109 writer_id, producer_id_trusted, *associated_buffer,
3110 buffer_id);
3111 PERFETTO_DFATAL("Wrong target buffer");
3112 chunks_discarded_++;
3113 return;
3114 }
3115
3116 buf->CopyChunkUntrusted(producer_id_trusted, client_identity_trusted,
3117 writer_id, chunk_id, num_fragments, chunk_flags,
3118 chunk_complete, src, size);
3119 }
3120
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)3121 void TracingServiceImpl::ApplyChunkPatches(
3122 ProducerID producer_id_trusted,
3123 const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
3124 PERFETTO_DCHECK_THREAD(thread_checker_);
3125
3126 for (const auto& chunk : chunks_to_patch) {
3127 const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
3128 const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
3129 TraceBuffer* buf =
3130 GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
3131 static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
3132 "Add a '|| chunk_id > kMaxChunkID' below if this fails");
3133 if (!writer_id || writer_id > kMaxWriterID || !buf) {
3134 // This can genuinely happen when the trace is stopped. The producers
3135 // might see the stop signal with some delay and try to keep sending
3136 // patches left soon after.
3137 PERFETTO_DLOG(
3138 "Received invalid chunks_to_patch request from Producer: %" PRIu16
3139 ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
3140 producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
3141 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3142 continue;
3143 }
3144
3145 // Note, there's no need to validate that the producer is allowed to write
3146 // to the specified buffer ID (or that it's the correct buffer ID for a
3147 // registered TraceWriter). That's because TraceBuffer uses the producer ID
3148 // and writer ID to look up the chunk to patch. If the producer specifies an
3149 // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
3150 // patches. Because the producer ID is trusted, there's also no way for a
3151 // malicious producer to patch another producer's data.
3152
3153 // Speculate on the fact that there are going to be a limited amount of
3154 // patches per request, so we can allocate the |patches| array on the stack.
3155 std::array<TraceBuffer::Patch, 1024> patches; // Uninitialized.
3156 if (chunk.patches().size() > patches.size()) {
3157 PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
3158 patches.size());
3159 PERFETTO_DFATAL("Too many patches");
3160 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
3161 continue;
3162 }
3163
3164 size_t i = 0;
3165 for (const auto& patch : chunk.patches()) {
3166 const std::string& patch_data = patch.data();
3167 if (patch_data.size() != patches[i].data.size()) {
3168 PERFETTO_ELOG("Received patch from producer: %" PRIu16
3169 " of unexpected size %zu",
3170 producer_id_trusted, patch_data.size());
3171 patches_discarded_++;
3172 continue;
3173 }
3174 patches[i].offset_untrusted = patch.offset();
3175 memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
3176 i++;
3177 }
3178 buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
3179 &patches[0], i, chunk.has_more_patches());
3180 }
3181 }
3182
GetDetachedSession(uid_t uid,const std::string & key)3183 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
3184 uid_t uid,
3185 const std::string& key) {
3186 PERFETTO_DCHECK_THREAD(thread_checker_);
3187 for (auto& kv : tracing_sessions_) {
3188 TracingSession* session = &kv.second;
3189 if (session->consumer_uid == uid && session->detach_key == key) {
3190 PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
3191 return session;
3192 }
3193 }
3194 return nullptr;
3195 }
3196
GetTracingSession(TracingSessionID tsid)3197 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
3198 TracingSessionID tsid) {
3199 PERFETTO_DCHECK_THREAD(thread_checker_);
3200 auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
3201 if (it == tracing_sessions_.end())
3202 return nullptr;
3203 return &it->second;
3204 }
3205
3206 TracingServiceImpl::TracingSession*
FindTracingSessionWithMaxBugreportScore()3207 TracingServiceImpl::FindTracingSessionWithMaxBugreportScore() {
3208 TracingSession* max_session = nullptr;
3209 for (auto& session_id_and_session : tracing_sessions_) {
3210 auto& session = session_id_and_session.second;
3211 const int32_t score = session.config.bugreport_score();
3212 // Exclude sessions with 0 (or below) score. By default tracing sessions
3213 // should NOT be eligible to be attached to bugreports.
3214 if (score <= 0 || session.state != TracingSession::STARTED)
3215 continue;
3216
3217 if (!max_session || score > max_session->config.bugreport_score())
3218 max_session = &session;
3219 }
3220 return max_session;
3221 }
3222
GetNextProducerID()3223 ProducerID TracingServiceImpl::GetNextProducerID() {
3224 PERFETTO_DCHECK_THREAD(thread_checker_);
3225 PERFETTO_CHECK(producers_.size() < kMaxProducerID);
3226 do {
3227 ++last_producer_id_;
3228 } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
3229 PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
3230 return last_producer_id_;
3231 }
3232
GetBufferByID(BufferID buffer_id)3233 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
3234 auto buf_iter = buffers_.find(buffer_id);
3235 if (buf_iter == buffers_.end())
3236 return nullptr;
3237 return &*buf_iter->second;
3238 }
3239
OnStartTriggersTimeout(TracingSessionID tsid)3240 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
3241 // Skip entirely the flush if the trace session doesn't exist anymore.
3242 // This is to prevent misleading error messages to be logged.
3243 //
3244 // if the trace has started from the trigger we rely on
3245 // the |stop_delay_ms| from the trigger so don't flush and
3246 // disable if we've moved beyond a CONFIGURED state
3247 auto* tracing_session_ptr = GetTracingSession(tsid);
3248 if (tracing_session_ptr &&
3249 tracing_session_ptr->state == TracingSession::CONFIGURED) {
3250 PERFETTO_DLOG("Disabling TracingSession %" PRIu64
3251 " since no triggers activated.",
3252 tsid);
3253 // No data should be returned from ReadBuffers() regardless of if we
3254 // call FreeBuffers() or DisableTracing(). This is because in
3255 // STOP_TRACING we need this promise in either case, and using
3256 // DisableTracing() allows a graceful shutdown. Consumers can follow
3257 // their normal path and check the buffers through ReadBuffers() and
3258 // the code won't hang because the tracing session will still be
3259 // alive just disabled.
3260 DisableTracing(tsid);
3261 }
3262 }
3263
UpdateMemoryGuardrail()3264 void TracingServiceImpl::UpdateMemoryGuardrail() {
3265 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
3266 uint64_t total_buffer_bytes = 0;
3267
3268 // Sum up all the shared memory buffers.
3269 for (const auto& id_to_producer : producers_) {
3270 if (id_to_producer.second->shared_memory())
3271 total_buffer_bytes += id_to_producer.second->shared_memory()->size();
3272 }
3273
3274 // Sum up all the trace buffers.
3275 for (const auto& id_to_buffer : buffers_) {
3276 total_buffer_bytes += id_to_buffer.second->size();
3277 }
3278
3279 // Sum up all the cloned traced buffers.
3280 for (const auto& id_to_ts : tracing_sessions_) {
3281 const TracingSession& ts = id_to_ts.second;
3282 for (const auto& id_to_clone_op : ts.pending_clones) {
3283 const PendingClone& clone_op = id_to_clone_op.second;
3284 for (const std::unique_ptr<TraceBuffer>& buf : clone_op.buffers) {
3285 if (buf) {
3286 total_buffer_bytes += buf->size();
3287 }
3288 }
3289 }
3290 }
3291
3292 // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
3293 // interval.
3294 uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
3295 base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
3296 #endif
3297 }
3298
PeriodicSnapshotTask(TracingSessionID tsid)3299 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
3300 auto* tracing_session = GetTracingSession(tsid);
3301 if (!tracing_session)
3302 return;
3303 if (tracing_session->state != TracingSession::STARTED)
3304 return;
3305 tracing_session->should_emit_sync_marker = true;
3306 tracing_session->should_emit_stats = true;
3307 MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3308 }
3309
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)3310 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
3311 uint32_t field_id,
3312 bool snapshot_clocks) {
3313 // field_id should be an id of a field in TracingServiceEvent.
3314 auto& lifecycle_events = tracing_session->lifecycle_events;
3315 auto event_it =
3316 std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
3317 [field_id](const TracingSession::LifecycleEvent& event) {
3318 return event.field_id == field_id;
3319 });
3320
3321 TracingSession::LifecycleEvent* event;
3322 if (event_it == lifecycle_events.end()) {
3323 lifecycle_events.emplace_back(field_id);
3324 event = &lifecycle_events.back();
3325 } else {
3326 event = &*event_it;
3327 }
3328
3329 // Snapshot the clocks before capturing the timestamp for the event so we can
3330 // use this snapshot to resolve the event timestamp if necessary.
3331 if (snapshot_clocks)
3332 MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3333
3334 // Erase before emplacing to prevent a unncessary doubling of memory if
3335 // not needed.
3336 if (event->timestamps.size() >= event->max_size) {
3337 event->timestamps.erase_front(1 + event->timestamps.size() -
3338 event->max_size);
3339 }
3340 event->timestamps.emplace_back(base::GetBootTimeNs().count());
3341 }
3342
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3343 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3344 TracingSession* tracing_session) {
3345 if (tracing_session->config.builtin_data_sources()
3346 .disable_clock_snapshotting()) {
3347 return;
3348 }
3349
3350 // We are making an explicit copy of the latest snapshot (if it exists)
3351 // because SnapshotClocks reads this data and computes the drift based on its
3352 // content. If the clock drift is high enough, it will update the contents of
3353 // |snapshot| and return true. Otherwise, it will return false.
3354 TracingSession::ClockSnapshotData snapshot =
3355 tracing_session->clock_snapshot_ring_buffer.empty()
3356 ? TracingSession::ClockSnapshotData()
3357 : tracing_session->clock_snapshot_ring_buffer.back();
3358 bool did_update = SnapshotClocks(&snapshot);
3359 if (did_update) {
3360 // This means clocks drifted enough since last snapshot. See the comment
3361 // in SnapshotClocks.
3362 auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3363
3364 // Erase before emplacing to prevent a unncessary doubling of memory if
3365 // not needed.
3366 static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3367 if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3368 snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3369 kClockSnapshotRingBufferSize);
3370 }
3371 snapshot_buffer->emplace_back(std::move(snapshot));
3372 }
3373 }
3374
3375 // Returns true when the data in |snapshot_data| is updated with the new state
3376 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3377 bool TracingServiceImpl::SnapshotClocks(
3378 TracingSession::ClockSnapshotData* snapshot_data) {
3379 // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3380 // been emitted into the trace yet (see comment below).
3381 static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000; // 10 ms
3382
3383 TracingSession::ClockSnapshotData new_snapshot_data = CaptureClockSnapshots();
3384 // If we're about to update a session's latest clock snapshot that hasn't been
3385 // emitted into the trace yet, check whether the clocks have drifted enough to
3386 // warrant overriding the current snapshot values. The older snapshot would be
3387 // valid for a larger part of the currently buffered trace data because the
3388 // clock sync protocol in trace processor uses the latest clock <= timestamp
3389 // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3390 // we try to keep it if we can.
3391 if (!snapshot_data->empty()) {
3392 PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3393 PERFETTO_DCHECK((*snapshot_data)[0].clock_id ==
3394 protos::gen::BUILTIN_CLOCK_BOOTTIME);
3395
3396 bool update_snapshot = false;
3397 uint64_t old_boot_ns = (*snapshot_data)[0].timestamp;
3398 uint64_t new_boot_ns = new_snapshot_data[0].timestamp;
3399 int64_t boot_diff =
3400 static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3401
3402 for (size_t i = 1; i < snapshot_data->size(); i++) {
3403 uint64_t old_ns = (*snapshot_data)[i].timestamp;
3404 uint64_t new_ns = new_snapshot_data[i].timestamp;
3405
3406 int64_t diff =
3407 static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3408
3409 // Compare the boottime delta against the delta of this clock.
3410 if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3411 update_snapshot = true;
3412 break;
3413 }
3414 }
3415 if (!update_snapshot)
3416 return false;
3417 snapshot_data->clear();
3418 }
3419
3420 *snapshot_data = std::move(new_snapshot_data);
3421 return true;
3422 }
3423
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3424 void TracingServiceImpl::EmitClockSnapshot(
3425 TracingSession* tracing_session,
3426 TracingSession::ClockSnapshotData snapshot_data,
3427 std::vector<TracePacket>* packets) {
3428 PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3429 .disable_clock_snapshotting());
3430
3431 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3432 auto* snapshot = packet->set_clock_snapshot();
3433
3434 protos::gen::BuiltinClock trace_clock =
3435 tracing_session->config.builtin_data_sources().primary_trace_clock();
3436 if (!trace_clock)
3437 trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3438 snapshot->set_primary_trace_clock(
3439 static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3440
3441 for (auto& clock_id_and_ts : snapshot_data) {
3442 auto* c = snapshot->add_clocks();
3443 c->set_clock_id(clock_id_and_ts.clock_id);
3444 c->set_timestamp(clock_id_and_ts.timestamp);
3445 }
3446
3447 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3448 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3449 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3450 }
3451
EmitSyncMarker(std::vector<TracePacket> * packets)3452 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3453 // The sync marks are used to tokenize large traces efficiently.
3454 // See description in trace_packet.proto.
3455 if (sync_marker_packet_size_ == 0) {
3456 // The marker ABI expects that the marker is written after the uid.
3457 // Protozero guarantees that fields are written in the same order of the
3458 // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3459 protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3460 &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3461 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3462 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3463
3464 // Keep this last.
3465 packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3466 sync_marker_packet_size_ = packet.Finalize();
3467 }
3468 packets->emplace_back();
3469 packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3470 }
3471
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3472 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3473 std::vector<TracePacket>* packets) {
3474 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3475 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3476 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3477 GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3478 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3479 }
3480
GetTraceStats(TracingSession * tracing_session)3481 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3482 TraceStats trace_stats;
3483 trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3484 trace_stats.set_producers_seen(last_producer_id_);
3485 trace_stats.set_data_sources_registered(
3486 static_cast<uint32_t>(data_sources_.size()));
3487 trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3488 trace_stats.set_tracing_sessions(
3489 static_cast<uint32_t>(tracing_sessions_.size()));
3490 trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3491 trace_stats.set_chunks_discarded(chunks_discarded_);
3492 trace_stats.set_patches_discarded(patches_discarded_);
3493 trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3494 trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3495 trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3496 trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3497 trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3498
3499 if (tracing_session->trace_filter) {
3500 auto* filt_stats = trace_stats.mutable_filter_stats();
3501 filt_stats->set_input_packets(tracing_session->filter_input_packets);
3502 filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3503 filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3504 filt_stats->set_errors(tracing_session->filter_errors);
3505 filt_stats->set_time_taken_ns(tracing_session->filter_time_taken_ns);
3506 for (uint64_t value : tracing_session->filter_bytes_discarded_per_buffer)
3507 filt_stats->add_bytes_discarded_per_buffer(value);
3508 }
3509
3510 for (BufferID buf_id : tracing_session->buffers_index) {
3511 TraceBuffer* buf = GetBufferByID(buf_id);
3512 if (!buf) {
3513 PERFETTO_DFATAL("Buffer not found.");
3514 continue;
3515 }
3516 *trace_stats.add_buffer_stats() = buf->stats();
3517 } // for (buf in session).
3518
3519 if (!tracing_session->config.builtin_data_sources()
3520 .disable_chunk_usage_histograms()) {
3521 // Emit chunk usage stats broken down by sequence ID (i.e. by trace-writer).
3522 // Writer stats are updated by each TraceBuffer object at ReadBuffers time,
3523 // and there can be >1 buffer per session. A trace writer never writes to
3524 // more than one buffer (it's technically allowed but doesn't happen in the
3525 // current impl of the tracing SDK).
3526
3527 bool has_written_bucket_definition = false;
3528 uint32_t buf_idx = static_cast<uint32_t>(-1);
3529 for (const BufferID buf_id : tracing_session->buffers_index) {
3530 ++buf_idx;
3531 const TraceBuffer* buf = GetBufferByID(buf_id);
3532 if (!buf)
3533 continue;
3534 for (auto it = buf->writer_stats().GetIterator(); it; ++it) {
3535 const auto& hist = it.value().used_chunk_hist;
3536 ProducerID p;
3537 WriterID w;
3538 GetProducerAndWriterID(it.key(), &p, &w);
3539 if (!has_written_bucket_definition) {
3540 // Serialize one-off the histogram bucket definition, which is the
3541 // same for all entries in the map.
3542 has_written_bucket_definition = true;
3543 // The -1 in the loop below is to skip the implicit overflow bucket.
3544 for (size_t i = 0; i < hist.num_buckets() - 1; ++i) {
3545 trace_stats.add_chunk_payload_histogram_def(hist.GetBucketThres(i));
3546 }
3547 } // if(!has_written_bucket_definition)
3548 auto* wri_stats = trace_stats.add_writer_stats();
3549 wri_stats->set_sequence_id(
3550 tracing_session->GetPacketSequenceID(kDefaultMachineID, p, w));
3551 wri_stats->set_buffer(buf_idx);
3552 for (size_t i = 0; i < hist.num_buckets(); ++i) {
3553 wri_stats->add_chunk_payload_histogram_counts(hist.GetBucketCount(i));
3554 wri_stats->add_chunk_payload_histogram_sum(hist.GetBucketSum(i));
3555 }
3556 } // for each sequence (writer).
3557 } // for each buffer.
3558 } // if (!disable_chunk_usage_histograms)
3559
3560 return trace_stats;
3561 }
3562
EmitUuid(TracingSession * tracing_session,std::vector<TracePacket> * packets)3563 void TracingServiceImpl::EmitUuid(TracingSession* tracing_session,
3564 std::vector<TracePacket>* packets) {
3565 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3566 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3567 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3568 auto* uuid = packet->set_trace_uuid();
3569 uuid->set_lsb(tracing_session->trace_uuid.lsb());
3570 uuid->set_msb(tracing_session->trace_uuid.msb());
3571 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3572 }
3573
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3574 void TracingServiceImpl::MaybeEmitTraceConfig(
3575 TracingSession* tracing_session,
3576 std::vector<TracePacket>* packets) {
3577 if (tracing_session->did_emit_initial_packets)
3578 return;
3579 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3580 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3581 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3582 tracing_session->config.Serialize(packet->set_trace_config());
3583 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3584 }
3585
EmitSystemInfo(std::vector<TracePacket> * packets)3586 void TracingServiceImpl::EmitSystemInfo(std::vector<TracePacket>* packets) {
3587 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3588 auto* info = packet->set_system_info();
3589 info->set_tracing_service_version(base::GetVersionString());
3590
3591 std::optional<int32_t> tzoff = base::GetTimezoneOffsetMins();
3592 if (tzoff.has_value())
3593 info->set_timezone_off_mins(*tzoff);
3594
3595 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3596 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3597 struct utsname uname_info;
3598 if (uname(&uname_info) == 0) {
3599 auto* utsname_info = info->set_utsname();
3600 utsname_info->set_sysname(uname_info.sysname);
3601 utsname_info->set_version(uname_info.version);
3602 utsname_info->set_machine(uname_info.machine);
3603 utsname_info->set_release(uname_info.release);
3604 }
3605 info->set_page_size(static_cast<uint32_t>(sysconf(_SC_PAGESIZE)));
3606 info->set_num_cpus(static_cast<uint32_t>(sysconf(_SC_NPROCESSORS_CONF)));
3607 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3608 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3609 std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3610 if (!fingerprint_value.empty()) {
3611 info->set_android_build_fingerprint(fingerprint_value);
3612 } else {
3613 PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3614 }
3615
3616 std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3617 std::optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3618 if (sdk_value.has_value()) {
3619 info->set_android_sdk_version(*sdk_value);
3620 } else {
3621 PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3622 }
3623
3624 std::string soc_model_value = base::GetAndroidProp("ro.soc.model");
3625 if (!soc_model_value.empty()) {
3626 info->set_android_soc_model(soc_model_value);
3627 } else {
3628 PERFETTO_ELOG("Unable to read ro.soc.model");
3629 }
3630 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3631 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3632 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3633 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3634 }
3635
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3636 void TracingServiceImpl::EmitLifecycleEvents(
3637 TracingSession* tracing_session,
3638 std::vector<TracePacket>* packets) {
3639 using TimestampedPacket =
3640 std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3641
3642 std::vector<TimestampedPacket> timestamped_packets;
3643 for (auto& event : tracing_session->lifecycle_events) {
3644 for (int64_t ts : event.timestamps) {
3645 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3646 packet->set_timestamp(static_cast<uint64_t>(ts));
3647 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3648 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3649
3650 auto* service_event = packet->set_service_event();
3651 service_event->AppendVarInt(event.field_id, 1);
3652 timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3653 }
3654 event.timestamps.clear();
3655 }
3656
3657 // We sort by timestamp here to ensure that the "sequence" of lifecycle
3658 // packets has monotonic timestamps like other sequences in the trace.
3659 // Note that these events could still be out of order with respect to other
3660 // events on the service packet sequence (e.g. trigger received packets).
3661 std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3662 [](const TimestampedPacket& a, const TimestampedPacket& b) {
3663 return a.first < b.first;
3664 });
3665
3666 for (auto& pair : timestamped_packets)
3667 SerializeAndAppendPacket(packets, std::move(pair.second));
3668 }
3669
MaybeEmitRemoteClockSync(TracingSession * tracing_session,std::vector<TracePacket> * packets)3670 void TracingServiceImpl::MaybeEmitRemoteClockSync(
3671 TracingSession* tracing_session,
3672 std::vector<TracePacket>* packets) {
3673 if (tracing_session->did_emit_remote_clock_sync_)
3674 return;
3675
3676 std::unordered_set<MachineID> did_emit_machines;
3677 for (const auto& id_and_relay_client : relay_clients_) {
3678 const auto& relay_client = id_and_relay_client.second;
3679 auto machine_id = relay_client->machine_id();
3680 if (did_emit_machines.find(machine_id) != did_emit_machines.end())
3681 continue; // Already emitted for the machine (e.g. multiple clients).
3682
3683 auto& sync_clock_snapshots = relay_client->synced_clocks();
3684 if (sync_clock_snapshots.empty()) {
3685 PERFETTO_DLOG("Clock not synchronized for machine ID = %" PRIu32,
3686 machine_id);
3687 continue;
3688 }
3689
3690 // Don't emit twice for the same machine.
3691 did_emit_machines.insert(machine_id);
3692
3693 protozero::HeapBuffered<protos::pbzero::TracePacket> sync_packet;
3694 sync_packet->set_machine_id(machine_id);
3695 sync_packet->set_trusted_uid(static_cast<int32_t>(uid_));
3696 auto* remote_clock_sync = sync_packet->set_remote_clock_sync();
3697 for (const auto& sync_exchange : relay_client->synced_clocks()) {
3698 auto* sync_exchange_msg = remote_clock_sync->add_synced_clocks();
3699
3700 auto* client_snapshots = sync_exchange_msg->set_client_clocks();
3701 for (const auto& client_clock : sync_exchange.client_clocks) {
3702 auto* clock = client_snapshots->add_clocks();
3703 clock->set_clock_id(client_clock.clock_id);
3704 clock->set_timestamp(client_clock.timestamp);
3705 }
3706
3707 auto* host_snapshots = sync_exchange_msg->set_host_clocks();
3708 for (const auto& host_clock : sync_exchange.host_clocks) {
3709 auto* clock = host_snapshots->add_clocks();
3710 clock->set_clock_id(host_clock.clock_id);
3711 clock->set_timestamp(host_clock.timestamp);
3712 }
3713 }
3714
3715 SerializeAndAppendPacket(packets, sync_packet.SerializeAsArray());
3716 }
3717
3718 tracing_session->did_emit_remote_clock_sync_ = true;
3719 }
3720
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3721 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3722 TracingSession* tracing_session,
3723 std::vector<TracePacket>* packets) {
3724 PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3725 tracing_session->received_triggers.size());
3726 for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3727 i < tracing_session->received_triggers.size(); ++i) {
3728 const auto& info = tracing_session->received_triggers[i];
3729 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3730 auto* trigger = packet->set_trigger();
3731 trigger->set_trigger_name(info.trigger_name);
3732 trigger->set_producer_name(info.producer_name);
3733 trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3734
3735 packet->set_timestamp(info.boot_time_ns);
3736 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3737 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3738 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3739 ++tracing_session->num_triggers_emitted_into_trace;
3740 }
3741 }
3742
MaybeLogUploadEvent(const TraceConfig & cfg,const base::Uuid & uuid,PerfettoStatsdAtom atom,const std::string & trigger_name)3743 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3744 const base::Uuid& uuid,
3745 PerfettoStatsdAtom atom,
3746 const std::string& trigger_name) {
3747 if (!ShouldLogEvent(cfg))
3748 return;
3749
3750 PERFETTO_DCHECK(uuid); // The UUID must be set at this point.
3751 android_stats::MaybeLogUploadEvent(atom, uuid.lsb(), uuid.msb(),
3752 trigger_name);
3753 }
3754
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3755 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3756 PerfettoTriggerAtom atom,
3757 const std::string& trigger_name) {
3758 if (!ShouldLogEvent(cfg))
3759 return;
3760 android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3761 }
3762
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3763 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3764 int64_t now_ns,
3765 uint64_t trigger_name_hash) {
3766 PERFETTO_DCHECK(
3767 std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3768 size_t remove_count = 0;
3769 size_t trigger_count = 0;
3770 for (const TriggerHistory& h : trigger_history_) {
3771 if (h.timestamp_ns < now_ns - trigger_window_ns_) {
3772 remove_count++;
3773 } else if (h.name_hash == trigger_name_hash) {
3774 trigger_count++;
3775 }
3776 }
3777 trigger_history_.erase_front(remove_count);
3778 return trigger_count;
3779 }
3780
FlushAndCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID tsid,bool skip_trace_filter,bool for_bugreport)3781 base::Status TracingServiceImpl::FlushAndCloneSession(
3782 ConsumerEndpointImpl* consumer,
3783 TracingSessionID tsid,
3784 bool skip_trace_filter,
3785 bool for_bugreport) {
3786 PERFETTO_DCHECK_THREAD(thread_checker_);
3787 auto clone_target = FlushFlags::CloneTarget::kUnknown;
3788
3789 if (tsid == kBugreportSessionId) {
3790 // This branch is only here to support the legacy protocol where we could
3791 // clone only a single session using the magic ID kBugreportSessionId.
3792 // The newer perfetto --clone-all-for-bugreport first queries the existing
3793 // sessions and then issues individual clone requests specifying real
3794 // session IDs, setting args.{for_bugreport,skip_trace_filter}=true.
3795 PERFETTO_LOG("Looking for sessions for bugreport");
3796 TracingSession* session = FindTracingSessionWithMaxBugreportScore();
3797 if (!session) {
3798 return base::ErrStatus(
3799 "No tracing sessions eligible for bugreport found");
3800 }
3801 tsid = session->id;
3802 clone_target = FlushFlags::CloneTarget::kBugreport;
3803 skip_trace_filter = true;
3804 for_bugreport = true;
3805 } else if (for_bugreport) {
3806 clone_target = FlushFlags::CloneTarget::kBugreport;
3807 }
3808
3809 TracingSession* session = GetTracingSession(tsid);
3810 if (!session) {
3811 return base::ErrStatus("Tracing session not found");
3812 }
3813
3814 // Skip the UID check for sessions marked with a bugreport_score > 0.
3815 // Those sessions, by design, can be stolen by any other consumer for the
3816 // sake of creating snapshots for bugreports.
3817 if (!session->IsCloneAllowed(consumer->uid_)) {
3818 return PERFETTO_SVC_ERR("Not allowed to clone a session from another UID");
3819 }
3820
3821 // If any of the buffers are marked as clear_before_clone, reset them before
3822 // issuing the Flush(kCloneReason).
3823 size_t buf_idx = 0;
3824 for (BufferID src_buf_id : session->buffers_index) {
3825 if (!session->config.buffers()[buf_idx++].clear_before_clone())
3826 continue;
3827 auto buf_iter = buffers_.find(src_buf_id);
3828 PERFETTO_CHECK(buf_iter != buffers_.end());
3829 std::unique_ptr<TraceBuffer>& buf = buf_iter->second;
3830
3831 // No need to reset the buffer if nothing has been written into it yet.
3832 // This is the canonical case if producers behive nicely and don't timeout
3833 // the handling of writes during the flush.
3834 // This check avoids a useless re-mmap upon every Clone() if the buffer is
3835 // already empty (when used in combination with `transfer_on_clone`).
3836 if (!buf->has_data())
3837 continue;
3838
3839 // Some leftover data was left in the buffer. Recreate it to empty it.
3840 const auto buf_policy = buf->overwrite_policy();
3841 const auto buf_size = buf->size();
3842 std::unique_ptr<TraceBuffer> old_buf = std::move(buf);
3843 buf = TraceBuffer::Create(buf_size, buf_policy);
3844 if (!buf) {
3845 // This is extremely rare but could happen on 32-bit. If the new buffer
3846 // allocation failed, put back the buffer where it was and fail the clone.
3847 // We cannot leave the original tracing session buffer-less as it would
3848 // cause crashes when data sources commit new data.
3849 buf = std::move(old_buf);
3850 return base::ErrStatus(
3851 "Buffer allocation failed while attempting to clone");
3852 }
3853 }
3854
3855 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3856 auto weak_consumer = consumer->GetWeakPtr();
3857
3858 const PendingCloneID clone_id = session->last_pending_clone_id_++;
3859
3860 auto& clone_op = session->pending_clones[clone_id];
3861 clone_op.pending_flush_cnt = 0;
3862 clone_op.buffers =
3863 std::vector<std::unique_ptr<TraceBuffer>>(session->buffers_index.size());
3864 clone_op.weak_consumer = weak_consumer;
3865 clone_op.skip_trace_filter = skip_trace_filter;
3866
3867 // Issue separate flush requests for separate buffer groups. The buffer marked
3868 // as transfer_on_clone will be flushed and cloned separately: even if they're
3869 // slower (like in the case of Winscope tracing), they will not delay the
3870 // snapshot of the other buffers.
3871 //
3872 // In the future we might want to split the buffer into more groups and maybe
3873 // allow this to be configurable.
3874 std::array<std::set<BufferID>, 2> bufs_groups;
3875 for (size_t i = 0; i < session->buffers_index.size(); i++) {
3876 if (session->config.buffers()[i].transfer_on_clone()) {
3877 bufs_groups[0].insert(session->buffers_index[i]);
3878 } else {
3879 bufs_groups[1].insert(session->buffers_index[i]);
3880 }
3881 }
3882
3883 clone_op.pending_flush_cnt = bufs_groups.size();
3884 for (const std::set<BufferID>& buf_group : bufs_groups) {
3885 FlushDataSourceInstances(
3886 session, 0,
3887 GetFlushableDataSourceInstancesForBuffers(session, buf_group),
3888 [tsid, clone_id, buf_group, weak_this](bool final_flush) {
3889 if (!weak_this)
3890 return;
3891 weak_this->OnFlushDoneForClone(tsid, clone_id, buf_group,
3892 final_flush);
3893 },
3894 FlushFlags(FlushFlags::Initiator::kTraced,
3895 FlushFlags::Reason::kTraceClone, clone_target));
3896 }
3897
3898 return base::OkStatus();
3899 }
3900
3901 std::map<ProducerID, std::vector<DataSourceInstanceID>>
GetFlushableDataSourceInstancesForBuffers(TracingSession * session,const std::set<BufferID> & bufs)3902 TracingServiceImpl::GetFlushableDataSourceInstancesForBuffers(
3903 TracingSession* session,
3904 const std::set<BufferID>& bufs) {
3905 std::map<ProducerID, std::vector<DataSourceInstanceID>> data_source_instances;
3906
3907 for (const auto& [producer_id, ds_inst] : session->data_source_instances) {
3908 // TODO(ddiproietto): Consider if we should skip instances if ds_inst.state
3909 // != DataSourceInstance::STARTED
3910 if (ds_inst.no_flush) {
3911 continue;
3912 }
3913 if (!bufs.count(static_cast<BufferID>(ds_inst.config.target_buffer()))) {
3914 continue;
3915 }
3916 data_source_instances[producer_id].push_back(ds_inst.instance_id);
3917 }
3918
3919 return data_source_instances;
3920 }
3921
OnFlushDoneForClone(TracingSessionID tsid,PendingCloneID clone_id,const std::set<BufferID> & buf_ids,bool final_flush_outcome)3922 void TracingServiceImpl::OnFlushDoneForClone(TracingSessionID tsid,
3923 PendingCloneID clone_id,
3924 const std::set<BufferID>& buf_ids,
3925 bool final_flush_outcome) {
3926 TracingSession* src = GetTracingSession(tsid);
3927 // The session might be gone by the time we try to clone it.
3928 if (!src) {
3929 return;
3930 }
3931
3932 auto it = src->pending_clones.find(clone_id);
3933 if (it == src->pending_clones.end()) {
3934 return;
3935 }
3936 auto& clone_op = it->second;
3937
3938 if (final_flush_outcome == false) {
3939 clone_op.flush_failed = true;
3940 }
3941
3942 base::Status result;
3943 base::Uuid uuid;
3944
3945 // First clone the flushed TraceBuffer(s). This can fail because of ENOMEM. If
3946 // it happens bail out early before creating any session.
3947 if (!DoCloneBuffers(src, buf_ids, &clone_op.buffers)) {
3948 result = PERFETTO_SVC_ERR("Buffer allocation failed");
3949 }
3950
3951 if (result.ok()) {
3952 UpdateMemoryGuardrail();
3953
3954 if (--clone_op.pending_flush_cnt != 0) {
3955 // Wait for more pending flushes.
3956 return;
3957 }
3958
3959 PERFETTO_LOG("FlushAndCloneSession(%" PRIu64 ") started, success=%d", tsid,
3960 final_flush_outcome);
3961
3962 if (clone_op.weak_consumer) {
3963 result = FinishCloneSession(
3964 &*clone_op.weak_consumer, tsid, std::move(clone_op.buffers),
3965 clone_op.skip_trace_filter, !clone_op.flush_failed, &uuid);
3966 }
3967 } // if (result.ok())
3968
3969 if (clone_op.weak_consumer) {
3970 clone_op.weak_consumer->consumer_->OnSessionCloned(
3971 {result.ok(), result.message(), uuid});
3972 }
3973
3974 src->pending_clones.erase(it);
3975 UpdateMemoryGuardrail();
3976 }
3977
DoCloneBuffers(TracingSession * src,const std::set<BufferID> & buf_ids,std::vector<std::unique_ptr<TraceBuffer>> * buf_snaps)3978 bool TracingServiceImpl::DoCloneBuffers(
3979 TracingSession* src,
3980 const std::set<BufferID>& buf_ids,
3981 std::vector<std::unique_ptr<TraceBuffer>>* buf_snaps) {
3982 PERFETTO_DCHECK(src->num_buffers() == src->config.buffers().size());
3983 buf_snaps->resize(src->buffers_index.size());
3984
3985 for (size_t buf_idx = 0; buf_idx < src->buffers_index.size(); buf_idx++) {
3986 BufferID src_buf_id = src->buffers_index[buf_idx];
3987 if (buf_ids.count(src_buf_id) == 0)
3988 continue;
3989 auto buf_iter = buffers_.find(src_buf_id);
3990 PERFETTO_CHECK(buf_iter != buffers_.end());
3991 std::unique_ptr<TraceBuffer>& src_buf = buf_iter->second;
3992 std::unique_ptr<TraceBuffer> new_buf;
3993 if (src->config.buffers()[buf_idx].transfer_on_clone()) {
3994 const auto buf_policy = src_buf->overwrite_policy();
3995 const auto buf_size = src_buf->size();
3996 new_buf = std::move(src_buf);
3997 src_buf = TraceBuffer::Create(buf_size, buf_policy);
3998 if (!src_buf) {
3999 // If the allocation fails put the buffer back and let the code below
4000 // handle the failure gracefully.
4001 src_buf = std::move(new_buf);
4002 }
4003 } else {
4004 new_buf = src_buf->CloneReadOnly();
4005 }
4006 if (!new_buf.get()) {
4007 return false;
4008 }
4009 (*buf_snaps)[buf_idx] = std::move(new_buf);
4010 }
4011 return true;
4012 }
4013
FinishCloneSession(ConsumerEndpointImpl * consumer,TracingSessionID src_tsid,std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,bool skip_trace_filter,bool final_flush_outcome,base::Uuid * new_uuid)4014 base::Status TracingServiceImpl::FinishCloneSession(
4015 ConsumerEndpointImpl* consumer,
4016 TracingSessionID src_tsid,
4017 std::vector<std::unique_ptr<TraceBuffer>> buf_snaps,
4018 bool skip_trace_filter,
4019 bool final_flush_outcome,
4020 base::Uuid* new_uuid) {
4021 PERFETTO_DLOG("CloneSession(%" PRIu64
4022 ", skip_trace_filter=%d) started, consumer uid: %d",
4023 src_tsid, skip_trace_filter, static_cast<int>(consumer->uid_));
4024
4025 TracingSession* src = GetTracingSession(src_tsid);
4026
4027 // The session might be gone by the time we try to clone it.
4028 if (!src)
4029 return PERFETTO_SVC_ERR("session not found");
4030
4031 if (consumer->tracing_session_id_) {
4032 return PERFETTO_SVC_ERR(
4033 "The consumer is already attached to another tracing session");
4034 }
4035
4036 std::vector<BufferID> buf_ids =
4037 buffer_ids_.AllocateMultiple(buf_snaps.size());
4038 if (buf_ids.size() != buf_snaps.size()) {
4039 return PERFETTO_SVC_ERR("Buffer id allocation failed");
4040 }
4041
4042 PERFETTO_CHECK(std::none_of(
4043 buf_snaps.begin(), buf_snaps.end(),
4044 [](const std::unique_ptr<TraceBuffer>& buf) { return buf == nullptr; }));
4045
4046 const TracingSessionID tsid = ++last_tracing_session_id_;
4047 TracingSession* cloned_session =
4048 &tracing_sessions_
4049 .emplace(
4050 std::piecewise_construct, std::forward_as_tuple(tsid),
4051 std::forward_as_tuple(tsid, consumer, src->config, task_runner_))
4052 .first->second;
4053
4054 // Generate a new UUID for the cloned session, but preserve the LSB. In some
4055 // contexts the LSB is used to tie the trace back to the statsd subscription
4056 // that triggered it. See the corresponding code in perfetto_cmd.cc which
4057 // reads at triggering_subscription_id().
4058 const int64_t orig_uuid_lsb = src->trace_uuid.lsb();
4059 cloned_session->state = TracingSession::CLONED_READ_ONLY;
4060 cloned_session->trace_uuid = base::Uuidv4();
4061 cloned_session->trace_uuid.set_lsb(orig_uuid_lsb);
4062 *new_uuid = cloned_session->trace_uuid;
4063
4064 for (size_t i = 0; i < buf_snaps.size(); i++) {
4065 BufferID buf_global_id = buf_ids[i];
4066 std::unique_ptr<TraceBuffer>& buf = buf_snaps[i];
4067 // This is only needed for transfer_on_clone. Other buffers are already
4068 // marked as read-only by CloneReadOnly(). We cannot do this early because
4069 // in case of an allocation failure we will put std::move() the original
4070 // buffer back in its place and in that case should not be made read-only.
4071 buf->set_read_only();
4072 buffers_.emplace(buf_global_id, std::move(buf));
4073 cloned_session->buffers_index.emplace_back(buf_global_id);
4074 }
4075 UpdateMemoryGuardrail();
4076
4077 // Copy over relevant state that we want to persist in the cloned session.
4078 // Mostly stats and metadata that is emitted in the trace file by the service.
4079 // Also clear the received trigger list in the main tracing session. A
4080 // CLONE_SNAPSHOT session can go in ring buffer mode for several hours and get
4081 // snapshotted several times. This causes two issues with `received_triggers`:
4082 // 1. Adding noise in the cloned trace emitting triggers that happened too
4083 // far back (see b/290799105).
4084 // 2. Bloating memory (see b/290798988).
4085 cloned_session->should_emit_stats = true;
4086 cloned_session->received_triggers = std::move(src->received_triggers);
4087 src->received_triggers.clear();
4088 src->num_triggers_emitted_into_trace = 0;
4089 cloned_session->lifecycle_events =
4090 std::vector<TracingSession::LifecycleEvent>(src->lifecycle_events);
4091 cloned_session->initial_clock_snapshot = src->initial_clock_snapshot;
4092 cloned_session->clock_snapshot_ring_buffer = src->clock_snapshot_ring_buffer;
4093 cloned_session->invalid_packets = src->invalid_packets;
4094 cloned_session->flushes_requested = src->flushes_requested;
4095 cloned_session->flushes_succeeded = src->flushes_succeeded;
4096 cloned_session->flushes_failed = src->flushes_failed;
4097 cloned_session->compress_deflate = src->compress_deflate;
4098 if (src->trace_filter && !skip_trace_filter) {
4099 // Copy the trace filter, unless it's a clone-for-bugreport (b/317065412).
4100 cloned_session->trace_filter.reset(
4101 new protozero::MessageFilter(src->trace_filter->config()));
4102 }
4103
4104 SnapshotLifecyleEvent(
4105 cloned_session,
4106 protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
4107 true /* snapshot_clocks */);
4108
4109 PERFETTO_DLOG("Consumer (uid:%d) cloned tracing session %" PRIu64
4110 " -> %" PRIu64,
4111 static_cast<int>(consumer->uid_), src_tsid, tsid);
4112
4113 consumer->tracing_session_id_ = tsid;
4114 cloned_session->final_flush_outcome = final_flush_outcome
4115 ? TraceStats::FINAL_FLUSH_SUCCEEDED
4116 : TraceStats::FINAL_FLUSH_FAILED;
4117 return base::OkStatus();
4118 }
4119
IsCloneAllowed(uid_t clone_uid) const4120 bool TracingServiceImpl::TracingSession::IsCloneAllowed(uid_t clone_uid) const {
4121 if (clone_uid == 0)
4122 return true; // Root is always allowed to clone everything.
4123 if (clone_uid == this->consumer_uid)
4124 return true; // Allow cloning if the uids match.
4125 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
4126 // On Android allow shell to clone sessions marked as exported for bugreport.
4127 // Dumpstate (invoked by adb bugreport) invokes commands as shell.
4128 if (clone_uid == AID_SHELL && this->config.bugreport_score() > 0)
4129 return true;
4130 #endif
4131 return false;
4132 }
4133
4134 ////////////////////////////////////////////////////////////////////////////////
4135 // TracingServiceImpl::ConsumerEndpointImpl implementation
4136 ////////////////////////////////////////////////////////////////////////////////
4137
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)4138 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
4139 TracingServiceImpl* service,
4140 base::TaskRunner* task_runner,
4141 Consumer* consumer,
4142 uid_t uid)
4143 : task_runner_(task_runner),
4144 service_(service),
4145 consumer_(consumer),
4146 uid_(uid),
4147 weak_ptr_factory_(this) {}
4148
~ConsumerEndpointImpl()4149 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
4150 service_->DisconnectConsumer(this);
4151 consumer_->OnDisconnect();
4152 }
4153
NotifyOnTracingDisabled(const std::string & error)4154 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
4155 const std::string& error) {
4156 PERFETTO_DCHECK_THREAD(thread_checker_);
4157 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4158 task_runner_->PostTask([weak_this, error /* deliberate copy */] {
4159 if (weak_this)
4160 weak_this->consumer_->OnTracingDisabled(error);
4161 });
4162 }
4163
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)4164 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
4165 const TraceConfig& cfg,
4166 base::ScopedFile fd) {
4167 PERFETTO_DCHECK_THREAD(thread_checker_);
4168 auto status = service_->EnableTracing(this, cfg, std::move(fd));
4169 if (!status.ok())
4170 NotifyOnTracingDisabled(status.message());
4171 }
4172
ChangeTraceConfig(const TraceConfig & cfg)4173 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
4174 const TraceConfig& cfg) {
4175 if (!tracing_session_id_) {
4176 PERFETTO_LOG(
4177 "Consumer called ChangeTraceConfig() but tracing was "
4178 "not active");
4179 return;
4180 }
4181 service_->ChangeTraceConfig(this, cfg);
4182 }
4183
StartTracing()4184 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
4185 PERFETTO_DCHECK_THREAD(thread_checker_);
4186 if (!tracing_session_id_) {
4187 PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
4188 return;
4189 }
4190 service_->StartTracing(tracing_session_id_);
4191 }
4192
DisableTracing()4193 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
4194 PERFETTO_DCHECK_THREAD(thread_checker_);
4195 if (!tracing_session_id_) {
4196 PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
4197 return;
4198 }
4199 service_->DisableTracing(tracing_session_id_);
4200 }
4201
ReadBuffers()4202 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
4203 PERFETTO_DCHECK_THREAD(thread_checker_);
4204 if (!tracing_session_id_) {
4205 PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
4206 consumer_->OnTraceData({}, /* has_more = */ false);
4207 return;
4208 }
4209 if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
4210 consumer_->OnTraceData({}, /* has_more = */ false);
4211 }
4212 }
4213
FreeBuffers()4214 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
4215 PERFETTO_DCHECK_THREAD(thread_checker_);
4216 if (!tracing_session_id_) {
4217 PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
4218 return;
4219 }
4220 service_->FreeBuffers(tracing_session_id_);
4221 tracing_session_id_ = 0;
4222 }
4223
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)4224 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
4225 FlushCallback callback,
4226 FlushFlags flush_flags) {
4227 PERFETTO_DCHECK_THREAD(thread_checker_);
4228 if (!tracing_session_id_) {
4229 PERFETTO_LOG("Consumer called Flush() but tracing was not active");
4230 return;
4231 }
4232 service_->Flush(tracing_session_id_, timeout_ms, callback, flush_flags);
4233 }
4234
Detach(const std::string & key)4235 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
4236 PERFETTO_DCHECK_THREAD(thread_checker_);
4237 bool success = service_->DetachConsumer(this, key);
4238 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4239 task_runner_->PostTask([weak_this, success] {
4240 if (weak_this)
4241 weak_this->consumer_->OnDetach(success);
4242 });
4243 }
4244
Attach(const std::string & key)4245 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
4246 PERFETTO_DCHECK_THREAD(thread_checker_);
4247 bool success = service_->AttachConsumer(this, key);
4248 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4249 task_runner_->PostTask([weak_this, success] {
4250 if (!weak_this)
4251 return;
4252 Consumer* consumer = weak_this->consumer_;
4253 TracingSession* session =
4254 weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
4255 if (!session) {
4256 consumer->OnAttach(false, TraceConfig());
4257 return;
4258 }
4259 consumer->OnAttach(success, session->config);
4260 });
4261 }
4262
GetTraceStats()4263 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
4264 PERFETTO_DCHECK_THREAD(thread_checker_);
4265 bool success = false;
4266 TraceStats stats;
4267 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4268 if (session) {
4269 success = true;
4270 stats = service_->GetTraceStats(session);
4271 }
4272 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4273 task_runner_->PostTask([weak_this, success, stats] {
4274 if (weak_this)
4275 weak_this->consumer_->OnTraceStats(success, stats);
4276 });
4277 }
4278
ObserveEvents(uint32_t events_mask)4279 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
4280 uint32_t events_mask) {
4281 PERFETTO_DCHECK_THREAD(thread_checker_);
4282 observable_events_mask_ = events_mask;
4283 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
4284 if (!session)
4285 return;
4286
4287 if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
4288 // Issue initial states.
4289 for (const auto& kv : session->data_source_instances) {
4290 ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
4291 PERFETTO_DCHECK(producer);
4292 OnDataSourceInstanceStateChange(*producer, kv.second);
4293 }
4294 }
4295
4296 // If the ObserveEvents() call happens after data sources have acked already
4297 // notify immediately.
4298 if (observable_events_mask_ &
4299 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
4300 service_->MaybeNotifyAllDataSourcesStarted(session);
4301 }
4302 }
4303
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)4304 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
4305 const ProducerEndpointImpl& producer,
4306 const DataSourceInstance& instance) {
4307 if (!(observable_events_mask_ &
4308 ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
4309 return;
4310 }
4311
4312 if (instance.state != DataSourceInstance::CONFIGURED &&
4313 instance.state != DataSourceInstance::STARTED &&
4314 instance.state != DataSourceInstance::STOPPED) {
4315 return;
4316 }
4317
4318 auto* observable_events = AddObservableEvents();
4319 auto* change = observable_events->add_instance_state_changes();
4320 change->set_producer_name(producer.name_);
4321 change->set_data_source_name(instance.data_source_name);
4322 if (instance.state == DataSourceInstance::STARTED) {
4323 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
4324 } else {
4325 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
4326 }
4327 }
4328
OnAllDataSourcesStarted()4329 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
4330 if (!(observable_events_mask_ &
4331 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
4332 return;
4333 }
4334 auto* observable_events = AddObservableEvents();
4335 observable_events->set_all_data_sources_started(true);
4336 }
4337
NotifyCloneSnapshotTrigger(const std::string & trigger_name)4338 void TracingServiceImpl::ConsumerEndpointImpl::NotifyCloneSnapshotTrigger(
4339 const std::string& trigger_name) {
4340 if (!(observable_events_mask_ & ObservableEvents::TYPE_CLONE_TRIGGER_HIT)) {
4341 return;
4342 }
4343 auto* observable_events = AddObservableEvents();
4344 auto* clone_trig = observable_events->mutable_clone_trigger_hit();
4345 clone_trig->set_tracing_session_id(static_cast<int64_t>(tracing_session_id_));
4346 clone_trig->set_trigger_name(trigger_name);
4347 }
4348
4349 ObservableEvents*
AddObservableEvents()4350 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
4351 PERFETTO_DCHECK_THREAD(thread_checker_);
4352 if (!observable_events_) {
4353 observable_events_.reset(new ObservableEvents());
4354 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4355 task_runner_->PostTask([weak_this] {
4356 if (!weak_this)
4357 return;
4358
4359 // Move into a temporary to allow reentrancy in OnObservableEvents.
4360 auto observable_events = std::move(weak_this->observable_events_);
4361 weak_this->consumer_->OnObservableEvents(*observable_events);
4362 });
4363 }
4364 return observable_events_.get();
4365 }
4366
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)4367 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
4368 QueryServiceStateArgs args,
4369 QueryServiceStateCallback callback) {
4370 PERFETTO_DCHECK_THREAD(thread_checker_);
4371 TracingServiceState svc_state;
4372
4373 const auto& sessions = service_->tracing_sessions_;
4374 svc_state.set_tracing_service_version(base::GetVersionString());
4375 svc_state.set_num_sessions(static_cast<int>(sessions.size()));
4376
4377 int num_started = 0;
4378 for (const auto& kv : sessions)
4379 num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
4380 svc_state.set_num_sessions_started(num_started);
4381
4382 for (const auto& kv : service_->producers_) {
4383 if (args.sessions_only)
4384 break;
4385 auto* producer = svc_state.add_producers();
4386 producer->set_id(static_cast<int>(kv.first));
4387 producer->set_name(kv.second->name_);
4388 producer->set_sdk_version(kv.second->sdk_version_);
4389 producer->set_uid(static_cast<int32_t>(kv.second->uid()));
4390 producer->set_pid(static_cast<int32_t>(kv.second->pid()));
4391 }
4392
4393 for (const auto& kv : service_->data_sources_) {
4394 if (args.sessions_only)
4395 break;
4396 const auto& registered_data_source = kv.second;
4397 auto* data_source = svc_state.add_data_sources();
4398 *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
4399 data_source->set_producer_id(
4400 static_cast<int>(registered_data_source.producer_id));
4401 }
4402
4403 svc_state.set_supports_tracing_sessions(true);
4404 for (const auto& kv : service_->tracing_sessions_) {
4405 const TracingSession& s = kv.second;
4406 if (!s.IsCloneAllowed(uid_))
4407 continue;
4408 auto* session = svc_state.add_tracing_sessions();
4409 session->set_id(s.id);
4410 session->set_consumer_uid(static_cast<int>(s.consumer_uid));
4411 session->set_duration_ms(s.config.duration_ms());
4412 session->set_num_data_sources(
4413 static_cast<uint32_t>(s.data_source_instances.size()));
4414 session->set_unique_session_name(s.config.unique_session_name());
4415 if (s.config.has_bugreport_score())
4416 session->set_bugreport_score(s.config.bugreport_score());
4417 if (s.config.has_bugreport_filename())
4418 session->set_bugreport_filename(s.config.bugreport_filename());
4419 for (const auto& snap_kv : s.initial_clock_snapshot) {
4420 if (snap_kv.clock_id == protos::pbzero::BUILTIN_CLOCK_REALTIME)
4421 session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.timestamp));
4422 }
4423 for (const auto& buf : s.config.buffers())
4424 session->add_buffer_size_kb(buf.size_kb());
4425
4426 switch (s.state) {
4427 case TracingSession::State::DISABLED:
4428 session->set_state("DISABLED");
4429 break;
4430 case TracingSession::State::CONFIGURED:
4431 session->set_state("CONFIGURED");
4432 break;
4433 case TracingSession::State::STARTED:
4434 session->set_is_started(true);
4435 session->set_state("STARTED");
4436 break;
4437 case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
4438 session->set_state("STOP_WAIT");
4439 break;
4440 case TracingSession::State::CLONED_READ_ONLY:
4441 session->set_state("CLONED_READ_ONLY");
4442 break;
4443 }
4444 }
4445 callback(/*success=*/true, svc_state);
4446 }
4447
QueryCapabilities(QueryCapabilitiesCallback callback)4448 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
4449 QueryCapabilitiesCallback callback) {
4450 PERFETTO_DCHECK_THREAD(thread_checker_);
4451 TracingServiceCapabilities caps;
4452 caps.set_has_query_capabilities(true);
4453 caps.set_has_trace_config_output_path(true);
4454 caps.set_has_clone_session(true);
4455 caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
4456 caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
4457 caps.add_observable_events(ObservableEvents::TYPE_CLONE_TRIGGER_HIT);
4458 static_assert(
4459 ObservableEvents::Type_MAX == ObservableEvents::TYPE_CLONE_TRIGGER_HIT,
4460 "");
4461 callback(caps);
4462 }
4463
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)4464 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
4465 SaveTraceForBugreportCallback consumer_callback) {
4466 consumer_callback(false,
4467 "SaveTraceForBugreport is deprecated. Use "
4468 "CloneSession(kBugreportSessionId) instead.");
4469 }
4470
CloneSession(TracingSessionID tsid,CloneSessionArgs args)4471 void TracingServiceImpl::ConsumerEndpointImpl::CloneSession(
4472 TracingSessionID tsid,
4473 CloneSessionArgs args) {
4474 PERFETTO_DCHECK_THREAD(thread_checker_);
4475 // FlushAndCloneSession will call OnSessionCloned after the async flush.
4476 base::Status result = service_->FlushAndCloneSession(
4477 this, tsid, args.skip_trace_filter, args.for_bugreport);
4478
4479 if (!result.ok()) {
4480 consumer_->OnSessionCloned({false, result.message(), {}});
4481 }
4482 }
4483
4484 ////////////////////////////////////////////////////////////////////////////////
4485 // TracingServiceImpl::ProducerEndpointImpl implementation
4486 ////////////////////////////////////////////////////////////////////////////////
4487
ProducerEndpointImpl(ProducerID id,const ClientIdentity & client_identity,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)4488 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
4489 ProducerID id,
4490 const ClientIdentity& client_identity,
4491 TracingServiceImpl* service,
4492 base::TaskRunner* task_runner,
4493 Producer* producer,
4494 const std::string& producer_name,
4495 const std::string& sdk_version,
4496 bool in_process,
4497 bool smb_scraping_enabled)
4498 : id_(id),
4499 client_identity_(client_identity),
4500 service_(service),
4501 task_runner_(task_runner),
4502 producer_(producer),
4503 name_(producer_name),
4504 sdk_version_(sdk_version),
4505 in_process_(in_process),
4506 smb_scraping_enabled_(smb_scraping_enabled),
4507 weak_ptr_factory_(this) {}
4508
~ProducerEndpointImpl()4509 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
4510 service_->DisconnectProducer(id_);
4511 producer_->OnDisconnect();
4512 }
4513
Disconnect()4514 void TracingServiceImpl::ProducerEndpointImpl::Disconnect() {
4515 PERFETTO_DCHECK_THREAD(thread_checker_);
4516 // Disconnection is only supported via destroying the ProducerEndpoint.
4517 PERFETTO_FATAL("Not supported");
4518 }
4519
RegisterDataSource(const DataSourceDescriptor & desc)4520 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
4521 const DataSourceDescriptor& desc) {
4522 PERFETTO_DCHECK_THREAD(thread_checker_);
4523 service_->RegisterDataSource(id_, desc);
4524 }
4525
UpdateDataSource(const DataSourceDescriptor & desc)4526 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
4527 const DataSourceDescriptor& desc) {
4528 PERFETTO_DCHECK_THREAD(thread_checker_);
4529 service_->UpdateDataSource(id_, desc);
4530 }
4531
UnregisterDataSource(const std::string & name)4532 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
4533 const std::string& name) {
4534 PERFETTO_DCHECK_THREAD(thread_checker_);
4535 service_->UnregisterDataSource(id_, name);
4536 }
4537
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)4538 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
4539 uint32_t writer_id,
4540 uint32_t target_buffer) {
4541 PERFETTO_DCHECK_THREAD(thread_checker_);
4542 writers_[static_cast<WriterID>(writer_id)] =
4543 static_cast<BufferID>(target_buffer);
4544 }
4545
UnregisterTraceWriter(uint32_t writer_id)4546 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
4547 uint32_t writer_id) {
4548 PERFETTO_DCHECK_THREAD(thread_checker_);
4549 writers_.erase(static_cast<WriterID>(writer_id));
4550 }
4551
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)4552 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
4553 const CommitDataRequest& req_untrusted,
4554 CommitDataCallback callback) {
4555 PERFETTO_DCHECK_THREAD(thread_checker_);
4556
4557 if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
4558 PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
4559 EncodeCommitDataRequest(id_, req_untrusted));
4560 }
4561
4562 if (!shared_memory_) {
4563 PERFETTO_DLOG(
4564 "Attempted to commit data before the shared memory was allocated.");
4565 return;
4566 }
4567 PERFETTO_DCHECK(shmem_abi_.is_valid());
4568 for (const auto& entry : req_untrusted.chunks_to_move()) {
4569 const uint32_t page_idx = entry.page();
4570 if (page_idx >= shmem_abi_.num_pages())
4571 continue; // A buggy or malicious producer.
4572
4573 SharedMemoryABI::Chunk chunk;
4574 bool commit_data_over_ipc = entry.has_data();
4575 if (PERFETTO_UNLIKELY(commit_data_over_ipc)) {
4576 // Chunk data is passed over the wire. Create a chunk using the serialized
4577 // protobuf message.
4578 const std::string& data = entry.data();
4579 if (data.size() > SharedMemoryABI::Chunk::kMaxSize) {
4580 PERFETTO_DFATAL("IPC data commit too large: %zu", data.size());
4581 continue; // A malicious or buggy producer
4582 }
4583 // |data| is not altered, but we need to const_cast becasue Chunk data
4584 // members are non-const.
4585 chunk = SharedMemoryABI::MakeChunkFromSerializedData(
4586 reinterpret_cast<uint8_t*>(const_cast<char*>(data.data())),
4587 static_cast<uint16_t>(entry.data().size()),
4588 static_cast<uint8_t>(entry.chunk()));
4589 } else
4590 chunk = shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
4591 if (!chunk.is_valid()) {
4592 PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
4593 entry.page(), entry.chunk());
4594 continue;
4595 }
4596
4597 // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
4598 // the ABI contract expects the producer to not touch the chunk anymore
4599 // (until the service marks that as free). This is why all the reads below
4600 // are just memory_order_relaxed. Also, the code here assumes that all this
4601 // data can be malicious and just gives up if anything is malformed.
4602 BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
4603 const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
4604 WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
4605 ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
4606 auto packets = chunk_header.packets.load(std::memory_order_relaxed);
4607 uint16_t num_fragments = packets.count;
4608 uint8_t chunk_flags = packets.flags;
4609
4610 service_->CopyProducerPageIntoLogBuffer(
4611 id_, client_identity_, writer_id, chunk_id, buffer_id, num_fragments,
4612 chunk_flags,
4613 /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
4614
4615 if (!commit_data_over_ipc) {
4616 // This one has release-store semantics.
4617 shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
4618 }
4619 } // for(chunks_to_move)
4620
4621 service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
4622
4623 if (req_untrusted.flush_request_id()) {
4624 service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
4625 }
4626
4627 // Keep this invocation last. ProducerIPCService::CommitData() relies on this
4628 // callback being invoked within the same callstack and not posted. If this
4629 // changes, the code there needs to be changed accordingly.
4630 if (callback)
4631 callback();
4632 }
4633
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)4634 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
4635 std::unique_ptr<SharedMemory> shared_memory,
4636 size_t page_size_bytes,
4637 bool provided_by_producer) {
4638 PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
4639 PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
4640
4641 shared_memory_ = std::move(shared_memory);
4642 shared_buffer_page_size_kb_ = page_size_bytes / 1024;
4643 is_shmem_provided_by_producer_ = provided_by_producer;
4644
4645 shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
4646 shared_memory_->size(),
4647 shared_buffer_page_size_kb() * 1024,
4648 SharedMemoryABI::ShmemMode::kDefault);
4649 if (in_process_) {
4650 inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
4651 shared_memory_->start(), shared_memory_->size(),
4652 SharedMemoryABI::ShmemMode::kDefault,
4653 shared_buffer_page_size_kb_ * 1024, this, task_runner_));
4654 inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
4655 }
4656
4657 OnTracingSetup();
4658 service_->UpdateMemoryGuardrail();
4659 }
4660
shared_memory() const4661 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
4662 PERFETTO_DCHECK_THREAD(thread_checker_);
4663 return shared_memory_.get();
4664 }
4665
shared_buffer_page_size_kb() const4666 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
4667 const {
4668 return shared_buffer_page_size_kb_;
4669 }
4670
ActivateTriggers(const std::vector<std::string> & triggers)4671 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
4672 const std::vector<std::string>& triggers) {
4673 service_->ActivateTriggers(id_, triggers);
4674 }
4675
StopDataSource(DataSourceInstanceID ds_inst_id)4676 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
4677 DataSourceInstanceID ds_inst_id) {
4678 // TODO(primiano): When we'll support tearing down the SMB, at this point we
4679 // should send the Producer a TearDownTracing if all its data sources have
4680 // been disabled (see b/77532839 and aosp/655179 PS1).
4681 PERFETTO_DCHECK_THREAD(thread_checker_);
4682 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4683 task_runner_->PostTask([weak_this, ds_inst_id] {
4684 if (weak_this)
4685 weak_this->producer_->StopDataSource(ds_inst_id);
4686 });
4687 }
4688
4689 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()4690 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
4691 if (!inproc_shmem_arbiter_) {
4692 PERFETTO_FATAL(
4693 "The in-process SharedMemoryArbiter can only be used when "
4694 "CreateProducer has been called with in_process=true and after tracing "
4695 "has started.");
4696 }
4697
4698 PERFETTO_DCHECK(in_process_);
4699 return inproc_shmem_arbiter_.get();
4700 }
4701
IsShmemProvidedByProducer() const4702 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
4703 const {
4704 return is_shmem_provided_by_producer_;
4705 }
4706
4707 // Can be called on any thread.
4708 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4709 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4710 BufferID buf_id,
4711 BufferExhaustedPolicy buffer_exhausted_policy) {
4712 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4713 return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4714 buffer_exhausted_policy);
4715 }
4716
NotifyFlushComplete(FlushRequestID id)4717 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4718 FlushRequestID id) {
4719 PERFETTO_DCHECK_THREAD(thread_checker_);
4720 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4721 return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4722 }
4723
OnTracingSetup()4724 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4725 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4726 task_runner_->PostTask([weak_this] {
4727 if (weak_this)
4728 weak_this->producer_->OnTracingSetup();
4729 });
4730 }
4731
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources,FlushFlags flush_flags)4732 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4733 FlushRequestID flush_request_id,
4734 const std::vector<DataSourceInstanceID>& data_sources,
4735 FlushFlags flush_flags) {
4736 PERFETTO_DCHECK_THREAD(thread_checker_);
4737 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4738 task_runner_->PostTask(
4739 [weak_this, flush_request_id, data_sources, flush_flags] {
4740 if (weak_this) {
4741 weak_this->producer_->Flush(flush_request_id, data_sources.data(),
4742 data_sources.size(), flush_flags);
4743 }
4744 });
4745 }
4746
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4747 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4748 DataSourceInstanceID ds_id,
4749 const DataSourceConfig& config) {
4750 PERFETTO_DCHECK_THREAD(thread_checker_);
4751 allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4752 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4753 task_runner_->PostTask([weak_this, ds_id, config] {
4754 if (weak_this)
4755 weak_this->producer_->SetupDataSource(ds_id, std::move(config));
4756 });
4757 }
4758
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4759 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4760 DataSourceInstanceID ds_id,
4761 const DataSourceConfig& config) {
4762 PERFETTO_DCHECK_THREAD(thread_checker_);
4763 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4764 task_runner_->PostTask([weak_this, ds_id, config] {
4765 if (weak_this)
4766 weak_this->producer_->StartDataSource(ds_id, std::move(config));
4767 });
4768 }
4769
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4770 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4771 DataSourceInstanceID data_source_id) {
4772 PERFETTO_DCHECK_THREAD(thread_checker_);
4773 service_->NotifyDataSourceStarted(id_, data_source_id);
4774 }
4775
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4776 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4777 DataSourceInstanceID data_source_id) {
4778 PERFETTO_DCHECK_THREAD(thread_checker_);
4779 service_->NotifyDataSourceStopped(id_, data_source_id);
4780 }
4781
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4782 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4783 const std::vector<BufferID>& target_buffers) {
4784 if (allowed_target_buffers_.empty())
4785 return;
4786 for (BufferID buffer : target_buffers)
4787 allowed_target_buffers_.erase(buffer);
4788 }
4789
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4790 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4791 const std::vector<DataSourceInstanceID>& data_sources) {
4792 PERFETTO_DCHECK_THREAD(thread_checker_);
4793 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4794 task_runner_->PostTask([weak_this, data_sources] {
4795 if (weak_this) {
4796 base::StringView producer_name(weak_this->name_);
4797 weak_this->producer_->ClearIncrementalState(data_sources.data(),
4798 data_sources.size());
4799 }
4800 });
4801 }
4802
Sync(std::function<void ()> callback)4803 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4804 std::function<void()> callback) {
4805 task_runner_->PostTask(callback);
4806 }
4807
4808 ////////////////////////////////////////////////////////////////////////////////
4809 // TracingServiceImpl::TracingSession implementation
4810 ////////////////////////////////////////////////////////////////////////////////
4811
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4812 TracingServiceImpl::TracingSession::TracingSession(
4813 TracingSessionID session_id,
4814 ConsumerEndpointImpl* consumer,
4815 const TraceConfig& new_config,
4816 base::TaskRunner* task_runner)
4817 : id(session_id),
4818 consumer_maybe_null(consumer),
4819 consumer_uid(consumer->uid_),
4820 config(new_config),
4821 snapshot_periodic_task(task_runner),
4822 timed_stop_task(task_runner) {
4823 // all_data_sources_flushed is special because we store up to 64 events of
4824 // this type. Other events will go through the default case in
4825 // SnapshotLifecycleEvent() where they will be given a max history of 1.
4826 lifecycle_events.emplace_back(
4827 protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
4828 64 /* max_size */);
4829 }
4830
4831 ////////////////////////////////////////////////////////////////////////////////
4832 // TracingServiceImpl::RelayEndpointImpl implementation
4833 ////////////////////////////////////////////////////////////////////////////////
RelayEndpointImpl(RelayClientID relay_client_id,TracingServiceImpl * service)4834 TracingServiceImpl::RelayEndpointImpl::RelayEndpointImpl(
4835 RelayClientID relay_client_id,
4836 TracingServiceImpl* service)
4837 : relay_client_id_(relay_client_id), service_(service) {}
4838 TracingServiceImpl::RelayEndpointImpl::~RelayEndpointImpl() = default;
4839
SyncClocks(SyncMode sync_mode,ClockSnapshotVector client_clocks,ClockSnapshotVector host_clocks)4840 void TracingServiceImpl::RelayEndpointImpl::SyncClocks(
4841 SyncMode sync_mode,
4842 ClockSnapshotVector client_clocks,
4843 ClockSnapshotVector host_clocks) {
4844 // We keep only the most recent 5 clock sync snapshots.
4845 static constexpr size_t kNumSyncClocks = 5;
4846 if (synced_clocks_.size() >= kNumSyncClocks)
4847 synced_clocks_.pop_front();
4848
4849 synced_clocks_.emplace_back(sync_mode, std::move(client_clocks),
4850 std::move(host_clocks));
4851 }
4852
Disconnect()4853 void TracingServiceImpl::RelayEndpointImpl::Disconnect() {
4854 service_->DisconnectRelayClient(relay_client_id_);
4855 }
4856
4857 } // namespace perfetto
4858